oro/message-queue

此软件包已被弃用且不再维护。作者建议使用 oro/platform 软件包。

ORO 消息队列组件

dev-master / 1.x-dev 2018-04-17 09:44 UTC

This package is auto-updated.

Last update: 2021-04-19 20:04:16 UTC


README

注意:本文发布在Oro文档库中。

该组件通过不同的传输方式在应用程序中集成了消息队列。它包含多个层级。

最底层称为传输层,提供传输协议的抽象。消费层提供消费消息的工具,例如cli命令、信号处理、日志、扩展。它位于传输层之上。

客户端层提供尽可能少的配置即可启动生产/消费消息的能力。

外部链接

什么是消息队列

消息队列提供了一种异步通信协议,这意味着消息的发送者和接收者不需要同时与消息队列进行交互。放入队列的消息将被存储,直到接收者检索它们。消息不包含关于先前和后续消息的信息。

因此,以下情况下应使用消息队列:

  • 可以异步执行一个进程。
  • 进程不会影响用户体验。
  • 需要并行执行进程以获得更快的性能。
  • 需要保证处理。
  • 需要可扩展性。

发布/订阅消息

OroMessageQueue使用发布/订阅消息。这意味着发送应用程序通过特定的主题发布(发送)消息,而消费者找到该主题的订阅者。发布/订阅消息允许解耦信息提供者与该信息的消费者。发送应用程序和接收应用程序不需要了解对方即可发送和接收信息。

字典

  • 消息 - 包含消息主题的信息消息,指示哪个消息处理器将处理它,以及消息体 - 必要的参数数组,例如实体ID或通道名称。消息由消息生产者创建并发送到“队列尾部”。当消息到来时,它由使用消息处理器消费者进行处理。消息还包含一些额外的设置(见消息设置)。
  • 消息队列 - 一个FIFO队列,直到消息被处理才存储队列消息。可以有一个或多个队列。如果我们只使用一个队列,那么事情会简单得多。如果有多个队列,那么事情会复杂得多,但有时也更灵活。
  • 消费者 - 从队列中接收消息并进行处理的组件。它一次处理一条消息:一旦一条消息处理完毕,则处理下一条消息。对于每条消息,消费者运行一个订阅了消息主题消息处理器(如果存在的话)。如果有多个处理器订阅了同一个主题,它们可以并行运行(实际上消息是通过代理发送的,如果代理看到有多个接收者,它会复制消息为每个接收者创建一个单独的消息)。可以有多个消费者,它们可以在不同的服务器上工作。这样做是为了提高性能。在实现消息处理器时,开发者应该记住有多个消费者在不同的服务器上工作
  • 消息处理器 - 处理队列消息(即包含在消费者处理指定主题的消息时应运行的代码)。
  • 消息主题 - 指示应该为该消息执行哪个消息处理器的标识符。一个处理器可以订阅多个主题。也可以有多个进程订阅同一个主题。
  • 任务 - 消息处理器可以直接处理消息或创建一个任务。任务在数据库中创建,允许监控处理状态、开始和结束时间、中断进程。如果将一个过程分解成一组并行过程,任务还可以监控和控制整个过程。有关详细信息,请参阅任务部分。

消息设置

  • 主题 - 指的是上面的“消息主题”。
  • 正文 - 包含一些数据的字符串或JSON编码数组。
  • 优先级 - 可以是MessagePriority::VERY_LOWMessagePriority::LOWMessagePriority::NORMALMessagePriority::HIGHMessagePriority::VERY_HIGH。识别优先级很简单:有五个队列,每个优先级一个队列。消费者处理来自VERY_HIGH队列的消息。如果没有消息在VERY_HIGH队列中,消费者处理HIGH队列的消息,依此类推。因此,如果所有其他队列都为空,消费者则处理VERY_LOW队列的消息。
  • 过期 - 在该时间数秒后,消息应从队列中移除而未进行处理的秒数。
  • 延迟 - 消息在发送到队列之前应延迟的秒数。

消息处理器

消息处理器是处理队列消息的类。它们实现了MessageProcessorInterface。此外,它们通常订阅特定的主题并实现TopicSubscriberInterface

process(MessageInterface $message, SessionInterface $session)方法描述了在接收到消息时应执行的操作。它可以直接执行操作或创建一个任务。它还可以生成一个新消息以异步运行另一个处理器。

处理状态

接收到的消息可以被处理、拒绝,并重新入队。还可以抛出异常。

