leadtech/boot-rabbit-mq

1.2.1 2015-08-25 13:57 UTC

This package is auto-updated.

Last update: 2024-08-29 04:35:40 UTC


README

这个库提供了一种简单快速的方式来实现RabbitMQ。要实现RabbitMQ,您首先需要创建一个QueueTemplate类的实例。这个类代表消费者和生产者之间的单行通信。生产者和消费者类将使用相同的模板。一旦模板可用,您必须子类化AbstractConsumer类并实现handle方法。传入的消息被委托给这个方法。handle方法返回true或false,表示成功状态。

要实现生产者,只需实例化或子类化Boot\RabbitMQ\Producer\ProducerBoot\RabbitMQ\Producer\BatchProducer,并在构造函数中提供队列模板实例。有一个命令可供使用,您可以使用它通过控制台发布消息。

完整示例

一个容错队列的完整示例。消息将被持久化,并在重启后继续存在。客户端被配置为手动发送ACK/NACK信号。

创建worker.php

<?php
// Autoload dependencies
require_once __DIR__ . '/../vendor/autoload.php';

use Boot\RabbitMQ\Strategy\FaultTolerantBehaviour;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Boot\RabbitMQ\Connection\AMQPConnection;
use Boot\RabbitMQ\Consumer\AbstractConsumer;
use Boot\RabbitMQ\RabbitMQ;
use Boot\RabbitMQ\Consumer\Event\ConsumerSuccessEvent;
use Boot\RabbitMQ\Consumer\Event\ReceiveEvent;

class ExampleConsumer extends AbstractConsumer
{
    /**
     * @param \PhpAmqpLib\Message\AMQPMessage $message
     * @return bool
     */
    public function handle(\PhpAmqpLib\Message\AMQPMessage $message)
    {
        echo "Received message #{$message->body['sequence_number']}\n";

        // Return true for success, an ACK signal is sent to the server.
        // Alternatively an exception or returning false will result in a NACK signal instead.
        return true;
    }

}

// Create event dispatcher (optional)
$eventDispatcher = new EventDispatcher();

// Create queue template
$queueTemplate = new \Boot\RabbitMQ\Template\QueueTemplate(
    'some_queue_name',
    new AMQPConnection('localhost', 5672, 'guest', 'guest'),
    new FaultTolerantBehaviour,
    $eventDispatcher
);

$queueTemplate->setExclusive(false);

$eventDispatcher->addListener(RabbitMQ::ON_RECEIVE, function(ReceiveEvent $event){
    echo "Receiving a new message. Sequence number: {$event->getMessage()->body['sequence_number']}\n";
});


$eventDispatcher->addListener(RabbitMQ::ON_CONSUMER_SUCCESS, function(ConsumerSuccessEvent $event){
    echo "Successfully processed message. Sequence number: {$event->getMessage()->body['sequence_number']}\n\n";
});

$consumer = new ExampleConsumer($queueTemplate);
$consumer->connect();
$consumer->listen();

while($consumer->isBusy()) {
    $consumer->wait();
}

创建producer.php

<?php
// Autoload dependencies
require_once __DIR__ . '/../vendor/autoload.php';
use Boot\RabbitMQ\Strategy\FaultTolerantBehaviour;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Boot\RabbitMQ\Connection\AMQPConnection;
use Boot\RabbitMQ\Producer\Producer;

$eventDispatcher = new EventDispatcher();
$queueTemplate = new \Boot\RabbitMQ\Template\QueueTemplate(
    'some_queue_name',
    new AMQPConnection('localhost', 5672, 'guest', 'guest'),
    new FaultTolerantBehaviour
);

$queueTemplate->setExclusive(false);


$producer = new Producer($queueTemplate);
$producer->connect();

for($i=0;$i<=10;$i++) {
    $producer->publish([
        'sequence_number' => time() . '-' . $i
    ]);
}

使用boot实现队列工作者作为命令行

Boot是一个基于symfony组件(如DependencyInjection、EventDispatcher和Console组件)的最小化框架,并完全依赖于composer进行包管理和自动加载。Boot是为了提供一种非常基本但优雅的方式来引导应用程序而创建的。

Boot使创建一个用于运行队列工作者的控制台应用程序变得非常容易。

这是引导控制台应用程序的一个示例

#!/usr/bin/env php
<?php
require_once __DIR__ . '/../vendor/autoload.php';

// Get environment
$input = new ArgvInput();
$env = $input->getParameterOption(['--env', '-e'], 'dev');

