chocofamilyme / pubsub
实现 Pub/Sub 模式的库
Requires
- php: >= 7.1
- ext-json: *
- ext-phalcon: >= 3.0.0
- chocofamilyme/pathcorrelation: 0.2.*
- php-amqplib/php-amqplib: ^3.1
Requires (Dev)
- codeception/codeception: ^3.0
- dev-master
- 4.0.0
- 3.5.0
- 3.4.0
- 3.3.2
- 3.3.1
- 3.3
- 3.2.2
- 3.2.1
- 3.2.0
- 3.1.0
- 3.0.6
- 3.0.5
- 3.0.4
- 3.0.3
- 3.0.2
- 3.0.1
- 3.0.0
- 2.2.1
- 2.1.2
- 2.1.1
- 2.1.0
- 2.0.7
- 2.0.6
- 2.0.5
- 2.0.4
- 2.0.3
- 2.0.2
- 2.0.1
- 2.0.0
- 1.1.6
- 1.1.5
- 1.1.4
- 1.1.3
- 1.1.2
- 1.1.1
- 1.1.0
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0.0
- 0.1.4
- 0.1.3
- 0.1.2
- 0.1.1
- 0.1.0
- 0.0.5
- 0.0.4
- 0.0.3
- 0.0.2
- dev-input.getmessage
- dev-CT-2605
- dev-kulumbayev
- dev-heartbeat
- dev-CT-1617
- dev-more-options
- dev-reset-conn
- dev-ludovicose-patch-1
- dev-add-modelclass
- dev-CT-1083
- dev-AddRoutingKeyInHeaders
- dev-CT-824
- dev-CT-810
- dev-CT-632
- dev-CT-808
- dev-multi_route
- dev-butch
This package is auto-updated.
Last update: 2024-09-19 15:04:09 UTC
README
该库实现了应用程序的事件驱动架构(Event-Driven Architecture)。它支持 Phalcon 3.x 框架,但也可以轻松适配其他框架。
工作示例请参考这里:https://github.com/chocofamilyme/pubsub/tree/master/examples
功能
- ORM 模型的事务性保存和事件发布
- 无事务发布事件
- 订阅事件
- 在需要时将事件重新发送到同一队列
- 保存所有未处理和过期的消息到公共队列。可以从这个队列中将消息保存到数据库中,然后单独处理
要求
- Phalcon 3.x+
- PHP 7.0+
安装
composer require chocofamilyme/pubsub
配置
目前该库仅支持 RabbitMQ,但可以添加其他支持。
配置配置文件
return [ 'eventsource' => [ 'default' => 'rabbitmq', 'drivers' => [ 'rabbitmq' => [ 'adapter' => 'RabbitMQ', 'hosts' => [ [ 'host' => 'host', 'port' => 5672, 'user' => 'user', 'password' => 'password', 'vhost' => '/', ], ], // Не объязательные параметры 'heartbeat' => 60, 'read_write_timeout' => 60, 'connection_timeout' => 60, 'wait_timeout' => 0, ], ], ], ];
完整列表请查看 - https://github.com/php-amqplib/php-amqplib
将代理添加到 DI 容器中
$di = \Phalcon\Di::getDefault(); $config = $di->get('config')->eventsource; $config = $config->drivers[$config->default]; $serviceName = $di->get('config')->domain; $cache = $di->get('cache'); $di->setShared('eventsource', function () use ($config, $serviceName, $cache) { $adapter = $config->adapter; $config = array_merge($config->toArray(), ['app_id' => $serviceName]); $class = 'Chocofamily\PubSub\Provider\\'.$adapter; $repeater = new Repeater($cache); return $class::getInstance($config, $repeater); } );
这里 $cache
是一个实现 Phalcon\Cache\BackendInterface
接口的对象。缓存用于计算特定消息的重试次数。
参数设置表
使用
发布
可以使用 Chocofamily\PubSub\Publisher
类发布消息。以下是一个最小的工作示例
$publisher = new Publisher($di->getShared('eventsource')); $payload = [ 'event_id' => 11995, 'name' => 'docx', 'age' => 25 ]; $routeKey = 'order.created'; $publisher->send($payload, $routeKey);
对于 RabbitMQ,变量 $routeKey
至少由两个部分组成,用点 .
分隔。例如 order.created
。Exchange 的名称将包含第一个部分,即 order
。之后,如果进入 rabbitmq 的管理员界面,应该会创建一个名为 order
的 exchange。从版本 2.* 开始,可以指定 exchange
,例如 $routeKey, 示例
$publisher = new Publisher($di->getShared('eventsource')); $payload = [ 'event_id' => 11995, 'name' => 'docx', 'age' => 25 ]; $exchange = 'order'; $routeKey = 'order.created'; $publisher->send($payload, $routeKey, $exchange);
订阅事件
使用 Chocofamily\PubSub\Subscriber
类订阅事件。以下是一个最小的工作示例
$params = [ 'queue_name' => 'restapi_orderx', ]; $taskName = 'your_task_name'; $subscriber = new Subscriber($di->getShared('eventsource'), 'order.created.*', $params, $taskName); $subscriber->subscribe(function ($headers, $body) { echo print_r($headers, 1). PHP_EOL; echo print_r($body, 1). PHP_EOL; });
从版本 2.* 开始,可以指定 exchange
并将其与路由关联。现在可以指定路由数组。示例
$params = [ 'queue_name' => 'restapi_orderx', ]; $taskName = 'your_task_name'; $routeKeys = [ 'order.created', 'order.payed', ]; $exchange = 'order'; $subscriber = new Subscriber($di->getShared('eventsource'), $routeKeys, $params, $taskName, $exchange); $subscriber->subscribe(function ($headers, $body) { echo print_r($headers, 1). PHP_EOL; echo print_r($body, 1). PHP_EOL; });
要返回消息到队列,需要在回调函数中抛出 Chocofamily\PubSub\Exceptions\RetryException
异常。消息最多可以重试 5 次,之后它会进入死信队列(exchange = DLX)。
可以传递以下设置到订阅者中
durable: bool — сохранять на диск данные
queue: array — настройки самой очереди
prefetch_count: int — количество предзагрузки сообщений
no_ack: — требуется ли подтверждение сообщений
app_id — уникальный ID приложения. Можно использовать для идентификации откуда собите пошло изначально
使用数据库事务发布
此方法对于确保数据库中实体保存和事件发布的原子性是必要的。以下图片很好地说明了它是如何工作的:[图片链接]
为此,需要创建一个 events 表
create table events ( id serial not null constraint events_pkey primary key, type smallint not null, model_id int not null, model_type varchar(100) not null, exchange varchar(100) not null, routing_key varchar(100) not null, payload json not null, status smallint not null, created_at timestamp default now() not null, updated_at timestamp );
示例使用
use Chocofamily\PubSub\Services\EventPrepare; ... $order = new Order([ 'user_id' => 11166541, 'status' => 0, 'total' => 5852, ]); $eventSource = $di->get('eventsource'); $event = new EventPrepare($order, new OrderSerialize(['name' => 'docx']), 1); $event->up($eventSource, 'order.created.-5');
模型 Order 应该实现 ModelInterface 接口。
从版本 2.* 开始,可以指定 exchange
并将其与路由关联。请参阅
use Chocofamily\PubSub\Services\EventPrepare; ... $order = new Order([ 'user_id' => 11166541, 'status' => 0, 'total' => 5852, ]); $eventSource = $di->get('eventsource'); $routeKey = 'order.created.-5'; $exchange = 'order'; $event = new EventPrepare($order, new OrderSerialize(['name' => 'docx']), 1); $event->up($eventSource, $routeKey, $exchange);
up
方法的工作原理
- db 事务开始
- order->save();
- eventModel->save()
- db 事务提交
- 事件发布
事件重新发送
用于重新发送事件的类为 Chocofamily\PubSub\Services\EventRepeater
。工作示例
use Chocofamily\PubSub\Services\EventRepeater; ... $dateStart = \DateTime::createFromFormat('Y-m-d', '2018-01-01'); $eventDataProvider = new Chocofamily\PubSub\Provider\Event($di->get('eventsource'), $dateStart); try { $event = new EventRepeater($eventDataProvider); $event->retry(); } catch (\Exception $e) { $message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine()); $di->get('logger')->error($message); }
事件日志清理
用于清理事件使用类 Chocofamily\PubSub\Services\EventCleaner
的 clean
方法。
工作示例
use Chocofamily\PubSub\Services\EventCleaner; ... try { $event = new EventCleaner($di->get('modelsManager')); $event->clean(); } catch (ModelException $e) { $message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine()); $di->get('logger')->error($message); }
默认删除超过1个月的事件。如果将日期作为构造函数的第二个参数传递,则将删除到指定日期的所有事件
use Chocofamily\PubSub\Services\EventCleaner; ... $dateTime = new \DateTime(); $dateTime = $dateTime->modify('-1 day'); try { $event = new EventCleaner($di->get('modelsManager'), $dateTime); $event->clean(); } catch (ModelException $e) { $message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine()); $di->get('logger')->error($message); }
@todo
- 编写事务接口并移除对框架的依赖