在以下情况下,消息处理器将返回self::ACK

  • 如果消息处理成功。
  • 如果创建的任务返回了true

这意味着消息已成功处理并从队列中删除。

在以下情况下,消息处理器将返回self::REJECT

  • 如果消息损坏。
  • 如果创建的任务返回了false

这意味着消息未处理,并从队列中移除,因为它无法处理,永远不会变为可处理状态(例如,缺少必填参数或出现其他永久性错误)。

可能有两种选择

  • 消息由于正常工作而变得无法处理。例如,当消息被发送到一个在发送时存在的实体,但有人删除了它。该实体将不会再次出现,我们可以拒绝该消息。这是正常的工作流程,因此不需要用户干预。
  • 消息由于故障而变得无法处理。例如,当实体ID无效或缺失时。这是异常行为,消息也应被拒绝,但处理器需要用户的注意(例如,记录一个关键错误或甚至抛出异常)。

如果消息暂时无法处理,例如,由于服务器过载导致的连接超时,则process方法应返回self::REQUEUE。消息将再次返回队列并稍后处理。如果在处理或作业运行期间抛出异常,也会发生这种情况。

重新排队消息的工作流程(处理器返回self::REQUEUE)如下

  1. 消费者处理一条消息(运行消息处理器的process方法)。
  2. process方法返回self::REQUEUE
  3. 消费者将消息(即消息的副本)放在队列的末尾,并将redelivery标志设置为true。
  4. 消费者继续消息处理(重新排队的消息在队列的末尾)。
  5. 当重新排队的消息轮到时,DelayRedeliveredMessageExtension会起作用,并为重新排队的消息设置延迟。
  6. 延迟设置的时间过去后,消息再次处理。

在消息处理器内部抛出异常时,重新排队消息的工作流程略有不同

  1. 消费者处理一条消息(运行消息处理器的process方法)。
  2. process方法内部抛出一个异常。
  3. 消费者记录异常,并将消息(即消息的副本)放在队列的末尾,将redelivery标志设置为true。然后消费者因异常而失败。
  4. 在此阶段应重新运行消费者。可以手动进行,也可以使用类似supervisord的工具自动进行。开发时首选手动重新运行,因为开发人员应检查消息处理过程中抛出的异常。回归测试或生产环境首选自动重新运行。
  5. 消费者继续消息处理(失败的消息在队列的末尾)。
  6. 当失败的消息轮到时,DelayRedeliveredMessageExtension会起作用,并为失败的消息设置延迟。
  7. 延迟时间过去后,消息再次处理,消费者可以再次失败。

示例

处理器收到一个带有实体ID的消息。它找到实体并更改其状态,而不创建任何作业。

    /**
     * {@inheritdoc}
     */
    public function process(MessageInterface $message, SessionInterface $session)
    {
        $body = JSON::decode($message->getBody());
        
        if (! isset($body['id'])) {
            $this->logger->critical(
                sprintf('Got invalid message, id is empty: "%s"', $message->getBody()),
                ['message' => $message]
            );

            return self::REJECT;
        }
        
        $em = $this->getEntityManager();
        $repository = $em->getRepository(SomeEntity::class);
        
        $entity = $repository->find($body['id']);
        
        if(! $entity) {
            $this->logger->error(
                sprintf('Cannot find an entity with id: "%s"', $body['id']),
                ['message' => $message]
            );

            return self::REJECT;            
        }
        
        $entity->setStatus('success');
        $em->persist($entity);
        $em->flush();
        
        return self::ACK;
      }

总的来说,可能有三种情况

  • 处理器收到一个带有实体ID的消息。找到了实体。处理器的方法改变了实体状态,并返回self::ACK。
  • 处理器收到一个带有实体ID的消息。未找到实体。这是可能的,如果实体在消息在队列中时被删除(即发送后但在处理之前)。这是预期行为,但处理器拒绝消息,因为实体不存在,并且以后不会出现。请注意,我们使用错误记录级别。
  • 处理器收到一个包含空实体ID的消息。这是意外行为。发送该消息的代码中肯定存在错误。我们拒绝该消息,但使用关键日志级别来通知用户需要干预。

任务

消息处理器可以通过创建或不创建任务来实现。

没有理想的准则来帮助决定是否应该创建任务。开发者应根据每次情况决定哪种方法更好。

以下是一些建议

如果我们可以在以下情况下跳过任务创建

  • 我们有一个易于快速执行的操作,例如状态更改等。
  • 我们的操作看起来像事件监听器。

