kbrw / riak-bundle
允许您的应用程序与Riak数据存储进行交互
Requires
- php: >=5.3.2
- guzzle/guzzle: 3.*
- jms/serializer-bundle: 0.11.*
- symfony/framework-bundle: >=2.1,<3.0-dev
README
目录
特性
RiakBundle旨在简化与Riak数据库的交互。它允许开发者专注于存储对象而不是通信API。支持的功能包括
- 支持多集群
- 配置集群:协议、域名、端口、并行支持的最大连接数、bucket列表、...
- 允许开发者直接将一些代码添加到Guzzle中:使用代理、记录或统计所有调用、...
- 支持并行调用(curl_multi)将对象插入、删除和从bucket中检索
- 检索、编辑和保存bucket属性
- 列出和统计bucket内的键
- 在bucket上进行搜索
- 通过流畅的接口执行mapreduce操作
- 控制台任务,用于列出和统计bucket内的键,删除单个键或整个bucket
- 支持自定义搜索参数(由自定义Riak Search构建使用)
- 支持bucket继承,允许您在bucket上定义自定义服务
路线图
- 支持二级索引的插入和检索操作
- 支持在YAML配置中扩展bucket设置(n_val、...)
- 将性能仪表板添加到Symfony调试工具栏
依赖项
RiakBundle需要一些其他bundle库才能运行
- Guzzle:通过curl处理HTTP请求
- JMSSerializer:处理序列化操作
安装
目前仅支持使用composer安装,但源代码可以轻松修改以在之外使用。要在经典symfony项目中安装RiakBundle,只需更新您的composer.json并添加或编辑以下行
"kbrw/riak-bundle": "1.0.*"
您可能还需要在composer.json中将"minimum-stability"调整为"dev"以使其正常工作。
然后,您只需将一些bundle添加到您的app/AppKernel.php中
new JMS\SerializerBundle\JMSSerializerBundle(), new Kbrw\RiakBundle\RiakBundle(),
配置
在下文各节中,我们假设您的基础设施由2个Riak集群组成:一个用于存储后端对象,另一个用于存储所有日志。后端集群包含两个预定义的存储桶。一个用于存储JSON格式的某些用户,另一个用于存储XML格式的某些城市。在日志集群中,您每天创建一个新的存储桶(我知道这很奇怪,但为什么不呢……)因此,您不能在此集群中预先配置存储桶。此外,您已经进行了稳健性活动,并知道后端集群可以支持多达500个并行连接,而您的前端在并行情况下永远不会被超过5个连接所影响。因此,您知道每次前端请求可以发送100个后端请求,但不能更多。如果您尝试一次性存储、获取或删除超过100个对象,只有前100个将被处理。其他对象将不得不等待有空闲槽位。RiakBundle将为您处理这种槽位机制。
所有配置都可以在config.yml文件下的新riak
命名空间中进行。
示例
riak: clusters: backend: domain: "127.0.0.1" port: "8098" client_id: "frontend" max_parallel_calls: 100 buckets: users: fqcn: 'MyCompany\MyBundle\Model\User' format: json cities: fqcn: 'MyCompany\MyBundle\Model\City' format: xml log: domain: "127.0.0.1" port: "8099" client_id: "frontend"
基本用法
访问集群
每个集群都成为名为"riak.cluster."的服务。在我们的示例中,您可以使用以下方式访问您的两个集群:
$backendCluster = $container->get("riak.cluster.backend"); $logCluster = $container->get("riak.cluster.log");
定义存储桶内容
Riak存储基于文本的对象,因此您需要提供对象的基于文本版本。我们推荐的最佳实践是创建一个注释对象,它代表您的模型。例如,一个User类可以这样实现:
<?php namespace MyCompany\MyBundle\Model; use JMS\Serializer\Annotation as Ser; /** * @Ser\AccessType("public_method") * @Ser\XmlRoot("user") */ class User { /** * @Ser\Type("string") * @Ser\SerializedName("id") * @var string */ protected $id; /** * @Ser\Type("string") * @Ser\SerializedName("email") * @var string */ protected $email; function __construct($id = null, $email = null) { $this->setId($id); $this->setEmail($email); } public function getId() { return $this->id; } public function setId($id) { $this->id = $id; } public function getEmail() { return $this->email; } public function setEmail($email) { $this->email = $email; } }
RiakBundle可以自动处理序列化和反序列化,这样您将始终与User实例一起工作,而不会与它的基于文本(JSON、XML或YAML)表示形式一起工作。您的模型类和序列化方法可以在配置中定义,如上所述,或者您可以在存储桶实例上指定它,如下所示:
$logCluster = $container->get("riak.cluster.log"); $bucket = $logCluster->getBucket("log_" . date('Y-m-d')); $bucket->setFullyQualifiedClassName("MyCompany\MyBundle\Model\LogEntry"); $bucket->setFormat("json");
将数据插入或更新到存储桶中
一旦配置了存储桶,您只需创建一个要存储的对象的实例,并要求RiakBundle为您存储它。示例:
$backendCluster = $container->get("riak.cluster.backend"); $user = new \MyCompany\MyBundle\Model\User("remi", "remi.alvado@yahoo.fr"); $backendCluster->getBucket("user")->put(array("remi" => $user));
您甚至可以使用对Riak的并行调用同时写入多个用户。
$backendCluster = $container->get("riak.cluster.backend"); $grenoble = new \MyCompany\MyBundle\Model\City("38000", "Grenoble"); $paris = new \MyCompany\MyBundle\Model\City("75000", "Paris"); $backendCluster->getBucket("city")->put( array( "38000" => $grenoble, "75000" => $paris ) );
相同的机制可用于更新数据。
$backendCluster = $container->get("riak.cluster.backend"); $paris = $backendCluster->getBucket("city")->uniq("paris"); $paris->setName("paris intra muros"); $backendCluster->getBucket("city")->put(array("75000" => $paris));
从存储桶中获取数据
一旦您在存储桶中有一些数据,您就可以开始获取它们。实际上,即使没有数据,您也可以开始获取数据。Bucket类提供了两种根据您的需求获取数据的方式:
- 使用
uniq($key)
函数获取单个数据 - 使用
fetch($keys)
函数获取数据列表 内部,这两个方法执行相同的事情,但"uniq"将提供\Kbrw\RiakBundle\Model\KV\Data实例,而"fetch"将提供\Kbrw\RiakBundle\Model\KV\Datas实例。\Kbrw\RiakBundle\Model\KV\Data类允许您访问实际对象(User、City等)以及响应头,如VClock、Last-Modified等。
示例
// Using fetch to get multiple objects $backendCluster = $container->get("riak.cluster.backend"); $datas = $backendCluster->getBucket("city")->fetch(array("paris", "grenoble")); $cities = $datas->getStructuredObjects(); // $cities will be an array of \MyCompany\MyBundle\Model\City instances // Using fetch to get one object $backendCluster = $container->get("riak.cluster.backend"); $datas = $backendCluster->getBucket("city")->fetch(array("paris")); $city = $datas->first()->getStructuredContent(); // $city will be a \MyCompany\MyBundle\Model\City instance // Using uniq to get one object $backendCluster = $container->get("riak.cluster.backend"); $city = $backendCluster->getBucket("city")->uniq("paris"); // $city will be a \MyCompany\MyBundle\Model\City instance
从存储桶中删除数据
您只需提供一个要删除的键的列表。示例:
// delete a list of key/value pairs $backendCluster = $container->get("riak.cluster.backend"); $backendCluster->getBucket("city")->delete(array("paris", "grenoble")); // delete one single key/value pair $backendCluster = $container->get("riak.cluster.backend"); $backendCluster->getBucket("city")->delete("paris");
列出bucket内的键
Riak没有像SQL数据库那样容易的方法来列出存储桶中的所有键。为了做到这一点,它必须遍历集群中的所有键。因此,即使您针对一个非常小的存储桶进行查询,这项操作在大型集群上也可能非常耗时。此外,即使RiakBundle使用"keys=stream"参数从Riak流式传输键而不是要求Riak在一个响应中返回它们,请记住,PHP可能无法很好地处理数百万值的数组。要显示存储桶中的所有键:
// delete a list of key/value pairs $backendCluster = $container->get("riak.cluster.backend"); foreach($backendCluster->getBucket("city")->keys() as $key) { echo "$key\n"; }
统计bucket内的键数
有时,存储桶中的键太多而无法获取,但您可能想要计数。要计算存储桶中的所有键:
// delete a list of key/value pairs $backendCluster = $container->get("riak.cluster.backend"); echo "'city' bucket contains" . $backendCluster->getBucket("city")->count() . " key(s)."
列出集群中的存储桶
如果您需要列出集群内的所有存储桶,可以使用以下示例
$backendCluster = $container->get("riak.cluster.backend"); print_r(backendCluster->bucketNames());
使用 Riak Search 在存储桶内搜索项目
Riak 支持类似 Solr(和 -light)的搜索引擎。RiakBundle 允许您在每个启用了搜索功能的存储桶上搜索项目。搜索可以使用简单的类似 Solr 的字符串查询或使用完全合格的\Kbrw\RiakBundle\Model\Search\Query 实例执行。示例
// with string based query $backendCluster = $container->get("riak.cluster.backend"); $usersBucket = $backendCluster->getBucket("users"); $response = $usersBucket->search("id:rem*"); // with full Query instance $backendCluster = $container->get("riak.cluster.backend"); $usersBucket = $backendCluster->getBucket("users"); $query = new \Kbrw\RiakBundle\Model\Search\Query("id:rem*"); $query->addFieldInList("id"); // only look in "id" field for each object stored in the bucket $query->setRows(5); // return only 5 results $response = $usersBucket->search($query);
在集群上执行 MapReduce 请求
Riak 允许开发者在整个集群上执行 mapreduce 操作。RiakBundle 通过流畅的接口提供同样的可能性。Mapreduce 操作由两个主要部分组成
- 选择 map 阶段将执行其上的键。开发者可以选择针对整个存储桶、预定义的键子集或可过滤的键集执行 mapreduce 操作。以下将描述这三种可能性。
- 使用 JavaScript 函数、Erlang 模块等定义 map 和 reduce 阶段。以下将定义一些可能性,但您也可以查看源代码或 API 来了解更多信息。
通用示例
$result = $this->cluster->mapReduce() ->on("meals") ->map('function(riakObject) {...}') ->link('meals', 'menu_') // to follow links pointing to any key matching "menu*" on "meals" bucket ->reduce('function(riakObject) {...}') ->responseShouldBe("\Some\JMS\Serializable\Type") ->send();
基本示例
// count how many times 'pizza' are served, meal by meal on all seasons $bucket = $this->cluster->getBucket("meals", true); $bucket->delete($bucket->keys()); $bucket->put(array("summer-1" => "pizza salad pasta meat sushi")); $bucket->put(array("summer-2" => "pizza pizza pizza pizza pizza")); $bucket->put(array("winter-1" => "cheese cheese patatoes meat vegetables")); $bucket->put(array("autumn-1" => "pizza pizza pizza mushroom meat")); $result = $this->cluster->mapReduce() ->on("meals") ->map(' function(riakObject) { var m = riakObject.values[0].data.match("pizza"); return [[riakObject.key, (m ? m.length : 0 )]]; } ') ->responseShouldBe("array<string, string>") ->send();
键过滤器示例
// count how many times 'pizza' are served, meal by meal only on winter and autumn // Apply a specific timeout (10sec) as well $result = $this->cluster->mapReduce() ->filter("meals") ->tokenize("-", 1) ->or() ->eq("winter") ->eq("autumn") ->end() ->done() ->map(' function(riakObject) { var m = riakObject.values[0].data.match("pizza"); return [[riakObject.key, (m ? m.length : 0 )]]; } ') ->timeout(10000) ->responseShouldBe("array<string, string>") ->send();
键过滤器示例
// use an erlang function to execute something on meals $result = $this->cluster->mapReduce() ->on("meals") ->configureMapPhase() ->setLanguage("erlang") ->setModule("riak_mapreduce") ->setFunction("map_object_value") ->done() ->responseShouldBe("array<Acme\DemoBundle\Model\Meal>") ->send();
异常
如果 Riak 不可用或已关闭,RiakBundle 将抛出 RiakUnavailableException
高级用法
加载和编辑存储桶配置
Riak 允许开发者自定义一些存储桶配置,例如 nval,即每个数据在集群中存储的副本数量。请参阅 Riak 文档以获取更详细的信息。
使用 RiakBundle,您可以轻松更新存储桶配置。\Kbrw\RiakBundle\Model\Bucket\Bucket 类不仅是执行存储桶操作的场所,还包含您可以管理的存储桶属性。示例
$backendCluster = $container->get("riak.cluster.backend"); $users = $backendCluster->addBucket("users", true); // the second parameter will force RiakBundle to fetch properties for this bucket $users->getProps()->setNVal(5); $users->save();
启用自动索引
Riak 支持从 Riak KV 到 Riak Search 的 JSON / XML / Erlang 数据的自动索引。此功能需要在每个存储桶级别上激活。RiakBundle 允许您轻松完成此操作
示例
$backendCluster = $container->get("riak.cluster.backend"); $usersBucket = $backendCluster->getBucket("users"); $usersBucket->enableSearchIndexing(); $usersBucket->save();
操作头部信息
Riak 不仅将对象与键关联,还关联一些头部信息。其中一些是预定义的(Last-Modified、X-Riak-Vclock 等),但您也可以添加自己的自定义头部信息。如上所述,put($objects)
方法接受一个关联数组的对象。这些对象可以是您要存储在 Riak 中的数据的简单表示,也可以是\Kbrw\RiakBundle\Model\KV\Data 实例(与 fetch 和 uniq 方法返回的相同)。在此对象上,您可以使用 headerBag 属性定义自己的自定义头部信息,该属性是\Symfony\Component\HttpFoundation\HeaderBag。示例
$remi = new \MyCompany\MyBundle\Model\User("remi", "remi.alvado@yahoo.fr"); $data = new \Kbrw\RiakBundle\Model\KV\Data("remi"); $data->setContent($remi); $data->getHeaderBag()->setHeader("X-Signup-Date", date('Y-m-d')); $backendCluster = $container->get("riak.cluster.backend"); $backendCluster->getBucket("users")->put($remi);
可以使用相同的机制来处理带有 X-Riak-Vclock 头部的 Riak 合并问题。
定义自己的存储桶类
有时,您可能需要定义自己的存储桶类,以便可以覆盖一些现有方法,添加新方法等。这可以通过像这样修改配置来实现
riak: clusters: backend: domain: "127.0.0.1" port: "8098" client_id: "frontend" max_parallel_calls: 100 buckets: points_of_interests: fqcn: 'MyCompany\MyBundle\Model\PointOfInterest' format: json class: 'Acme\DemoBundle\Model\AcmeBucket'
使用此配置,"points_of_interests" 存储桶将被初始化为 Acme\DemoBundle\Model\AcmeBucket
而不是常规存储桶。现在,您可以实现此类(它扩展了 \Kbrw\RiakBundle\Model\Bucket\Bucket
)以满足您的要求。同样,每次向集群添加存储桶时,都会抛出一个名为 "riak.bucket.add" 的事件到 EventDispatcher。此 \Symfony\Component\EventDispatcher\GenericEvent
包含新创建的存储桶,您可以按自己的意愿对其进行操作。有关更详细的信息,您可以查看此示例。
管理任务
常用选项
在集群上运行的所有任务(所以...所有任务)都支持以下选项
-c
或--cluster
:集群名称。如果未提供,则将在命令行中询问
在桶上运行的所有任务都支持以下选项
-b
或--bucket
:桶名称。如果未提供,则将在命令行中询问
列出或计算项的所有任务都支持以下选项
-r
或--raw
:如果提供,将使用原始输出格式。主要用于bash脚本。
删除项的所有任务都支持以下选项
-y
或--yes
:如果提供,将跳过所有是/否问题。主要用于bash脚本。
列出现有bucket
php app/console riak:cluster:list -c backend
删除所有bucket
php app/console riak:cluster:deleteAll -c backend
列出bucket内的键
php app/console riak:bucket:list -c backend -b meals
统计bucket内的键数
php app/console riak:bucket:count -c backend -b meals
删除bucket内的所有键
php app/console riak:bucket:deleteAll -c backend -b meals
删除bucket内的单个键
php app/console riak:bucket:delete -c backend -b meals -k summer-2