// Build application
$rootDir = realpath(__DIR__ . '/..');
$app = (new \Boot\Builder($rootDir))
    ->appName('SomeConsumerApp')                                     # The name of the application
    ->caching('cache', false)                                        # Enable/disable caching of the service container
    ->environment($env)                                              # Set environment
    ->path('resources/config')                                       # Service configuration (order matters)
    ->path('src/Search/Resources/config')                            # Service configuration (order matters)
    ->parameter('project_dir', $rootDir)                             # Register parameters to the service container.
    ->beforeOptimization(new CommandCompilerPass)                    # Automatically register the commands to the console. Console commands must be tagged with a console_command tag.
    ->build()
;

/** @var ConsoleApplication $console */
$console = $app->get('console');
$console->getDefinition()->addOption(
    new InputOption('--env', '-e', InputOption::VALUE_REQUIRED, 'The environment name.', 'dev')
);

$console->run();

安装

在安装RabbitMQ之后,您需要设置一个项目。如果您选择使用boot,请检查示例文件夹。在那里您将找到一个可用的控制台应用程序。如果您想从头开始,您需要在composer.json文件中包含以下包。如果您不想使用boot并创建自己的symfony\console实现,则也可以使用此包,无需安装boot。

安装Boot

"require": {
    "leadtech/boot": "^1.0",
  }

安装Boot RabbitMQ

"require": {
    "leadtech/boot-rabbit-mq": "^1.0"
  }

要求

系统要求

  • PHP >= 5.4
  • RabbitMQ服务器

必需的包

  • videlalvaro/php-amqplib
  • symfony/event-dispatcher
  • symfony/console
  • monolog/monolog

QueueTemplate

责任

QueueTemplate的责任是提供一个针对生产者和消费者之间单行通信的特定设置。消费者和生产者都依赖于队列模板提供的相同设置。

QueueTemplate包含

  • 连接(生产者和消费者都必须使用相同的配置)。
  • 消息序列化器。
  • RabbitMQ特定的选项,例如队列名称、交换、被动性、专用连接等。
  • 队列策略。
  • 事件调度器。

什么是队列策略?

RabbitMQ是一个强大的队列服务器,有无数种使用方式。这个库提供了两种开箱即用的配置,提供容错解决方案或基本但更快的设置。配置可能需要消费端和产生端应用程序的特定设置。

许多功能的缺点是,在实现过程中更容易出错。特别是当你必须处理多个团队时。通过提供单个配置点,队列模板防止排队逻辑分散到组件和/或应用程序中。该策略应同时简化RabbitMQ的实现,并提高团队和应用程序中实现的可靠性。

创建一个在内存中性能更好但不会在崩溃中存活的队列

$queueTemplate = new QueueTemplate('some_queue_name', $connection, new Boot\RabbitMQ\Strategy\BasicBehaviour);

或者,为了创建一个容错队列,我们可以这样做

$queueTemplate = new QueueTemplate('some_queue_name', $connection, new Boot\RabbitMQ\Strategy\FaultTolerantBehaviour);

实现自定义策略

要实现自定义策略,只需创建一个扩展Boot\RabbitMQ\Strategy\QueueStrategy的对象。

/**
 * Class SomeCustomBehaviour
 * @package Boot\RabbitMQ\Strategy
 */
class SomeCustomBehaviour extends QueueStrategy
{

   /**
     * @param QueueTemplate $queueTemplate
     * @param array $data
     *
     * @return AMQPMessage
     */
    public function createMessage(QueueTemplate $queueTemplate, array $data)
    {
        return new AMQPMessage(
            $queueTemplate->getSerializer()->serialize($data)
        );
    }

   /**
     * @param QueueTemplate $queueTemplate
     */
    public function declareQueue(QueueTemplate $queueTemplate)
    {
       // ...
    }

   /**
     * @param QueueTemplate $queueTemplate
     */
    public function declareQualityOfService(QueueTemplate $queueTemplate)
    {
        // ...
    }

   /**
     * Whether an (n)ack signal must be sent to the server. Depending on the setup this may or may not happen automatically.
     *
     * @return bool
     */
    public function doAckManually()
    {
        // ...
    }
}

你有一个可能对其他人有用的出色策略吗?请随时分享 ;-)

消费者

责任

消费者负责监听和处理从特定队列接收到的消息。消费者和生产者类都与QueueTemplate类紧密耦合。队列模板为消费者和生产者提供单个配置。