以下情况下我们应该始终创建任务

  • 操作复杂且可能长时间执行。
  • 我们需要监控执行状态。
  • 我们需要运行一个唯一的任务,即不允许在先前的任务完成之前运行具有相同名称的任务。
  • 我们需要逐步执行操作,即消息流程有多个步骤(任务),一个接一个地执行。
  • 我们需要将任务拆分为一组子任务以并行运行,并监控整个任务的状态。

任务通常使用JobRunner运行。

JobRunner

JobRunner创建并运行任务。通常使用以下方法之一

runUnique

public function runUnique($ownerId, $name, \Closure $runCallback)

运行 $runCallback。它不允许同时运行具有相同名称的其他任务。

createDelayed

public function createDelayed($name, \Closure $startCallback)

一个异步运行(发送自己的消息)的子任务。它只能在其他任务内部运行。

runDelayed

public function runDelayed($jobId, \Closure $runCallback)

此方法用于处理器内部的消息,该消息使用createDelayed发送。

$runCallback 闭包通常返回 truefalse,任务状态取决于返回值。有关详细信息,请参阅任务状态部分。

要在任务的作用域内重用现有的处理器逻辑,可以将其装饰为 DelayedJobRunnerDecoratingProcessor,这将执行runDelayed,将控制权传递给指定的处理器,然后处理适用于 runDelayed 的结果格式。

一个依赖任务

当您的任务流程有多个步骤但您希望在所有步骤完成后发送新的消息时,请使用依赖任务。

在下面的示例中,创建了一个根任务。一旦其工作完成,它就会向队列发送两个带有 'topic1' 和 'topic2' 主题的消息。

class MessageProcessor implements MessageProcessorInterface
{
    /**
     * @var JobRunner
     */
    private $jobRunner;

    /**
     * @var DependentJobService
     */
    private $dependentJob;

    public function process(MessageInterface $message, SessionInterface $session)
    {
        $data = JSON::decode($message->getBody());

        $result = $this->jobRunner->runUnique(
            $message->getMessageId(),
            'oro:index:reindex',
            function (JobRunner $runner, Job $job) use ($data) {
                // register two dependent jobs
                // next messages will be sent to queue when that job and all children are finished
                $context = $this->dependentJob->createDependentJobContext($job->getRootJob());
                $context->addDependentJob('topic1', 'message1');
                $context->addDependentJob('topic2', 'message2', MessagePriority::VERY_HIGH);

                $this->dependentJob->saveDependentJob($context);

                // some work to do

                return true; // if you want to ACK message or false to REJECT
            }
        );

        return $result ? self::ACK : self::REJECT;
    }
}

依赖任务只能添加到根任务中(即使用 runUnique 创建的任务,而不是使用 runDelayed 创建的任务)。

任务结构

对于处理过程,创建了一个两层任务层次结构,其中

  • 根任务可以有多个子任务。

  • 子任务可以有一个根任务。

  • 子任务不能有自己子任务。

  • 根作业不能有它自己的根作业。

  • 如果我们只使用runUnique,则会创建具有相同名称的父作业和子作业。

  • 如果我们同时在runUnique中使用createDelayed,则会为runUnique创建父作业和子作业。然后每次调用createDelayed都会为runUnique父作业添加另一个子作业。

