nilportugues / eventbus-queue
事件总线队列库。允许实现异步事件总线。
Requires
- php: >=7
- nilportugues/messagebus: ^1.1
- nilportugues/uuid: ^1.0
Requires (Dev)
- ext-redis: *
- doctrine/dbal: 2.5.*
- fabpot/php-cs-fixer: ~1.9
- mongodb/mongodb: 1.0.*
- pda/pheanstalk: 3.1.*
- php-amqplib/php-amqplib: 2.6.*
- phpunit/phpunit: 5.*
- predis/predis: 1.0.*
Suggests
- ext-redis: To use the RedisQueue Adapter in the ProducerEventBus & EventBusWorker
- doctrine/dbal: To use the PdoQueue Adapter in the ProducerEventBus & EventBusWorker
- mongodb/mongodb: to use the MongoQueue Adapter in the ProducerEventBus & EventBusWorker
- pda/pheanstalk: To use the BeanstalkdQueue Adapter in the ProducerEventBus & EventBusWorker
- php-amqplib/php-amqplib: To use the AmqpQueue Adapter in the ProducerEventBus & EventBusWorker
- predis/predis: To use the Predis Adapter in the ProducerEventBus & EventBusWorker
This package is not auto-updated.
Last update: 2024-09-14 18:47:42 UTC
README
本包是nilportugues/messagebus的扩展库,为事件总线实现添加队列。
本包将为构建以下内容提供必要的类:
- 生产者:将事件序列化并发送到队列的代码。这是同步发生的。
- 消费者:在后台异步读取的代码,因此读取并反序列化队列中的事件,并将其传递给事件总线进行处理。
为什么?
一切都关于决定哪些命令逻辑可以被延迟或隐藏起来,以使其更快。这正是我们想要做的。
你永远不会移除瓶颈,你只是将它移动。缺点是我们可能不得不假设可能的延迟。
安装
为了开始使用此项目,您需要使用Composer进行安装。
composer require nilportugues/eventbus-queue
用法
此包将为您提供一个新的中间件:ProducerEventBusMiddleware
。
此中间件需要一个序列化器和存储,这取决于所使用的队列适配器。支持的适配器有:
- PDOQueue:使用Doctrine的DBAL构建的基于SQL数据库的队列。
- MongoDBQueue:使用MongoDB库构建的队列。
- RedisQueue:使用Redis PHP扩展的队列。
- PredisQueue:使用Predis库的队列。
- FileSystemQueue:使用本地文件系统构建的队列。
- AmqpQueue:使用RabbitMQ或任何实现Amqp协议的队列。
- BeanstalkdQueue:使用Beanstalk作为队列。
要设置它,将ProducerEventBusMiddleware
注册到事件总线中。因为我们还需要定义第二个事件总线(消费者),我们将称之为ProducerEventBus
。
ProducerEventBus
<?php $container['LoggerEventBusMiddleware'] = function() use ($container) { return new \NilPortugues\MessageBus\EventBus\LoggerEventBusMiddleware( $container['Monolog'] ); }; //Definition of the Serializer $container['NativeSerializer'] = function() use ($container) { return new \NilPortugues\MessageBus\Serializer\NativeSerializer(); }; //Definition of the Queue driver $container['RabbitMQ'] = function() use ($container) { return new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); }; //Definition of the Event Bus Queue. For instance RabbitMQ. $container['EventBusQueueAdapter'] = function() use ($container) { return new \NilPortugues\MessageBus\EventBusQueue\Adapters\AmqpQueue( $container['NativeSerializer'], $container['RabbitMQ'], 'myEventBusQueue' //queue Name ); }; //Definition of the Producer. $container['ProducerEventBusMiddleware'] = function() use ($container) { return new \NilPortugues\MessageBus\EventBusQueue\ProducerEventBusMiddleware( $container['EventBusQueueAdapter'] ); }; //This is our ProducerEventBus. $container['ProducerEventBus'] = function() use ($container) { return new \NilPortugues\MessageBus\EventBus\EventBus([ $container['LoggerEventBusMiddleware'], $container['ProducerEventBusMiddleware'] ]); };
ProducerEventBus的消费者
消费者需要一个脚本,该脚本读取事件总线定义和已订阅的事件,以运行直到处理完所有事件。为此,我们需要注册一个新的EventBus
,我们将称之为ConsumerEventBus
。
我们还想存储无法处理或引发异常的事件。因此需要一个新队列。例如,让我们将错误存储在MongoDB数据库中。
这可能如下所示:
<?php //This is our ConsumerEventBus. $container['ConsumerEventBus'] = function() use ($container) { return new \NilPortugues\MessageBus\EventBus\EventBus([ $container['LoggerEventBusMiddleware'], $container['EventBusMiddleware'], ]); }; $container['MongoDB'] = function() use ($container) { return new \MongoDB\Client(); }; //This is an error Queue. $container['ErrorQueue'] = function() use ($container) { return new \NilPortugues\MessageBus\EventBusQueue\Adapters\MongoQueue( $container['NativeSerializer'], $container['MongoDB'], 'error_queues', 'myEventBusErrorQueue' ); };
EventBusWorker
最后,我们必须调用一个消费者。此包已提供完整的消费者实现:EventBusWorker
。
使用方法如下:
<?php //... your $container should be available here. $consumer = \NilPortugues\MessageBus\EventBusQueue\EventBusWorker(); $consumer->consume( $container->get('EventBusQueueAdapter'), $container->get('ErrorQueue'), $container->get('ConsumerEventBus') );
消费者类将运行consume
方法直到所有事件被消费。然后它将退出。这是最优的,以确保它不会泄漏内存。
如果您需要使消费者永久运行,请使用服务器脚本,如Supervisor。
Supervisor配置
Supervisor 是 Linux 操作系统的进程监控工具,当您的进程失败时会自动重启。要在 Ubuntu 上安装 Supervisor,可以使用以下命令:
sudo apt-get install supervisor
Supervisor 的配置文件通常存储在 /etc/supervisor/conf.d
目录中。在此目录中,您可以创建任意数量的配置文件,以指导如何监控您的进程。
例如,让我们创建 /etc/supervisor/conf.d/my_worker.conf
,以便启动并监控名为 my_worker.php
的进程脚本。
[program:my_worker] process_name=%(program_name)s_%(process_num)02d command=php my_worker.php autostart=true autorestart=true user=www-data numprocs=20 redirect_stderr=true stdout_logfile=/var/log/my_worker.log
在此文件中,我们告诉 Supervisor 我们希望始终运行 20 个实例。如果 my_worker.php
结束或失败,它将启动一个新的实例。
为了使此任务永久运行,您需要输入以下命令:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start my_worker
适配器配置
PDOQueue
为此,您需要在数据库中创建一个表。
例如,sqlite 语言的表创建如下:
CREATE TABLE testAdapterQueue ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_data TEXT NOT NULL, event_status CHAR(50), created_at INTEGER NOT NULL );
MongoDBQueue
为了使用它,您需要安装 PHP 7 的 mongodb 扩展。
sudo pecl install mongodb
RedisQueue
为了使用它,您需要安装 PHP 7 的 phpredis 扩展。
# Build Redis PHP module
git clone -b php7 https://github.com/phpredis/phpredis.git
sudo mv phpredis/ /etc/ && \
cd /etc/phpredis \
phpize \
./configure \
make && make install \
touch /etc/php/7.0/mods-available/redis.ini \
echo 'extension=redis.so' > /etc/php/7.0/mods-available/redis.ini
PredisQueue
无操作,但如果有 phpredis 扩展则性能更好。
FileSystemQueue
无操作。
AmqpQueue
除了访问 amqp 服务器外,无操作。
Beanstalkd
除了访问 beanstalkd 服务器外,无操作。
贡献
对该软件包的贡献总是受欢迎的!
支持
您可以通过以下方式之一与我联系:
- 通过电子邮件 contact@nilportugues.com 发送给我
- 打开一个 问题
作者
许可
代码库采用 MIT 许可证。