skrz / bunny-bundle
Symfony 扩展,让使用 RabbitMQ 变得简单。
Requires
- php: >=5.5.9
- bunny/bunny: ~0.2.2|~0.4
- skrz/autowiring-bundle: ~1.1|~2.0
- skrz/meta: >=3.0, <4.0
Requires (Dev)
- phpunit/phpunit: ~4.6
- symfony/framework-bundle: ~2.7|~3.0|~4.0
- symfony/yaml: ~2.7|~3.0|~4.0
This package is not auto-updated.
Last update: 2024-09-16 11:06:15 UTC
README
从 RabbitMQ 队列生成和生产类型安全的消息
安装
作为 Composer 依赖添加
$ composer require skrz/bunny-bundle
然后将 BunnyBundle
添加到 Symfony Kernel
use Skrz\Bundle\BunnyBundle\SkrzBunnyBundle; class AppKernel { public function registerBundles() { return [ ... new SkrzBunnyBundle() ... ]; } }
用法
BunnyBundle
连接 Skrz\Meta
和 Skrz\Bundle\AutowiringBundle
,使您可以从 RabbitMQ 生成和消费类型安全的消息。
BunnyBundle
创建了 2 个新的类型(见 AutowiringBundle
的描述)
@Consumer
- 消费者开始监听指定队列/交换机上的消息。每当消息到达时,都会调用handleMessage
方法。@Producer
- 生产者必须继承自Skrz\Bundle\BunnyBundle\AbstractProducer
。它们将类型安全的消息发布到指定的交换机。
当 BunnyBundle
添加到 Symfony kernel 时,它会注册 3 个命令
bunny:setup
- 根据配置创建交换机、队列以及它们之间的绑定。bunny:consumer
- 启动指定的消费者。bunny:producer
- 一个用于发送 JSON 序列化消息、路由键并使用指定生产者发送它们的实用命令。这对于调试很有用。
在 services.yml
中设置
BunnyBundle
使用 bunny
容器扩展密钥。
bunny: host: %bunny.host% # default: 127.0.0.1 port: %bunny.port% # default: 5672 vhost: %bunny.vhost% # default: / user: %bunny.user% # default: guest password: %bunny.password% # default: guest # make heartbeat as long as longest message processing time in any consumer might take heartbeat: 120 # in seconds = 2 minutes, default: 60 seconds exchanges: change: durable: true # durable means exchange won't be deleted on broker restart type: topic # topic exchanges route messages by given routing key # see https://rabbitmq.cn/tutorials/amqp-concepts.html#exchange-topic # other possible types: direct, fanout, headers change_done: durable: true type: topic bindings: - exchange: change # RabbitMQ-specific functionality = exchange-to-exchange bindings routing_key: "#" queues: product_categorize: durable: true bindings: - exchange: change routing_key: "change.product.#"
配置所有交换机、队列以及它们之间的绑定后,运行 bunny:setup
$ ./console bunny:setup
代理实体应按配置创建。
注意,bunny:setup
不会尝试解决任何冲突的声明,例如,一次将队列声明为持久性,第二次不声明为持久性,您必须自己解决这些问题。
编写生产者
我们的示例将是数据变更的异步处理。假设您有产品和分类,并希望根据产品标题和分类标题自动将产品分类。但是,分类算法相当昂贵,因此必须异步执行。我们将任何产品或分类的变更发布到 change
交换机。
从数据模型开始
class Product { /** @var int */ protected $id; /** @var string */ protected $title; // ... getters, setters, etc. } class Category { /** @var int */ protected $id; /** @var string */ protected $title; // ... getters, setters, etc. } class Change { /** @var Product change in product */ protected $product; /** @var Category change in category */ protected $category; /** @var string which hostname the change happened on */ protected $hostname; /** @var int which user made the change */ protected $userId; // ... getters, setters, etc. }
生产者 - ChangeProducer
- 将将变更发布到 change
交换机。生产者具有 beforeMethod
设置 - 生产者上的一个方法,在消息序列化和发送到代理之前被调用。我们将预处理消息并设置 $hostname
和 $userId
。
use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorage; /** * @Producer( * exchange="change", * beforeMethod="preProcessMessage", * meta="ChangeMeta" * ) */ class ChangeProducer extends AbstractProducer { /** * @var TokenStorage * * @Autowired */ public $tokenStorage; public function preProcessMessage(Change $change) { $change ->setHostname(gethostname()) ->setUserId($this->tokenStorage->getToken()->getUser()->getId()); } }
meta
指向 *Meta
类,该类将用于序列化消息。
您可以从命令行测试生产者
$ ./console b:p --help Usage: bunny:producer producer-name message [routing-key] Arguments: producer-name Name of the producer. message Message JSON string. routing-key Message's routing key. $ ./console bunny:producer Change '{"product":{"id":121,"title":"Razor blades"}}' change.product.test
编写消费者
当使用 BunnyBundle
编写消费者时,请考虑以下内容:消费者可能会失败 - 失败的消费者的消息是否应该被重新投递?如果是这样,您应该在 services.yml
中创建队列并从中消费。如果不是,您应该在 @Consumer
注解中指定 exchange
- 消费者启动时会创建匿名队列。
我们希望消息被重新投递,因此创建了 product_categorize
队列,消费者将从该队列中消费。
use Bunny\Client; use Bunny\Message; /** * @Consumer( * queue="product_categorize", * meta="ChangeMeta", * maxMessages=1000, * maxSeconds=3600.0, * prefetchCount=1 * ) */ class ProductCategorizeConsumer { public function handleMessage(Change $change, Message $message, Channel $channel) { // ... expensive product categorization algorithm ... $channel->ack($message); } }
maxMessages
和maxSeconds
- 您应该始终在某种管理程序下运行您的消费者,例如 supervisord。PHP 可能会泄漏内存,在处理指定数量的消息或运行指定时间后,消费者将进行清洁关闭(刷新所有消息,从 RabbitMQ 断开连接)并退出代码0
- 管理程序应该自动重新启动它。prefetchCount
- 如果您有多个消费者进程并行从同一队列中消费,设置prefetchCount=1
以在消费者之间平均分配工作
已知限制
- 如果消息的处理时间超过心跳超时时间,RabbitMQ 将断开客户端连接,消费者进程将崩溃。这更多的是 PHP 的限制(没有线程)。心跳必须设置得足够高。
许可证
MIT 许可证。见 LICENSE
文件。