kbrw/riak-bundle

允许您的应用程序与Riak数据存储进行交互

安装次数: 11,965

依赖关系: 0

建议者: 0

安全性: 0

星标: 15

关注者: 6

分支: 8

公开问题: 0

类型:symfony-bundle

1.6.0 2014-08-28 09:49 UTC

README

Build Status

目录

特性

RiakBundle旨在简化与Riak数据库的交互。它允许开发者专注于存储对象而不是通信API。支持的功能包括

  • 支持多集群
  • 配置集群:协议、域名、端口、并行支持的最大连接数、bucket列表、...
  • 允许开发者直接将一些代码添加到Guzzle中:使用代理、记录或统计所有调用、...
  • 支持并行调用(curl_multi)将对象插入、删除和从bucket中检索
  • 检索、编辑和保存bucket属性
  • 列出和统计bucket内的键
  • 在bucket上进行搜索
  • 通过流畅的接口执行mapreduce操作
  • 控制台任务,用于列出和统计bucket内的键,删除单个键或整个bucket
  • 支持自定义搜索参数(由自定义Riak Search构建使用)
  • 支持bucket继承,允许您在bucket上定义自定义服务

路线图

  • 支持二级索引的插入和检索操作
  • 支持在YAML配置中扩展bucket设置(n_val、...)
  • 将性能仪表板添加到Symfony调试工具栏

依赖项

RiakBundle需要一些其他bundle库才能运行

  • Guzzle:通过curl处理HTTP请求
  • JMSSerializer:处理序列化操作

安装

目前仅支持使用composer安装,但源代码可以轻松修改以在之外使用。要在经典symfony项目中安装RiakBundle,只需更新您的composer.json并添加或编辑以下行

"kbrw/riak-bundle": "1.0.*"

您可能还需要在composer.json中将"minimum-stability"调整为"dev"以使其正常工作。

然后,您只需将一些bundle添加到您的app/AppKernel.php中

new JMS\SerializerBundle\JMSSerializerBundle(),
new Kbrw\RiakBundle\RiakBundle(),

配置

在下文各节中,我们假设您的基础设施由2个Riak集群组成:一个用于存储后端对象,另一个用于存储所有日志。后端集群包含两个预定义的存储桶。一个用于存储JSON格式的某些用户,另一个用于存储XML格式的某些城市。在日志集群中,您每天创建一个新的存储桶(我知道这很奇怪,但为什么不呢……)因此,您不能在此集群中预先配置存储桶。此外,您已经进行了稳健性活动,并知道后端集群可以支持多达500个并行连接,而您的前端在并行情况下永远不会被超过5个连接所影响。因此,您知道每次前端请求可以发送100个后端请求,但不能更多。如果您尝试一次性存储、获取或删除超过100个对象,只有前100个将被处理。其他对象将不得不等待有空闲槽位。RiakBundle将为您处理这种槽位机制。

所有配置都可以在config.yml文件下的新riak命名空间中进行。

示例

riak:
  clusters:
    backend:
      domain: "127.0.0.1"
      port: "8098"
      client_id: "frontend"
      max_parallel_calls: 100
      buckets:
        users:
          fqcn: 'MyCompany\MyBundle\Model\User'
          format: json
        cities:
          fqcn: 'MyCompany\MyBundle\Model\City'
          format: xml
    log:
      domain: "127.0.0.1"
      port: "8099"
      client_id: "frontend"

基本用法

访问集群

每个集群都成为名为"riak.cluster."的服务。在我们的示例中,您可以使用以下方式访问您的两个集群:

$backendCluster = $container->get("riak.cluster.backend");
$logCluster = $container->get("riak.cluster.log");

定义存储桶内容

Riak存储基于文本的对象,因此您需要提供对象的基于文本版本。我们推荐的最佳实践是创建一个注释对象,它代表您的模型。例如,一个User类可以这样实现:

<?php

namespace MyCompany\MyBundle\Model;

use JMS\Serializer\Annotation as Ser;

/** 
 * @Ser\AccessType("public_method") 
 * @Ser\XmlRoot("user")
 */
class User
{
    /**
     * @Ser\Type("string") 
     * @Ser\SerializedName("id")
     * @var string
     */
    protected $id;
    
