rf/amazon-kinesis-client-library-php

0.0.2 2014-04-13 14:19 UTC

This package is not auto-updated.

Last update: 2024-09-28 15:54:46 UTC


README

Build Status

概览

Amazon Kinesis 客户端模块的包装库。(http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Kinesis.KinesisClient.html)

我想简化从流中提取的Shard Id和Sequence Number的管理。另外,存储管理也是一个选项。

要求

安装

使用Composer

composer.json

{
    "require": {
        "rf/amazon-kinesis-client-library-php": "0.0.1"
    }
}

复制目录

git clone https://github.com/fukuiretu/amazon-kinesis-client-library-php.git
cp -r src/Rf <path/to/your_project>

用法

案例:数据推送

require 'vendor/autoload.php';

use Aws\Kinesis\KinesisClient;
use Aws\Common\Enum\Region;
use Rf\Aws\AutoLoader;
use Rf\Aws\Kinesis\ClientLibrary\KinesisProxy;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardFileDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardMemcacheDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisStorageManager;

define('STREAM_NAME', 'kinesis-trial');
$kinesis = KinesisClient::factory(array(
  'key' => 'XXXXX',
  'secret' => 'XXXXX',
  'region' => Region::VIRGINIA
));
$kinesis_proxy = KinesisProxy::factory($kinesis, STREAM_NAME);

while (true) {
    $sample_data = date('YmdHis');
    $kinesis_proxy->putRecord($sample_data, mt_rand(1, 1000000));

    echo $sample_data, PHP_EOL;
    sleep(1);
}

案例:使用Memcache数据存储

<?php

require 'vendor/autoload.php';

use Aws\Kinesis\KinesisClient;
use Aws\Common\Enum\Region;
use Rf\Aws\AutoLoader;
use Rf\Aws\Kinesis\ClientLibrary\KinesisProxy;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardFileDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardMemcacheDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisStorageManager;

define('STREAM_NAME', 'kinesis-trial');
$kinesis = KinesisClient::factory(array(
  'key' => 'XXXXX',
  'secret' => 'XXXXX',
  'region' => Region::VIRGINIA
));

$memcache = new Memcache();
$memcache->addServer("localhost", 11211);

$kinesis_proxy = KinesisProxy::factory($kinesis,  STREAM_NAME);
$kinesis_storage_manager = new KinesisStorageManager($kinesis_proxy, new KinesisShardMemcacheDataStore($memcache));

$data_records = $kinesis_storage_manager->findWithMergeStoreDataRecords(null, 10, 5);
foreach ($data_records as $data_record) {
  echo $data_record->getData(), PHP_EOL;
}

$kinesis_storage_manager->saveAll();

案例:使用文件数据存储


<?php

require 'vendor/autoload.php';

use Aws\Kinesis\KinesisClient;
use Aws\Common\Enum\Region;
use Rf\Aws\AutoLoader;
use Rf\Aws\Kinesis\ClientLibrary\KinesisProxy;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardFileDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardMemcacheDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisStorageManager;

define('STREAM_NAME', 'kinesis-trial');

$kinesis = KinesisClient::factory(array(
  'key' => 'XXXXX',
  'secret' => 'XXXXX',
  'region' => Region::VIRGINIA
));

$kinesis_proxy = KinesisProxy::factory($kinesis, STREAM_NAME);
$kinesis_storage_manager = new KinesisStorageManager($kinesis_proxy, new KinesisShardFileDataStore('/tmp/amazon-kinesis'));

$data_records = $kinesis_storage_manager->findWithMergeStoreDataRecords(null, 10, 5);
foreach ($data_records as $data_record) {
  echo $data_record->getData(), PHP_EOL;
}

$kinesis_storage_manager->saveAll();

如果你不使用Composer,以下是需要的内容:

require 'src/Rf/Aws/AutoLoader.php';
AutoLoader::register();

指定ShardId,获取数量

默认情况下,针对所有分片,获取数据记录的上限为5,000(1000条记录 * 5次循环)。以下是一个示例,从"shardId-000000000000"分片获取最多10,000(1000条记录 * 10次循环)的数据记录。

$data_records = $kinesis_storage_manager->findWithMergeStoreDataRecords('shardId-000000000000', 1000, 10);

待办事项

  • 支持存储的增加。DynamoDB、Redis等...
  • 并行处理。

版权

版权:: 版权 (c) 2014- Fukui ReTu 许可:: MIT