thomasvargiu / rabbitmq-module
将php-amqplib与Zend Framework 2和RabbitMq集成
5.0.0
2023-01-13 10:33 UTC
Requires
- php: ^7.4 || ^8.0
- laminas/laminas-serializer: ^2.10.0
- laminas/laminas-servicemanager: ^3.11
- laminas/laminas-stdlib: ^3.3.0
- php-amqplib/php-amqplib: ^3.0.0
- psr/container: ^1.0 || ^2.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.11
- laminas/laminas-cli: ^1.0
- phpspec/prophecy: ^1.15.0
- phpspec/prophecy-phpunit: ^2.0.1
- phpunit/phpunit: ^9.5.24
- vimeo/psalm: ^4.27
This package is auto-updated.
Last update: 2024-09-13 14:26:52 UTC
README
将php-amqplib与Laminas/Mezzio框架和RabbitMq集成。
受RabbitMqBundle for Symfony 2的启发
用法
连接
您可以在配置中配置多个连接
return [ 'rabbitmq' => [ 'connection' => [ // connection name 'default' => [ // default values 'type' => 'stream', // Available: stream, socket, ssl, lazy 'host' => 'localhost', 'port' => 5672, 'username' => 'guest', 'password' => 'guest', 'vhost' => '/', 'insist' => false, 'read_write_timeout' => 2, 'keep_alive' => false, 'connection_timeout' => 3, 'heartbeat' => 0 ] ] ] ]
选项类
您可以在这里找到所有可用的选项
检索服务
您可以从服务定位器检索连接
// Getting the 'default' connection /** @var \Laminas\ServiceManager\ServiceLocatorInterface $serviceLocator **/ $connection = $serviceLocator->get('rabbitmq.connection.default');
生产者
您可以在配置中配置多个生产者
return [ 'rabbitmq' => [ 'producer' => [ 'producer_name' => [ 'connection' => 'default', // the connection name 'exchange' => [ 'type' => 'direct', 'name' => 'exchange-name', 'durable' => true, // (default) 'auto_delete' => false, // (default) 'internal' => false, // (default) 'declare' => true, // (default) 'arguments' => [], // (default) 'ticket' => 0, // (default) 'exchange_binds' => [] // (default) ], 'queue' => [ // optional queue 'name' => 'queue-name', // can be an empty string, 'passive' => false, // (default) 'durable' => true, // (default) 'auto_delete' => false, // (default) 'exclusive' => false, // (default) 'arguments' => [], // (default) 'ticket' => 0, // (default) 'routing_keys' => [] // (default) ], 'auto_setup_fabric_enabled' => true // auto-setup exchanges and queues ] ] ] ]
选项类
您可以在这里找到所有可用的选项
检索服务
您可以从服务定位器检索生产者
// Getting a producer /** @var \Laminas\ServiceManager\ServiceLocatorInterface $serviceLocator **/ /** @var \RabbitMqModule\ProducerInterface $producer **/ $producer = $serviceLocator->get('rabbitmq.producer.producer_name'); // Sending a message $producer->publish(json_encode(['foo' => 'bar']));
消费者
您可以在配置中配置多个消费者
return [ 'rabbitmq' => [ 'consumer' => [ 'consumer_name' => [ 'description' => 'Consumer description', 'connection' => 'default', // the connection name 'exchange' => [ 'type' => 'direct', 'name' => 'exchange-name' ], 'queue' => [ 'name' => 'queue-name', // can be an empty string, 'routing_keys' => [ // optional routing keys ] ], 'auto_setup_fabric_enabled' => true, // auto-setup exchanges and queues 'qos' => [ // optional QOS options for RabbitMQ 'prefetch_size' => 0, 'prefetch_count' => 1, 'global' => false ], 'callback' => 'my-service-name', ] ] ] ]
选项类
您可以在这里找到所有可用的选项
回调
callback
键必须包含以下之一
- 一个
callable
:一个闭包或可调用的对象,它接收一个PhpAmqpLib\Message\AMQPMessage
对象。 RabbitMqModule\\ConsumerInterface
的一个实例。- 服务定位器中的一个字符串服务名称(可以是任何
callable
或RabbitMqModule\\ConsumerInterface
的实例)。
请查看RabbitMqModule\\ConsumerInterface
类常量以获取可用的返回值。
如果您的回调返回false
,则消息将被拒绝并重新入队。
如果您的回调返回任何其他不同于false
且与ConsumerInterface
常量之一不同的值,则默认响应类似于MSG_ACK
常量。
检索服务
您可以从服务定位器检索消费者
// Getting a consumer /** @var \Laminas\ServiceManager\ServiceLocatorInterface $serviceLocator **/ /** @var \RabbitMqModule\Consumer $consumer **/ $consumer = $serviceLocator->get('rabbitmq.consumer.consumer_name'); // Start consumer $consumer->consume();
有一个控制台命令可用于列出并启动消费者。见下文。
消费者示例
use PhpAmqpLib\Message\AMQPMessage; use RabbitMqModule\ConsumerInterface; class FetchProposalsConsumer implements ConsumerInterface { /** * @param AMQPMessage $message * * @return int */ public function execute(AMQPMessage $message) { $data = json_decode($message->body, true); try { // do something... } catch (\PDOException $e) { return ConsumerInterface::MSG_REJECT_REQUEUE; } catch (\Exception $e) { return ConsumerInterface::MSG_REJECT; } return ConsumerInterface::MSG_ACK; } }
交换2交换绑定
您可以在生产者或消费者中配置exchange2exchange绑定。示例
return [ 'rabbitmq' => [ 'consumer' => [ 'consumer_name' => [ // ... 'exchange' => [ 'type' => 'fanout', 'name' => 'exchange_to_bind_to', 'exchange_binds' => [ [ 'exchange' => [ 'type' => 'fanout', 'name' => 'main_exchange' ], 'routing_keys' => [ '#' ] ] ] ], ] ] ] ]
控制台使用
有一些控制台命令可用
rabbitmq:fabric:setup
:为每个服务设置fabric,声明交换和队列rabbitmq:consumers:list
:列出可用的消费者rabbitmq:consumers:start <name> [--without-signals|-w]
:按名称启动消费者rabbitmq:rpc-server:start <name> [--without-signals|-w]
:按名称启动rpc服务器rabbitmq:producer:publish <name> [--route=] <msg>
:使用生产者发送消息
示例
vendor/bin/laminas rabbitmq:producer:publish my_producer "Hello world!"