oro / message-queue-bundle
Requires
- php: 7.1.*
- oro/message-queue: dev-master
- symfony/framework-bundle: 2.3.*
This package is auto-updated.
Last update: 2021-04-19 20:05:25 UTC
README
注意: 本文发布在 Oro 文档库中。
OroMessageQueueBundle 将 OroMessageQueue 组件集成到 OroPlatform 中,从而为所有应用组件提供消息队列处理能力。
目录
概述
该包集成了 OroMessageQueue 组件。它添加了易于使用的配置层,注册服务和将它们绑定在一起,注册便捷的 CLI 命令。
任务
该包提供了一种实体和一个 Web 界面用于 任务。因此,任务在数据库中创建,并有一个 Web 界面,您可以在其中监控任务状态和中断任务。
使用方法
首先,您必须配置一个传输层并设置一个作为默认传输。对于配置设置
# config/config.yml oro_message_queue: transport: default: '%message_queue_transport%' '%message_queue_transport%': '%message_queue_transport_config%' client: ~
我们可以通过参数配置支持的传输之一
DBAL 传输
# config/parameters.yml message_queue_transport: DBAL message_queue_transport_config: ~
一旦配置完成,您就可以开始生产消息
<?php /** @var Oro\Component\MessageQueue\Client\MessageProducer $messageProducer **/ $messageProducer = $container->get('oro_message_queue.message_producer'); $messageProducer->send('aFooTopic', 'Something has happened');
要消费消息,您必须首先创建一个消息处理器
<?php use Oro\Component\MessageQueue\Consumption\MessageProcessor; class FooMessageProcessor implements MessageProcessor, TopicSubscriberInterface { public function process(Message $message, Session $session) { echo $message->getBody(); return self::ACK; // return self::REJECT; // when the message is broken // return self::REQUEUE; // the message is fine but you want to postpone processing } public static function getSubscribedTopics() { return ['aFooTopic']; } }
将其注册为容器服务并订阅主题
oro_channel.async.change_integration_status_processor: class: 'FooMessageProcessor' tags: - { name: 'oro_message_queue.client.message_processor' }
现在您可以开始消费消息
./bin/console oro:message-queue:consume
注意:在消费消息时添加 -vvv 以了解正在发生的事情。那里有很多有价值的调试信息。
消费者选项
--message-limit=MESSAGE-LIMIT
消费 n 条消息后退出--time-limit=TIME-LIMIT
在此时间内消费消息--memory-limit=MEMORY-LIMIT
消费消息直到进程达到此内存限制(以 MB 为单位)
建议使用 --memory-limit
选项进行常规消费者使用。如果设置了该选项,消费者在处理完每条消息后都会检查使用的内存量,并在超出时终止。例如,如果消费者运行
./bin/console oro:message-queue:consume --memory-limit=700
那么
- 消费者处理消息
- 消费者检查使用的内存量
- 如果它超过选项值(即 705 MB 或 780Mb 或 1300 Mb),消费者将终止(Supervisord 将重新运行它)
- 否则它将继续处理消息。
我们建议将此选项设置为比 PHP 内存限制低 2-3 倍的值。这有助于在处理消息时避免 PHP 内存限制错误。
如果使用 DBAL
传输,我们建议将 --time-limit
选项设置为 5-10 分钟,以避免数据库连接问题
消费者中断
消费者可以通过多种原因正常中断消息处理
- 内存不足(如果设置了选项)
- 超时(如果设置了选项)
- 消息限制超过(如果设置了选项)
- 由事件强制中断
- 如果缓存被清除
- 如果架构被更新
- 如果维护模式被关闭
正常中断仅发生在消息处理之后。如果在消息处理期间触发事件,消费者完成消息处理并在处理完成后中断。
此外,如果在消息处理期间抛出异常,消费者也会中断。
Supervisord
如您之前所读,消费者可以通过多种原因正常中断消息处理。在上述所有情况下,应重新运行中断的消费者。因此,您必须保持运行 oro:message-queue:consume
命令,为此我们建议您将此责任委托给 Supervisord。通过以下程序配置,supervisord 会保持运行四个同时实例的 oro:message-queue:consume
命令,并关注在实例由于任何原因死亡时重新启动。请注意,在 [program:oro_message_consumer]
中定义的 程序名称 必须与在同一 supervisord 服务器上部署的其他任何实例唯一,即使它们仅为测试目的。例如,设置以下程序 [program:prod_oro_message_consumer]
和 [program:dev_oro_message_consumer]
。
[program:oro_message_consumer] command=/path/to/bin/console --env=prod --no-debug oro:message-queue:consume process_name=%(program_name)s_%(process_num)02d numprocs=4 autostart=true autorestart=true startsecs=0 user=apache redirect_stderr=true
消息队列的名称前缀
要在单个 RabbitMQ 实例上使用多个独立的消息队列,请配置消息队列的名称前缀。例如
# config/config.yml oro_message_queue: ... client: prefix: mq_oro_platform_test router_destination: queue_name default_destination: queue_name
在 router_destsination
和 default_destionation
中,放置特定于您环境的队列名称。在 prefix 选项中,提供一个应添加到队列名称前缀的字符串。
内部结构
结构
如果您仅打算使用组件,则可以跳过此部分。组件分为几个层级
- 传输 - 传输 API 为程序提供了一种创建、发送、接收和读取消息的通用方式。受 Java 消息服务 启发
- 路由器 - RecipientList 模式的实现。
- 消费 - 该层提供简化消息消费的工具。它提供了一个命令行工具、队列消费者、消息处理器以及扩展它的方法。
- 客户端 - 提供了高级抽象。它提供了一种易于使用的抽象来生产和处理消息。它还减少了配置代理的需要。
流程
客户端的消息生产者将消息发送到路由器消息处理器。它接收消息并寻找对这种消息感兴趣的真实接收者。然后,它向他们发送消息的副本。每个目标消息处理器接收其消息副本并进行处理。
消息本身具有头部和体部,它们在通过系统传输时发生变化。
自定义传输
如果您需要实现自定义提供者,请查看传输的接口。您必须提供它们的实现
关键类
- MessageProducer - 客户端的消息生产者,您将始终使用它来发送消息
- MessageProcessorInterface - 执行工作的每个类都必须实现此接口
- TopicSubscriberInterface - 类似于EventSubscriberInterface。它允许您在一个地方保持处理代码和它所订阅的主题。
- MessageConsumeCommand - 您用于消费消息的命令。
- QueueConsumer - 在命令内部工作的一个类,它会监视新的消息,一旦获取到消息,就将其传递给消息处理器。
单元和功能测试
为了在单元和功能测试中测试消息是否已发送,您可以使用MessageQueueExtension
特性。此特性有两种实现,一种用于单元测试,另一种用于功能测试
- Oro\Bundle\MessageQueueBundle\Test\Unit\MessageQueueExtension 用于单元测试
- Oro\Bundle\MessageQueueBundle\Test\Functional\MessageQueueExtension 用于功能测试
此外,如果您需要用于管理发送消息的自定义逻辑,可以使用Oro\Bundle\MessageQueueBundle\Test\Unit\MessageQueueAssertTrait 或 Oro\Bundle\MessageQueueBundle\Test\Functional\MessageQueueAssertTrait 特性。
在您开始在使用功能测试中的特性之前,您需要为test
环境注册oro_message_queue.test.message_collector
服务。
# config/config_test.yml services: oro_message_queue.test.message_collector: class: Oro\Bundle\MessageQueueBundle\Test\Functional\MessageCollector decorates: oro_message_queue.client.message_producer arguments: - '@oro_message_queue.test.message_collector.inner'
以下示例展示了如何测试消息是否已发送。
<?php namespace Acme\Bundle\AcmeBundle\Tests\Functional; use Oro\Bundle\MessageQueueBundle\Test\Functional\MessageQueueExtension; use Oro\Bundle\TestFrameworkBundle\Test\WebTestCase; class SomeTest extends WebTestCase { use MessageQueueExtension; public function testSingleMessage() { // assert that a message was sent to a topic self::assertMessageSent('aFooTopic', 'Something has happened'); // assert that at least one message was sent to a topic // can be used if a message is not matter self::assertMessageSent('aFooTopic'); } public function testSeveralMessages() { // assert that exactly given messages were sent to a topic self::assertMessagesSent( 'aFooTopic', [ 'Something has happened', 'Something else has happened', ] ); // assert that the exactly given number of messages were sent to a topic // can be used if messages are not matter self::assertMessagesCount('aFooTopic', 2); // also assertCountMessages alias can be used to do the same assertion self::assertCountMessages('aFooTopic'); } public function testNoMessages() { // assert that no any message was sent to a topic self::assertMessagesEmpty('aFooTopic'); // also assertEmptyMessages alias can be used to do the same assertion self::assertEmptyMessages('aFooTopic'); } public function testAllMessages() { // assert that exactly given messages were sent // NOTE: use this assertion with caution because it is possible // that messages not related to a testing functionality were sent as well self::assertAllMessagesSent( [ ['topic' => 'aFooTopic', 'message' => 'Something has happened'], ['topic' => 'aFooTopic', 'message' => 'Something else has happened'], ] ); } }
在单元测试中,您通常需要将消息生产者传递给您要测试的服务。要在单元测试中获取消息生产者的正确实例,请使用self::getMessageProducer()
,例如。
<?php namespace Acme\Bundle\AcmeBundle\Tests\Unit; use Acme\Bundle\AcmeBundle\SomeClass; use Oro\Bundle\MessageQueueBundle\Test\Unit\MessageQueueExtension; class SomeTest extends \PHPUnit_Framework_TestCase { use MessageQueueExtension; public function testSingleMessage() { $instance = new SomeClass(self::getMessageProducer()); $instance->doSomethind(); self::assertMessageSent('aFooTopic', 'Something has happened'); } }
过时作业
无法创建两个具有相同名称的唯一作业。这就是为什么如果一个唯一作业无法完成其工作,它可能会阻止另一个作业。
为了避免这种情况,您可以设置唯一作业执行的最大时间。如果作业运行时间超过该时间,则可以创建唯一作业的新副本(具有相同的名称)。在这种情况下,旧作业被标记为“过时”。有关详细信息,请参阅过时作业。
您可以在config.yml文件中配置time_before_stale参数,提供秒数
oro_message_queue: time_before_stale: default: 1800 jobs: bundle_name.processor_name.entity_name.user: 3600 bundle_name.processor_name.entity_name: 2000 bundle_name.processor_name: -1
解析器首先通过作业的完整名称查找作业。如果根据完整名称找不到作业,解析器将尝试匹配作业名称的最长部分(从左到右读取)。
在上面的示例中
- bundle_name.processor_name.entity_name.user将在3600秒后过时
- bundle_name.processor_name.entity_name.organisation 将在 2000 秒后 过期。
- bundle_name.processor_name.other_name.some_job 将 永远不会 过期。
- bundle_name.other_processor.other_name.some_job 将在 1800 秒后 过期。
- processor_name.entity_name.user 将在 1800 秒后 过期。
消费者心跳
管理员必须了解系统中消费者(是否至少有一个存活)的状态。
这由消费者心跳功能处理,其工作方式如下:
- 在启动后以及配置时间周期后,每个消费者都会调用 ConsumerHeartbeat 服务的
tick
方法,向系统报告消费者是活着的。 - 执行 oro:cron:message-queue:consumer_heartbeat_check 的 cron 命令定期执行,以检查消费者的状态。如果没有找到任何存活的消费者,则发送
oro/message_queue_state
套接字消息,通知所有登录用户系统可能工作不正确(因为消费者不可用)。 - 当用户登录时也会执行相同的检查。这样做是为了尽快通知用户问题。
可以通过在应用程序配置文件中使用 consumer_heartbeat_update_period
选项来更改检查周期。
oro_message_queue: consumer: heartbeat_update_period: 20 #the update period was set to 20 minutes
heartbeat_update_period
选项的默认值是 15 分钟。
要禁用消费者心跳功能,将 heartbeat_update_period
选项设置为 0。