kaliop / queueingbundle
Kaliop Queueing Bundle
Requires
- ext-curl: *
- php-amqplib/rabbitmq-bundle: ^1.2
- symfony/config: ^2.4 || ^3.0 || ^4.0
- symfony/console: ^2.4 || ^3.0 || ^4.0
- symfony/dependency-injection: ^2.4 || ^3.0 || ^4.0
- symfony/event-dispatcher: ^2.4 || ^3.0 || ^4.0
- symfony/http-kernel: ^2.4 || ^3.0 || ^4.0
- symfony/process: ^2.4 || ^3.0 || ^4.0
Requires (Dev)
- ext-posix: *
- phpunit/phpunit: ^5.7 || ^7.5
- symfony/finder: ^2.4 || ^3.0 || ^4.0
- symfony/framework-bundle: ^2.4 || ^3.0 || ^4.0
- symfony/monolog-bundle: ^2.4 || ^3.0 || ^4.0
Suggests
- phpxmlrpc/phpxmlrpc: Used by the message consumer which executes xmlrpc calls. Ver 4.0 or later required
Conflicts
- kaliop/queueingbundle-sqs: <=0.2
README
一个提供与消息队列系统相关功能的Symfony Bundle。
主要用途
- 简化消息生产者和消费者的编写
- 保护应用程序的其余部分不受所使用消息系统的影响
- 简化在现有应用程序中引入队列系统,允许远程执行现有控制台命令/服务等...
- 帮助创建作为守护进程运行的工作处理消息消费者,克服PHP内在的稳定性问题。
目前仅支持通过RabbitMqBundle的RabbitMQ作为消息代理;支持AMQP协议(版本0.9)的其他代理可能也可以工作,但未经测试。
其他消息系统的支持可用在单独的包中
- AWS SQS
- 基于STOMP的代理(已测试:Apache ActiveMQ和Apollo)
实现的功能
-
一个MessageProducer类,可用于将任何Symfony控制台命令的执行分配给分布式工作者
-
一个控制台命令,可用于测试上述功能(分配现有控制台命令的执行)
-
一个MessageConsumer类,实现了上述功能的补充部分
使用时请注意安全:您通常不希望允许任何人能够向队列中发布命令并盲目执行。您可以实施的基本限制是仅允许通过队列消息执行可用的命令;这通过parameters.yml设置
-
MessageConsumer和MessageProducer类用于分配HTTP调用的执行,例如将链接检查任务分配给多个并发工作者
-
MessageConsumer和MessageProducer类用于将XMLRPC调用分配到远程服务器(注意,您需要安装phpxmlrpc\phpxmlrpc包才能使此功能工作)
-
MessageConsumer和MessageProducer类可以分配由Symfony服务公开的方法的执行。您可以实施的基本限制是仅允许通过队列消息执行可用的服务方法;这通过parameters.yml设置
-
一个事件:kaliop_queueing.message_received,您的服务可以通过使用tag kaliop_queueing.event_listener来监听。这允许过滤接收到的消息,例如引入安全、日志记录或其他横切关注点。要“吞没”已消费的消息,您的事件监听器应简单地调用stopPropagation()事件
-
一个事件:kaliop_queueing.message_consumed,您的服务可以通过使用tag kaliop_queueing.event_listener来监听
-
一个事件:kaliop_queueing.message_consumption_failed,您的服务可以通过使用tag kaliop_queueing.event_listener来监听
-
一个事件:kaliop_queueing.message_sent,您的服务可以通过使用tag kaliop_queueing.event_listener来监听
-
一个用于消费消息的控制台命令,类似于rabbitmq:consumer命令,但具有更多选项,例如支持多个驱动和超时
-
一个用于“守护进程化”(即如果未执行则重启)多个PHP进程的控制台命令,这些进程是“工作者”(即消息消费者)
-
一个用于调试和管理队列的控制台命令,通过转储其配置和当前消息计数以及清除和删除它们(确切功能取决于每个驱动)
-
一个用于调试驱动器的控制台命令——目前它只是简单地列出它们
-
一个MessageProducer类,可以从中派生出消息生产者
-
一个MessageConsumer类,可以从中派生出消息消费者
入门教程
设置
-
安装并启动RabbitMQ。目前您不需要设置交换和队列,但安装管理插件是个好主意。
-
安装包。确保您已安装了Symfony中的oldsound/rabbitmq-bundle包(如果您使用Composer,这会自动完成)。
-
在kernel类的registerBundles()中启用两个 KaliopQueueingBundle包和OldSoundRabbitMqBundle。
-
如果不是在开发环境中,请清除所有缓存。
配置 - 测试
我们将配置服务器,以便可以将控制台命令的执行委托给远程系统。首先,将使用相同的Symfony安装作为消息生产者和消费者。
-
首先测试从这个包中执行一个简单的控制台命令是否可以在本地执行。
php console kaliop_queueing:echoback "hello world" -f "testoutput.txt"
-
检查是否已注册了该包的'rabbitmq'驱动程序。
php console kaliop_queueing:managedriver list
-
在配置文件中,根据rabbitmq-bundle文档定义生产者和消费者。
Resources/config中的rabbitmq_sample.yml文件提供了一个配置示例,用于定义用于分发symfony控制台命令执行的队列。
-
通过列出它们来检查生产者和消费者是否设置正确。
php console kaliop_queueing:managequeue list-configured
在结果中,标记为1的队列是生产者,标记为2的队列是消费者。
-
启动一个消费者,将其置于后台。
php console kaliop_queueing:consumer <queue> --label="testconsumer" -w &
注意,上面的内容应替换为步骤8中的消费者名称。
-
测试现在会发生什么:当您将echoback的执行入队时,消费者应立即触发它。
php console kaliop_queueing:queuecommand <queue> kaliop_queueing:echoback "hello world again" option.f.testoutput2.txt cat testoutput2.txt tail logs/<env>.log
注意,上面的内容应替换为步骤8中的生产者名称。
-
终止消费者,删除创建的测试输出文件。
配置 - 迁移到生产环境
-
实现自定义消息生产者和消费者,通过配置将它们连接到Rabbit队列。
-
安排 watchdog 的执行,以便它可以自动启动消费者。
-
在配置文件中,将您希望作为守护进程运行的工人定义为参数。有关详细信息,请参阅parameters.yml文件。
-
在crontab中设置类似以下内容
* * * * * cd $APP && $PHP console kaliop_queueing:workerswatchdog > /dev/null
-
-
务必保护您的网络!!
如果您正在运行执行symfony控制台命令或symfony服务的消费者,请注意,目前它们根本不提供任何认证机制。任何可以向其队列发送消息的人都可以执行相关代码。
-
如果您正在运行执行symfony控制台命令或symfony服务的消费者,请至少通过在parameters.yml中配置值来设置一些基本的安全性,通过过滤接受的消息。
代码示例
发送消息
-
设置新的消息生产者
-
创建MessageProducer的子类;
-
实现一个
publish
方法,该方法内部调用doPublish
;namespace Hello\World; use Kaliop\QueueingBundle\Service\MessageProducer; class Producer { public function publish() { $this->doPublish($someData, $aRoutingKey); } }
-
将其声明为服务;
services: hello.world.producer: class: Hello\World\Producer
-
-
执行
$driver = $container->get('kaliop_queueing.drivermanager')->getDriver($driverName); $container->get('hello.world.producer') ->setDriver($driver) ->setQueueName($queueName); ->publish($stuff...);
-
如果您想使上述代码更简单,您可以将特定的消息生产者定义为服务,只要您使用的是Symfony 2.4或更高版本。
示例配置:此服务使用'sqs'驱动程序和一个名为'aQueue'的队列。
services: hello.world.producer: class: %kaliop_queueing.message_producer.console_command.class% calls: - [ setDriver, [ "@=service('kaliop_queueing.drivermanager').getDriver('sqs')" ] ] - [ setQueueName, [ 'aQueue' ] ]
和代码
$container->get('hello.world.producer')->publish($args, $routingKey, ...);
接收消息
-
设置新的消息消费者
- 创建MessageConsumer的子类;
- 实现一个
consume
方法; - 将其声明为服务;
- 使用特定于驱动程序的配置将服务连接到所需的队列;
-
执行
$driver = $container->get('kaliop_queueing.drivermanager')->getDriver($driverName); $driver->getConsumer($queueName) // optional ->setRoutingKey($key); ->consume($nrOfMessages);
可用的控制台命令
-
php console kaliop_queueing:queuecommand [-i=] [-ttl=] [-r=] [--novalidate] <args*>
将消息发送到队列,指定执行给定的symfony控制台命令
-
php console kaliop_queueing:queuemessage [-i=] [-ttl=] [-r=] [-c=] [-m=]
将消息发送到队列,使用预格式化的有效负载
-
php console kaliop_queueing:consumer [-w] [-r=] [-m=] [-t=timeout]
启动一个工作进程,该进程从指定的队列中消费消息。
-
php控制台kaliop_queueing:managedriver list []
管理指定的驱动程序或列出已安装的驱动程序
-
php控制台kaliop_queueing:managequeue [-i=] list-configured|purge|delete|info [] [--argument=]*
管理指定的队列:获取其状态信息、删除它或从消息中清除它。也可以列出所有队列
-
php控制台kaliop_queueing:watchdog start|stop|check
检查所有配置的工人进程是否正在执行,如果它们没有运行则重新启动它们
可用的事件
-
Kaliop\QueueingBundle\Event\EventsList::MESSAGE_RECEIVED在从队列获取消息并在消费之前发出。它可以用来取消消费。如果从队列中接收的消息无法根据其预期的格式(默认为json)进行解码,则不会发出此事件。
-
Kaliop\QueueingBundle\Event\EventsList::MESSAGE_CONSUMED在从队列中消费消息时发出。
-
Kaliop\QueueingBundle\Event\EventsList::MESSAGE_CONSUMPTION_FAILED在消息消费者在尝试解码和消费消息时抛出异常时发出。这通常用于替代MESSAGE_CONSUMED事件。
-
Kaliop\QueueingBundle\Event\EventsList::PROCESS_STARTED在watchdog启动进程时发出。
-
Kaliop\QueueingBundle\Event\EventsList::PROCESS_STOPPED在watchdog停止进程时发出。
所有标记为'kaliop_queueing.event_listener'的过滤器都可以设置为在所有队列上运行,或者只在单个队列上运行。为了在单个队列上设置过滤器,请使用以下语法
Tags:
- { name: kaliop_queueing.event_listener, event: kaliop_queueing.message_received, queue: aQueueName }
注意:这些事件不是由Symfony2的事件调度器发出的,因此您不能使用kernel.event_listener
标签或@DI\Observe
注解来注册监听器。请参阅services.yml中的示例了解如何使用它们。
稳定性很重要
为什么使用消息消费者来执行Symfony控制台命令,而不是在消费者类中完成所有工作是一个好主意?
答案是:增加了稳定性。
默认情况下,消息消费者作为守护进程运行,即它会长时间执行。这意味着它容易受到代码中的内存泄漏的影响。它还可能在发生致命错误时意外停止,以及可能遭受数据库连接中断(这通常发生在长连接时)或其他资源锁定。
通过使用消费者进程仅“监听”传入的消息,并在每次收到“工作”控制台命令时启动一个独立进程,我们使消费者更简单、更稳定。它不连接到数据库,也不泄漏内存。
缺点是,为每条接收到的消息启动一个控制台进程需要相当长的时间,系统的吞吐量会降低。
如果您知道“CGI模式”如何为执行PHP的web服务器工作,那么,这正是我们在做的事情,只是从amqp请求而不是http请求开始(还有,我们的监听器一次只产生一个工作者,而web服务器会并行产生多个)。
性能很重要
选项1
如果您的消费者必须以最短的时间处理大量消息,并且您仍然想要“每个消息一个PHP进程”的模型提供的良好稳定性保证,您只需简单地设置多个消费者并行运行。根据工作者的工作类型,您通常希望并行执行与可用CPU数量相同的消费者。您可以使用watchdog
控制台命令来并行启动消费者。
选项2
如果您更喜欢从更快的执行时间中受益,同时降低稳定性保证,您可以在消费者内部
- 处理所有预期的工作
- 使用-l选项启动消费者命令,该选项在内存增长无界之前结束
- 使用-m或-t选项启动消费者命令,该选项在经过一定时间或消费一定数量的消息后结束
- 使用
watchdog
控制台命令确保消费者在自杀后立即重新启动
此模式的工作方式类似于 'Apache MPM' 配置,但 PHP 不会在每次请求结束时清理其全局状态。
如果您想除了内存使用和超时之外,让消费者在自定义条件下自杀,您可以简单地添加一个从 Watchdog
继承的事件监听器并实现 check
方法。
请注意,消费者一次只能处理一条消息,因此您可能需要并行执行多个消费者。
加分项
如果您想在不重写任何代码的情况下测试 'CGI-like' 处理模式和 'MPM-like' 模式的区别,您可以
-
将您的消费者代码编写为 Symfony 控制台命令
-
基准测试它们每秒可以处理多少条消息
-
在配置文件中,将您的队列的回调从
callback: kaliop_queueing.message_consumer.console_command
改为
callback: kaliop_queueing.message_consumer.inprocess_console_command
-
再次基准测试
当然,您还可以像使用 Apache 中的 MaxRequestsPerChildren 指令一样使用 -l 和 -m 选项
选项 3
在追求无妥协性能和稳定性方面,第三种方法稍微复杂一些。步骤如下
- 将您的消费者代码编写为作为 '网页'(例如,Symfony 控制器)执行
- 设置一个只能从本地主机访问的 web 服务器,在这些页面上可以执行这些页面
- 编写您的消费者代码,使其将传入的消息转换为对本地服务器的 HTTP 请求
继续使用 web 服务器的类比,这更类似于 FastCGI 配置。
食谱
错误管理
(待文档化)
ACK/NACK,究竟是什么意思
(待文档化)
在执行控制台命令失败时重新排队消息
(待文档化。目前,请查看 services.yml 中示例过滤器服务定义,它执行此操作)
捆绑队列和路由键 - 它们如何映射到消息系统
(待文档化)
实现新的驱动程序
(待文档化)
更多文档
-
一套幻灯片,为 2015 年的 phpsummercamp 准备:https://docs.google.com/presentation/d/16rjSyejWGx4z7lIUYzvB5sXS8wMuHQc5N3QdIbkgj1A/pub?start=false&loop=false&delayms=10000#slide=id.p
-
另一套幻灯片,来自 2015 年的 Forum PHP:http://www.slideshare.net/gggeek/rabbits-indians-and-symfony-meets-queueing-brokers
类似包
这里所做的努力绝非独一无二;似乎已经有大量的 PHP 包处理队列并从传输协议的细节中抽象出来。
以下既不是一项背书声明,也不是任何衡量标准下的 definitive 列表,更多的是对库的开发者的一种提醒,告诉他们哪里可以找到灵感并借用代码 ;-)
-
zendframework/zend-queue - https://github.com/zendframework/ZendQueue
-
slm/queue - https://github.com/juriansluiman/SlmQueue
-
jms/job-queue-bundle - https://github.com/schmittjoh/JMSJobQueueBundle
-
bernard/bernard - https://github.com/bernardphp/bernard
-
wowo/wowo-queue-bundle - https://github.com/wowo/WowoQueueBundle
-
grimkirill/queue - https://github.com/grimkirill/queue
-
swarrot/swarrot - https://github.com/swarrot/swarrot
-
M6Web 团队的不同包:https://github.com/M6Web