asiries335/redis-stream-php

v0.0.7 2020-10-02 21:45 UTC

README

Code Intelligence Status Scrutinizer Code Quality Code Coverage Build Status

本包用于与Redis流进行php操作

特性

  1. 在流中添加消息
  2. 从流中删除消息
  3. 根据消息ID从流中查找消息
  4. 获取流中的消息集合
  5. 为流创建组消费者
  6. 从流中删除组消费者
  7. 从组中删除消费者
  8. 监听流,基于(https://github.com/reactphp/event-loop)实现

信息

需要Redis版本 >= 5.0

安装

composer require asiries335/redis-stream-php

使用

开始工作

<?php
use Asiries335\redisSteamPhp\Dto\StreamCommandCallTransporter;

// Example use in your app.
class Config implements \Asiries335\redisSteamPhp\ClientRedisStreamPhpInterface {

    private $client;

    public function __construct()
    {
        $this->client = new \Redis();
        $this->client->connect('127.0.0.1', '6379');
    }

    /**
     * Method for run command of redis
     *
     * @param StreamCommandCallTransporter $commandCallTransporter
     *
     * @return mixed
     *
     * @throws \Dto\Exceptions\InvalidDataTypeException
     * @throws \Dto\Exceptions\InvalidKeyException
     */
    public function call(StreamCommandCallTransporter $commandCallTransporter)
    {
        // Example use.
        return $this->client->rawCommand(
            $commandCallTransporter->get('command')->toScalar(),
            ...$commandCallTransporter->get('args')->toArray()
        );
    }
}

$client = new \Asiries335\redisSteamPhp\Client(new Config());

向流中添加消息

$client->stream('test')->add(
    'key',
    [
        'id'   => 123,
        'name' => 'Barney',
        'age'  => 25,
    ]
);

根据ID查找消息

$message = $client->stream('test')->findById('1599404282894-0');

// result.
Asiries335\redisSteamPhp\Data\Message {
  -_id: "1599404282894-0"
  -_key: "user"
  -_body: "{"id":123,"name":"Barney","age":25}"
}

删除消息

$client->stream('test')->delete('key');

更多信息 https://redis.ac.cn/commands/xdel

获取流中的消息集合

// Get data from stream.
$collection = $client->stream('test')->get();

// result.

 Asiries335\redisSteamPhp\Data\Collection {
  -_name: "test"
  -_messages: [
    0 => Asiries335\redisSteamPhp\Data\Message {
      -_id: "1588098124977-0"
      -_key: "key"
      -_body: "{"id":123,"name":"Barney","age":25}"
    }
    1 => Asiries335\redisSteamPhp\Data\Message {
      -_id: "1588098124977-1"
      -_key: "key"
      -_body: "{"id":124,"name":"Smith","age":30}"
    }
    2 => Asiries335\redisSteamPhp\Data\Message {
      -_id: "1588500979608-0"
      -_key: "key"
      -_body: "{"id":163,"name":"Alex","age":20}"
    }
  ]
}

监听流

基于包的功能性操作 https://github.com/reactphp/event-loop

$client->stream('test')->listen(
    function (\Asiries335\redisSteamPhp\Data\Message $message) {
        // Your code...
    }
);

创建新的消费者组

$streamName = 'test';
$groupName  = 'demo-group-1';
$isShowFullHistoryStream = false;

// return bool or ErrorException.
$client->streamGroupConsumer($streamName)->create($groupName, $isShowFullHistoryStream);

销毁消费者组

$streamName = 'test';
$groupName  = 'demo-group-1';

// return bool or ErrorException.
$client->streamGroupConsumer($streamName)->destroy($groupName);

从组中删除消费者

$streamName = 'test';
$groupName  = 'demo-group-1';
$consumerName = 'consumer-name';

// return bool or ErrorException.
$client->streamGroupConsumer($streamName)->deleteConsumer($groupName, $consumerName);