evaneos/burrow

AMQP 消息事件组件

v4.6.0 2021-10-05 17:08 UTC

README

Build Status Scrutinizer Code Quality Packagist Version Code Coverage

Evaneos AMQP 库,可以同时使用 php-amqplibpecl amqp C 库

安装

composer require evaneos/burrow

用法

更详细的信息请参阅示例目录
要测试它,您可以使用 rabbitmq 容器,这个容器非常适用

docker run -d -p 5672:5672 rabbitmq

使用 RabbitMQ 声明一个交换机并绑定一个队列

$admin = DriverFactory::getDriver([
    'host' => 'localhost',
    'port' => '5672',
    'user' => 'guest',
    'pwd' => 'guest'
]);
$admin->declareExchange('exchange');
$admin->declareAndBindQueue('exchange', 'my_queue');

异步管理

使用 RabbitMQ 分发异步消息

$driver = DriverFactory::getDriver([
    'host' => 'localhost',
    'port' => '5672',
    'user' => 'guest',
    'pwd' => 'guest'
]);
$publisher = new AsyncPublisher($driver, 'exchange');
$publisher->publish('message', 'routing_key', [ 'meta' => 'data' ]);

编写一个守护程序从 RabbitMQ 消费异步消息

$driver = DriverFactory::getDriver([
    'host' => 'default',
    'port' => '5672',
    'user' => 'guest',
    'pwd' => 'guest'
]);
$handlerBuilder = new HandlerBuilder($driver);
$handler = $handlerBuilder->async()->build(new EchoConsumer());
$daemon = new QueueHandlingDaemon($driver, $handler, 'test');
$worker = new Worker($daemon);
$worker->run();

在命令行中,从不同的终端启动这两个脚本,消息 'my_message' 应该在工作终端中显示。

同步管理

使用 RabbitMQ 分发异步消息

$driver = DriverFactory::getDriver([
   'host' => 'default',
   'port' => '5672',
   'user' => 'guest',
   'pwd' => 'guest'
]);
$publisher = new SyncPublisher($driver, 'xchange');
$publisher->publish('my_message', 'routing_key', [ 'meta' => 'data' ]);

编写一个守护程序从 RabbitMQ 消费异步消息

$driver = DriverFactory::getDriver([
   'host' => 'default',
   'port' => '5672',
   'user' => 'guest',
   'pwd' => 'guest'
]);

$handlerBuilder = new HandlerBuilder($driver);
$handler = $handlerBuilder->sync()->build(new ReturnConsumer());
$daemon = new QueueHandlingDaemon($driver, $handler, 'test');
$worker = new Worker($daemon);
$worker->run();

在命令行中,从不同的终端启动这两个脚本,消息 'my_message' 应该在发布者终端中显示。

事件

您可以添加您的发射器来订阅事件

$emitter = new League\Event\Emitter();
$emitter->addListener(new MyListener());

new QueueHandlingDaemon([..], $emitter);

度量

基于事件,您可以订阅一个内置的度量发布者,它将发送这些度量

  • daemon.started(递增)
  • daemon.stopped(递增)
  • daemon.message_received(递增)
  • daemon.message_consumed(递增)
  • daemon.message_processing_time(计时)

StatsD 和 DogStatsD 有相应的实现。

$config = ['host' => 'host', 'port' => 'port'];

// StatsD
$metricService = MetricServiceFactory::create('statsd', $config);
// DogStatsD
$tags = ['service' => 'myService']; // This tags will be sent with all the metrics
$metricService = MetricServiceFactory::create('dogstats', $config, $tags);

$emitter = new League\Event\Emitter();
$emitter->useListenerProvider(new SendMetricListenerProvider($metricService));

new QueueHandlingDaemon([..], $emitter);

示例

所有这些示例也都在 example 目录中提供。

处理器

现在您可以使用处理器来修改消费行为。出于向后兼容性的原因,已创建了一个 ContinueOnFailureHandler 以重现之前的默认行为。请不要再使用它,因为它允许工作进程在接收到错误时继续执行,这相当危险。

为了简化处理器的使用,请使用 HandlerBuilder 构建处理器堆栈。