asiries335 / redis-stream-php
Redis stream php
v0.0.7
2020-10-02 21:45 UTC
Requires
- dto/dto: ^3.2
- predis/predis: ^1.1
- react/event-loop: ^1.1.1
Requires (Dev)
- larapack/dd: ^1.1
- mockery/mockery: ^1.3
- phpmetrics/phpmetrics: ^2.7
- phpunit/php-code-coverage: ^7.0
- phpunit/phpunit: ^8.5
README
本包用于与Redis流进行php操作
特性
- 在流中添加消息
- 从流中删除消息
- 根据消息ID从流中查找消息
- 获取流中的消息集合
- 为流创建组消费者
- 从流中删除组消费者
- 从组中删除消费者
- 监听流,基于(https://github.com/reactphp/event-loop)实现
信息
需要Redis版本 >= 5.0
安装
composer require asiries335/redis-stream-php
使用
开始工作
<?php use Asiries335\redisSteamPhp\Dto\StreamCommandCallTransporter; // Example use in your app. class Config implements \Asiries335\redisSteamPhp\ClientRedisStreamPhpInterface { private $client; public function __construct() { $this->client = new \Redis(); $this->client->connect('127.0.0.1', '6379'); } /** * Method for run command of redis * * @param StreamCommandCallTransporter $commandCallTransporter * * @return mixed * * @throws \Dto\Exceptions\InvalidDataTypeException * @throws \Dto\Exceptions\InvalidKeyException */ public function call(StreamCommandCallTransporter $commandCallTransporter) { // Example use. return $this->client->rawCommand( $commandCallTransporter->get('command')->toScalar(), ...$commandCallTransporter->get('args')->toArray() ); } } $client = new \Asiries335\redisSteamPhp\Client(new Config());
向流中添加消息
$client->stream('test')->add( 'key', [ 'id' => 123, 'name' => 'Barney', 'age' => 25, ] );
根据ID查找消息
$message = $client->stream('test')->findById('1599404282894-0'); // result. Asiries335\redisSteamPhp\Data\Message { -_id: "1599404282894-0" -_key: "user" -_body: "{"id":123,"name":"Barney","age":25}" }
删除消息
$client->stream('test')->delete('key');
更多信息 https://redis.ac.cn/commands/xdel
获取流中的消息集合
// Get data from stream. $collection = $client->stream('test')->get(); // result. Asiries335\redisSteamPhp\Data\Collection { -_name: "test" -_messages: [ 0 => Asiries335\redisSteamPhp\Data\Message { -_id: "1588098124977-0" -_key: "key" -_body: "{"id":123,"name":"Barney","age":25}" } 1 => Asiries335\redisSteamPhp\Data\Message { -_id: "1588098124977-1" -_key: "key" -_body: "{"id":124,"name":"Smith","age":30}" } 2 => Asiries335\redisSteamPhp\Data\Message { -_id: "1588500979608-0" -_key: "key" -_body: "{"id":163,"name":"Alex","age":20}" } ] }
监听流
基于包的功能性操作 https://github.com/reactphp/event-loop
$client->stream('test')->listen( function (\Asiries335\redisSteamPhp\Data\Message $message) { // Your code... } );
创建新的消费者组
$streamName = 'test'; $groupName = 'demo-group-1'; $isShowFullHistoryStream = false; // return bool or ErrorException. $client->streamGroupConsumer($streamName)->create($groupName, $isShowFullHistoryStream);
销毁消费者组
$streamName = 'test'; $groupName = 'demo-group-1'; // return bool or ErrorException. $client->streamGroupConsumer($streamName)->destroy($groupName);
从组中删除消费者
$streamName = 'test'; $groupName = 'demo-group-1'; $consumerName = 'consumer-name'; // return bool or ErrorException. $client->streamGroupConsumer($streamName)->deleteConsumer($groupName, $consumerName);