作业状态

  • 单个作业:当消费者正在处理消息,并且没有创建任何子作业时调用JobRunner方法的runUnique
    • 创建根作业,传入参数的闭包运行。作业获得Job::STATUS_RUNNING状态,作业startedAt字段设置为当前时间。
    • 如果闭包返回true,作业状态更改为Job::STATUS_SUCCESS,作业stoppedAt字段更改为当前时间。
    • 如果闭包返回false或抛出异常,作业状态更改为Job::STATUS_FAILED,作业stoppedAt字段更改为当前时间。
    • 如果有人中断作业,它停止工作并获得Job::STATUS_CANCELLED状态,作业stoppedAt字段更改为当前时间。
    • 如果创建了新的唯一作业,但之前的作业尚未完成,则检查其执行时间。如果执行时间超过配置的time_before_stale时间,(见陈旧作业),则设置Job::STATUS_STALE状态。
  • 子作业:当消费者正在处理消息时,调用JobRunner方法的runUnique并使用createDelayed创建子作业
    • 创建根作业,传入参数的闭包运行。作业获得Job::STATUS_RUNNING状态,作业startedAt字段设置为当前时间。
    • 当调用JobRunner方法的createDelayed时,会创建子作业并获得Job::STATUS_NEW状态。作业的消息发送到消息队列。
    • 当消费者正在处理子作业的消息并调用JobRunner方法的runDelayed时,闭包运行,子作业获得Job::STATUS_RUNNING状态。
    • 如果闭包返回true,子作业状态更改为Job::STATUS_SUCCESS,作业stoppedAt字段更改为当前时间。
    • 如果闭包返回false或抛出异常,子作业状态更改为Job::STATUS_FAILED,作业stoppedAt字段更改为当前时间。
    • 当所有子作业停止时,根作业状态根据子作业状态更改。
    • 如果有人中断子作业,它停止工作并获得Job::STATUS_CANCELLED状态,作业stoppedAt字段更改为当前时间。
    • 如果有人中断根作业,已运行的子作业将完成其工作并获得根据工作结果的状态(见上述描述)。尚未运行的子作业将被取消并获得Job::STATUS_CANCELLED状态。
    • 如果根作业状态更改为Job::STATUS_STALE,其子作业将自动获得相同的状态。(见陈旧作业
  • 此外:如果作业闭包返回true,运行此作业的过程方法应返回self::ACK。如果作业闭包返回false,运行此作业的过程方法应返回self::REJECT

陈旧作业

无法创建具有相同名称的两个唯一作业。这就是为什么如果一个唯一作业无法完成其工作,它可能会阻塞另一个作业。要处理这种情况,您可以使用陈旧作业功能。

默认情况下,JobProcessor 使用 NullJobConfigurationProvider,因此唯一的作业永远不会过时。如果您想改变这种行为,您需要创建自己的提供者,该提供者实现 JobConfigurationProviderInterface。

方法 JobConfigurationProvider::getTimeBeforeStaleForJobName($jobName); 应返回秒数,在之后作业将被视为“过时”。如果您不希望作业过时,则返回 null 或 -1。

以下示例中,所有作业将在一小时后被视为“过时”。

<?php

use Oro\Component\MessageQueue\Provider\JobConfigurationProviderInterface;

class JobConfigurationProvider implements JobConfigurationProviderInterface
{
    /**
     * {@inheritdoc}
     */
    public function getTimeBeforeStaleForJobName($jobName)
    {
        return 3600;
    }
}

$jobProcessor = new JobProcessor(/* arguments */);
$jobProcessor->setJobConfigurationProvider(new JobConfigurationProvider());

在这种情况下,如果创建第二个具有相同名称的唯一作业,并且之前的作业已超过一小时未更新,并且它未启动子作业,则它将获得 Job::STATUS_STALE 状态,并且将创建新作业。

此外,如果处理器尝试完成“过时”作业,则将删除该作业。

流程

简单流程

通常,消息流程看起来如下所示

Simple Message Flow

然而,如果有多个处理器订阅了同一个主题,流程会变得更加复杂。客户端的消息生产者将消息发送到路由消息处理器。它获取消息并搜索对这种消息感兴趣的真实接收者。然后它将消息的副本发送给所有这些接收者。每个目标消息处理器接收其消息副本并进行处理。

并行运行多个进程的简单方法

让我们假设我们想要并行运行两个进程。在这种情况下,我们可以创建一个具有第一个进程的处理器 B 和一个具有第二个进程的处理器 C。然后我们可以创建处理器 A,向其中注入消息生产者,并将消息发送到处理器 B 和处理器 C。消息被放入队列中,当轮到它们时,消费者运行进程 B 和 C。这可以并行完成。

Simple Parallel Process Running Flow

代码示例

    public function process(MessageInterface $message, SessionInterface $session)
    {
        $data = JSON::decode($message->getBody());

        if ({$message is invalid}) {
            $this->logger->critical(
                sprintf('Got invalid message: "%s"', $message->getBody()),
                ['message' => $message]
            );

            return self::REJECT;
        }

        foreach ($data['ids'] as $id) {
            $this->producer->send(Topics::DO_SOMETHING_WITH_ENTITY, [
                'id' => $id,
                'targetClass' => $data['targetClass'],
                'targetId' => $data['targetId'],
            ]);
        }

        $this->logger->info(sprintf(
            'Sent "%s" messages',
            count($data['ids'])
        ));

        return self::ACK;
    }

示例中的处理器接受一些实体 ID 的数组,并将消息 Topics:DO_SOMETHING_WITH_ENTITY 发送到每个 ID。这些消息被放入消息队列,并在轮到它们时进行处理。如果运行多个消费者,则可以并行完成。

这种方法很简单,并且工作得很好,尽管它有几个缺点。

  • 我们没有一种方法来 监控 进程的 状态,除了读取日志文件。在上面的示例中,我们不知道有多少实体正在处理,以及有多少实体仍在队列中。我们也不知道有多少实体处理成功,以及有多少在处理过程中收到错误。
  • 我们无法确保 唯一 运行。
  • 我们无法轻松 中断 运行的进程。

通过创建根作业和子作业使用 runUnique/createDelayed/runDelayed 运行并行作业的流程

这种方法运行并行作业比之前的方法更合适,尽管它稍微复杂一些。然而,这是并行进程实现的推荐方法。

任务与之前相同。我们想要并行运行两个进程。我们也在创建处理器 A、B 和 C,但它们略有不同。

我们将 JobRunner 注入到 处理器 A 中。在 process 方法内部,它运行 runUnique 方法。在 runUnique 的闭包中,它运行 createDelayed 方法为 处理器 B处理器 C,将 jobId 参数传递给其闭包。在 createDelayed 的闭包中,为 处理器 B处理器 C 创建并发送消息。我们还应该将 jobId 参数添加到消息体中,除了所需的参数。

处理器B和C也有所不同。它们的处理方法调用runDelayed方法,传递接收到的jobId参数。

以下是优点:

  • 唯一运行。由于我们在处理器A中使用runUnique方法,它的新实例在完成所有作业之前无法运行。
  • 作业在数据库中创建。为处理器A创建一个根作业,并为处理器B和C添加子作业。
  • 状态监控。我们可以看到所有子作业的状态:新建表示刚刚创建,运行中表示正在运行的作业,成功表示成功完成的作业,失败表示失败的作业。
  • 根作业状态为运行中,直到所有子作业完成。
  • 中断。我们可以中断子作业或根作业。如果我们中断根作业,所有正在运行的子作业将完成其工作。尚未开始的子作业将不会启动。

Running Parallel Jobs - a Root Job with async Sub-jobs

createDelayed和runDelayed的使用示例

处理器订阅了Topics::DO_BIG_JOB,并运行一个唯一的大作业(作业名称为Topics::DO_BIG_JOB - 与主题名称相同,因此将无法同时运行另一个大作业)处理器创建一系列延迟作业,每个作业发送Topics::DO_SMALL_JOB消息。

    /**
     * {@inheritdoc}
     */
    public function process(MessageInterface $message, SessionInterface $session)
    {
        $bigJobParts = JSON::decode($message->getBody());

        $result = $this->jobRunner->runUnique( //a root job is creating here 
            $message->getMessageId(),
            Topics::DO_BIG_JOB,
            function (JobRunner $jobRunner) use ($bigJobParts) {

                foreach ($bigJobParts as $smallJob) {
                    $jobRunner->createDelayed( // child jobs are creating here and get new status
                        sprintf('%s:%s', Topics::DO_SMALL_JOB, $smallJob),
                        function (JobRunner $jobRunner, Job $child) use ($smallJob) {
                            $this->producer->send(Topics::DO_SMALL_JOB, [ // messages for child jobs are sent here
                                'smallJob' => $smallJob,
                                'jobId' => $child->getId(), // the created child jobs ids are passing as message body params
                            ]);
                        }
                    );
                }

                return true;
            }
        );

        return $result ? self::ACK : self::REJECT;
    }

处理器订阅了Topics::DO_SMALL_JOB,并运行创建的延迟作业。

    /**
     * {@inheritdoc}
     */
    public function process(MessageInterface $message, SessionInterface $session)
    {
        $payload = JSON::decode($message->getBody());

        $result = $this->jobRunner->runDelayed($payload['jobId'], function (JobRunner $jobRunner) use ($payload) {
            //the child job status with the id $payload['jobId'] is changed from new to running
            
            $smallJobData = $payload['smallJob'];
            
            if (! $this->checkDataValidity($smallJobData))) {
                $this->logger->error(
                    sprintf('Invalid data received: "%s"', $smallJobData),
                    ['message' => $payload]
                );

                return false; //the child job status with the id $payload['jobId'] is changed from running to failed
            }

            return true; //the child job status with the id $payload['jobId'] is changed from running to success
        });

        return $result ? self::ACK : self::REJECT;
    }

