m6web/amqp-bundle

本包最新版(v6.0.0)没有可用的许可信息。

AMQP Bundle

v6.0.0 2024-03-12 10:07 UTC

README

Build Status Total Downloads License PHP Version Require

配置和文档灵感来源于videlalvaro/RabbitMqBundle

将Amqp客户端作为Symfony服务使用

AmqpBundle通过使用php-amqp 扩展将消息集成到您的应用程序中。它可以与任何AMQP spec 0-9-1兼容的服务器(如RabbitMQ、OpenAMQP和Qpid)进行通信,让您能够向任何交换机发布消息并从任何队列中消费消息。

从Symfony控制器向AMQP服务器发布消息就像这样

$msg = ["key" => "value"];
$this->myProducer->publishMessage(serialize($msg)); // where "myProducer" refers to a configured producer (see producers documentation below)

当您想从一个队列中消费消息时

$msg = $this->myConsumer->getMessage(); // where "myConsumer" refers to a configured consumer (see consumers documentation below)

AmqpBundle不提供守护进程模式来运行AMQP消费者,也不会这样做。您可以使用M6Web/DaemonBundle来实现。

安装

使用composer

composer require m6web/amqp-bundle

然后确保在您的应用程序中注册了此捆绑包

// config/bundles.php

return [
    M6Web\Bundle\AmqpBundle\M6WebAmqpBundle::class => ['all' => true],
];

用法

在您的配置文件中添加m6_web_amqp部分。

默认情况下,Symfony事件调度器会在每个命令上抛出一个事件(事件包含AMQP命令和执行它所用的时间)。要禁用此功能以及其他事件的分发

m6_web_amqp:
   event_dispatcher: false

以下是一个配置示例

m6_web_amqp:
    sandbox:
        enabled: false #optional - default false
    connections:
        default:
            host:     'localhost'  # optional - default 'localhost'
            port:     5672         # optional - default 5672
            timeout:  10           # optional - default 10 - in seconds
            login:     'guest'     # optional - default 'guest'
            password: 'guest'      # optional - default 'guest'
            vhost:    '/'          # optional - default '/'
            lazy:     false        # optional - default false
    producers:
        myproducer:
            class: "My\\Provider\\Class"                           # optional - to overload the default provider class
            connection: myconnection                               # require
            queue_options:
                name: 'my-queue'                                   # optional
                passive: bool                                      # optional - defaut false
                durable: bool                                      # optional - defaut true
                auto_delete: bool                                  # optional - defaut false
            exchange_options:
                name: 'myexchange'                                 # require
                type: direct/fanout/headers/topic                  # require
                passive: bool                                      # optional - defaut false
                durable: bool                                      # optional - defaut true
                auto_delete: bool                                  # optional - defaut false
                arguments: { key: value }                          # optional - default { } - Please refer to the documentation of your broker for information about the arguments.
                routing_keys: ['routingkey', 'routingkey2']        # optional - default { }
                publish_attributes: { key: value }                 # optional - default { } - possible keys: content_type, content_encoding, message_id, user_id, app_id, delivery_mode,
                                                                   #                          priority, timestamp, expiration, type, reply_to, headers.

    consumers:
        myconsumer:
            class: "My\\Provider\\Class"                      # optional - to overload the default consumer class
            connection: default
            exchange_options:
                name: 'myexchange'                            # require
            queue_options:
                name: 'myqueue'                               # require
                passive: bool                                 # optional - defaut false
                durable: bool                                 # optional - defaut true
                exclusive: bool                               # optional - defaut false
                auto_delete: bool                             # optional - defaut false
                arguments: { key: value }                     # optional - default { } - Please refer to the documentation of your broker for information about the arguments.
                                                              #                          RabbitMQ ex: {'x-ha-policy': 'all', 'x-dead-letter-routing-key': 'async.dead',
                                                              #                                      'x-dead-letter-exchange': 'async_dead', 'x-message-ttl': 20000}
                routing_keys: ['routingkey', 'routingkey2']   # optional - default { }
            qos_options:
                prefetch_size: integer                        # optional - default 0
                prefetch_count: integer                       # optional - default 0

在此,我们配置了应用程序将拥有的连接服务和消息端点。

可以通过使用m6_web_amqp.locator通过getConsumer和getProducer检索生产者和消费者服务。

在此示例中,您的服务容器将包含服务m6_web_amqp.producer.myproducerm6_web_amqp.consumer.myconsumer

