arus / amqp-bridge
PHP 7.1+(包括PHP 8)的AMQP扩展桥接器,支持注解和JSON Schema
v1.0.2
2021-09-07 11:49 UTC
Requires
- php: ^7.1|^8.0
- ext-amqp: *
- doctrine/annotations: ^1.6
- justinrainbow/json-schema: ^5.0
- psr/log: ^1.0
Requires (Dev)
- phpunit/phpunit: 7.5.20|9.5.0
- sunrise/coding-standard: 1.0.0
This package is auto-updated.
Last update: 2024-09-07 18:26:16 UTC
README
安装
composer require 'arus/amqp-bridge'
快速入门
队列消息处理器
declare(strict_types=1); namespace App\QueueMessageHandler; use Arus\AMQP\Bridge\PayloadDecoder\JsonDecoder; use Arus\AMQP\Bridge\MessageHandlerInterface; use Arus\AMQP\Bridge\MessageInterface; use const JSON_OBJECT_AS_ARRAY; /** * @JsonSchemaReference("config/json-schemas/SomeQueueMessage.json") */ final class SomeQueueMessageHandler implements MessageHandlerInterface { /** * {@inheritDoc} */ public function handle(MessageInterface $message) : void { $data = (new JsonDecoder)->decode($message, JSON_OBJECT_AS_ARRAY); // some code... } }
消息队列消费者
use App\QueueMessageHandler\SomeQueueMessageHandler; use Arus\AMQP\Bridge\Consumer; $connection = new AMQPConnection(); $connection->setHost('localhost'); $connection->setPort(5672); $connection->setVhost('/'); $connection->setLogin('guest'); $connection->setPassword('guest'); $connection->connect(); $channel = new AMQPChannel($connection); $channel->setPrefetchCount(100); $queue = new AMQPQueue($channel); $queue->setName('queue.name'); // init the message queue consumer... $consumer = new Consumer(new SomeQueueMessageHandler()); // [optional] set a logger based on PSR-3... $consumer->setLogger($logger); // [optional] set a custom payload validator... $consumer->setPayloadValidator($payloadValidator); // [optional] set a custom annotation reader... $consumer->setAnnotationReader($annotationReader); // [optional] use a JSON schema validator for queue messages... $consumer->useJsonSchemaValidator(); // [optional] set a callback that will be called when a queue message is received... $consumer->setMessageReceivedCallback(function ($message) { // here you can, for example, re-open doctrine entity managers... }); // [optional] set a callback that will be called when a queue message is handled... $consumer->setMessageHandledCallback(function ($message) { // here you can, for example, clear doctrine entity managers... }); try { $queue->consume($consumer); } catch (Throwable $e) { $connection->disconnect(); throw $e; }
确认、拒绝和重新入队命令
- 如果队列消息处理 无错误,则该消息将 自动确认;
- 如果队列消息包含 无法解码 或 无效的有效负载,则该消息将 自动拒绝;
- 如果队列消息处理时发生 意外错误,则该消息将 自动重新入队;
- 如果您需要在代码中 拒绝队列消息,只需抛出异常
Arus\AMQP\Bridge\Exception\UnacknowledgableQueueMessageExceptionInterface
。
测试运行
composer test