nilportugues/eventbus-queue

事件总线队列库。允许实现异步事件总线。

1.1.3 2017-04-16 17:52 UTC

This package is not auto-updated.

Last update: 2024-09-14 18:47:42 UTC


README

Build Status Scrutinizer Code Quality SensioLabsInsight Latest Stable Version Total Downloads License Donate

本包是nilportugues/messagebus的扩展库,为事件总线实现添加队列。

本包将为构建以下内容提供必要的类:

  • 生产者:将事件序列化并发送到队列的代码。这是同步发生的。
  • 消费者:在后台异步读取的代码,因此读取并反序列化队列中的事件,并将其传递给事件总线进行处理。

为什么?

一切都关于决定哪些命令逻辑可以被延迟或隐藏起来,以使其更快。这正是我们想要做的。

你永远不会移除瓶颈,你只是将它移动。缺点是我们可能不得不假设可能的延迟。

安装

为了开始使用此项目,您需要使用Composer进行安装。

composer require nilportugues/eventbus-queue

用法

此包将为您提供一个新的中间件:ProducerEventBusMiddleware

此中间件需要一个序列化器和存储,这取决于所使用的队列适配器。支持的适配器有:

  • PDOQueue:使用Doctrine的DBAL构建的基于SQL数据库的队列。
  • MongoDBQueue:使用MongoDB库构建的队列。
  • RedisQueue:使用Redis PHP扩展的队列。
  • PredisQueue:使用Predis库的队列。
  • FileSystemQueue:使用本地文件系统构建的队列。
  • AmqpQueue:使用RabbitMQ或任何实现Amqp协议的队列。
  • BeanstalkdQueue:使用Beanstalk作为队列。

要设置它,将ProducerEventBusMiddleware注册到事件总线中。因为我们还需要定义第二个事件总线(消费者),我们将称之为ProducerEventBus

ProducerEventBus

<?php
$container['LoggerEventBusMiddleware'] = function() use ($container) {
    return new \NilPortugues\MessageBus\EventBus\LoggerEventBusMiddleware(
        $container['Monolog']
    );
};

//Definition of the Serializer
$container['NativeSerializer'] = function() use ($container) {
    return new \NilPortugues\MessageBus\Serializer\NativeSerializer();
};

//Definition of the Queue driver
$container['RabbitMQ'] = function() use ($container) {
    return new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
};

//Definition of the Event Bus Queue. For instance RabbitMQ.
$container['EventBusQueueAdapter'] = function() use ($container) {
    return new \NilPortugues\MessageBus\EventBusQueue\Adapters\AmqpQueue(
        $container['NativeSerializer'],
        $container['RabbitMQ'],
        'myEventBusQueue' //queue Name
    );
};

//Definition of the Producer.
$container['ProducerEventBusMiddleware'] = function() use ($container) {
    return new \NilPortugues\MessageBus\EventBusQueue\ProducerEventBusMiddleware(
        $container['EventBusQueueAdapter']
    );
};

//This is our ProducerEventBus. 
$container['ProducerEventBus'] = function() use ($container) {
    return new \NilPortugues\MessageBus\EventBus\EventBus([
        $container['LoggerEventBusMiddleware'],
        $container['ProducerEventBusMiddleware']
    ]);
};

ProducerEventBus的消费者

消费者需要一个脚本,该脚本读取事件总线定义和已订阅的事件,以运行直到处理完所有事件。为此,我们需要注册一个新的EventBus,我们将称之为ConsumerEventBus

我们还想存储无法处理或引发异常的事件。因此需要一个新队列。例如,让我们将错误存储在MongoDB数据库中。

这可能如下所示:

<?php
//This is our ConsumerEventBus. 
$container['ConsumerEventBus'] = function() use ($container) {
    return new \NilPortugues\MessageBus\EventBus\EventBus([
        $container['LoggerEventBusMiddleware'],
        $container['EventBusMiddleware'],
    ]);
};

$container['MongoDB'] = function() use ($container) {
    return new \MongoDB\Client();
};

//This is an error Queue.
$container['ErrorQueue'] = function() use ($container) {
    return new \NilPortugues\MessageBus\EventBusQueue\Adapters\MongoQueue(
        $container['NativeSerializer'],
        $container['MongoDB'], 
        'error_queues', 
        'myEventBusErrorQueue'
     );
};

EventBusWorker

最后,我们必须调用一个消费者。此包已提供完整的消费者实现:EventBusWorker

使用方法如下:

<?php
//... your $container should be available here.

$consumer = \NilPortugues\MessageBus\EventBusQueue\EventBusWorker();
$consumer->consume(
    $container->get('EventBusQueueAdapter'), 
    $container->get('ErrorQueue'), 
    $container->get('ConsumerEventBus')
);

消费者类将运行consume方法直到所有事件被消费。然后它将退出。这是最优的,以确保它不会泄漏内存。

如果您需要使消费者永久运行,请使用服务器脚本,如Supervisor

Supervisor配置

Supervisor 是 Linux 操作系统的进程监控工具,当您的进程失败时会自动重启。要在 Ubuntu 上安装 Supervisor,可以使用以下命令:

sudo apt-get install supervisor

Supervisor 的配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,以指导如何监控您的进程。

例如,让我们创建 /etc/supervisor/conf.d/my_worker.conf,以便启动并监控名为 my_worker.php 的进程脚本。

[program:my_worker]
process_name=%(program_name)s_%(process_num)02d
command=php my_worker.php
autostart=true
autorestart=true
user=www-data
numprocs=20
redirect_stderr=true
stdout_logfile=/var/log/my_worker.log

在此文件中,我们告诉 Supervisor 我们希望始终运行 20 个实例。如果 my_worker.php 结束或失败,它将启动一个新的实例。

为了使此任务永久运行,您需要输入以下命令:

sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start my_worker

适配器配置

PDOQueue

为此,您需要在数据库中创建一个表。

例如,sqlite 语言的表创建如下:

CREATE TABLE testAdapterQueue (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  event_data TEXT NOT NULL,
  event_status CHAR(50),
  created_at INTEGER NOT NULL
);

MongoDBQueue

为了使用它,您需要安装 PHP 7 的 mongodb 扩展。

sudo pecl install mongodb

RedisQueue

为了使用它,您需要安装 PHP 7 的 phpredis 扩展。

# Build Redis PHP module
git clone -b php7 https://github.com/phpredis/phpredis.git
sudo mv phpredis/ /etc/ && \
cd /etc/phpredis \
phpize \
./configure \
make && make install \
touch /etc/php/7.0/mods-available/redis.ini \
echo 'extension=redis.so' > /etc/php/7.0/mods-available/redis.ini

PredisQueue

无操作,但如果有 phpredis 扩展则性能更好。

FileSystemQueue

无操作。

AmqpQueue

除了访问 amqp 服务器外,无操作。

Beanstalkd

除了访问 beanstalkd 服务器外,无操作。

贡献

对该软件包的贡献总是受欢迎的!

  • 问题跟踪器 上报告您发现的任何错误或问题。
  • 您可以在软件包的 Git 仓库 中获取源代码。

支持

您可以通过以下方式之一与我联系:

作者

许可

代码库采用 MIT 许可证