arus / amqp-bridge

PHP 7.1+(包括PHP 8)的AMQP扩展桥接器,支持注解和JSON Schema

v1.0.2 2021-09-07 11:49 UTC

This package is auto-updated.

Last update: 2024-09-07 18:26:16 UTC


README

Build Status Code Coverage Scrutinizer Code Quality Total Downloads Latest Stable Version License

安装

composer require 'arus/amqp-bridge'

快速入门

队列消息处理器

declare(strict_types=1);

namespace App\QueueMessageHandler;

use Arus\AMQP\Bridge\PayloadDecoder\JsonDecoder;
use Arus\AMQP\Bridge\MessageHandlerInterface;
use Arus\AMQP\Bridge\MessageInterface;

use const JSON_OBJECT_AS_ARRAY;

/**
 * @JsonSchemaReference("config/json-schemas/SomeQueueMessage.json")
 */
final class SomeQueueMessageHandler implements MessageHandlerInterface
{

    /**
     * {@inheritDoc}
     */
    public function handle(MessageInterface $message) : void
    {
        $data = (new JsonDecoder)->decode($message, JSON_OBJECT_AS_ARRAY);

        // some code...
    }
}

消息队列消费者

use App\QueueMessageHandler\SomeQueueMessageHandler;
use Arus\AMQP\Bridge\Consumer;

$connection = new AMQPConnection();
$connection->setHost('localhost');
$connection->setPort(5672);
$connection->setVhost('/');
$connection->setLogin('guest');
$connection->setPassword('guest');
$connection->connect();

$channel = new AMQPChannel($connection);
$channel->setPrefetchCount(100);

$queue = new AMQPQueue($channel);
$queue->setName('queue.name');

// init the message queue consumer...
$consumer = new Consumer(new SomeQueueMessageHandler());
// [optional] set a logger based on PSR-3...
$consumer->setLogger($logger);
// [optional] set a custom payload validator...
$consumer->setPayloadValidator($payloadValidator);
// [optional] set a custom annotation reader...
$consumer->setAnnotationReader($annotationReader);
// [optional] use a JSON schema validator for queue messages...
$consumer->useJsonSchemaValidator();
// [optional] set a callback that will be called when a queue message is received...
$consumer->setMessageReceivedCallback(function ($message) {
    // here you can, for example, re-open doctrine entity managers...
});
// [optional] set a callback that will be called when a queue message is handled...
$consumer->setMessageHandledCallback(function ($message) {
    // here you can, for example, clear doctrine entity managers...
});

try {
    $queue->consume($consumer);
} catch (Throwable $e) {
    $connection->disconnect();

    throw $e;
}

确认、拒绝和重新入队命令

  • 如果队列消息处理 无错误,则该消息将 自动确认
  • 如果队列消息包含 无法解码无效的有效负载,则该消息将 自动拒绝
  • 如果队列消息处理时发生 意外错误,则该消息将 自动重新入队
  • 如果您需要在代码中 拒绝队列消息,只需抛出异常 Arus\AMQP\Bridge\Exception\UnacknowledgableQueueMessageExceptionInterface

测试运行

composer test

有用链接