oro/message-queue-bundle

此包已被弃用且不再维护。作者建议使用 oro/platform 包。

ORO 消息队列包

安装: 0

依赖: 0

推荐者: 0

安全: 0

星标: 9

关注者: 49

分支: 2

类型:symfony-bundle

dev-master / 1.0.x-dev 2018-05-18 13:53 UTC

This package is auto-updated.

Last update: 2021-04-19 20:05:25 UTC


README

注意: 本文发布在 Oro 文档库中。

OroMessageQueueBundle 将 OroMessageQueue 组件集成到 OroPlatform 中,从而为所有应用组件提供消息队列处理能力。

目录

概述

该包集成了 OroMessageQueue 组件。它添加了易于使用的配置层,注册服务和将它们绑定在一起,注册便捷的 CLI 命令。

任务

该包提供了一种实体和一个 Web 界面用于 任务。因此,任务在数据库中创建,并有一个 Web 界面,您可以在其中监控任务状态和中断任务。

使用方法

首先,您必须配置一个传输层并设置一个作为默认传输。对于配置设置

# config/config.yml

oro_message_queue:
    transport:
        default: '%message_queue_transport%'
        '%message_queue_transport%': '%message_queue_transport_config%'
    client: ~

我们可以通过参数配置支持的传输之一

DBAL 传输

# config/parameters.yml

    message_queue_transport: DBAL
    message_queue_transport_config: ~

DBAL 传输选项

一旦配置完成,您就可以开始生产消息

<?php

/** @var Oro\Component\MessageQueue\Client\MessageProducer $messageProducer **/
$messageProducer = $container->get('oro_message_queue.message_producer');

$messageProducer->send('aFooTopic', 'Something has happened');

要消费消息,您必须首先创建一个消息处理器

<?php
use Oro\Component\MessageQueue\Consumption\MessageProcessor;

class FooMessageProcessor implements MessageProcessor, TopicSubscriberInterface
{
    public function process(Message $message, Session $session)
    {
        echo $message->getBody();

        return self::ACK;
        // return self::REJECT; // when the message is broken
        // return self::REQUEUE; // the message is fine but you want to postpone processing
    }

    public static function getSubscribedTopics()
    {
        return ['aFooTopic'];
    }
}

将其注册为容器服务并订阅主题

oro_channel.async.change_integration_status_processor:
    class: 'FooMessageProcessor'
    tags:
        - { name: 'oro_message_queue.client.message_processor' }

现在您可以开始消费消息

./bin/console oro:message-queue:consume

注意:在消费消息时添加 -vvv 以了解正在发生的事情。那里有很多有价值的调试信息。

消费者选项

  • --message-limit=MESSAGE-LIMIT 消费 n 条消息后退出
  • --time-limit=TIME-LIMIT 在此时间内消费消息
  • --memory-limit=MEMORY-LIMIT 消费消息直到进程达到此内存限制(以 MB 为单位)

建议使用 --memory-limit 选项进行常规消费者使用。如果设置了该选项,消费者在处理完每条消息后都会检查使用的内存量,并在超出时终止。例如,如果消费者运行

./bin/console oro:message-queue:consume --memory-limit=700

那么

  • 消费者处理消息
  • 消费者检查使用的内存量
  • 如果它超过选项值(即 705 MB 或 780Mb 或 1300 Mb),消费者将终止(Supervisord 将重新运行它)
  • 否则它将继续处理消息。

我们建议将此选项设置为比 PHP 内存限制低 2-3 倍的值。这有助于在处理消息时避免 PHP 内存限制错误。

如果使用 DBAL 传输,我们建议将 --time-limit 选项设置为 5-10 分钟,以避免数据库连接问题

消费者中断

消费者可以通过多种原因正常中断消息处理

  • 内存不足(如果设置了选项)
  • 超时(如果设置了选项)
  • 消息限制超过(如果设置了选项)
  • 由事件强制中断
    • 如果缓存被清除
    • 如果架构被更新
    • 如果维护模式被关闭

正常中断仅发生在消息处理之后。如果在消息处理期间触发事件,消费者完成消息处理并在处理完成后中断。

此外,如果在消息处理期间抛出异常,消费者也会中断。

Supervisord

