werkspot/message-queue

此包已被弃用,不再维护。未建议替代包。

一个通用的消息队列。

dev-master / 1.0.x-dev 2017-09-19 15:00 UTC

This package is auto-updated.

Last update: 2020-10-21 07:22:30 UTC


README

Author Software License Latest Version Total Downloads

Build Status Coverage Status Quality Score

本项目是什么

一个能够异步将消息发送到目标地的库,尽快或指定日期和时间。

要发送的消息可以是任何内容,但其序列化必须由MessageRepositoryInterface处理,其实现必须由使用此库的代码提供。

目的地可以通过任何字符串指定,但该字符串的解释以及将消息实际发送到目的地的有效交付必须由MessageDeliveryServiceInterface处理,其实现必须由使用此库的代码提供。

此MessageQueue使用两个内部队列,一个用于计划发送的消息(ScheduledQueue,使用MySQL等持久化机制),另一个用于准备发送的消息(DeliveryQueue,使用rabbitMq)。

为什么存在这个项目

消息队列有助于运行异步任务,尽快或指定日期和时间,从而平衡服务器在时间上的负载,并允许更快地响应用户,因为他们不需要等待任务在线完成,这些任务可以异步完成,例如发送电子邮件。

在这个库的基础上,我们可以构建一个消息总线,该总线可以决定消息是否应该同步或异步发送。反过来,在消息总线的基础上,我们可以构建一个命令总线,它只向一个目的地发送一条消息,或者一个事件总线,它可以向多个目的地发送一条消息。

用法

MessageQueueService是消息队列的入口点。

    $messageQueueService = new MessageQueueService(
        new ScheduledQueueService(
            new MessageRepository(/*...*/) // implemented by the code using this library
        )
    );
    
    $messageQueueService->enqueueMessage(
        $someObjectOrStringOrWhatever,      // some payload to deliver, persisted by the MessageRepository
        '{"deliver_to": "SomeServiceId"}',  // destination to be decoded by the delivery service (MessageDeliveryServiceInterface)
        new DateTimeImmutable(),            // delivery date and time
        5,                                  // priority
        []                                  // some whatever metadata
    );

要将消息从ScheduledQueue移动到DeliveryQueue,我们需要在后台运行一个ScheduledQueueToDeliveryQueueWorker。要将消息从DeliveryQueue移动到实际目的地,我们需要至少运行一个DeliveryQueueToHandlerWorker。

我们的$scheduledQueueWorker将通过CLI命令运行,该命令将被进程管理工具(如Supervisor)保持存活。

    $scheduledQueueWorker = new ScheduledQueueToDeliveryQueueWorker(
        new ScheduledQueueService(new MessageRepository(/*...*/)),
        new AmqpProducer(new AMQPLazyConnection(/*...*/), new UuidMessageIdGenerator()),
        'some_queue_name',
        new SomeLogger(/*...*/)
    );
    
    $scheduledQueueWorker->moveMessageBatch(50);

$scheduledQueueWorker一样,$deliveryQueueWorker也通过CLI命令启动,并由进程管理工具(如Supervisor)保持存活。

    $logger = new SomeLogger(/*...*/);
    
    $deliveryQueueWorker = new DeliveryQueueToHandlerWorker(
        new AmqpConsumer(
            new AMQPLazyConnection(/*...*/),
            new AmqpMessageHandler(
                new MessageHandler(/*...*/),
                new SomeCache(/*...*/),
                new PersistenceClient(/*...*/),
                $logger
            ),
            $logger
        ),
        'some_queue_name'
    );
    
    $deliveryQueueWorker->startConsuming(300);

安装

要安装库,请运行以下命令并获取最新版本

composer require werkspot/message-queue

测试

要执行测试,请运行

make test

覆盖率

要生成测试覆盖率,请运行

make test_with_coverage

代码规范

要修复代码规范,请运行

make cs-fix