werkspot / message-queue
一个通用的消息队列。
Requires
- php: ^7.1
- php-amqplib/php-amqplib: ^2.6
- psr/simple-cache: ^1.0
- ramsey/uuid: ^3.5
- roave/security-advisories: dev-master
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.0
- mockery/mockery: v1.0.0-alpha1
- phpunit/phpunit: ^6.0
This package is auto-updated.
Last update: 2020-10-21 07:22:30 UTC
README
本项目是什么
一个能够异步将消息发送到目标地的库,尽快或指定日期和时间。
要发送的消息可以是任何内容,但其序列化必须由MessageRepositoryInterface处理,其实现必须由使用此库的代码提供。
目的地可以通过任何字符串指定,但该字符串的解释以及将消息实际发送到目的地的有效交付必须由MessageDeliveryServiceInterface处理,其实现必须由使用此库的代码提供。
此MessageQueue使用两个内部队列,一个用于计划发送的消息(ScheduledQueue,使用MySQL等持久化机制),另一个用于准备发送的消息(DeliveryQueue,使用rabbitMq)。
为什么存在这个项目
消息队列有助于运行异步任务,尽快或指定日期和时间,从而平衡服务器在时间上的负载,并允许更快地响应用户,因为他们不需要等待任务在线完成,这些任务可以异步完成,例如发送电子邮件。
在这个库的基础上,我们可以构建一个消息总线,该总线可以决定消息是否应该同步或异步发送。反过来,在消息总线的基础上,我们可以构建一个命令总线,它只向一个目的地发送一条消息,或者一个事件总线,它可以向多个目的地发送一条消息。
用法
MessageQueueService
是消息队列的入口点。
$messageQueueService = new MessageQueueService( new ScheduledQueueService( new MessageRepository(/*...*/) // implemented by the code using this library ) ); $messageQueueService->enqueueMessage( $someObjectOrStringOrWhatever, // some payload to deliver, persisted by the MessageRepository '{"deliver_to": "SomeServiceId"}', // destination to be decoded by the delivery service (MessageDeliveryServiceInterface) new DateTimeImmutable(), // delivery date and time 5, // priority [] // some whatever metadata );
要将消息从ScheduledQueue移动到DeliveryQueue,我们需要在后台运行一个ScheduledQueueToDeliveryQueueWorker。要将消息从DeliveryQueue移动到实际目的地,我们需要至少运行一个DeliveryQueueToHandlerWorker。
我们的$scheduledQueueWorker
将通过CLI命令运行,该命令将被进程管理工具(如Supervisor)保持存活。
$scheduledQueueWorker = new ScheduledQueueToDeliveryQueueWorker( new ScheduledQueueService(new MessageRepository(/*...*/)), new AmqpProducer(new AMQPLazyConnection(/*...*/), new UuidMessageIdGenerator()), 'some_queue_name', new SomeLogger(/*...*/) ); $scheduledQueueWorker->moveMessageBatch(50);
与$scheduledQueueWorker
一样,$deliveryQueueWorker
也通过CLI命令启动,并由进程管理工具(如Supervisor)保持存活。
$logger = new SomeLogger(/*...*/); $deliveryQueueWorker = new DeliveryQueueToHandlerWorker( new AmqpConsumer( new AMQPLazyConnection(/*...*/), new AmqpMessageHandler( new MessageHandler(/*...*/), new SomeCache(/*...*/), new PersistenceClient(/*...*/), $logger ), $logger ), 'some_queue_name' ); $deliveryQueueWorker->startConsuming(300);
安装
要安装库,请运行以下命令并获取最新版本
composer require werkspot/message-queue
测试
要执行测试,请运行
make test
覆盖率
要生成测试覆盖率,请运行
make test_with_coverage
代码规范
要修复代码规范,请运行
make cs-fix