rf / amazon-kinesis-client-library-php
Amazon Kinesis 包装类
0.0.2
2014-04-13 14:19 UTC
Requires
- php: >=5.3.3
- aws/aws-sdk-php: 2.5.2
Requires (Dev)
- phpunit/phpunit: 4.0.9
This package is not auto-updated.
Last update: 2024-09-28 15:54:46 UTC
README
概览
Amazon Kinesis 客户端模块的包装库。(http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Kinesis.KinesisClient.html)
我想简化从流中提取的Shard Id和Sequence Number的管理。另外,存储管理也是一个选项。
要求
- PHP >= 5.3.3
- aws-sdk-php (https://github.com/aws/aws-sdk-php)
- [可选] memcache.so (http://pecl.php.net/package/memcache) AND memcached (http://memcached.org/)
安装
使用Composer
composer.json
{
"require": {
"rf/amazon-kinesis-client-library-php": "0.0.1"
}
}
复制目录
git clone https://github.com/fukuiretu/amazon-kinesis-client-library-php.git
cp -r src/Rf <path/to/your_project>
用法
案例:数据推送
require 'vendor/autoload.php';
use Aws\Kinesis\KinesisClient;
use Aws\Common\Enum\Region;
use Rf\Aws\AutoLoader;
use Rf\Aws\Kinesis\ClientLibrary\KinesisProxy;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardFileDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardMemcacheDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisStorageManager;
define('STREAM_NAME', 'kinesis-trial');
$kinesis = KinesisClient::factory(array(
'key' => 'XXXXX',
'secret' => 'XXXXX',
'region' => Region::VIRGINIA
));
$kinesis_proxy = KinesisProxy::factory($kinesis, STREAM_NAME);
while (true) {
$sample_data = date('YmdHis');
$kinesis_proxy->putRecord($sample_data, mt_rand(1, 1000000));
echo $sample_data, PHP_EOL;
sleep(1);
}
案例:使用Memcache数据存储
<?php
require 'vendor/autoload.php';
use Aws\Kinesis\KinesisClient;
use Aws\Common\Enum\Region;
use Rf\Aws\AutoLoader;
use Rf\Aws\Kinesis\ClientLibrary\KinesisProxy;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardFileDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardMemcacheDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisStorageManager;
define('STREAM_NAME', 'kinesis-trial');
$kinesis = KinesisClient::factory(array(
'key' => 'XXXXX',
'secret' => 'XXXXX',
'region' => Region::VIRGINIA
));
$memcache = new Memcache();
$memcache->addServer("localhost", 11211);
$kinesis_proxy = KinesisProxy::factory($kinesis, STREAM_NAME);
$kinesis_storage_manager = new KinesisStorageManager($kinesis_proxy, new KinesisShardMemcacheDataStore($memcache));
$data_records = $kinesis_storage_manager->findWithMergeStoreDataRecords(null, 10, 5);
foreach ($data_records as $data_record) {
echo $data_record->getData(), PHP_EOL;
}
$kinesis_storage_manager->saveAll();
案例:使用文件数据存储
<?php
require 'vendor/autoload.php';
use Aws\Kinesis\KinesisClient;
use Aws\Common\Enum\Region;
use Rf\Aws\AutoLoader;
use Rf\Aws\Kinesis\ClientLibrary\KinesisProxy;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardFileDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardMemcacheDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisStorageManager;
define('STREAM_NAME', 'kinesis-trial');
$kinesis = KinesisClient::factory(array(
'key' => 'XXXXX',
'secret' => 'XXXXX',
'region' => Region::VIRGINIA
));
$kinesis_proxy = KinesisProxy::factory($kinesis, STREAM_NAME);
$kinesis_storage_manager = new KinesisStorageManager($kinesis_proxy, new KinesisShardFileDataStore('/tmp/amazon-kinesis'));
$data_records = $kinesis_storage_manager->findWithMergeStoreDataRecords(null, 10, 5);
foreach ($data_records as $data_record) {
echo $data_record->getData(), PHP_EOL;
}
$kinesis_storage_manager->saveAll();
如果你不使用Composer,以下是需要的内容:
require 'src/Rf/Aws/AutoLoader.php';
AutoLoader::register();
指定ShardId,获取数量
默认情况下,针对所有分片,获取数据记录的上限为5,000(1000条记录 * 5次循环)。以下是一个示例,从"shardId-000000000000"分片获取最多10,000(1000条记录 * 10次循环)的数据记录。
$data_records = $kinesis_storage_manager->findWithMergeStoreDataRecords('shardId-000000000000', 1000, 10);
待办事项
- 支持存储的增加。DynamoDB、Redis等...
- 并行处理。
版权
版权:: 版权 (c) 2014- Fukui ReTu 许可:: MIT