为大作业创建一个根作业,并为小作业创建一系列子作业。

更多示例

仅运行单个作业(即具有一步的作业使用runUnique)

class MessageProcessor implements MessageProcessorInterface
{
    /**
     * @var JobRunner
     */
    private $jobRunner;

    public function process(MessageInterface $message, SessionInterface $session)
    {
        $data = JSON::decode($message->getBody());

        $result = $this->jobRunner->runUnique(
            $message->getMessageId(),
            'oro:index:reindex',
            function (JobRunner $runner, Job $job) use ($data) {
                // do your job

                return true; // if you want to ACK message or false to REJECT
            }
        );

        return $result ? self::ACK : self::REJECT;
    }
}

作业流程有两个或更多步骤

class Step1MessageProcessor implements MessageProcessorInterface
{
    /**
     * @var JobRunner
     */
    private $jobRunner;

    /**
     * @var MessageProducerInterface
     */
    private $producer;

    public function process(MessageInterface $message, SessionInterface $session)
    {
        $data = JSON::decode($message->getBody());

        $result = $this->jobRunner->runUnique(
            $message->getMessageId(),
            'oro:index:reindex',
            function (JobRunner $runner, Job $job) use ($data) {
                // for example first step generates tasks for step two

                foreach ($entities as $entity) {
                    // every job name must be unique
                    $jobName = 'oro:index:index-single-entity:' . $entity->getId();
                    $runner->createDelayed(
                        $jobName,
                        function (JobRunner $runner, Job $childJob) use ($entity) {
                            $this->producer->send('oro:index:index-single-entity', [
                                'entityId' => $entity->getId(),
                                'jobId' => $childJob->getId(),
                            ])
                    });
                }

                return true; // if you want to ACK message or false to REJECT
            }
        );

        return $result ? self::ACK : self::REJECT;
    }
}

