webit / message-bus-amqp
1.0.0
2018-01-01 11:04 UTC
Requires
- php: >=7.0
- doctrine/cache: ^1.6.0
- php-amqplib/php-amqplib: ^2.7.0
- psr/log: ^1.0.0
- webit/message-bus: ^1.0.0
Requires (Dev)
- phpunit/phpunit: ^6.0
- symfony/console: ^3.0|^4.0
Suggests
- symfony/console: To use console commands for Utils and Listeners
Provides
This package is auto-updated.
Last update: 2024-09-20 22:23:05 UTC
README
Message Bus 的 AMQP 协议基础设施
安装
composer require webit/message-bus-amqp=^1.0.0
使用
连接池
使用 ConnectionPoolBuilder 创建一个
use Webit\MessageBus\Infrastructure\Amqp\Connection\Pool\ConnectionPoolBuilder; use Webit\MessageBus\Infrastructure\Amqp\Connection\ConnectionParams; $builder = ConnectionPoolBuilder::create(); // optionally set connection factory (LazyConnectionFactory used by default) $builder->setConnectionFactory( new \Webit\MessageBus\Infrastructure\Amqp\Connection\InstantConnectionFactory() ); // optionally add logger (use a smarter one in real life) $logger = new \Psr\Log\NullLogger(); $builder->setLogger($logger); // register at least one connection $builder->registerConnection( new ConnectionParams( 'rabbitmq.host', '5672', // port, 'my-username', 'my-password' ), 'connection-1' ); $connectionPool = $builder->build();
连接池为你提供一个当前连接。如果你发现当前连接有问题,你可以销毁它并要求池给你下一个(如果有更多的话)。
try { $connection = $connectionPool->current(); $channel = $connection->getChannel(); } catch (\Exception $e) { $connectionPool->disposeCurrent(); $connection = $connectionPool->current(); }
发布者集成
要通过 AMQP 发布 消息,请使用 AmqpPublisher
use Webit\MessageBus\Infrastructure\Amqp\Connection\Channel\NewChannelConnectionAwareChannelFactory; use Webit\MessageBus\Infrastructure\Amqp\Publisher\ExchangePublicationTarget; use Webit\MessageBus\Infrastructure\Amqp\Publisher\QueuePublicationTarget; use Webit\MessageBus\Infrastructure\Amqp\Publisher\AmqpPublisher; use Webit\MessageBus\Infrastructure\Amqp\Publisher\Routing\FromMessageTypeRoutingKeyResolver; $channelFactory = new NewChannelConnectionAwareChannelFactory($connectionPool); $publicationTarget = new ExchangePublicationTarget( $channelFactory, new FromMessageTypeRoutingKeyResolver(), // you can provide your implementation 'exchange-name' ); // or $publicationTarget = new QueuePublicationTarget( $channelFactory, 'queueName' ); $publisher = new AmqpPublisher($publicationTarget); $message = new Message('my-type', 'message_content'); $publisher->publish($message);
消息消费
要监听来自 AMQP 的消息并消费它们
- 实现你的 消费者
use Webit\MessageBus\Consumer; use Webit\MessageBus\Message; class \MyConsumer implements Consumer { public function consume(Message $message) { // do your stuff here } }
- 构建 AmqpConsumer
use Webit\MessageBus\Infrastructure\Amqp\Listener\Message\MessageFactory; use Webit\MessageBus\Infrastructure\Amqp\Listener\AmqpConsumerBuilder; $builder = AmqpConsumerBuilder::create(); $builder->setConsumer(new \MyConsumer()); $builder->setLogger(new NullLogger()); // optional $builder->shouldSendFeedback(false); // if you don't want to acknowledge messages, set this to false (true by default) $builder->setMessageFactory(new SimpleMessageFactory()); // optionally set your MessageFactory $amqpConsumer = $builder->build();
- 开始监听 AMQPMessages
$listener = new SimpleAmqpListener( $channelFactory, $amqpConsumer, 'queue-name' ); // start listening (continuous process) $listener->listen();
运行测试
使用 composer 安装依赖
docker-compose run --rm composer
单元测试
docker-compose run --rm unit-tests
集成测试
docker-compose run --rm integration-tests