生产者

将使用生产者将消息发送到服务器。

假设您想发布一条消息,并且已经配置了一个名为myproducer的生产者(就像上面那样),那么您只需在需要使用它的任何地方注入m6_web_amqp.producer.myproducer服务即可

App\TheClassWhereIWantToInjectMyProducer:
    arguments: ['@m6_web_amqp.producer.myproducer']
private $myProducer;

public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Producer $myProducer) {
    $this->myProducer = $myProducer;
}

public function myFunction() {
    $msg = ["key" => "value"];
    $this->myProducer->publishMessage(serialize($msg));
}

否则,您可以使用m6_web_amqp.locator

App\TheClassWhereIWantToRetriveMyConsumer:
    arguments: ['@m6_web_amqp.locator']
private $myProducer;

public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Locator $locator) {
    $this->locator = $locator;
}

public function myFunction() {
    $this->locator->getProducer('m6_web_amqp.produer.myproducer');
}

在AMQP模型中,消息被发送到交换机,这意味着在生产者的配置中,您必须指定连接选项以及exchange_options

如果您需要为每条消息添加默认发布属性,publish_attributes选项可以像这样

publish_attributes: { 'content_type': 'application/json', 'delivery_mode': 'persistent', 'priority': 8,  'expiration': '3200'}

如果您不想使用配置来定义路由键(例如,如果它应该为每条消息计算),您可以在调用publishMessage()时定义它

$routingKey = $this->computeRoutingKey($message);
$this->get('m6_web_amqp.producer.myproducer')->publishMessage($message, AMQP_NOPARAM, [], [$routingKey]);

消费者

将使用消费者从队列中获取消息。

假设您想消费一条消息,并且已经配置了一个名为myconsumer的消费者(就像上面那样),那么您只需在需要使用它的任何地方注入m6_web_amqp.consumer.myconsumer服务即可

App\TheClassWhereIWantToInjectMyConsumer:
    arguments: ['@m6_web_amqp.consumer.myconsumer']
private $myConsumer;

public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Consumer $myConsumer) {
    $this->myConsumer = $myConsumer;
}

public function myFunction() {
    $this->myConsumer->getMessage();
}

否则,您可以使用m6_web_amqp.locator

App\TheClassWhereIWantToRetriveMyConsumer:
    arguments: ['@m6_web_amqp.locator']
private $myConsumer;

public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Locator $locator) {
    $this->locator = $locator;
}

public function myFunction() {
    $this->locator->getConsumer('m6_web_amqp.consumer.myconsumer');
}

消费者不会等待消息:如果不可用,getMessage将立即返回null,如果可以消费,则返回AMQPEnvelope对象。getMessage的“flags”参数接受AMQP_AUTOACK(默认自动确认)或AMQP_NOPARAM(手动确认)。

要手动确认消息,请使用消费者的ackMessage/nackMessage方法,并使用从AMQPEnvelope对象中获取的delivery_tag参数的值。如果您选择不确认消息,nackMessage的第二个参数接受AMQP_REQUEUE来重新排队消息或AMQP_NOPARAM来忘记它。

小心处理qos参数,你应该知道它可能会影响你的性能。请阅读这篇文章。此外,目前PHP的amqp扩展中没有任何global参数可用。

懒连接

强烈建议在配置文件中将所有连接设置为lazy: true。这将防止应用程序在每次请求时连接到RabbitMQ。

如果您想使用懒连接,必须在composer.json文件中添加"ocramius/proxy-manager": "~1.0",并且(如前所述)将您的连接设置为lazy: true

数据收集器

如果kernel.debugtrue,则默认启用数据收集器。通常在开发环境中。

Docker

如果您有一个多容器应用程序,我们提供了一个包含rabbitmq-server的容器Dockerfile。这个容器仅用于测试。

docker-compose.yml示例

web:
    build: .
    volumes:
        - .:/var/www
    links:
        - rabbitmq:rabbitmq.local

rabbitmq:
    build: vendor/m6web/amqp-bundle/
    ports:
        - "15672:15672"
        - "5672:5672"

测试

如果您在应用程序测试中使用这个库,您将需要一个正在运行的rabbitmq实例。如果您不想测试rabbitmq的生产者和消费者,您可以使用沙盒模式。

m6_web_amqp:
    sandbox:
        enabled: true

在此模式下,将不会建立与rabbitmq服务器的连接。生产者默默地接受消息,消费者默默地假设没有消息可用。