实现 Pub/Sub 模式的库


README

该库实现了应用程序的事件驱动架构(Event-Driven Architecture)。它支持 Phalcon 3.x 框架,但也可以轻松适配其他框架。

工作示例请参考这里:https://github.com/chocofamilyme/pubsub/tree/master/examples

功能

  • ORM 模型的事务性保存和事件发布
  • 无事务发布事件
  • 订阅事件
  • 在需要时将事件重新发送到同一队列
  • 保存所有未处理和过期的消息到公共队列。可以从这个队列中将消息保存到数据库中,然后单独处理

要求

  • Phalcon 3.x+
  • PHP 7.0+

安装

composer require chocofamilyme/pubsub

配置

目前该库仅支持 RabbitMQ,但可以添加其他支持。

配置配置文件

return [
    'eventsource' => [
        'default' => 'rabbitmq',
        
        'drivers' => [
            'rabbitmq' => [
                'adapter'    => 'RabbitMQ',
                'hosts' => [
                    [
                        'host'     => 'host',
                        'port'     => 5672,
                        'user'     => 'user',
                        'password' => 'password',
                        'vhost'    => '/',
                    ],
                ],
    
                // Не объязательные параметры
                'heartbeat'          => 60,
                'read_write_timeout' => 60,
                'connection_timeout' => 60,
                'wait_timeout'       => 0,
            ],
        ],
    ],
];

完整列表请查看 - https://github.com/php-amqplib/php-amqplib

将代理添加到 DI 容器中

$di = \Phalcon\Di::getDefault();
$config      = $di->get('config')->eventsource;
$config      = $config->drivers[$config->default];

$serviceName = $di->get('config')->domain;
$cache       = $di->get('cache');

$di->setShared('eventsource',
    function () use ($config, $serviceName, $cache) {
        $adapter = $config->adapter;
        $config  = array_merge($config->toArray(), ['app_id' => $serviceName]);
        $class   = 'Chocofamily\PubSub\Provider\\'.$adapter;

        $repeater = new Repeater($cache);
        return $class::getInstance($config, $repeater);
    }
);

这里 $cache 是一个实现 Phalcon\Cache\BackendInterface 接口的对象。缓存用于计算特定消息的重试次数。

参数设置表

使用

发布

可以使用 Chocofamily\PubSub\Publisher 类发布消息。以下是一个最小的工作示例

$publisher = new Publisher($di->getShared('eventsource'));

$payload = [
	'event_id' => 11995,
	'name' => 'docx',
	'age' => 25
];

$routeKey = 'order.created';

$publisher->send($payload, $routeKey);

对于 RabbitMQ,变量 $routeKey 至少由两个部分组成,用点 . 分隔。例如 order.created。Exchange 的名称将包含第一个部分,即 order。之后,如果进入 rabbitmq 的管理员界面,应该会创建一个名为 order 的 exchange。从版本 2.* 开始,可以指定 exchange,例如 $routeKey, 示例

$publisher = new Publisher($di->getShared('eventsource'));

$payload = [
	'event_id' => 11995,
	'name' => 'docx',
	'age' => 25
];

$exchange = 'order';
$routeKey = 'order.created';

$publisher->send($payload, $routeKey, $exchange);

订阅事件

使用 Chocofamily\PubSub\Subscriber 类订阅事件。以下是一个最小的工作示例

$params = [
    'queue_name' => 'restapi_orderx',
];

$taskName = 'your_task_name';

$subscriber = new Subscriber($di->getShared('eventsource'), 'order.created.*', $params, $taskName);

$subscriber->subscribe(function ($headers, $body) {
    echo print_r($headers, 1). PHP_EOL;
    echo print_r($body, 1). PHP_EOL;
});

从版本 2.* 开始,可以指定 exchange 并将其与路由关联。现在可以指定路由数组。示例

$params = [
    'queue_name' => 'restapi_orderx',
];

$taskName = 'your_task_name';

$routeKeys = [
    'order.created',
    'order.payed',
];

