guidepilot/php-underground-radio

一个PHP库,允许通过实现消息队列架构和发布/订阅服务的各种服务发送消息

dev-main 2022-10-20 08:46 UTC

This package is auto-updated.

Last update: 2024-09-20 12:45:41 UTC


README

免责声明:此项目处于非常早期的发展阶段,不应在生产环境中使用。当前代码仅是一个初步草案。尽管如此,它可能对某些人已经有所助益。欢迎提交拉取请求...

简介

UndergroundRadio 库允许通过实现消息队列架构和发布/订阅服务的各种服务发送消息。

尽管该架构为任何合适服务的集成提供了一个抽象接口,但目前只有基于 Redis 的实现。这里重点关注使用现代 Redis 功能,如 StreamsPub/Sub

此库受到 Enqueue 库和 queue-interop 协议的启发,但在某些方面采取了不同的方法。

安装

此包需要 PHP 8.1 或更高版本!

使用 composer 安装

$ composer require guidepilot/php-underground-radio

pub/sub 模式的使用示例

此模式也称为广播或扇出架构。

通过频道进行简单的消息生产者

use GuidePilot\UndergroundRadio\Broadcast\Channel;
use GuidePilot\UndergroundRadio\JsonSerializer;
use GuidePilot\UndergroundRadio\Message;
use GuidePilot\UndergroundRadio\Producer;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;

$redisConfig = new RedisConfig('localhost');
$serializer = new JsonSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$producer = new Producer($context);
$channel = new Channel('fooChannel');

$message = new Message(uniqid());
$message->addHeader('cli-test', "1");
$message->setBody('Hello world!');

$producer->send($message, $channel);

简单的消息订阅者

use GuidePilot\UndergroundRadio\Broadcast\Channel;
use GuidePilot\UndergroundRadio\Broadcast\Interfaces\SubscriptionMessageProcessor;
use GuidePilot\UndergroundRadio\Broadcast\Subscriber;
use GuidePilot\UndergroundRadio\JsonSerializer;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;


$redisConfig = new RedisConfig('localhost');
$serializer = new JsonSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$subscriber = new Subscriber($context);
$channel = new Channel('fooChannel');

$subscriber->subscribe($channel, new class implements SubscriptionMessageProcessor {

    public function processMessage(\GuidePilot\UndergroundRadio\Interfaces\Message $message, \GuidePilot\UndergroundRadio\Broadcast\Interfaces\Channel $channel) {
        echo "--- New message from {$channel->getDestinationIdentifier()} ---".PHP_EOL;
        print_r($message);
        echo PHP_EOL;
    }
});

消息队列模式的使用示例

通过队列进行简单的消息生产者

use GuidePilot\UndergroundRadio\JsonSerializer;
use GuidePilot\UndergroundRadio\Message;
use GuidePilot\UndergroundRadio\Producer;
use GuidePilot\UndergroundRadio\Queue\CappedQueue;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;

$redisConfig = new RedisConfig('localhost');
$serializer = new JsonSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$producer = new Producer($context);
$queue = new CappedQueue('fooQueue', 42);

$message = new Message(uniqid());
$message->addHeader('cli-test', "1");
$message->setBody('Hello queue world! (capped)');

$producer->send($message, $queue);

简单的队列消费者

use GuidePilot\UndergroundRadio\Interfaces\Message;
use GuidePilot\UndergroundRadio\PhpSerializer;
use GuidePilot\UndergroundRadio\Queue\Interfaces\ProcessorResult;
use GuidePilot\UndergroundRadio\Queue\Interfaces\QueueMessageProcessor;
use GuidePilot\UndergroundRadio\Queue\Queue;
use GuidePilot\UndergroundRadio\Queue\QueueConsumer;
use GuidePilot\UndergroundRadio\Queue\QueueConsumerGroup;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;

$redisConfig = new RedisConfig('localhost');
$serializer = new PhpSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$group = new QueueConsumerGroup('worker');
$consumer = new QueueConsumer($context, 'worker-0', $group);
$queue = new Queue('fooQueue');

$consumer->consume($queue, new class implements QueueMessageProcessor {

    public function processMessage(Message $message, \GuidePilot\UndergroundRadio\Queue\Interfaces\Queue $queue): ProcessorResult {
        echo "--- New message from {$queue->getDestinationIdentifier()} ---".PHP_EOL;
        print_r($message);
        echo PHP_EOL;

        return ProcessorResult::Acknowledge;
    }

    public function handleMaxRequeueCountReached(Message $message, \GuidePilot\UndergroundRadio\Queue\Interfaces\Queue $queue) {
        echo "!!! Message {$message->getMessageId()} reached max requeue count !!!".PHP_EOL;
    }

});

许可证

它是在 MIT 许可证 下发布的。