dada-amater / nette-rabbitmq

v2.1.0 2018-07-09 08:12 UTC

README

Latest Stable Version License Total Downloads

Nette RabbitMQ

Nette 扩展用于 RabbitMQ (使用 composer 包 jakubkulhan/bunny)

示例配置

下载 composer 包

composer require gamee/nette-rabbitmq

扩展注册

config.neon

extensions:
	rabbitmq: Gamee\RabbitMQ\DI\RabbitMQExtension

示例配置

services:
	- TestConsumer

rabbitmq:
	connections:
		default:
			user: guest
			password: guest
			host: localhost
			port: 5672

	queues:
		testQueue:
			connection: default

	exchanges:
		testExchange:
			connection: default
			type: fanout
			queueBindings:
				testQueue:
					routingKey: testRoutingKey

	producers:
		testProducer:
			exchange: testExchange
			# queue: testQueue
			contentType: application/json
			deliveryMode: 2 # Producer::DELIVERY_MODE_PERSISTENT

	consumers:
		testConsumer:
			queue: testQueue
			callback: [@TestConsumer, consume]
			qos:
				prefetchSize: 0
				prefetchCount: 5

发布消息

注意:发布第一条消息后,队列将自动创建。

services.neon

services:
	- TestQueue(@Gamee\RabbitMQ\Client::getProducer(testProducer))

TestQueue.php

<?php

declare(strict_types=1);

use Gamee\RabbitMQ\Producer\Producer;

final class TestQueue
{

	/**
	 * @var Producer
	 */
	private $testProducer;


	public function __construct(Producer $testProducer)
	{
		$this->testProducer = $testProducer;
	}


	public function publish(string $message): void
	{
		$json = json_encode(['message' => $message]);
		$headers = [];

		$this->testProducer->publish($json, $headers);
	}

}

消费消息

您的消费者回调必须返回一个确认,表明特定消息已被确认(或不同的状态 - unack, reject)。

TestConsumer.php

<?php

declare(strict_types=1);

use Bunny\Message;
use Gamee\RabbitMQ\Consumer\IConsumer;

final class TestConsumer implements IConsumer
{

	public function consume(Message $message): int
	{
		$messageData = json_decode($message->content);

		$headers = $message->headers;

		/**
		 * @todo Some logic here...
		 */

		return IConsumer::MESSAGE_ACK; // Or ::MESSAGE_NACK || ::MESSAGE_REJECT
	}

}

通过 CLI 运行消费者

已准备两个消费者命令。 rabbitmq:consumer 将消费指定时间(以秒为单位)的消息。以下命令将消费一小时的消息

php index.php rabbitmq:consumer testConsumer 3600

rabbitmq:staticConsumer 将消费特定数量的消息。以下示例将消费 20 条消息

php index.php rabbitmq:staticConsumer testConsumer 20