$exchange = 'order';

$subscriber = new Subscriber($di->getShared('eventsource'), $routeKeys, $params, $taskName, $exchange);

$subscriber->subscribe(function ($headers, $body) {
    echo print_r($headers, 1). PHP_EOL;
    echo print_r($body, 1). PHP_EOL;
});

要返回消息到队列,需要在回调函数中抛出 Chocofamily\PubSub\Exceptions\RetryException 异常。消息最多可以重试 5 次,之后它会进入死信队列(exchange = DLX)。

可以传递以下设置到订阅者中

durable: bool — сохранять на диск данные
queue: array — настройки самой очереди
prefetch_count: int — количество предзагрузки сообщений
no_ack: — требуется ли подтверждение сообщений
app_id — уникальный ID приложения. Можно использовать для идентификации откуда собите пошло изначально

使用数据库事务发布

此方法对于确保数据库中实体保存和事件发布的原子性是必要的。以下图片很好地说明了它是如何工作的:[图片链接]

为此,需要创建一个 events 表

create table events
(
	id serial not null
		constraint events_pkey
			primary key,
	type smallint not null,
	model_id int not null,
	model_type varchar(100) not null,
	exchange   varchar(100) not null,
	routing_key varchar(100) not null,
	payload json not null,
	status smallint not null,
	created_at timestamp default now() not null,
	updated_at timestamp
);

示例使用

use Chocofamily\PubSub\Services\EventPrepare;

...

$order = new Order([
    'user_id' => 11166541,
    'status'  => 0,
    'total'   => 5852,
]);

$eventSource = $di->get('eventsource');

$event = new EventPrepare($order, new OrderSerialize(['name' => 'docx']), 1);
$event->up($eventSource, 'order.created.-5');
	

模型 Order 应该实现 ModelInterface 接口。

从版本 2.* 开始,可以指定 exchange 并将其与路由关联。请参阅

use Chocofamily\PubSub\Services\EventPrepare;

...

$order = new Order([
    'user_id' => 11166541,
    'status'  => 0,
    'total'   => 5852,
]);

$eventSource = $di->get('eventsource');

$routeKey = 'order.created.-5';
$exchange = 'order';

$event = new EventPrepare($order, new OrderSerialize(['name' => 'docx']), 1);
$event->up($eventSource, $routeKey, $exchange);
	

up 方法的工作原理

  • db 事务开始
  • order->save();
  • eventModel->save()
  • db 事务提交
  • 事件发布

事件重新发送

用于重新发送事件的类为 Chocofamily\PubSub\Services\EventRepeater。工作示例

use Chocofamily\PubSub\Services\EventRepeater;

...

$dateStart = \DateTime::createFromFormat('Y-m-d', '2018-01-01');

$eventDataProvider = new Chocofamily\PubSub\Provider\Event($di->get('eventsource'), $dateStart);

try {
    $event = new EventRepeater($eventDataProvider);
    $event->retry();
} catch (\Exception $e) {
    $message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine());
    $di->get('logger')->error($message);
}

事件日志清理

用于清理事件使用类 Chocofamily\PubSub\Services\EventCleanerclean 方法。

工作示例

use Chocofamily\PubSub\Services\EventCleaner;

...

try {
    $event = new EventCleaner($di->get('modelsManager'));
    $event->clean();
} catch (ModelException $e) {
    $message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine());
    $di->get('logger')->error($message);
}

默认删除超过1个月的事件。如果将日期作为构造函数的第二个参数传递,则将删除到指定日期的所有事件

use Chocofamily\PubSub\Services\EventCleaner;

...

$dateTime  = new \DateTime();
$dateTime = $dateTime->modify('-1 day');

try {
    $event = new EventCleaner($di->get('modelsManager'), $dateTime);
    $event->clean();
} catch (ModelException $e) {
    $message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine());
    $di->get('logger')->error($message);
}

@todo

  • 编写事务接口并移除对框架的依赖