    /**
     * @Ser\Type("string") 
     * @Ser\SerializedName("email")
     * @var string
     */
    protected $email;
    
    function __construct($id = null, $email = null)
    {
        $this->setId($id);
        $this->setEmail($email);
    }
    
    public function getId()
    {
        return $this->id;
    }

    public function setId($id)
    {
        $this->id = $id;
    }
    
    public function getEmail()
    {
        return $this->email;
    }

    public function setEmail($email)
    {
        $this->email = $email;
    }
}

RiakBundle可以自动处理序列化和反序列化,这样您将始终与User实例一起工作,而不会与它的基于文本(JSON、XML或YAML)表示形式一起工作。您的模型类和序列化方法可以在配置中定义,如上所述,或者您可以在存储桶实例上指定它,如下所示:

$logCluster = $container->get("riak.cluster.log");
$bucket = $logCluster->getBucket("log_" . date('Y-m-d'));
$bucket->setFullyQualifiedClassName("MyCompany\MyBundle\Model\LogEntry");
$bucket->setFormat("json");

将数据插入或更新到存储桶中

一旦配置了存储桶,您只需创建一个要存储的对象的实例,并要求RiakBundle为您存储它。示例:

$backendCluster = $container->get("riak.cluster.backend");
$user = new \MyCompany\MyBundle\Model\User("remi", "remi.alvado@yahoo.fr");
$backendCluster->getBucket("user")->put(array("remi" => $user));

您甚至可以使用对Riak的并行调用同时写入多个用户。

$backendCluster = $container->get("riak.cluster.backend");
$grenoble = new \MyCompany\MyBundle\Model\City("38000", "Grenoble");
$paris = new \MyCompany\MyBundle\Model\City("75000", "Paris");
$backendCluster->getBucket("city")->put(
  array(
    "38000" => $grenoble,
    "75000" => $paris
  )
);

相同的机制可用于更新数据。

$backendCluster = $container->get("riak.cluster.backend");
$paris = $backendCluster->getBucket("city")->uniq("paris");
$paris->setName("paris intra muros");
$backendCluster->getBucket("city")->put(array("75000" => $paris));

从存储桶中获取数据

一旦您在存储桶中有一些数据,您就可以开始获取它们。实际上,即使没有数据,您也可以开始获取数据。Bucket类提供了两种根据您的需求获取数据的方式:

示例

// Using fetch to get multiple objects
$backendCluster = $container->get("riak.cluster.backend");
$datas = $backendCluster->getBucket("city")->fetch(array("paris", "grenoble"));
$cities = $datas->getStructuredObjects(); // $cities will be an array of \MyCompany\MyBundle\Model\City instances

// Using fetch to get one object
$backendCluster = $container->get("riak.cluster.backend");
$datas = $backendCluster->getBucket("city")->fetch(array("paris"));
$city = $datas->first()->getStructuredContent(); // $city will be a \MyCompany\MyBundle\Model\City instance

// Using uniq to get one object
$backendCluster = $container->get("riak.cluster.backend");
$city = $backendCluster->getBucket("city")->uniq("paris"); // $city will be a \MyCompany\MyBundle\Model\City instance

从存储桶中删除数据

您只需提供一个要删除的键的列表。示例:

// delete a list of key/value pairs
$backendCluster = $container->get("riak.cluster.backend");
$backendCluster->getBucket("city")->delete(array("paris", "grenoble"));

// delete one single key/value pair
$backendCluster = $container->get("riak.cluster.backend");
$backendCluster->getBucket("city")->delete("paris");

列出bucket内的键

Riak没有像SQL数据库那样容易的方法来列出存储桶中的所有键。为了做到这一点,它必须遍历集群中的所有键。因此,即使您针对一个非常小的存储桶进行查询,这项操作在大型集群上也可能非常耗时。此外,即使RiakBundle使用"keys=stream"参数从Riak流式传输键而不是要求Riak在一个响应中返回它们,请记住,PHP可能无法很好地处理数百万值的数组。要显示存储桶中的所有键:

// delete a list of key/value pairs
$backendCluster = $container->get("riak.cluster.backend");
foreach($backendCluster->getBucket("city")->keys() as $key) {
  echo "$key\n";
}

统计bucket内的键数

有时,存储桶中的键太多而无法获取,但您可能想要计数。要计算存储桶中的所有键:

