printed / rabbitmq-queue-bundle
RabbitMQ生产者和消费者助手包装器
Requires
- php: >=7.0
- doctrine/cache: ^1.6
- doctrine/doctrine-bundle: ~1.4
- doctrine/orm: ~2.5
- monolog/monolog: ~1.11
- php-amqplib/rabbitmq-bundle: ^1.13.0
- php-http/guzzle6-adapter: ^1.1
- psr/log: ^1.0
- ramsey/uuid: ~3.4
- richardfullmer/rabbitmq-management-api: ^2.0
- symfony/config: ^3.4 || ^4.0
- symfony/console: ^3.4 || ^4.0
- symfony/dependency-injection: ^3.4 || ^4.0
- symfony/expression-language: ^3.4 || ^4.0
- symfony/filesystem: ^3.4 || ^4.0
- symfony/http-foundation: ^3.4 || ^4.0
- symfony/http-kernel: ^3.4 || ^4.0
- symfony/validator: ^3.4 || ^4.0
This package is not auto-updated.
Last update: 2024-09-18 19:12:44 UTC
README
Symfony的RabbitMQ包装器包,旨在提高您使用消费者和生产者的体验。该包依赖于php-amqplib/rabbitmq-bundle
包。
显著特性
- 可取消的队列任务
- 队列任务完成进度跟踪
- 一个基于supervisord的零停机时间部署程序
- (通过
php-amqplib/rabbitmq-bundle
) 队列消费者的优雅最大执行时间 - 队列任务重试
设置与依赖
- PHP
>=7.0
- https://packagist.org.cn/packages/symfony/symfony
^3.4|^4.0
- https://packagist.org.cn/packages/doctrine/orm
~2.5
- https://packagist.org.cn/packages/monolog/monolog
~1.11
- https://packagist.org.cn/packages/ramsey/uuid
~3.4
- https://packagist.org.cn/packages/php-amqplib/rabbitmq-bundle
~1.6
我们假设您熟悉php-amqplib/rabbitmq-bundle
的配置和设置。
对memcached用户的特别说明
请打开您的项目中的vendor/doctrine/cache/lib/Doctrine/Common/Cache/MemcachedCache.php
文件,查看您是否可以找到以下代码片段
protected function doContains($id) { return false !== $this->memcached->get($id) || $this->memcached->getResultCode() !== Memcached::RES_NOTFOUND; }
如果找到了,请考虑将doctrine/cache
的版本升级到至少1.7.0
,否则CacheQueueMaintenanceStrategy
可能会在连接到您的memcached服务器出现任何问题时说维护模式已启动。
包配置
printedcom_rabbitmq_queue_bundle: options: # Name of the service that acts as a default producer in RabbitMQ. See below this code snippet for details. default_rabbitmq_producer_name: 'default_rabbitmq_producer' # Name of the service that implements the queue maintenance mode. Use one of the following: # # 1. 'printed.bundle.queue.service.queue_maintenance.cache_queue_maintenance_strategy' (recommended) # 2. 'printed.bundle.queue.service.queue_maintenance.filesystem_queue_maintenance_strategy' # # To understand the difference, see the comments in the source code. queue_maintenance_strategy__service_name: 'printed.bundle.queue.service.queue_maintenance.cache_queue_maintenance_strategy' # Name of a cache service that implements the requirements outlined in the CacheQueueMaintenanceStrategy cache_queue_maintenance_strategy__cache_service_name: 'rabbitmq_queue_bundle_cache' # Name of the service that implements the New Deployments Detection feature. Choose one of the following: # # 1. 'printed.bundle.queue.service.new_deployments_detector.noop_strategy' - disables this functionality # 2. 'printed.bundle.queue.service.new_deployments_detector.cache_strategy' new_deployments_detector_strategy__service_name: 'printed.bundle.queue.service.new_deployments_detector.cache_strategy' # Name of a cache service that implements the requirements outlined in the CacheQueueMaintenanceStrategy new_deployments_detector_strategy__cache_service_name: 'rabbitmq_queue_bundle_cache' # Exit code used to exit a worker when it's detected that it's running old code consumer_exit_code__running_using_old_code: 15 # With tools like supervisord, it's important to have consumers running without exiting for a specified amount of time # in order to prove that the script started successfully. This is a problem if a consumer manages to start, connect to rabbitmq # and fail due to exception being thrown during execution of the task faster than the specified amount of time. The following # option makes the consumer not fail too fast. You essentially want to put your supervisord's "startsecs" value here (in seconds). # Bear in mind that the underlying code will always add 1 second to whatever value you put here in order to compensate for # fraction of seconds that aren't taken into account during evaluating of how long the script has been running for. This means that # you just need to put the value from supervisord's config "as is" without worrying about race conditions. # # You can disable this feature by either setting this option to null or not mentioning this option at all. I.e. by default this option # is disabled. minimal_runtime_in_seconds_on_consumer_exception: 1 rabbitmq_user: '%rabbitmq_user%' rabbitmq_password: '%rabbitmq_pass%' # Pass '/' or don't set this option if you don't know what rabbtimq vhost is. rabbitmq_vhost: '%rabbitmq_vhost%' # This is used only by commands that call the rabbit management api. You don't need to do # anything with this key if you don't use those commands. rabbitmq_api_base_url: 'http://%rabbitmq_host%:15672'
rabbitmq-queue-bundle.default_rabbitmq_producer_name
预期您至少有一个以下配置的RabbitMQ生产者
producers: default: connection: default service_alias: default_rabbitmq_producer
service_alias
的值应在rabbitmq-queue-bundle.default_rabbitmq_producer_name
参数中提供。此配置创建一个RabbitMQ生产者,它通过队列名称将任务派发到适当的队列。这也被称为RabbitMQ中的“默认”生产者。
重要通知:为您的消费者使用专用的EntityManager。
AbstractQueueConsumer应理想地使用专用的独立EntityManager来处理QueueTasks。这是为了防止当应用程序的默认EntityManager停止操作时(例如,由于“已关闭”错误或由于实体管理器的单元工作已被清除)对QueueTasks进行更改时出现问题。
为了实现这一点,请为DoctrineBundle使用多EntityManager配置,然后将专用的EntityManager提供给消费者构造函数。
示例配置
# DoctrineBundle configuration doctrine: # (...) orm: # (...) default_entity_manager: default entity_managers: default: auto_mapping: true queue_consumer: mappings: Printed\Bundle\Queue: type: annotation prefix: Printed\Bundle\Queue\Entity dir: '%kernel.project_dir%/vendor/printed/rabbitmq-queue-bundle/src/Printed/Bundle/Queue/Entity' # Registering your queue consumer e.g. in services.yml acme.queue.consumer.my_queue_task1: class: 'Acme\Bundle\Queue\Consumer\MyQueueTask1QueueConsumer' arguments: - '@doctrine.orm.default_entity_manager' - '@validator' - '@monolog.logger' - '@Psr\Container\ContainerInterface' - '@printed.bundle.queue.service.service_container_parameters' - '@doctrine.orm.queue_consumer_entity_manager' tags: - container.service_subscriber
用法
假设您已为前面的包完成配置并运行基本的Symfony演示应用程序,以下步骤应该相当容易遵循。根据需要可以做出一些小的更改,但示例旨在帮助传达要点。
示例rabbitmq.yml配置
old_sound_rabbit_mq: connections: default: host: '%rabbitmq_host%' port: '%rabbitmq_port%' user: '%rabbitmq_user%' password: '%rabbitmq_pass%' vhost: '/' lazy: true connection_timeout: 3 # this needs to be 2x of the heartbeat option read_write_timeout: 7200 # don't forget that you should enable tcp keepalive in rabbitmq as well: https://rabbitmq.cn/networking.html#socket-gen-tcp-options keepalive: true # keep this value high because https://github.com/php-amqplib/RabbitMqBundle/issues/301 heartbeat: 3600 producers: default: connection: default service_alias: default_rabbitmq_producer consumers: upload_picture: &consumer_template connection: default queue_options: { name: 'upload-picture' } callback: upload_picture_service qos_options: { prefetch_size: 0, prefetch_count: 1, global: false } graceful_max_execution: timeout: 1800 exit_code: 10 render_treasure_map: <<: *consumer_template queue_options: { name: 'render_treasure_map' } callback: render_treasure_map_service
Monolog
消费者公开了一个属性$this->logger
,它将是一个注册在Symfony中的monolog/monolog
实例,具有queue
通道。请随意处理通道,但至少需要在您的monolog配置中添加channels: [queue]
。
生产(有效载荷)
我们使用称为 payloads
的类来包含消费者的数据(也称为工作者)。当此类被构建并交给派发器时,将首先使用 symfony/validator
进行验证,然后序列化并存储到数据库中。新创建的记录的 ID 将与在抽象方法 getExchangeName
中定义的交换机一起发送到 RabbitMQ 队列。这允许在不需要担心目的地并保持代码更新时,轻松地调度 payloads。
namespace AppBundle\Queue\Payload; use Printed\Bundle\Queue\Queue\AbstractQueuePayload; use Symfony\Component\Validator\Constraints as Assert; /** * {@inheritdoc} */ class ExampleQueuePayload extends AbstractQueuePayload { /** * @Assert\NotNull() * @Assert\Type(type="integer") */ protected $data; /** * {@inheritdoc} */ public static function getExchangeName(): string { return 'example_exchange'; } // public function getData() // public function setData() // .. }
要调度此 payload,您可以使用通过此包提供的派发器类。您可以通过 printed.bundle.queue.service.queue_task_dispatcher
对容器进行访问。例如 ..
class MyServiceClass { private $queueTaskDispatcher; public function __construct(QueueTaskDispatcher $queueTaskDispatcher) { $this->queueTaskDispatcher = $queueTaskDispatcher; } public function doSomething() { // do something.. $payload = new ExampleQueuePayload; $payload->setData($myData); $this->queueTaskDispatcher->dispatch($payload); } }
services: MyServiceClass: class: MyServiceClass arguments: - '@printed.bundle.queue.service.queue_task_dispatcher'
消费(工作者)
如 php-amqplib/rabbitmq-bundle
中所述,您的消费者需要注册为服务并分配到配置中的消费者定义。这里也是一样的,但我们需要一些所有消费者都通用的参数。
services: app_bundle.queue.consumer.example_consumer: class: 'AppBundle\Queue\Consumer\ExampleQueueConsumer' arguments: - '@doctrine.orm.entity_manager' - '@validator' - '@monolog.logger.queue' - '@Psr\Container\ContainerInterface' - '@printed.bundle.queue.service.service_container_parameters' public: false tags: - container.service_subscriber
请注意,需要使用 container.service_subscriber
标签才能使用 ServiceSubscriber Symfony 功能 链接。
然后,您的工作将非常熟悉,只是进行了一些更改和增强。
namespace AppBundle\Queue\Consumer; use AppBundle\Queue\Payload\ExampleQueuePayload; use Printed\Bundle\Queue\Exception\Consumer\QueueFatalErrorException; use Printed\Bundle\Queue\Queue\AbstractQueueConsumer; use Printed\Bundle\Queue\Queue\AbstractQueuePayload; use Symfony\Component\HttpFoundation\Request; /** * {@inheritdoc} */ class ExampleQueueConsumer extends AbstractQueueConsumer { /** * {@inheritDoc} */ public static function getSubscribedServices(): array { return array_merge(parent::getSubscribedServices(), [ 'some_service' => MySomeService::class, ]); } /** * {@inheritdoc} * * @param ExampleQueuePayload $payload */ public function run(AbstractQueuePayload $payload): bool { // You hava access to all the standard things .. $someService = $this->locator->get('some_service'); $usefulParameter = $this->containerParameters->get('my_app.useful_parameter'); $this->em->persist($entity); $this->logger->info('Oh noes'); // Setting a task response data. $this->task->setResponseData(['something' => true]); // Do lots of things .. // And a few more things .. return self::TASK_COMPLETE; } }
任务必须始终以布尔值退出,true
表示通过,false
表示失败。为了使此信息更加详细,您可以使用常量 TASK_COMPLETE
和 TASK_FAILED
,它们只是那些布尔值。当将任务标记为失败时,任务对尝试的计数在将其送回队列进行另一次尝试之前会增加。每个任务都会获得由消费者中的 getAttemptLimit()
方法指定的尝试限制。您可以自由覆盖此限制,但默认限制设置为 10
。在那些您知道任务将永远失败且不希望它使用剩余尝试的情况下,您可以抛出 QueueFatalErrorException
异常。消息将被记录到常规日志中,尝试限制将被设置为最大值,这将防止作业生成更多尝试。
运行这些消费者与提到的包相同:./bin/console rabbitmq:consumer example_consumer -vv
维护和部署
为了优雅地关闭您的队列或停止其进度,我们定义了以下控制台命令。这将允许轻松维护队列消费者/工作者。
queue:maintenance:up
queue:maintenance:down
queue:maintenance:wait
本质上,queue:maintenance:up
将阻止新作业被工作者处理。当维护模式处于开启状态时,将作业发送到工作者,工作者将立即以代码 0
退出。这对于使用类似 supervisord
的东西非常有用,因为 supervisord
默认不会重启退出状态为该状态的程序。
重要的是要理解 queue:maintenance:up
的主要目的是阻止新作业被处理。该命令不是用于停止/重启工作者(尽管大多数情况下确实会发生)。请使用以下描述的“新部署检测”功能来重启工作者。
queue:maintenance:wait
将轮询数据库中的运行任务,此命令仅在没有任何任务被标记为运行时退出。使用 -r
参数可以配置它轮询数据库的秒数。此外,作为 symfony 命令,您可以在部署工具中抑制输出 -q
。
然后,当然,queue:maintenance:down
将允许队列再次运行。尽管您需要手动重启它们,因为它们已经退出了。
在您禁用维护模式之前,您需要确保所有旧空闲工作者都已重新启动。有几种方法可以做到这一点。此捆绑包为您提供了一个所有工作者在检测到新部署时退出的方法。这个特性叫做“新部署检测”,需要您调用queue:store-new-deployment-stamp-command
并传递一个可以用来比较部署的字符串(通常时间戳就足够了)。确保您配置此捆绑包以实际使用此功能。
本质上,您的构建脚本可能看起来像这样
./bin/console queue:maintenance:up
./bin/console queue:maintenance:wait -q -r 10
# deploy code
# database migrations
# cache cleaning/warming
./bin/console queue:store-new-deployment-stamp-command `date +%s`
./bin/console queue:maintenance:down
# "supervisorctl reread"
# "supervisorctl update"
由于任务及其全部负载数据都存储在数据库中,因此可以重新启动所有任务。如果您需要升级或迁移RabbitMQ实例,这将很有用。此时我们创建了此命令(抱歉),但是通过复制您最初启动任务的流程,只是循环遍历数据库中标记为pending
或status = 1
的所有条目,就足够简单了。有关更多信息,请参阅QueueTaskInterface
。
提示
RabbitMQ vhosts
如果您使用一个RabbitMQ服务器来托管应用程序的多个部署的队列(例如,在一个RabbitMQ服务器上托管测试、预发布、回归环境),此功能可以让您这样做而不会遇到队列名称冲突。
要使用它,设置以下选项
old_sound_rabbit_mq.connections.default.vhost
printedcom_rabbitmq_queue_bundle.rabbitmq_vhost
进程的Supervisord组。
这允许您将所有队列消费者收集在一个Supervisord进程组中,这反过来又允许您通过该组名称启动和停止它们,并允许您在单个机器上运行来自多个应用程序环境的队列消费者,而不会遇到名称冲突。
这也是停止您的消费者、部署您的代码、重新读取Supervisord配置以及重新启动消费者的更合适的方式。这取代了此捆绑包的“新部署检测”功能。
队列任务优先级
RabbitMQ提供了一种按不同优先级分配和处理队列任务的方法。此捆绑包了解此功能,并提供了一种利用它的方法。请参阅:AbstractQueuePayload::__construct()
,$queueMessageProperties
参数。
此功能有用之处之一是当您得出结论,您有n个消费者大部分都在做无用功,但仍然占用服务器的资源(RAM)时。您可以重新组织您的代码,使用单个队列消费者监听单个、共享的队列,并处理具有不同优先级的队列任务。此捆绑包不提供任何关于此类实现的意见,但是具有5个优先级的共享队列(最低、低、正常、高、最高)是一个合理的设置。
测试和贡献
遗憾的是目前没有测试,实际上没有什么可以测试的。但如果有任何东西要贡献,请现在打开一个pull-request。只需保持代码干净!