ryaremenko / amqp
AMQP 封装库,用于发布和消费消息
v0.0.5
2021-11-30 11:54 UTC
Requires
- php: >=7.3
- ext-json: *
- php-amqplib/php-amqplib: ^2.9|^3.1
Requires (Dev)
- mockery/mockery: ^1.4@dev
- phpunit/phpunit: 9.5.x-dev
README
AMQP 封装库,用于发布和消费消息,特别是从 RabbitMQ
安装
Composer
将以下内容添加到 composer.json 文件的 require 部分
$ php composer require ryaremenko/amqp
集成
Lumen
创建一个 连接 类
根据您的需求调整属性。
class BaseConnection extends AmqpConnection { protected function setConnectionOptions(): AmqpConnectionOptions { return (new AmqpConnectionOptions()) ->setHost('127.0.0.1') ->setPort(5672) ->setLogin('guest') ->setPassword('guest'); } }
将连接类注册为单例
/* |-------------------------------------------------------------------------- | Laravel example |-------------------------------------------------------------------------- */ //... $this->app->singleton(BaseConnection::class); $this->app->bind(AMQPConnectionInterface::class, BaseConnection::class); //...
发布消息
(new AmqpProducer)->publish(['data'], 'queue_name');
消费消息
class AMQPHandlersService { private const HANDLERS = [ 'queue_name' => TestHandler::class ]; private const PRIORITY_HANDLERS = [ 'queue_name' ]; private $amqpConsumer; public function __construct(AmqpConsumer $amqpConsumer) { $this->amqpConsumer = $amqpConsumer; } public function handle(string $queueName) { $properties = []; if (in_array($queueName, self::PRIORITY_HANDLERS, true)){ $properties['priority'] = true; } $handler = app(self::HANDLERS[$queueName]); $this->amqpConsumer->consume($queueName, function ($message) use ($handler) { try { $handler->handle($message->body); } catch (\Exception $exception) { // exception handler } }, $properties); } }