thomasvargiu/rabbitmq-module

将php-amqplib与Zend Framework 2和RabbitMq集成

5.0.0 2023-01-13 10:33 UTC

README

将php-amqplib与Laminas/Mezzio框架和RabbitMq集成。

RabbitMqBundle for Symfony 2的启发

用法

连接

您可以在配置中配置多个连接

return [
    'rabbitmq' => [
        'connection' => [
            // connection name
            'default' => [ // default values
                'type' => 'stream', // Available: stream, socket, ssl, lazy
                'host' => 'localhost',
                'port' => 5672,
                'username' => 'guest',
                'password' => 'guest',
                'vhost' => '/',
                'insist' => false,
                'read_write_timeout' => 2,
                'keep_alive' => false,
                'connection_timeout' => 3,
                'heartbeat' => 0
            ]
        ]
    ]
]

选项类

您可以在这里找到所有可用的选项

检索服务

您可以从服务定位器检索连接

// Getting the 'default' connection
/** @var \Laminas\ServiceManager\ServiceLocatorInterface $serviceLocator **/
$connection = $serviceLocator->get('rabbitmq.connection.default');

生产者

您可以在配置中配置多个生产者

return [
    'rabbitmq' => [
        'producer' => [
            'producer_name' => [
                'connection' => 'default', // the connection name
                'exchange' => [
                    'type' => 'direct',
                    'name' => 'exchange-name',
                    'durable' => true,      // (default)
                    'auto_delete' => false, // (default)
                    'internal' => false,    // (default)
                    'declare' => true,      // (default)
                    'arguments' => [],      // (default)
                    'ticket' => 0,          // (default)
                    'exchange_binds' => []  // (default)
                ],
                'queue' => [ // optional queue
                    'name' => 'queue-name', // can be an empty string,
                    'passive' => false,     // (default)
                    'durable' => true,      // (default)
                    'auto_delete' => false, // (default)
                    'exclusive' => false,   // (default)
                    'arguments' => [],      // (default)
                    'ticket' => 0,          // (default)
                    'routing_keys' => []    // (default)
                ],
                'auto_setup_fabric_enabled' => true // auto-setup exchanges and queues 
            ]
        ]
    ]
]

选项类

您可以在这里找到所有可用的选项

检索服务

您可以从服务定位器检索生产者

// Getting a producer
/** @var \Laminas\ServiceManager\ServiceLocatorInterface $serviceLocator **/
/** @var \RabbitMqModule\ProducerInterface $producer **/
$producer = $serviceLocator->get('rabbitmq.producer.producer_name');

// Sending a message
$producer->publish(json_encode(['foo' => 'bar']));

消费者

您可以在配置中配置多个消费者

return [
    'rabbitmq' => [
        'consumer' => [
            'consumer_name' => [
                'description' => 'Consumer description',
                'connection' => 'default', // the connection name
                'exchange' => [
                    'type' => 'direct',
                    'name' => 'exchange-name'
                ],
                'queue' => [
                    'name' => 'queue-name', // can be an empty string,
                    'routing_keys' => [
                        // optional routing keys
                    ]
                ],
                'auto_setup_fabric_enabled' => true, // auto-setup exchanges and queues
                'qos' => [
                    // optional QOS options for RabbitMQ
                    'prefetch_size' => 0,
                    'prefetch_count' => 1,
                    'global' => false
                ],
                'callback' => 'my-service-name',
            ]
        ]
    ]
]

选项类

您可以在这里找到所有可用的选项

回调

callback键必须包含以下之一

  • 一个callable:一个闭包或可调用的对象,它接收一个PhpAmqpLib\Message\AMQPMessage对象。
  • RabbitMqModule\\ConsumerInterface的一个实例。
  • 服务定位器中的一个字符串服务名称(可以是任何callableRabbitMqModule\\ConsumerInterface的实例)。

请查看RabbitMqModule\\ConsumerInterface类常量以获取可用的返回值。

如果您的回调返回false,则消息将被拒绝并重新入队。

如果您的回调返回任何其他不同于false且与ConsumerInterface常量之一不同的值,则默认响应类似于MSG_ACK常量。

检索服务

您可以从服务定位器检索消费者

// Getting a consumer
/** @var \Laminas\ServiceManager\ServiceLocatorInterface $serviceLocator **/
/** @var \RabbitMqModule\Consumer $consumer **/
$consumer = $serviceLocator->get('rabbitmq.consumer.consumer_name');

// Start consumer
$consumer->consume();

有一个控制台命令可用于列出并启动消费者。见下文。

消费者示例

use PhpAmqpLib\Message\AMQPMessage;
use RabbitMqModule\ConsumerInterface;

class FetchProposalsConsumer implements ConsumerInterface
{
    /**
     * @param AMQPMessage $message
     *
     * @return int
     */
    public function execute(AMQPMessage $message)
    {
        $data = json_decode($message->body, true);

        try {
            // do something...
        } catch (\PDOException $e) {
            return ConsumerInterface::MSG_REJECT_REQUEUE;
        } catch (\Exception $e) {
            return ConsumerInterface::MSG_REJECT;
        }

        return ConsumerInterface::MSG_ACK;
    }
}

交换2交换绑定

您可以在生产者或消费者中配置exchange2exchange绑定。示例

return [
    'rabbitmq' => [
        'consumer' => [
            'consumer_name' => [
                // ...
                'exchange' => [
                    'type' => 'fanout',
                    'name' => 'exchange_to_bind_to',
                    'exchange_binds' => [
                        [
                            'exchange' => [
                                'type' => 'fanout',
                                'name' => 'main_exchange'
                            ],
                            'routing_keys' => [
                                '#'
                            ]
                        ]
                    ]
                ],
            ]
        ]
    ]
]

控制台使用

有一些控制台命令可用

  • rabbitmq:fabric:setup:为每个服务设置fabric,声明交换和队列
  • rabbitmq:consumers:list:列出可用的消费者
  • rabbitmq:consumers:start <name> [--without-signals|-w]:按名称启动消费者
  • rabbitmq:rpc-server:start <name> [--without-signals|-w]:按名称启动rpc服务器
  • rabbitmq:producer:publish <name> [--route=] <msg>:使用生产者发送消息

示例

vendor/bin/laminas rabbitmq:producer:publish my_producer "Hello world!"