arekxv / php-data-streamer
1.0.1
2021-07-29 13:28 UTC
Requires
- php: >=7.4
- ext-json: *
- ext-mbstring: *
Requires (Dev)
- codeception/codeception: ^4.1
- codeception/module-asserts: ^1.3
- codeception/verify: ^2.1
Suggests
- ext-redis: To use the native PHP extension Redis driver.
- predis/predis: To use the pure PHP redis driver.
README
数据流库,处理多个Redis驱动程序,并提供对Redis Streams和消费者组的简单访问。
支持php-redis和predis驱动程序。
安装
通过composer: composer require arekxv/php-data-streamer
用法
此库支持读取数据流和写入数据流。
读取和处理消息
要将您的代码设置为监听和读取消息,代码应设置如下
use ArekX\DataStreamer\Data\ArrayMessage; use ArekX\DataStreamer\Data\CallableHandler; use ArekX\DataStreamer\Data\CallableParser; use ArekX\DataStreamer\Data\Settings; use ArekX\DataStreamer\Drivers\RedisDriver; use ArekX\DataStreamer\StreamReader; // Specify a redis driver to use $driver = new RedisDriver(); // or new \ArekX\DataStreamer\Drivers\PredisDriver() if Predis package is used $driver->connect([ 'host' => '127.0.0.1' ]); // Specify a message converter from array into instances // This can be one callable via setDefaultBuilder or per type in setBuilder $parser = new CallableParser(); $parser->setDefaultBuilder(fn(string $id, string $type, array $payload) => ArrayMessage::create($type, $payload, $id)); // Set handler for messages this can be a default one for all messages // or a custom one per Message::getType() $handler = new CallableHandler(); $handler->setDefaultHandler(function (ArrayMessage $message) { echo "{$message->getId()}: " . json_encode($message->getPayload()) . PHP_EOL; return true; // Returning true means that message was handled successfully. }); // Settings object which holds the configuration for the stream. $settings = new Settings([ 'stream' => 'data-stream', 'consumerGroup' => 'my-consumer-group', 'consumerName' => 'my-consumer-consumer', ]); // Initialize data stream reader. $reader = new StreamReader($driver, $parser, $handler, $settings); // Run infinite loop to process messages. echo "Listening..." . PHP_EOL; $reader->runLoop();
发送消息
要发送消息,您可以使用以下代码发送数据
use ArekX\DataStreamer\Data\ArrayMessage; use ArekX\DataStreamer\Data\PayloadMessageConverter; use ArekX\DataStreamer\Data\Settings; use ArekX\DataStreamer\Drivers\RedisDriver; use ArekX\DataStreamer\StreamWriter; // Specify a redis driver to use $driver = new RedisDriver(); // or new \ArekX\DataStreamer\Drivers\PredisDriver() if Predis package is used $driver->connect([ 'host' => '127.0.0.1' ]); // Settings object which holds the configuration for the stream. $settings = new Settings([ 'stream' => 'data-stream' ]); // Define a converter which will convert a message into an array // suitable for sending across the data stream. $converter = new PayloadMessageConverter(); // Initialize a stream writer $writer = new StreamWriter($driver, $settings, $converter); // Send message to the data stream $writer->write(ArrayMessage::create('test-type', [ 'key' => 'value', 'key2' => 'value2' ]));
文档
文档可在以下位置找到:http://php-data-streamer.rtfd.io/
您还可以手动构建文档或直接从docs文件夹中读取。
构建
- 安装Python 3和PIP
- 运行
pip install mkdocs - 运行
mkdocs serve以提供服务或mkdocs build以构建。
测试
要运行测试,请运行composer test。
许可证
版权所有 Aleksandar Panic
根据Apache License,版本2.0(“许可证”);除非遵守许可证规定,否则不得使用此文件。您可以在以下位置获得许可证副本:
https://apache.ac.cn/licenses/LICENSE2.0 或 在此存储库中
除非适用法律要求或书面同意,否则根据许可证分发的软件按“原样”提供,不提供任何明示或暗示的保证或条件。有关许可证的具体语言和权限限制,请参阅许可证。