webit/message-bus-amqp

1.0.0 2018-01-01 11:04 UTC

This package is auto-updated.

Last update: 2024-09-20 22:23:05 UTC


README

Message Bus 的 AMQP 协议基础设施

安装

composer require webit/message-bus-amqp=^1.0.0

使用

连接池

使用 ConnectionPoolBuilder 创建一个

use Webit\MessageBus\Infrastructure\Amqp\Connection\Pool\ConnectionPoolBuilder;
use Webit\MessageBus\Infrastructure\Amqp\Connection\ConnectionParams;


$builder = ConnectionPoolBuilder::create();

// optionally set connection factory (LazyConnectionFactory used by default)
$builder->setConnectionFactory(
    new \Webit\MessageBus\Infrastructure\Amqp\Connection\InstantConnectionFactory()
);

// optionally add logger (use a smarter one in real life)
$logger = new \Psr\Log\NullLogger();
$builder->setLogger($logger);

// register at least one connection
$builder->registerConnection(
    new ConnectionParams(
        'rabbitmq.host',
        '5672', // port,
        'my-username',
        'my-password'
    ),
    'connection-1'
);

$connectionPool = $builder->build();
 

连接池为你提供一个当前连接。如果你发现当前连接有问题,你可以销毁它并要求池给你下一个(如果有更多的话)。

try {
    $connection = $connectionPool->current();
    $channel = $connection->getChannel();
} catch (\Exception $e) {
    $connectionPool->disposeCurrent();
    $connection = $connectionPool->current();
}

发布者集成

要通过 AMQP 发布 消息,请使用 AmqpPublisher

use Webit\MessageBus\Infrastructure\Amqp\Connection\Channel\NewChannelConnectionAwareChannelFactory;
use Webit\MessageBus\Infrastructure\Amqp\Publisher\ExchangePublicationTarget;
use Webit\MessageBus\Infrastructure\Amqp\Publisher\QueuePublicationTarget;
use Webit\MessageBus\Infrastructure\Amqp\Publisher\AmqpPublisher;
use Webit\MessageBus\Infrastructure\Amqp\Publisher\Routing\FromMessageTypeRoutingKeyResolver;

$channelFactory = new NewChannelConnectionAwareChannelFactory($connectionPool);

$publicationTarget = new ExchangePublicationTarget(
    $channelFactory,
    new FromMessageTypeRoutingKeyResolver(), // you can provide your implementation
    'exchange-name'
);

// or

$publicationTarget = new QueuePublicationTarget(
    $channelFactory,
    'queueName'
);

$publisher = new AmqpPublisher($publicationTarget);

$message = new Message('my-type', 'message_content');
$publisher->publish($message);

消息消费

要监听来自 AMQP 的消息并消费它们

  1. 实现你的 消费者
use Webit\MessageBus\Consumer;
use Webit\MessageBus\Message;

class \MyConsumer implements Consumer
{
    public function consume(Message $message)
    {
        // do your stuff here
    }
}
  1. 构建 AmqpConsumer
use Webit\MessageBus\Infrastructure\Amqp\Listener\Message\MessageFactory;
use Webit\MessageBus\Infrastructure\Amqp\Listener\AmqpConsumerBuilder;

$builder = AmqpConsumerBuilder::create();
$builder->setConsumer(new \MyConsumer());
$builder->setLogger(new NullLogger()); // optional
$builder->shouldSendFeedback(false); // if you don't want to acknowledge messages, set this to false (true by default)
$builder->setMessageFactory(new SimpleMessageFactory()); // optionally set your MessageFactory

$amqpConsumer = $builder->build();
  1. 开始监听 AMQPMessages
$listener = new SimpleAmqpListener(
  $channelFactory,
  $amqpConsumer,
  'queue-name'
);

// start listening (continuous process)
$listener->listen();

运行测试

使用 composer 安装依赖

docker-compose run --rm composer

单元测试

docker-compose run --rm unit-tests

集成测试

docker-compose run --rm integration-tests