如您之前所读,消费者可以通过多种原因正常中断消息处理。在上述所有情况下,应重新运行中断的消费者。因此,您必须保持运行 oro:message-queue:consume 命令,为此我们建议您将此责任委托给 Supervisord。通过以下程序配置,supervisord 会保持运行四个同时实例的 oro:message-queue:consume 命令,并关注在实例由于任何原因死亡时重新启动。请注意,在 [program:oro_message_consumer] 中定义的 程序名称 必须与在同一 supervisord 服务器上部署的其他任何实例唯一,即使它们仅为测试目的。例如,设置以下程序 [program:prod_oro_message_consumer][program:dev_oro_message_consumer]

[program:oro_message_consumer]
command=/path/to/bin/console --env=prod --no-debug oro:message-queue:consume
process_name=%(program_name)s_%(process_num)02d
numprocs=4
autostart=true
autorestart=true
startsecs=0
user=apache
redirect_stderr=true

消息队列的名称前缀

要在单个 RabbitMQ 实例上使用多个独立的消息队列,请配置消息队列的名称前缀。例如

# config/config.yml

oro_message_queue:
    ...
    client:
        prefix: mq_oro_platform_test
        router_destination: queue_name
        default_destination: queue_name

router_destsinationdefault_destionation 中,放置特定于您环境的队列名称。在 prefix 选项中,提供一个应添加到队列名称前缀的字符串。

内部结构

结构

如果您仅打算使用组件,则可以跳过此部分。组件分为几个层级

  • 传输 - 传输 API 为程序提供了一种创建、发送、接收和读取消息的通用方式。受 Java 消息服务 启发
  • 路由器 - RecipientList 模式的实现。
  • 消费 - 该层提供简化消息消费的工具。它提供了一个命令行工具、队列消费者、消息处理器以及扩展它的方法。
  • 客户端 - 提供了高级抽象。它提供了一种易于使用的抽象来生产和处理消息。它还减少了配置代理的需要。

Component structure

流程

客户端的消息生产者将消息发送到路由器消息处理器。它接收消息并寻找对这种消息感兴趣的真实接收者。然后,它向他们发送消息的副本。每个目标消息处理器接收其消息副本并进行处理。

Message flow

消息本身具有头部和体部,它们在通过系统传输时发生变化。

Message structure

自定义传输

如果您需要实现自定义提供者,请查看传输的接口。您必须提供它们的实现

关键类

  • MessageProducer - 客户端的消息生产者,您将始终使用它来发送消息
  • MessageProcessorInterface - 执行工作的每个类都必须实现此接口
  • TopicSubscriberInterface - 类似于EventSubscriberInterface。它允许您在一个地方保持处理代码和它所订阅的主题。
  • MessageConsumeCommand - 您用于消费消息的命令。
  • QueueConsumer - 在命令内部工作的一个类,它会监视新的消息,一旦获取到消息,就将其传递给消息处理器。

单元和功能测试

为了在单元和功能测试中测试消息是否已发送,您可以使用MessageQueueExtension特性。此特性有两种实现,一种用于单元测试,另一种用于功能测试

此外,如果您需要用于管理发送消息的自定义逻辑,可以使用Oro\Bundle\MessageQueueBundle\Test\Unit\MessageQueueAssertTraitOro\Bundle\MessageQueueBundle\Test\Functional\MessageQueueAssertTrait 特性。

在您开始在使用功能测试中的特性之前,您需要为test环境注册oro_message_queue.test.message_collector服务。

# config/config_test.yml

services:
    oro_message_queue.test.message_collector:
        class: Oro\Bundle\MessageQueueBundle\Test\Functional\MessageCollector
        decorates: oro_message_queue.client.message_producer
        arguments:
            - '@oro_message_queue.test.message_collector.inner'

以下示例展示了如何测试消息是否已发送。

<?php
namespace Acme\Bundle\AcmeBundle\Tests\Functional;

use Oro\Bundle\MessageQueueBundle\Test\Functional\MessageQueueExtension;
use Oro\Bundle\TestFrameworkBundle\Test\WebTestCase;

class SomeTest extends WebTestCase
{
    use MessageQueueExtension;

    public function testSingleMessage()
    {
        // assert that a message was sent to a topic
        self::assertMessageSent('aFooTopic', 'Something has happened');

        // assert that at least one message was sent to a topic
        // can be used if a message is not matter
        self::assertMessageSent('aFooTopic');
    }