// delete a list of key/value pairs
$backendCluster = $container->get("riak.cluster.backend");
echo "'city' bucket contains" . $backendCluster->getBucket("city")->count() . " key(s)."

列出集群中的存储桶

如果您需要列出集群内的所有存储桶,可以使用以下示例

$backendCluster = $container->get("riak.cluster.backend");
print_r(backendCluster->bucketNames());

使用 Riak Search 在存储桶内搜索项目

Riak 支持类似 Solr(和 -light)的搜索引擎。RiakBundle 允许您在每个启用了搜索功能的存储桶上搜索项目。搜索可以使用简单的类似 Solr 的字符串查询或使用完全合格的\Kbrw\RiakBundle\Model\Search\Query 实例执行。示例

// with string based query
$backendCluster = $container->get("riak.cluster.backend");
$usersBucket = $backendCluster->getBucket("users");
$response = $usersBucket->search("id:rem*");

// with full Query instance
$backendCluster = $container->get("riak.cluster.backend");
$usersBucket = $backendCluster->getBucket("users");
$query = new \Kbrw\RiakBundle\Model\Search\Query("id:rem*");
$query->addFieldInList("id"); // only look in "id" field for each object stored in the bucket
$query->setRows(5); // return only 5 results
$response = $usersBucket->search($query);

在集群上执行 MapReduce 请求

Riak 允许开发者在整个集群上执行 mapreduce 操作。RiakBundle 通过流畅的接口提供同样的可能性。Mapreduce 操作由两个主要部分组成

  • 选择 map 阶段将执行其上的键。开发者可以选择针对整个存储桶、预定义的键子集或可过滤的键集执行 mapreduce 操作。以下将描述这三种可能性。
  • 使用 JavaScript 函数、Erlang 模块等定义 map 和 reduce 阶段。以下将定义一些可能性,但您也可以查看源代码或 API 来了解更多信息。

通用示例

$result = $this->cluster->mapReduce()
  ->on("meals")
  ->map('function(riakObject) {...}')
  ->link('meals', 'menu_') // to follow links pointing to any key matching "menu*" on "meals" bucket
  ->reduce('function(riakObject) {...}')
  ->responseShouldBe("\Some\JMS\Serializable\Type")
  ->send();

基本示例

