evaneos / burrow
AMQP 消息事件组件
v4.6.0
2021-10-05 17:08 UTC
Requires
- php: >=5.5.9
- ext-pcntl: *
- beberlei/assert: ^2.6
- beberlei/metrics: ^2.8
- evaneos/daemon: ^2.0
- league/event: ^2.2
- php-amqplib/php-amqplib: ^2.6
- psr/log: ~1.0
- symfony/console: ^2.8|^3.0|^4.0
Requires (Dev)
- fzaninotto/faker: ^1.6
- mockery/mockery: ^0.9.4
- monolog/monolog: ~1.13
- phpunit/phpunit: ~4.0|~5.0
- squizlabs/php_codesniffer: ~2.0
- symfony/process: ^3.2
This package is auto-updated.
Last update: 2024-09-05 23:30:57 UTC
README
Evaneos AMQP 库,可以同时使用 php-amqplib 和 pecl amqp C 库
安装
composer require evaneos/burrow
用法
更详细的信息请参阅示例目录
要测试它,您可以使用 rabbitmq 容器,这个容器非常适用
docker run -d -p 5672:5672 rabbitmq
使用 RabbitMQ 声明一个交换机并绑定一个队列
$admin = DriverFactory::getDriver([ 'host' => 'localhost', 'port' => '5672', 'user' => 'guest', 'pwd' => 'guest' ]); $admin->declareExchange('exchange'); $admin->declareAndBindQueue('exchange', 'my_queue');
异步管理
使用 RabbitMQ 分发异步消息
$driver = DriverFactory::getDriver([ 'host' => 'localhost', 'port' => '5672', 'user' => 'guest', 'pwd' => 'guest' ]); $publisher = new AsyncPublisher($driver, 'exchange'); $publisher->publish('message', 'routing_key', [ 'meta' => 'data' ]);
编写一个守护程序从 RabbitMQ 消费异步消息
$driver = DriverFactory::getDriver([ 'host' => 'default', 'port' => '5672', 'user' => 'guest', 'pwd' => 'guest' ]); $handlerBuilder = new HandlerBuilder($driver); $handler = $handlerBuilder->async()->build(new EchoConsumer()); $daemon = new QueueHandlingDaemon($driver, $handler, 'test'); $worker = new Worker($daemon); $worker->run();
在命令行中,从不同的终端启动这两个脚本,消息 'my_message' 应该在工作终端中显示。
同步管理
使用 RabbitMQ 分发异步消息
$driver = DriverFactory::getDriver([ 'host' => 'default', 'port' => '5672', 'user' => 'guest', 'pwd' => 'guest' ]); $publisher = new SyncPublisher($driver, 'xchange'); $publisher->publish('my_message', 'routing_key', [ 'meta' => 'data' ]);
编写一个守护程序从 RabbitMQ 消费异步消息
$driver = DriverFactory::getDriver([ 'host' => 'default', 'port' => '5672', 'user' => 'guest', 'pwd' => 'guest' ]); $handlerBuilder = new HandlerBuilder($driver); $handler = $handlerBuilder->sync()->build(new ReturnConsumer()); $daemon = new QueueHandlingDaemon($driver, $handler, 'test'); $worker = new Worker($daemon); $worker->run();
在命令行中,从不同的终端启动这两个脚本,消息 'my_message' 应该在发布者终端中显示。
事件
您可以添加您的发射器来订阅事件
$emitter = new League\Event\Emitter(); $emitter->addListener(new MyListener()); new QueueHandlingDaemon([..], $emitter);
度量
基于事件,您可以订阅一个内置的度量发布者,它将发送这些度量
daemon.started
(递增)daemon.stopped
(递增)daemon.message_received
(递增)daemon.message_consumed
(递增)daemon.message_processing_time
(计时)
StatsD 和 DogStatsD 有相应的实现。
$config = ['host' => 'host', 'port' => 'port']; // StatsD $metricService = MetricServiceFactory::create('statsd', $config); // DogStatsD $tags = ['service' => 'myService']; // This tags will be sent with all the metrics $metricService = MetricServiceFactory::create('dogstats', $config, $tags); $emitter = new League\Event\Emitter(); $emitter->useListenerProvider(new SendMetricListenerProvider($metricService)); new QueueHandlingDaemon([..], $emitter);
示例
所有这些示例也都在 example
目录中提供。
处理器
现在您可以使用处理器来修改消费行为。出于向后兼容性的原因,已创建了一个 ContinueOnFailureHandler
以重现之前的默认行为。请不要再使用它,因为它允许工作进程在接收到错误时继续执行,这相当危险。
为了简化处理器的使用,请使用 HandlerBuilder
构建处理器堆栈。