实现你的消费者

要开始,只需创建一个Boot\RabbitMQ\Consumer\AbstractConsumer的子类。

你必须实现以下方法

/**
 * @param AMQPMessage $message
 * @return bool
 */
public function handle(AMQPMessage $message)
{
   print_r($message->body);

   // Return true on success, if the message could not be processed and you need the message to be enqueued again than return false.
   // Note that the client should be configured to sent ack/nack signals manually. (See FaultTolerantBehaviour strategy)

   return true;
}

实例化消费者

创建消费者对象。消费者依赖于QueueTemplate类。应向消费者提供相同的模板。

$queueTemplate = $app->get('queueTemplate'); // Get fully configured queue template from whatever component one could use.
$consumer = new SomeMessageConsumer($queueTemplate, 'some_optional_consumer_name');

尽管这样做可行,但最好使用依赖注入来配置组件。如果消费者作为独立应用程序实现,我建议查看PHPBoot存储库,该存储库实现了Symfony2服务容器和控制台组件的轻量级实现。我还会在那里添加这个库的现成示例。(在你阅读这段文字时,它可能已经存在了。)

处理传入的消息

开始监听传入的消息。

if($consumer->connect()) {
    $consumer->listen();
}

connect方法将

  • 创建到服务器的连接。消费者将从队列模板获取配置的连接对象。
  • 声明队列,如队列模板和其策略中定义的那样
  • 声明服务质量(qos),如队列模板和其策略中定义的那样

listen方法将

  • 创建一个通道并订阅特定队列。内部我们提供了一个回调,它将为每个接收到的消息执行。由于AbstractConsumer实现了魔法__invoke方法,我们可以将消费者实例用作有效的回调。当__invoke方法被调用时,我们将调用具体消费者的handle方法。handle方法必须返回true或false。这在手动发送ack/nack信号的情况下尤为重要。

我们还没有处理消息。要开始接收传入的消息,我们必须执行

// Start receiving
while ($consumer->isBusy()) {
    // Wait for messages. Any incoming message is delegated to the consumer object
    $consumer->wait();
}

监听消费者事件

客户提供了额外的功能,使其他组件能够轻松地附加额外的功能。以下事件已实现

  • ON_RECEIVE
  • ON_CONSUMER_ERROR
  • ON_CONSUMER_SUCCESS

提示:查看symfony2文档以获取有关EventDispatcher和事件的完整文档。还有更多注册监听器的方法。(事件订阅者,使用对象而不是函数等。)

添加监听器

use Boot\RabbitMQ\RabbitMQ;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Boot\RabbitMQ\Consumer\Event\ReceiveEvent

$eventDispatcher = new EventDispatcher();

// ON_RECEIVE
$eventDispatcher->addListener(RabbitMQ::ON_RECEIVE, function(ReceiveEvent $event){
    var_dump($event->getMessage());
    var_dump($event->getConsumer());
});

// ON_CONSUMER_ERROR
$eventDispatcher->addListener(RabbitMQ::ON_CONSUMER_ERROR, function(ReceiveEvent $event){
    // ...
});

// ON_CONSUMER_SUCCESS
$eventDispatcher->addListener(RabbitMQ::ON_CONSUMER_SUCCESS, function(ReceiveEvent $event){
    // ...
});

控制台命令

该库提供了一个从命令行运行消费者的简单方法。

实例化ConsumerCommand

   /** @var Symfony\Component\Console\Application $console */
   $console->add(
       new Boot\RabbitMQ\Command\ConsumerCommand(
           'consume:my-foo-consumer',                         # Provide a command name
           null,                                              # Provide a logger that implements LoggerInterface. (most mainstream php loggers do, see PSR guidelines)
           'Start consumer to process my foo messages.',      # A description for this command
           1                                                  # Optional sleep time after handling a message. Provide interval in seconds.
       );
   );
   $console->run();

通过执行:php /path/to/app/console consume:my-foo-consumer开始消费

提供的解决方案适用于任何基于symfony2组件构建或基于symfony2的控制台应用程序。如果您在这个项目中不使用symfony2,我建议您查看PHPBoot仓库。Boot提供了一个非常简约的微框架,仅基于composer构建,包含symfony2服务容器(可选)内置对控制台组件的支持。Boot提供了一个强大的构建器,您可以使用它来设置您的应用程序。组件设计为与框架无关、灵活且轻量级。

生产者

责任