// count how many times 'pizza' are served, meal by meal on all seasons
$bucket = $this->cluster->getBucket("meals", true);
$bucket->delete($bucket->keys());
$bucket->put(array("summer-1" => "pizza salad pasta meat sushi"));
$bucket->put(array("summer-2" => "pizza pizza pizza pizza pizza"));
$bucket->put(array("winter-1" => "cheese cheese patatoes meat vegetables"));
$bucket->put(array("autumn-1" => "pizza pizza pizza mushroom meat"));
$result = $this->cluster->mapReduce()
  ->on("meals")
  ->map('
      function(riakObject) {   
          var m =  riakObject.values[0].data.match("pizza");
          return  [[riakObject.key, (m ? m.length : 0 )]];
      }    
  ')
  ->responseShouldBe("array<string, string>")
  ->send();

键过滤器示例

// count how many times 'pizza' are served, meal by meal only on winter and autumn
// Apply a specific timeout (10sec) as well
$result = $this->cluster->mapReduce()
  ->filter("meals")
    ->tokenize("-", 1)
    ->or()
      ->eq("winter")
      ->eq("autumn")
    ->end()
  ->done()
  ->map('
      function(riakObject) {   
          var m =  riakObject.values[0].data.match("pizza");
          return  [[riakObject.key, (m ? m.length : 0 )]];
      }    
  ')
  ->timeout(10000)
  ->responseShouldBe("array<string, string>")
  ->send();

键过滤器示例

// use an erlang function to execute something on meals
$result = $this->cluster->mapReduce()
  ->on("meals")
  ->configureMapPhase()
    ->setLanguage("erlang")
    ->setModule("riak_mapreduce")
    ->setFunction("map_object_value")
  ->done()
  ->responseShouldBe("array<Acme\DemoBundle\Model\Meal>")
  ->send();

异常

如果 Riak 不可用或已关闭,RiakBundle 将抛出 RiakUnavailableException

高级用法

加载和编辑存储桶配置

Riak 允许开发者自定义一些存储桶配置,例如 nval,即每个数据在集群中存储的副本数量。请参阅 Riak 文档以获取更详细的信息。

使用 RiakBundle,您可以轻松更新存储桶配置。\Kbrw\RiakBundle\Model\Bucket\Bucket 类不仅是执行存储桶操作的场所,还包含您可以管理的存储桶属性。示例

$backendCluster = $container->get("riak.cluster.backend");
$users = $backendCluster->addBucket("users", true); // the second parameter will force RiakBundle to fetch properties for this bucket
$users->getProps()->setNVal(5);
$users->save();

启用自动索引

Riak 支持从 Riak KV 到 Riak Search 的 JSON / XML / Erlang 数据的自动索引。此功能需要在每个存储桶级别上激活。RiakBundle 允许您轻松完成此操作

示例

$backendCluster = $container->get("riak.cluster.backend");
$usersBucket = $backendCluster->getBucket("users");
$usersBucket->enableSearchIndexing();
$usersBucket->save();

操作头部信息

Riak 不仅将对象与键关联,还关联一些头部信息。其中一些是预定义的(Last-Modified、X-Riak-Vclock 等),但您也可以添加自己的自定义头部信息。如上所述,put($objects) 方法接受一个关联数组的对象。这些对象可以是您要存储在 Riak 中的数据的简单表示,也可以是\Kbrw\RiakBundle\Model\KV\Data 实例(与 fetch 和 uniq 方法返回的相同)。在此对象上,您可以使用 headerBag 属性定义自己的自定义头部信息,该属性是\Symfony\Component\HttpFoundation\HeaderBag。示例

$remi = new \MyCompany\MyBundle\Model\User("remi", "remi.alvado@yahoo.fr");
$data = new \Kbrw\RiakBundle\Model\KV\Data("remi");
$data->setContent($remi);
$data->getHeaderBag()->setHeader("X-Signup-Date", date('Y-m-d'));

$backendCluster = $container->get("riak.cluster.backend");
$backendCluster->getBucket("users")->put($remi);

可以使用相同的机制来处理带有 X-Riak-Vclock 头部的 Riak 合并问题。

定义自己的存储桶类

有时,您可能需要定义自己的存储桶类,以便可以覆盖一些现有方法,添加新方法等。这可以通过像这样修改配置来实现

riak:
  clusters:
    backend:
      domain: "127.0.0.1"
      port: "8098"
      client_id: "frontend"
      max_parallel_calls: 100
      buckets:
        points_of_interests:
          fqcn: 'MyCompany\MyBundle\Model\PointOfInterest'
          format: json
          class: 'Acme\DemoBundle\Model\AcmeBucket'

使用此配置,"points_of_interests" 存储桶将被初始化为 Acme\DemoBundle\Model\AcmeBucket 而不是常规存储桶。现在,您可以实现此类(它扩展了 \Kbrw\RiakBundle\Model\Bucket\Bucket)以满足您的要求。同样,每次向集群添加存储桶时,都会抛出一个名为 "riak.bucket.add" 的事件到 EventDispatcher。此 \Symfony\Component\EventDispatcher\GenericEvent 包含新创建的存储桶,您可以按自己的意愿对其进行操作。有关更详细的信息,您可以查看此示例

管理任务

常用选项

在集群上运行的所有任务(所以...所有任务)都支持以下选项

  • -c--cluster:集群名称。如果未提供,则将在命令行中询问

在桶上运行的所有任务都支持以下选项

  • -b--bucket:桶名称。如果未提供,则将在命令行中询问

列出或计算项的所有任务都支持以下选项

  • -r--raw:如果提供,将使用原始输出格式。主要用于bash脚本。

删除项的所有任务都支持以下选项

  • -y--yes:如果提供,将跳过所有是/否问题。主要用于bash脚本。

列出现有bucket

php app/console riak:cluster:list -c backend

删除所有bucket

php app/console riak:cluster:deleteAll -c backend

列出bucket内的键

php app/console riak:bucket:list -c backend -b meals

统计bucket内的键数

php app/console riak:bucket:count -c backend -b meals

删除bucket内的所有键

php app/console riak:bucket:deleteAll -c backend -b meals

删除bucket内的单个键

php app/console riak:bucket:delete -c backend -b meals -k summer-2