m6web / amqp-bundle
AMQP Bundle
Requires
- php: ^8.0
- ext-amqp: >=2.0
- symfony/dependency-injection: ^5.4 || ^6.4 || ^7.0
- symfony/framework-bundle: ^5.4 || ^6.4 || ^7.0
- symfony/http-kernel: ^5.4 || ^6.4 || ^7.0
- symfony/yaml: ^5.4 || 6.4 || ^7.0
- twig/twig: ^2.13 || ^3.0
Requires (Dev)
- atoum/atoum: ~4.0
- m6web/php-cs-fixer-config: ^3.2
- phpstan/phpstan: ^1.10
Suggests
- ocramius/proxy-manager: Required for lazy connections
- v6.0.0
- v5.0.0
- v4.0.0
- dev-master / 3.x-dev
- v3.2.0
- v3.1.2
- v3.1.1
- v3.1.0
- v3.0.4
- v3.0.3
- v3.0.2
- v3.0.1
- v3.0.0
- v2.4.0
- v2.3.1
- v2.3.0
- v2.2.1
- v2.2.0
- v2.1.0
- v2.0.0
- v1.10.2
- v1.10.1
- v1.10.0
- v1.9.1
- v1.9.0
- v1.8.1
- v1.8.0
- v1.7.1
- v1.7.0
- v1.6.1
- v1.6.0
- v1.5.1
- v1.5.0
- v1.4.0
- v1.3.1
- v1.3.0
- v1.2.1
- v1.2.0
- v1.1.1
- v1.1.0
- v1.0.0
This package is auto-updated.
Last update: 2024-09-12 11:18:03 UTC
README
配置和文档灵感来源于videlalvaro/RabbitMqBundle。
将Amqp客户端作为Symfony服务使用
AmqpBundle通过使用php-amqp 扩展将消息集成到您的应用程序中。它可以与任何AMQP spec 0-9-1兼容的服务器(如RabbitMQ、OpenAMQP和Qpid)进行通信,让您能够向任何交换机发布消息并从任何队列中消费消息。
从Symfony控制器向AMQP服务器发布消息就像这样
$msg = ["key" => "value"]; $this->myProducer->publishMessage(serialize($msg)); // where "myProducer" refers to a configured producer (see producers documentation below)
当您想从一个队列中消费消息时
$msg = $this->myConsumer->getMessage(); // where "myConsumer" refers to a configured consumer (see consumers documentation below)
AmqpBundle不提供守护进程模式来运行AMQP消费者,也不会这样做。您可以使用M6Web/DaemonBundle来实现。
安装
使用composer
composer require m6web/amqp-bundle
然后确保在您的应用程序中注册了此捆绑包
// config/bundles.php return [ M6Web\Bundle\AmqpBundle\M6WebAmqpBundle::class => ['all' => true], ];
用法
在您的配置文件中添加m6_web_amqp
部分。
默认情况下,Symfony事件调度器会在每个命令上抛出一个事件(事件包含AMQP命令和执行它所用的时间)。要禁用此功能以及其他事件的分发
m6_web_amqp: event_dispatcher: false
以下是一个配置示例
m6_web_amqp: sandbox: enabled: false #optional - default false connections: default: host: 'localhost' # optional - default 'localhost' port: 5672 # optional - default 5672 timeout: 10 # optional - default 10 - in seconds login: 'guest' # optional - default 'guest' password: 'guest' # optional - default 'guest' vhost: '/' # optional - default '/' lazy: false # optional - default false producers: myproducer: class: "My\\Provider\\Class" # optional - to overload the default provider class connection: myconnection # require queue_options: name: 'my-queue' # optional passive: bool # optional - defaut false durable: bool # optional - defaut true auto_delete: bool # optional - defaut false exchange_options: name: 'myexchange' # require type: direct/fanout/headers/topic # require passive: bool # optional - defaut false durable: bool # optional - defaut true auto_delete: bool # optional - defaut false arguments: { key: value } # optional - default { } - Please refer to the documentation of your broker for information about the arguments. routing_keys: ['routingkey', 'routingkey2'] # optional - default { } publish_attributes: { key: value } # optional - default { } - possible keys: content_type, content_encoding, message_id, user_id, app_id, delivery_mode, # priority, timestamp, expiration, type, reply_to, headers. consumers: myconsumer: class: "My\\Provider\\Class" # optional - to overload the default consumer class connection: default exchange_options: name: 'myexchange' # require queue_options: name: 'myqueue' # require passive: bool # optional - defaut false durable: bool # optional - defaut true exclusive: bool # optional - defaut false auto_delete: bool # optional - defaut false arguments: { key: value } # optional - default { } - Please refer to the documentation of your broker for information about the arguments. # RabbitMQ ex: {'x-ha-policy': 'all', 'x-dead-letter-routing-key': 'async.dead', # 'x-dead-letter-exchange': 'async_dead', 'x-message-ttl': 20000} routing_keys: ['routingkey', 'routingkey2'] # optional - default { } qos_options: prefetch_size: integer # optional - default 0 prefetch_count: integer # optional - default 0
在此,我们配置了应用程序将拥有的连接服务和消息端点。
可以通过使用m6_web_amqp.locator
通过getConsumer和getProducer检索生产者和消费者服务。
在此示例中,您的服务容器将包含服务m6_web_amqp.producer.myproducer
和m6_web_amqp.consumer.myconsumer
。
生产者
将使用生产者将消息发送到服务器。
假设您想发布一条消息,并且已经配置了一个名为myproducer
的生产者(就像上面那样),那么您只需在需要使用它的任何地方注入m6_web_amqp.producer.myproducer
服务即可
App\TheClassWhereIWantToInjectMyProducer: arguments: ['@m6_web_amqp.producer.myproducer']
private $myProducer; public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Producer $myProducer) { $this->myProducer = $myProducer; } public function myFunction() { $msg = ["key" => "value"]; $this->myProducer->publishMessage(serialize($msg)); }
否则,您可以使用m6_web_amqp.locator
App\TheClassWhereIWantToRetriveMyConsumer: arguments: ['@m6_web_amqp.locator']
private $myProducer; public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Locator $locator) { $this->locator = $locator; } public function myFunction() { $this->locator->getProducer('m6_web_amqp.produer.myproducer'); }
在AMQP模型中,消息被发送到交换机,这意味着在生产者的配置中,您必须指定连接选项以及exchange_options
。
如果您需要为每条消息添加默认发布属性,publish_attributes
选项可以像这样
publish_attributes: { 'content_type': 'application/json', 'delivery_mode': 'persistent', 'priority': 8, 'expiration': '3200'}
如果您不想使用配置来定义路由键(例如,如果它应该为每条消息计算),您可以在调用publishMessage()
时定义它
$routingKey = $this->computeRoutingKey($message); $this->get('m6_web_amqp.producer.myproducer')->publishMessage($message, AMQP_NOPARAM, [], [$routingKey]);
消费者
将使用消费者从队列中获取消息。
假设您想消费一条消息,并且已经配置了一个名为myconsumer
的消费者(就像上面那样),那么您只需在需要使用它的任何地方注入m6_web_amqp.consumer.myconsumer
服务即可
App\TheClassWhereIWantToInjectMyConsumer: arguments: ['@m6_web_amqp.consumer.myconsumer']
private $myConsumer; public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Consumer $myConsumer) { $this->myConsumer = $myConsumer; } public function myFunction() { $this->myConsumer->getMessage(); }
否则,您可以使用m6_web_amqp.locator
App\TheClassWhereIWantToRetriveMyConsumer: arguments: ['@m6_web_amqp.locator']
private $myConsumer; public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Locator $locator) { $this->locator = $locator; } public function myFunction() { $this->locator->getConsumer('m6_web_amqp.consumer.myconsumer'); }
消费者不会等待消息:如果不可用,getMessage将立即返回null,如果可以消费,则返回AMQPEnvelope对象。getMessage的“flags”参数接受AMQP_AUTOACK(默认自动确认)或AMQP_NOPARAM(手动确认)。
要手动确认消息,请使用消费者的ackMessage/nackMessage方法,并使用从AMQPEnvelope对象中获取的delivery_tag参数的值。如果您选择不确认消息,nackMessage的第二个参数接受AMQP_REQUEUE来重新排队消息或AMQP_NOPARAM来忘记它。
小心处理qos参数,你应该知道它可能会影响你的性能。请阅读这篇文章。此外,目前PHP的amqp
扩展中没有任何global
参数可用。
懒连接
强烈建议在配置文件中将所有连接设置为lazy: true
。这将防止应用程序在每次请求时连接到RabbitMQ。
如果您想使用懒连接,必须在composer.json文件中添加"ocramius/proxy-manager": "~1.0"
,并且(如前所述)将您的连接设置为lazy: true
。
数据收集器
如果kernel.debug
为true
,则默认启用数据收集器。通常在开发环境中。
Docker
如果您有一个多容器应用程序,我们提供了一个包含rabbitmq-server的容器Dockerfile。这个容器仅用于测试。
docker-compose.yml示例
web:
build: .
volumes:
- .:/var/www
links:
- rabbitmq:rabbitmq.local
rabbitmq:
build: vendor/m6web/amqp-bundle/
ports:
- "15672:15672"
- "5672:5672"
测试
如果您在应用程序测试中使用这个库,您将需要一个正在运行的rabbitmq实例。如果您不想测试rabbitmq的生产者和消费者,您可以使用沙盒模式。
m6_web_amqp: sandbox: enabled: true
在此模式下,将不会建立与rabbitmq服务器的连接。生产者默默地接受消息,消费者默默地假设没有消息可用。