    public function testSeveralMessages()
    {
        // assert that exactly given messages were sent to a topic
        self::assertMessagesSent(
            'aFooTopic',
            [
                'Something has happened',
                'Something else has happened',
            ]
        );
        // assert that the exactly given number of messages were sent to a topic
        // can be used if messages are not matter
        self::assertMessagesCount('aFooTopic', 2);
        // also assertCountMessages alias can be used to do the same assertion
        self::assertCountMessages('aFooTopic');
    }

    public function testNoMessages()
    {
        // assert that no any message was sent to a topic
        self::assertMessagesEmpty('aFooTopic');
        // also assertEmptyMessages alias can be used to do the same assertion
        self::assertEmptyMessages('aFooTopic');
    }

    public function testAllMessages()
    {
        // assert that exactly given messages were sent
        // NOTE: use this assertion with caution because it is possible
        // that messages not related to a testing functionality were sent as well
        self::assertAllMessagesSent(
            [
                ['topic' => 'aFooTopic', 'message' => 'Something has happened'],
                ['topic' => 'aFooTopic', 'message' => 'Something else has happened'],
            ]
        );
    }
}

在单元测试中,您通常需要将消息生产者传递给您要测试的服务。要在单元测试中获取消息生产者的正确实例,请使用self::getMessageProducer(),例如。

<?php
namespace Acme\Bundle\AcmeBundle\Tests\Unit;

use Acme\Bundle\AcmeBundle\SomeClass;
use Oro\Bundle\MessageQueueBundle\Test\Unit\MessageQueueExtension;

class SomeTest extends \PHPUnit_Framework_TestCase
{
    use MessageQueueExtension;

    public function testSingleMessage()
    {
        $instance = new SomeClass(self::getMessageProducer());
        
        $instance->doSomethind();

        self::assertMessageSent('aFooTopic', 'Something has happened');
    }
}

过时作业

无法创建两个具有相同名称的唯一作业。这就是为什么如果一个唯一作业无法完成其工作,它可能会阻止另一个作业。

为了避免这种情况,您可以设置唯一作业执行的最大时间。如果作业运行时间超过该时间,则可以创建唯一作业的新副本(具有相同的名称)。在这种情况下,旧作业被标记为“过时”。有关详细信息,请参阅过时作业

您可以在config.yml文件中配置time_before_stale参数,提供秒数

oro_message_queue:
    time_before_stale:
        default: 1800
        jobs:
            bundle_name.processor_name.entity_name.user: 3600
            bundle_name.processor_name.entity_name: 2000
            bundle_name.processor_name: -1

解析器首先通过作业的完整名称查找作业。如果根据完整名称找不到作业,解析器将尝试匹配作业名称的最长部分(从左到右读取)。

在上面的示例中

  • bundle_name.processor_name.entity_name.user将在3600秒后过时
  • bundle_name.processor_name.entity_name.organisation 将在 2000 秒后 过期
  • bundle_name.processor_name.other_name.some_job永远不会 过期。
  • bundle_name.other_processor.other_name.some_job 将在 1800 秒后 过期
  • processor_name.entity_name.user 将在 1800 秒后 过期

消费者心跳

管理员必须了解系统中消费者(是否至少有一个存活)的状态。

这由消费者心跳功能处理,其工作方式如下:

  • 在启动后以及配置时间周期后,每个消费者都会调用 ConsumerHeartbeat 服务的 tick 方法,向系统报告消费者是活着的。
  • 执行 oro:cron:message-queue:consumer_heartbeat_check 的 cron 命令定期执行,以检查消费者的状态。如果没有找到任何存活的消费者,则发送 oro/message_queue_state 套接字消息,通知所有登录用户系统可能工作不正确(因为消费者不可用)。
  • 当用户登录时也会执行相同的检查。这样做是为了尽快通知用户问题。

可以通过在应用程序配置文件中使用 consumer_heartbeat_update_period 选项来更改检查周期。

oro_message_queue:
    consumer:
        heartbeat_update_period: 20 #the update period was set to 20 minutes 

heartbeat_update_period 选项的默认值是 15 分钟。

要禁用消费者心跳功能,将 heartbeat_update_period 选项设置为 0。