guidepilot / php-underground-radio
一个PHP库,允许通过实现消息队列架构和发布/订阅服务的各种服务发送消息
This package is auto-updated.
Last update: 2024-09-20 12:45:41 UTC
README
免责声明:此项目处于非常早期的发展阶段,不应在生产环境中使用。当前代码仅是一个初步草案。尽管如此,它可能对某些人已经有所助益。欢迎提交拉取请求...
简介
UndergroundRadio 库允许通过实现消息队列架构和发布/订阅服务的各种服务发送消息。
尽管该架构为任何合适服务的集成提供了一个抽象接口,但目前只有基于 Redis 的实现。这里重点关注使用现代 Redis 功能,如 Streams 和 Pub/Sub。
此库受到 Enqueue 库和 queue-interop 协议的启发,但在某些方面采取了不同的方法。
安装
此包需要 PHP 8.1 或更高版本!
使用 composer 安装
$ composer require guidepilot/php-underground-radio
pub/sub 模式的使用示例
此模式也称为广播或扇出架构。
通过频道进行简单的消息生产者
use GuidePilot\UndergroundRadio\Broadcast\Channel; use GuidePilot\UndergroundRadio\JsonSerializer; use GuidePilot\UndergroundRadio\Message; use GuidePilot\UndergroundRadio\Producer; use GuidePilot\UndergroundRadio\RedisConfig; use GuidePilot\UndergroundRadio\RedisRadioContext; $redisConfig = new RedisConfig('localhost'); $serializer = new JsonSerializer(); $context = new RedisRadioContext($redisConfig, $serializer); $producer = new Producer($context); $channel = new Channel('fooChannel'); $message = new Message(uniqid()); $message->addHeader('cli-test', "1"); $message->setBody('Hello world!'); $producer->send($message, $channel);
简单的消息订阅者
use GuidePilot\UndergroundRadio\Broadcast\Channel; use GuidePilot\UndergroundRadio\Broadcast\Interfaces\SubscriptionMessageProcessor; use GuidePilot\UndergroundRadio\Broadcast\Subscriber; use GuidePilot\UndergroundRadio\JsonSerializer; use GuidePilot\UndergroundRadio\RedisConfig; use GuidePilot\UndergroundRadio\RedisRadioContext; $redisConfig = new RedisConfig('localhost'); $serializer = new JsonSerializer(); $context = new RedisRadioContext($redisConfig, $serializer); $subscriber = new Subscriber($context); $channel = new Channel('fooChannel'); $subscriber->subscribe($channel, new class implements SubscriptionMessageProcessor { public function processMessage(\GuidePilot\UndergroundRadio\Interfaces\Message $message, \GuidePilot\UndergroundRadio\Broadcast\Interfaces\Channel $channel) { echo "--- New message from {$channel->getDestinationIdentifier()} ---".PHP_EOL; print_r($message); echo PHP_EOL; } });
消息队列模式的使用示例
通过队列进行简单的消息生产者
use GuidePilot\UndergroundRadio\JsonSerializer; use GuidePilot\UndergroundRadio\Message; use GuidePilot\UndergroundRadio\Producer; use GuidePilot\UndergroundRadio\Queue\CappedQueue; use GuidePilot\UndergroundRadio\RedisConfig; use GuidePilot\UndergroundRadio\RedisRadioContext; $redisConfig = new RedisConfig('localhost'); $serializer = new JsonSerializer(); $context = new RedisRadioContext($redisConfig, $serializer); $producer = new Producer($context); $queue = new CappedQueue('fooQueue', 42); $message = new Message(uniqid()); $message->addHeader('cli-test', "1"); $message->setBody('Hello queue world! (capped)'); $producer->send($message, $queue);
简单的队列消费者
use GuidePilot\UndergroundRadio\Interfaces\Message; use GuidePilot\UndergroundRadio\PhpSerializer; use GuidePilot\UndergroundRadio\Queue\Interfaces\ProcessorResult; use GuidePilot\UndergroundRadio\Queue\Interfaces\QueueMessageProcessor; use GuidePilot\UndergroundRadio\Queue\Queue; use GuidePilot\UndergroundRadio\Queue\QueueConsumer; use GuidePilot\UndergroundRadio\Queue\QueueConsumerGroup; use GuidePilot\UndergroundRadio\RedisConfig; use GuidePilot\UndergroundRadio\RedisRadioContext; $redisConfig = new RedisConfig('localhost'); $serializer = new PhpSerializer(); $context = new RedisRadioContext($redisConfig, $serializer); $group = new QueueConsumerGroup('worker'); $consumer = new QueueConsumer($context, 'worker-0', $group); $queue = new Queue('fooQueue'); $consumer->consume($queue, new class implements QueueMessageProcessor { public function processMessage(Message $message, \GuidePilot\UndergroundRadio\Queue\Interfaces\Queue $queue): ProcessorResult { echo "--- New message from {$queue->getDestinationIdentifier()} ---".PHP_EOL; print_r($message); echo PHP_EOL; return ProcessorResult::Acknowledge; } public function handleMaxRequeueCountReached(Message $message, \GuidePilot\UndergroundRadio\Queue\Interfaces\Queue $queue) { echo "!!! Message {$message->getMessageId()} reached max requeue count !!!".PHP_EOL; } });
许可证
它是在 MIT 许可证 下发布的。