生产者负责向队列发布消息。生产者使用与消费者相同的队列模板。此库提供了生产者类以及另一个用于批量操作的生产者。不需要子类化生产者,尽管如果您需要可以这样做。

示例

使用生产者发布消息。

use Boot\RabbitMQ\Producer\Producer
/** @var QueueTemplate $queueTemplate*/
$producer = new Producer($queueTemplate);
$producer->connect();
$producer->publish([
    'message'          => 'some example message',
    'some-other-field' => 'some other value',
    'published'        => time()
]);

使用批量生产者一次发布多个消息。

use Boot\RabbitMQ\Producer\BatchProducer
/** @var QueueTemplate $queueTemplate*/
$producer = new BatchProducer($queueTemplate);
$producer->connect();

// Publish message 1
$producer->publish([
    'message'          => 'some example message',
    'published'        => time()
]);
// Publish message 2
$producer->publish([
    'message'          => 'some example message',
    'published'        => time()
]);
// Commit
$producer->commit();

控制台

我们已使您能够在短时间内快速启动。在开发/测试期间,您可以注册ConsoleProducerCommand类的实例。此命令允许您通过运行命令来简单地发布消息。此命令依赖于生产者对象,并使用生产者和队列模板来设置连接。当您需要时,开发自己的命令也非常简单。只需扩展Boot\RabbitMQ\Command\AbstractProducerCommand类,它应该几乎可以直接使用。命令必须实现一个produce方法。检查Boot\RabbitMQ\Command\ConsoleProducerCommand的源代码以了解其工作原理。(仅几行代码,我保证:-))

创建命令的示例(假设您不使用依赖注入或symfony2等。

/**
 * @var Symfony\Component\Console\Application $console
 * @var Boot\RabbitMQ\Producer\Producer       $producer
 */
$console->add(
    new Boot\RabbitMQ\Command\ConsoleProducerCommand(
        'produce:my-foo-producer',                         # Provide a command name
        $producer,                                         # Provide the producer
        null,                                              # Provide a logger that implements LoggerInterface. (most mainstream php loggers do, see PSR guidelines)
    );
);
$console->run();

要发布相同消息10次,请执行:php /path/to/app/console produce:my-foo-producer --repeat=10 --base64=0 "This is my message"

序列化器

责任

序列化器负责处理RabbitMQ的消息序列化。消费者和生产者都不应知道序列化过程是如何工作的。生产者和消费者将自动使用相同的转换逻辑,通过请求相同的对应队列模板来获取序列化器。如果没有定义序列化器,我们将自动使用包含的JsonSerializer类。

实现自己的序列化器

要实现自己的序列化器,只需创建一个实现了'Boot\RabbitMQ\Serializer\SerializerInterface'的对象。假设我们想使用加密来保护超级秘密对象。我们可以通过创建类似于以下示例的序列化器来实现此功能。只需将此序列化器的实例注入到队列模板中,您就可以开始了。

例如


use Boot\RabbitMQ\Serializer\SerializerInterface;

class EncryptedJsonSerializer implements SerializerInterface
{
   /** @var string */
   private $secretKey;

   /**
     * @param string $secretKey
     */
    public function serialize($secretKey)
    {
        $this->secretKey = $secretKey;
    }


   /**
     * @param array $data
     * @return string
     */
    public function serialize(array $data)
    {
        return $this->encrypt(serialize($data));
    }

   /**
     * @param $data
     * @return array
     */
    public function unserialize($data)
    {
        return unserialize($this->decrpyt($data), true);
    }

   /**
     * @param string $string
     * @return string
     */
    protected function encrypt($string)
    {
        return trim(base64_encode(mcrypt_encrypt(MCRYPT_RIJNDAEL_256, $this->secretKey, $string, MCRYPT_MODE_ECB, mcrypt_create_iv(mcrypt_get_iv_size(MCRYPT_RIJNDAEL_256, MCRYPT_MODE_ECB), MCRYPT_RAND))));
    }

   /**
     * @param string $string
     * @return string
     */
    protected function decrypt($string)
    {
        return trim(mcrypt_decrypt(MCRYPT_RIJNDAEL_256, $this->secretKey, base64_decode($string), MCRYPT_MODE_ECB, mcrypt_create_iv(mcrypt_get_iv_size(MCRYPT_RIJNDAEL_256, MCRYPT_MODE_ECB), MCRYPT_RAND)));
    }


}

在编写此示例时,我意识到这将是很有用的添加。所以我添加了一个使用加密的序列化器。:-)