astepin / rabbitmq-module
集成php-amqplib与Zend Framework 3和RabbitMq
2.0.2
2017-10-25 14:04 UTC
Requires
- php: ^7.0
- ext-amqp: *
- php-amqplib/php-amqplib: ~2.6
- zendframework/zend-console: ~2.6
- zendframework/zend-modulemanager: ~2.7
- zendframework/zend-mvc-console: ~1.1.10
- zendframework/zend-router: ~3.0
- zendframework/zend-serializer: ~2.8
- zendframework/zend-servicemanager: ~3.1
- zendframework/zend-stdlib: ~3.0
Requires (Dev)
- empi89/php-amqp-stubs: dev-master
- friendsofphp/php-cs-fixer: @stable
- mockery/mockery: ~1.0
- phpmd/phpmd: ~2.4
- phpunit/phpunit: >=6.0
- squizlabs/php_codesniffer: ~3.1
- zendframework/zend-test: ~3.0
Suggests
- ext-pcntl: Allows signal handling and process termination
README
集成php-amqplib与Zend Framework 3和RabbitMq。
灵感来源于RabbitMqBundle for Symfony 2
使用
连接
您可以在配置中配置多个连接
return [ 'rabbitmq_module' => [ 'connection' => [ // connection name 'default' => [ // default values 'type' => 'stream', // Available: stream, socket, ssl, lazy 'host' => 'localhost', 'port' => 5672, 'username' => 'guest', 'password' => 'guest', 'vhost' => '/', 'insist' => false, 'read_write_timeout' => 2, 'keep_alive' => false, 'connection_timeout' => 3, 'heartbeat' => 0 ] ] ] ]
选项类
您可以在此处找到所有可用选项
获取服务
您可以从服务定位器中获取连接
// Getting the 'default' connection /** @var \Zend\ServiceManager\ServiceLocatorInterface $serviceLocator **/ $connection = $serviceLocator->get('rabbitmq.connection.default');
生产者
您可以在配置中配置多个生产者
return [ 'rabbitmq_module' => [ 'producer' => [ 'producer_name' => [ 'connection' => 'default', // the connection name 'exchange' => [ 'type' => 'direct', 'name' => 'exchange-name', 'durable' => true, // (default) 'auto_delete' => false, // (default) 'internal' => false, // (default) 'no_wait' => false, // (default) 'declare' => true, // (default) 'arguments' => [], // (default) 'ticket' => 0, // (default) 'exchange_binds' => [] // (default) ], 'queue' => [ // optional queue 'name' => 'queue-name' // can be an empty string, 'type' => null, // (default) 'passive' => false, // (default) 'durable' => true, // (default) 'auto_delete' => false, // (default) 'exclusive' => false, // (default) 'no_wait' => false, // (default) 'arguments' => [], // (default) 'ticket' => 0, // (default) 'routing_keys' => [] // (default) ], 'auto_setup_fabric_enabled' => true // auto-setup exchanges and queues ] ] ] ]
选项类
您可以在此处找到所有可用选项
获取服务
您可以从服务定位器中获取生产者
// Getting a producer /** @var \Zend\ServiceManager\ServiceLocatorInterface $serviceLocator **/ /** @var \RabbitMqModule\ProducerInterface $producer **/ $producer = $serviceLocator->get('rabbitmq.producer.producer_name'); // Sending a message $producer->publish(json_encode(['foo' => 'bar']));
消费者
您可以在配置中配置多个消费者
return [ 'rabbitmq_module' => [ 'consumer' => [ 'consumer_name' => [ 'description' => 'Consumer description', 'connection' => 'default', // the connection name 'exchange' => [ 'type' => 'direct', 'name' => 'exchange-name' ], 'queue' => [ 'name' => 'queue-name' // can be an empty string, 'routing_keys' => [ // optional routing keys ] ], 'auto_setup_fabric_enabled' => true, // auto-setup exchanges and queues 'qos' => [ // optional QOS options for RabbitMQ 'prefetch_size' => 0, 'prefetch_count' => 1, 'global' => false ], 'callback' => 'my-service-name', ] ] ] ]
选项类
您可以在此处找到所有可用选项
回调
callback
键必须包含以下之一
- 一个
callable
:一个闭包或可调用的对象,接收一个PhpAmqpLib\Message\AMQPMessage
对象。 RabbitMqModule\\ConsumerInterface
的一个实例。- 服务定位器中的字符串服务名称(可以是任何
callable
或RabbitMqModule\\ConsumerInterface
的实例)。
请查看RabbitMqModule\\ConsumerInterface
类常量以获取可用返回值。
如果您的回调返回false
,则消息将被拒绝并重新入队。
如果您的回调返回任何其他不同于false
的值,并且是ConsumerInterface
常量之一,则默认响应类似于MSG_ACK
常量。
获取服务
您可以从服务定位器中获取消费者
// Getting a consumer /** @var \Zend\ServiceManager\ServiceLocatorInterface $serviceLocator **/ /** @var \RabbitMqModule\Consumer $consumer **/ $consumer = $serviceLocator->get('rabbitmq.consumer.consumer_name'); // Start consumer $consumer->consume();
有一个控制台命令可用,用于列出和启动消费者。请参见以下内容。
消费者示例
use PhpAmqpLib\Message\AMQPMessage; use RabbitMqModule\ConsumerInterface; class FetchProposalsConsumer implements ConsumerInterface { /** * @param AMQPMessage $message * * @return int */ public function execute(AMQPMessage $message) { $data = json_decode($message->body, true); try { // do something... } catch (\PDOException $e) { return ConsumerInterface::MSG_REJECT_REQUEUE; } catch (\Exception $e) { return ConsumerInterface::MSG_REJECT; } return ConsumerInterface::MSG_ACK; } }
交换2交换绑定
您可以在生产者或消费者中配置exchange2exchange绑定。示例
return [ 'rabbitmq_module' => [ 'consumer' => [ 'consumer_name' => [ // ... 'exchange' => [ 'type' => 'fanout', 'name' => 'exchange_to_bind_to', 'exchange_binds' => [ [ 'exchange' => [ 'type' => 'fanout', 'name' => 'main_exchange' ], 'routing_keys' => [ '#' ] ] ] ], ] ] ] ]
控制台使用
有一些控制台命令可用
rabbitmq-module setup-fabric
:为每个服务设置fabric,声明交换和队列rabbitmq-module list consumers
:列出可用消费者rabbitmq-module consumer <name> [--without-signals|-w]
:通过名称启动消费者rabbitmq-module rpc_server <name> [--without-signals|-w]
:通过名称启动rpc服务器rabbitmq-module stdin-producer <name> [--route=] <msg>
:使用生产者发送消息