class Step2MessageProcessor implements MessageProcessorInterface
{
    /**
     * @var JobRunner
     */
    private $jobRunner;

    public function process(MessageInterface $message, SessionInterface $session)
    {
        $data = JSON::decode($message->getBody());

        $result = $this->jobRunner->runDelayed(
            $data['jobId'],
            function (JobRunner $runner, Job $job) use ($data) {
                // do your job

                return true; // if you want to ACK message or false to REJECT
            }
        );

        return $result ? self::ACK : self::REJECT;
    }
}

用法

以下是一个仅使用传输层产生消息的示例

<?php

use Oro\Component\MessageQueue\Transport\Dbal\DbalConnection;
use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\DriverManager;

$doctrineConnection = DriverManager::getConnection(
    ['url' => 'mysql://user:secret@localhost/mydb'],
    new Configuration
);

$connection = new DbalConnection($doctrineConnection, 'oro_message_queue');

$session = $connection->createSession();

$queue = $session->createQueue('aQueue');
$message = $session->createMessage('Something has happened');

$session->createProducer()->send($queue, $message);

$session->close();
$connection->close();

以下是一个仅使用传输层消费消息的示例

use Oro\Component\MessageQueue\Transport\Dbal\DbalConnection;
use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\DriverManager;

$doctrineConnection = DriverManager::getConnection(
    ['url' => 'mysql://user:secret@localhost/mydb'],
    new Configuration
);

$connection = new DbalConnection($doctrineConnection, 'oro_message_queue');

$session = $connection->createSession();

$queue = $session->createQueue('aQueue');
$consumer = $session->createConsumer($queue);

while (true) {
    if ($message = $consumer->receive()) {
        echo $message->getBody();

        $consumer->acknowledge($message);
    }
}

$session->close();
$connection->close();

以下是一个仅使用消费层消费消息的示例

<?php
use Oro\Component\MessageQueue\Consumption\MessageProcessor;

class FooMessageProcessor implements MessageProcessor
{
    public function process(Message $message, Session $session)
    {
        echo $message->getBody();

        return self::ACK;
    }
}
<?php
use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\DriverManager;
use Oro\Component\MessageQueue\Consumption\ChainExtension;
use Oro\Component\MessageQueue\Consumption\QueueConsumer;
use Oro\Component\MessageQueue\Transport\Dbal\DbalConnection;

$doctrineConnection = DriverManager::getConnection(
    ['url' => 'mysql://user:secret@localhost/mydb'],
    new Configuration
);

$connection = new DbalConnection($doctrineConnection, 'oro_message_queue');

$queueConsumer = new QueueConsumer($connection, new ChainExtension([]));
$queueConsumer->bind('aQueue', new FooMessageProcessor());

try {
    $queueConsumer->consume();
} finally {
    $queueConsumer->getConnection()->close();
}