skrz/bunny-bundle

Symfony 扩展,让使用 RabbitMQ 变得简单。

v2.0.3 2020-01-13 10:17 UTC

README

Build Status Downloads this Month Latest stable

从 RabbitMQ 队列生成和生产类型安全的消息

安装

作为 Composer 依赖添加

$ composer require skrz/bunny-bundle

然后将 BunnyBundle 添加到 Symfony Kernel

use Skrz\Bundle\BunnyBundle\SkrzBunnyBundle;

class AppKernel
{

    public function registerBundles()
    {
        return [
            ...
            new SkrzBunnyBundle()
            ...
        ];
    }

}

用法

BunnyBundle 连接 Skrz\MetaSkrz\Bundle\AutowiringBundle,使您可以从 RabbitMQ 生成和消费类型安全的消息。

BunnyBundle 创建了 2 个新的类型(见 AutowiringBundle 的描述

  • @Consumer - 消费者开始监听指定队列/交换机上的消息。每当消息到达时,都会调用 handleMessage 方法。
  • @Producer - 生产者必须继承自 Skrz\Bundle\BunnyBundle\AbstractProducer。它们将类型安全的消息发布到指定的交换机。

BunnyBundle 添加到 Symfony kernel 时,它会注册 3 个命令

  • bunny:setup - 根据配置创建交换机、队列以及它们之间的绑定。
  • bunny:consumer - 启动指定的消费者。
  • bunny:producer - 一个用于发送 JSON 序列化消息、路由键并使用指定生产者发送它们的实用命令。这对于调试很有用。

services.yml 中设置

BunnyBundle 使用 bunny 容器扩展密钥。

bunny:
  host: %bunny.host%          # default: 127.0.0.1
  port: %bunny.port%          # default: 5672
  vhost: %bunny.vhost%        # default: /
  user: %bunny.user%          # default: guest
  password: %bunny.password%  # default: guest 
  
  # make heartbeat as long as longest message processing time in any consumer might take
  heartbeat: 120 # in seconds = 2 minutes, default: 60 seconds

  exchanges:
    change:
      durable: true  # durable means exchange won't be deleted on broker restart
      type: topic    # topic exchanges route messages by given routing key
                     # see https://rabbitmq.cn/tutorials/amqp-concepts.html#exchange-topic
                     # other possible types: direct, fanout, headers
      
    change_done:
      durable: true
      type: topic
      bindings:
        - exchange: change  # RabbitMQ-specific functionality = exchange-to-exchange bindings
          routing_key: "#" 

  queues:
    product_categorize:
      durable: true
      bindings:
        - exchange: change
          routing_key: "change.product.#"

配置所有交换机、队列以及它们之间的绑定后,运行 bunny:setup

$ ./console bunny:setup

代理实体应按配置创建。

注意,bunny:setup 不会尝试解决任何冲突的声明,例如,一次将队列声明为持久性,第二次不声明为持久性,您必须自己解决这些问题。

编写生产者

我们的示例将是数据变更的异步处理。假设您有产品和分类,并希望根据产品标题和分类标题自动将产品分类。但是,分类算法相当昂贵,因此必须异步执行。我们将任何产品或分类的变更发布到 change 交换机。

从数据模型开始

class Product
{

    /** @var int */
    protected $id;
    
    /** @var string */
    protected $title;
    
    // ... getters, setters, etc.
    
}

class Category
{

    /** @var int */
    protected $id;
    
    /** @var string */
    protected $title;
    
    // ... getters, setters, etc.

}

class Change
{

    /** @var Product  change in product */
    protected $product;
    
    /** @var Category  change in category */
    protected $category;
    
    /** @var string  which hostname the change happened on */
    protected $hostname;
    
    /** @var int  which user made the change */
    protected $userId;
    
    // ... getters, setters, etc.
    
}

生产者 - ChangeProducer - 将将变更发布到 change 交换机。生产者具有 beforeMethod 设置 - 生产者上的一个方法,在消息序列化和发送到代理之前被调用。我们将预处理消息并设置 $hostname$userId

use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorage;

/**
 * @Producer(
 *     exchange="change",
 *     beforeMethod="preProcessMessage",
 *     meta="ChangeMeta"
 * )
 */
class ChangeProducer extends AbstractProducer
{

	/**
	 * @var TokenStorage
	 *
	 * @Autowired
	 */
	public $tokenStorage;

	public function preProcessMessage(Change $change)
	{
		$change
			->setHostname(gethostname())
			->setUserId($this->tokenStorage->getToken()->getUser()->getId());
	}

}

meta 指向 *Meta 类,该类将用于序列化消息。

您可以从命令行测试生产者

$ ./console b:p --help
Usage:
 bunny:producer producer-name message [routing-key]

Arguments:
 producer-name         Name of the producer.
 message               Message JSON string.
 routing-key           Message's routing key.
$ ./console bunny:producer Change '{"product":{"id":121,"title":"Razor blades"}}' change.product.test

编写消费者

当使用 BunnyBundle 编写消费者时,请考虑以下内容:消费者可能会失败 - 失败的消费者的消息是否应该被重新投递?如果是这样,您应该在 services.yml 中创建队列并从中消费。如果不是,您应该在 @Consumer 注解中指定 exchange - 消费者启动时会创建匿名队列。

我们希望消息被重新投递,因此创建了 product_categorize 队列,消费者将从该队列中消费。

use Bunny\Client;
use Bunny\Message;

/**
 * @Consumer(
 *     queue="product_categorize",
 *     meta="ChangeMeta",
 *     maxMessages=1000,
 *     maxSeconds=3600.0,
 *     prefetchCount=1
 * )
 */
class ProductCategorizeConsumer
{
    
    public function handleMessage(Change $change, Message $message, Channel $channel)
    {
        // ... expensive product categorization algorithm ...
        
        $channel->ack($message);
    }
    
}
  • maxMessagesmaxSeconds - 您应该始终在某种管理程序下运行您的消费者,例如 supervisord。PHP 可能会泄漏内存,在处理指定数量的消息或运行指定时间后,消费者将进行清洁关闭(刷新所有消息,从 RabbitMQ 断开连接)并退出代码 0 - 管理程序应该自动重新启动它。
  • prefetchCount - 如果您有多个消费者进程并行从同一队列中消费,设置 prefetchCount=1 以在消费者之间平均分配工作

已知限制

  • 如果消息的处理时间超过心跳超时时间,RabbitMQ 将断开客户端连接,消费者进程将崩溃。这更多的是 PHP 的限制(没有线程)。心跳必须设置得足够高。

许可证

MIT 许可证。见 LICENSE 文件。