leadtech / boot-rabbit-mq
Requires
- php: >=5.4.0
- monolog/monolog: ^1.16
- symfony/console: ^2.7
- symfony/event-dispatcher: ^2.7
- videlalvaro/php-amqplib: ^2.5
Requires (Dev)
- devster/ubench: ^1.2
- fabpot/php-cs-fixer: ^1.8
- mikey179/vfsstream: ^1.5
- phpmd/phpmd: ^2.2
- phpunit/phpunit: ^4.8
README
这个库提供了一种简单快速的方式来实现RabbitMQ。要实现RabbitMQ,您首先需要创建一个QueueTemplate类的实例。这个类代表消费者和生产者之间的单行通信。生产者和消费者类将使用相同的模板。一旦模板可用,您必须子类化AbstractConsumer类并实现handle方法。传入的消息被委托给这个方法。handle方法返回true或false,表示成功状态。
要实现生产者,只需实例化或子类化Boot\RabbitMQ\Producer\Producer或Boot\RabbitMQ\Producer\BatchProducer,并在构造函数中提供队列模板实例。有一个命令可供使用,您可以使用它通过控制台发布消息。
完整示例
一个容错队列的完整示例。消息将被持久化,并在重启后继续存在。客户端被配置为手动发送ACK/NACK信号。
创建worker.php
<?php
// Autoload dependencies
require_once __DIR__ . '/../vendor/autoload.php';
use Boot\RabbitMQ\Strategy\FaultTolerantBehaviour;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Boot\RabbitMQ\Connection\AMQPConnection;
use Boot\RabbitMQ\Consumer\AbstractConsumer;
use Boot\RabbitMQ\RabbitMQ;
use Boot\RabbitMQ\Consumer\Event\ConsumerSuccessEvent;
use Boot\RabbitMQ\Consumer\Event\ReceiveEvent;
class ExampleConsumer extends AbstractConsumer
{
/**
* @param \PhpAmqpLib\Message\AMQPMessage $message
* @return bool
*/
public function handle(\PhpAmqpLib\Message\AMQPMessage $message)
{
echo "Received message #{$message->body['sequence_number']}\n";
// Return true for success, an ACK signal is sent to the server.
// Alternatively an exception or returning false will result in a NACK signal instead.
return true;
}
}
// Create event dispatcher (optional)
$eventDispatcher = new EventDispatcher();
// Create queue template
$queueTemplate = new \Boot\RabbitMQ\Template\QueueTemplate(
'some_queue_name',
new AMQPConnection('localhost', 5672, 'guest', 'guest'),
new FaultTolerantBehaviour,
$eventDispatcher
);
$queueTemplate->setExclusive(false);
$eventDispatcher->addListener(RabbitMQ::ON_RECEIVE, function(ReceiveEvent $event){
echo "Receiving a new message. Sequence number: {$event->getMessage()->body['sequence_number']}\n";
});
$eventDispatcher->addListener(RabbitMQ::ON_CONSUMER_SUCCESS, function(ConsumerSuccessEvent $event){
echo "Successfully processed message. Sequence number: {$event->getMessage()->body['sequence_number']}\n\n";
});
$consumer = new ExampleConsumer($queueTemplate);
$consumer->connect();
$consumer->listen();
while($consumer->isBusy()) {
$consumer->wait();
}
创建producer.php
<?php
// Autoload dependencies
require_once __DIR__ . '/../vendor/autoload.php';
use Boot\RabbitMQ\Strategy\FaultTolerantBehaviour;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Boot\RabbitMQ\Connection\AMQPConnection;
use Boot\RabbitMQ\Producer\Producer;
$eventDispatcher = new EventDispatcher();
$queueTemplate = new \Boot\RabbitMQ\Template\QueueTemplate(
'some_queue_name',
new AMQPConnection('localhost', 5672, 'guest', 'guest'),
new FaultTolerantBehaviour
);
$queueTemplate->setExclusive(false);
$producer = new Producer($queueTemplate);
$producer->connect();
for($i=0;$i<=10;$i++) {
$producer->publish([
'sequence_number' => time() . '-' . $i
]);
}
使用boot实现队列工作者作为命令行
Boot是一个基于symfony组件(如DependencyInjection、EventDispatcher和Console组件)的最小化框架,并完全依赖于composer进行包管理和自动加载。Boot是为了提供一种非常基本但优雅的方式来引导应用程序而创建的。
Boot使创建一个用于运行队列工作者的控制台应用程序变得非常容易。
这是引导控制台应用程序的一个示例
#!/usr/bin/env php
<?php
require_once __DIR__ . '/../vendor/autoload.php';
// Get environment
$input = new ArgvInput();
$env = $input->getParameterOption(['--env', '-e'], 'dev');
// Build application
$rootDir = realpath(__DIR__ . '/..');
$app = (new \Boot\Builder($rootDir))
->appName('SomeConsumerApp') # The name of the application
->caching('cache', false) # Enable/disable caching of the service container
->environment($env) # Set environment
->path('resources/config') # Service configuration (order matters)
->path('src/Search/Resources/config') # Service configuration (order matters)
->parameter('project_dir', $rootDir) # Register parameters to the service container.
->beforeOptimization(new CommandCompilerPass) # Automatically register the commands to the console. Console commands must be tagged with a console_command tag.
->build()
;
/** @var ConsoleApplication $console */
$console = $app->get('console');
$console->getDefinition()->addOption(
new InputOption('--env', '-e', InputOption::VALUE_REQUIRED, 'The environment name.', 'dev')
);
$console->run();
安装
在安装RabbitMQ之后,您需要设置一个项目。如果您选择使用boot,请检查示例文件夹。在那里您将找到一个可用的控制台应用程序。如果您想从头开始,您需要在composer.json文件中包含以下包。如果您不想使用boot并创建自己的symfony\console实现,则也可以使用此包,无需安装boot。
安装Boot
"require": {
"leadtech/boot": "^1.0",
}
安装Boot RabbitMQ
"require": {
"leadtech/boot-rabbit-mq": "^1.0"
}
要求
系统要求
- PHP >= 5.4
- RabbitMQ服务器
必需的包
- videlalvaro/php-amqplib
- symfony/event-dispatcher
- symfony/console
- monolog/monolog
QueueTemplate
责任
QueueTemplate的责任是提供一个针对生产者和消费者之间单行通信的特定设置。消费者和生产者都依赖于队列模板提供的相同设置。
QueueTemplate包含
- 连接(生产者和消费者都必须使用相同的配置)。
- 消息序列化器。
- RabbitMQ特定的选项,例如队列名称、交换、被动性、专用连接等。
- 队列策略。
- 事件调度器。
什么是队列策略?
RabbitMQ是一个强大的队列服务器,有无数种使用方式。这个库提供了两种开箱即用的配置,提供容错解决方案或基本但更快的设置。配置可能需要消费端和产生端应用程序的特定设置。
许多功能的缺点是,在实现过程中更容易出错。特别是当你必须处理多个团队时。通过提供单个配置点,队列模板防止排队逻辑分散到组件和/或应用程序中。该策略应同时简化RabbitMQ的实现,并提高团队和应用程序中实现的可靠性。
创建一个在内存中性能更好但不会在崩溃中存活的队列
$queueTemplate = new QueueTemplate('some_queue_name', $connection, new Boot\RabbitMQ\Strategy\BasicBehaviour);
或者,为了创建一个容错队列,我们可以这样做
$queueTemplate = new QueueTemplate('some_queue_name', $connection, new Boot\RabbitMQ\Strategy\FaultTolerantBehaviour);
实现自定义策略
要实现自定义策略,只需创建一个扩展Boot\RabbitMQ\Strategy\QueueStrategy的对象。
/**
* Class SomeCustomBehaviour
* @package Boot\RabbitMQ\Strategy
*/
class SomeCustomBehaviour extends QueueStrategy
{
/**
* @param QueueTemplate $queueTemplate
* @param array $data
*
* @return AMQPMessage
*/
public function createMessage(QueueTemplate $queueTemplate, array $data)
{
return new AMQPMessage(
$queueTemplate->getSerializer()->serialize($data)
);
}
/**
* @param QueueTemplate $queueTemplate
*/
public function declareQueue(QueueTemplate $queueTemplate)
{
// ...
}
/**
* @param QueueTemplate $queueTemplate
*/
public function declareQualityOfService(QueueTemplate $queueTemplate)
{
// ...
}
/**
* Whether an (n)ack signal must be sent to the server. Depending on the setup this may or may not happen automatically.
*
* @return bool
*/
public function doAckManually()
{
// ...
}
}
你有一个可能对其他人有用的出色策略吗?请随时分享 ;-)
消费者
责任
消费者负责监听和处理从特定队列接收到的消息。消费者和生产者类都与QueueTemplate类紧密耦合。队列模板为消费者和生产者提供单个配置。
实现你的消费者
要开始,只需创建一个Boot\RabbitMQ\Consumer\AbstractConsumer的子类。
你必须实现以下方法
/**
* @param AMQPMessage $message
* @return bool
*/
public function handle(AMQPMessage $message)
{
print_r($message->body);
// Return true on success, if the message could not be processed and you need the message to be enqueued again than return false.
// Note that the client should be configured to sent ack/nack signals manually. (See FaultTolerantBehaviour strategy)
return true;
}
实例化消费者
创建消费者对象。消费者依赖于QueueTemplate类。应向消费者提供相同的模板。
$queueTemplate = $app->get('queueTemplate'); // Get fully configured queue template from whatever component one could use.
$consumer = new SomeMessageConsumer($queueTemplate, 'some_optional_consumer_name');
尽管这样做可行,但最好使用依赖注入来配置组件。如果消费者作为独立应用程序实现,我建议查看PHPBoot存储库,该存储库实现了Symfony2服务容器和控制台组件的轻量级实现。我还会在那里添加这个库的现成示例。(在你阅读这段文字时,它可能已经存在了。)
处理传入的消息
开始监听传入的消息。
if($consumer->connect()) {
$consumer->listen();
}
connect方法将
- 创建到服务器的连接。消费者将从队列模板获取配置的连接对象。
- 声明队列,如队列模板和其策略中定义的那样
- 声明服务质量(qos),如队列模板和其策略中定义的那样
listen方法将
- 创建一个通道并订阅特定队列。内部我们提供了一个回调,它将为每个接收到的消息执行。由于AbstractConsumer实现了魔法__invoke方法,我们可以将消费者实例用作有效的回调。当__invoke方法被调用时,我们将调用具体消费者的handle方法。handle方法必须返回true或false。这在手动发送ack/nack信号的情况下尤为重要。
我们还没有处理消息。要开始接收传入的消息,我们必须执行
// Start receiving
while ($consumer->isBusy()) {
// Wait for messages. Any incoming message is delegated to the consumer object
$consumer->wait();
}
监听消费者事件
客户提供了额外的功能,使其他组件能够轻松地附加额外的功能。以下事件已实现
- ON_RECEIVE
- ON_CONSUMER_ERROR
- ON_CONSUMER_SUCCESS
提示:查看symfony2文档以获取有关EventDispatcher和事件的完整文档。还有更多注册监听器的方法。(事件订阅者,使用对象而不是函数等。)
添加监听器
use Boot\RabbitMQ\RabbitMQ;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Boot\RabbitMQ\Consumer\Event\ReceiveEvent
$eventDispatcher = new EventDispatcher();
// ON_RECEIVE
$eventDispatcher->addListener(RabbitMQ::ON_RECEIVE, function(ReceiveEvent $event){
var_dump($event->getMessage());
var_dump($event->getConsumer());
});
// ON_CONSUMER_ERROR
$eventDispatcher->addListener(RabbitMQ::ON_CONSUMER_ERROR, function(ReceiveEvent $event){
// ...
});
// ON_CONSUMER_SUCCESS
$eventDispatcher->addListener(RabbitMQ::ON_CONSUMER_SUCCESS, function(ReceiveEvent $event){
// ...
});
控制台命令
该库提供了一个从命令行运行消费者的简单方法。
实例化ConsumerCommand
/** @var Symfony\Component\Console\Application $console */
$console->add(
new Boot\RabbitMQ\Command\ConsumerCommand(
'consume:my-foo-consumer', # Provide a command name
null, # Provide a logger that implements LoggerInterface. (most mainstream php loggers do, see PSR guidelines)
'Start consumer to process my foo messages.', # A description for this command
1 # Optional sleep time after handling a message. Provide interval in seconds.
);
);
$console->run();
通过执行:php /path/to/app/console consume:my-foo-consumer开始消费
提供的解决方案适用于任何基于symfony2组件构建或基于symfony2的控制台应用程序。如果您在这个项目中不使用symfony2,我建议您查看PHPBoot仓库。Boot提供了一个非常简约的微框架,仅基于composer构建,包含symfony2服务容器(可选)内置对控制台组件的支持。Boot提供了一个强大的构建器,您可以使用它来设置您的应用程序。组件设计为与框架无关、灵活且轻量级。
生产者
责任
生产者负责向队列发布消息。生产者使用与消费者相同的队列模板。此库提供了生产者类以及另一个用于批量操作的生产者。不需要子类化生产者,尽管如果您需要可以这样做。
示例
使用生产者发布消息。
use Boot\RabbitMQ\Producer\Producer
/** @var QueueTemplate $queueTemplate*/
$producer = new Producer($queueTemplate);
$producer->connect();
$producer->publish([
'message' => 'some example message',
'some-other-field' => 'some other value',
'published' => time()
]);
使用批量生产者一次发布多个消息。
use Boot\RabbitMQ\Producer\BatchProducer
/** @var QueueTemplate $queueTemplate*/
$producer = new BatchProducer($queueTemplate);
$producer->connect();
// Publish message 1
$producer->publish([
'message' => 'some example message',
'published' => time()
]);
// Publish message 2
$producer->publish([
'message' => 'some example message',
'published' => time()
]);
// Commit
$producer->commit();
控制台
我们已使您能够在短时间内快速启动。在开发/测试期间,您可以注册ConsoleProducerCommand类的实例。此命令允许您通过运行命令来简单地发布消息。此命令依赖于生产者对象,并使用生产者和队列模板来设置连接。当您需要时,开发自己的命令也非常简单。只需扩展Boot\RabbitMQ\Command\AbstractProducerCommand类,它应该几乎可以直接使用。命令必须实现一个produce方法。检查Boot\RabbitMQ\Command\ConsoleProducerCommand的源代码以了解其工作原理。(仅几行代码,我保证:-))
创建命令的示例(假设您不使用依赖注入或symfony2等。
/**
* @var Symfony\Component\Console\Application $console
* @var Boot\RabbitMQ\Producer\Producer $producer
*/
$console->add(
new Boot\RabbitMQ\Command\ConsoleProducerCommand(
'produce:my-foo-producer', # Provide a command name
$producer, # Provide the producer
null, # Provide a logger that implements LoggerInterface. (most mainstream php loggers do, see PSR guidelines)
);
);
$console->run();
要发布相同消息10次,请执行:php /path/to/app/console produce:my-foo-producer --repeat=10 --base64=0 "This is my message"
序列化器
责任
序列化器负责处理RabbitMQ的消息序列化。消费者和生产者都不应知道序列化过程是如何工作的。生产者和消费者将自动使用相同的转换逻辑,通过请求相同的对应队列模板来获取序列化器。如果没有定义序列化器,我们将自动使用包含的JsonSerializer类。
实现自己的序列化器
要实现自己的序列化器,只需创建一个实现了'Boot\RabbitMQ\Serializer\SerializerInterface'的对象。假设我们想使用加密来保护超级秘密对象。我们可以通过创建类似于以下示例的序列化器来实现此功能。只需将此序列化器的实例注入到队列模板中,您就可以开始了。
例如
use Boot\RabbitMQ\Serializer\SerializerInterface;
class EncryptedJsonSerializer implements SerializerInterface
{
/** @var string */
private $secretKey;
/**
* @param string $secretKey
*/
public function serialize($secretKey)
{
$this->secretKey = $secretKey;
}
/**
* @param array $data
* @return string
*/
public function serialize(array $data)
{
return $this->encrypt(serialize($data));
}
/**
* @param $data
* @return array
*/
public function unserialize($data)
{
return unserialize($this->decrpyt($data), true);
}
/**
* @param string $string
* @return string
*/
protected function encrypt($string)
{
return trim(base64_encode(mcrypt_encrypt(MCRYPT_RIJNDAEL_256, $this->secretKey, $string, MCRYPT_MODE_ECB, mcrypt_create_iv(mcrypt_get_iv_size(MCRYPT_RIJNDAEL_256, MCRYPT_MODE_ECB), MCRYPT_RAND))));
}
/**
* @param string $string
* @return string
*/
protected function decrypt($string)
{
return trim(mcrypt_decrypt(MCRYPT_RIJNDAEL_256, $this->secretKey, base64_decode($string), MCRYPT_MODE_ECB, mcrypt_create_iv(mcrypt_get_iv_size(MCRYPT_RIJNDAEL_256, MCRYPT_MODE_ECB), MCRYPT_RAND)));
}
}
在编写此示例时,我意识到这将是很有用的添加。所以我添加了一个使用加密的序列化器。:-)