arekxv/php-data-streamer

1.0.1 2021-07-29 13:28 UTC

This package is auto-updated.

Last update: 2024-09-29 06:08:57 UTC


README

Documentation Status Build Status Scrutinizer Code Quality Code Coverage Code Intelligence Status

数据流库,处理多个Redis驱动程序,并提供对Redis Streams和消费者组的简单访问。

支持php-redis和predis驱动程序。

安装

通过composer: composer require arekxv/php-data-streamer

用法

此库支持读取数据流和写入数据流。

读取和处理消息

要将您的代码设置为监听和读取消息,代码应设置如下

use ArekX\DataStreamer\Data\ArrayMessage;
use ArekX\DataStreamer\Data\CallableHandler;
use ArekX\DataStreamer\Data\CallableParser;
use ArekX\DataStreamer\Data\Settings;
use ArekX\DataStreamer\Drivers\RedisDriver;
use ArekX\DataStreamer\StreamReader;

// Specify a redis driver to use
$driver = new RedisDriver(); // or new \ArekX\DataStreamer\Drivers\PredisDriver() if Predis package is used
$driver->connect([
    'host' => '127.0.0.1'
]);

// Specify a message converter from array into instances
// This can be one callable via setDefaultBuilder or per type in setBuilder
$parser = new CallableParser();
$parser->setDefaultBuilder(fn(string $id, string $type, array $payload) => ArrayMessage::create($type, $payload, $id));

// Set handler for messages this can be a default one for all messages
// or a custom one per Message::getType()
$handler = new CallableHandler();
$handler->setDefaultHandler(function (ArrayMessage $message) {
    echo "{$message->getId()}: " . json_encode($message->getPayload()) . PHP_EOL;
    return true; // Returning true means that message was handled successfully.
});

// Settings object which holds the configuration for the stream.
$settings = new Settings([
    'stream' => 'data-stream',
    'consumerGroup' => 'my-consumer-group',
    'consumerName' => 'my-consumer-consumer',
]);

// Initialize data stream reader.
$reader = new StreamReader($driver, $parser, $handler, $settings);

// Run infinite loop to process messages.
echo "Listening..." . PHP_EOL;
$reader->runLoop();

发送消息

要发送消息,您可以使用以下代码发送数据

use ArekX\DataStreamer\Data\ArrayMessage;
use ArekX\DataStreamer\Data\PayloadMessageConverter;
use ArekX\DataStreamer\Data\Settings;
use ArekX\DataStreamer\Drivers\RedisDriver;
use ArekX\DataStreamer\StreamWriter;

// Specify a redis driver to use
$driver = new RedisDriver(); // or new \ArekX\DataStreamer\Drivers\PredisDriver() if Predis package is used
$driver->connect([
    'host' => '127.0.0.1'
]);

// Settings object which holds the configuration for the stream.
$settings = new Settings([
    'stream' => 'data-stream'
]);

// Define a converter which will convert a message into an array
// suitable for sending across the data stream.
$converter = new PayloadMessageConverter();

// Initialize a stream writer
$writer = new StreamWriter($driver, $settings, $converter);

// Send message to the data stream
$writer->write(ArrayMessage::create('test-type', [
    'key' => 'value',
    'key2' => 'value2'
]));

文档

文档可在以下位置找到:http://php-data-streamer.rtfd.io/

您还可以手动构建文档或直接从docs文件夹中读取。

构建

  1. 安装Python 3和PIP
  2. 运行pip install mkdocs
  3. 运行mkdocs serve以提供服务或mkdocs build以构建。

测试

要运行测试,请运行composer test

许可证

版权所有 Aleksandar Panic

根据Apache License,版本2.0(“许可证”);除非遵守许可证规定,否则不得使用此文件。您可以在以下位置获得许可证副本:

https://apache.ac.cn/licenses/LICENSE2.0在此存储库中

除非适用法律要求或书面同意,否则根据许可证分发的软件按“原样”提供,不提供任何明示或暗示的保证或条件。有关许可证的具体语言和权限限制,请参阅许可证。