ptrofimov / zeroevents
进程间的事件。基于 Illuminate/Events 和 ZeroMQ 构建
Requires
- php: >=5.4.0
- illuminate/events: >=4.2
Requires (Dev)
- moriony/php-zmq-stubs: dev-master
- phpunit/phpunit: 4.5.*
This package is not auto-updated.
Last update: 2024-09-14 15:16:33 UTC
README
进程间的事件。基于 Illuminate/Events 和 ZeroMQ 构建。
可用于构建分布式 PHP 应用程序和异步 事件 处理系统,其中使用 ZeroMQ 作为消息传输。
安装
使用 composer 安装包
composer require ptrofimov/zeroevents:3.*
快速入门
-
- 在第一个进程中:订阅所需的事件 EventListener。
Event::listen('my.events.*', new EventListener(['connect' => 'ipc://my.ipc']));
在 EventListener 构造函数中,可以传递选项数组(将与默认选项合并)或字符串(Config::get() 的选项路径)
-
- 在第二个进程中:定义事件监听器。然后运行事件服务,该服务将监听套接字上的消息并触发传入的事件。
Event::listen('my.events.*', function () { echo 'Catched event:', Event::firing(), PHP_EOL; }); (new EventService) ->listen(new EventListener(['bind' => 'ipc://my.ipc'])) ->run();
就是这样。每次第一个进程触发事件时,该事件将被传输到第二个进程并在那里触发。
EventSocket 类
EventSocket PHP 类继承自 ZMQSocket。它支持所有原生方法,如 connect 和 send,并添加了使用 ZeroMQ 发送和接收事件的额外方法。
方法
- encode(string event, array payload) - 在发送前将事件和有效负载序列化为帧数组
- 事件作为消息的第一个帧中的字符串传递
- 有效负载序列化为 JSON(默认)并传递到消息的后续帧中
- encode 方法用于 push 方法
- decode(array frames) - 从接收到的帧数组中反序列化事件和有效负载
- 返回数组 [event, payload]
- 事件应作为消息的第一个帧中的字符串传递
- 有效负载应作为消息后续帧中的序列化 JSON 传递
- decode 方法用于 pull 方法
连接套接字
您可以使用 ZMQSocket 构造函数的常规方式连接到 ZeroMQ 套接字,或者更好的是使用 EventListener,它根据配置选项为您提供连接的套接字。
use ZeroEvents\EventSocket; $socket = new EventSocket(new ZMQContext, ZMQ::SOCKET_PUSH); $socket->connect('ipc:///var/tmp/test.ipc');
推送事件
建议监听消息发送过程中发生的事件(例如,ttl 过期、连接失败)以处理它们。
Event::listen('zeroevents.push.error', function () { // logging or something else }); $socket->push('event', ['payload']);
EventListener 类
EventListener PHP 类应传递给事件调度器作为监听器回调。它具有魔法方法 __invoke,当事件被触发时调用。EventListener 会按需创建 EventSocket 实例(延迟连接)并调用 EventSocket::push 方法。
Event::listen('my.events.*', new EventListener('my.socket.config.key'));
构造函数接受字符串(配置键)或选项数组。
连接选项
$options = [ 'example.connection' => [ /* * Number of io-threads in context, default = 1 */ 'threads' => 1, /* * Persistent context is stored over multiple requests, default = false */ 'is_persistent' => false, /* * Socket type * * Full list of available types https://php.ac.cn/manual/en/class.zmq.php * Description of sockets http://zguide.zeromq.org/page:all#toc11 */ 'socket_type' => ZMQ::SOCKET_PUSH, /* * Default options for ZeroMQ socket */ 'socket_options' => [ ZMQ::SOCKOPT_LINGER => 2000, // wait before disconnect (ms) ZMQ::SOCKOPT_SNDTIMEO => 2000, // send message timeout (ms) ZMQ::SOCKOPT_RCVTIMEO => 2000, // receive message timeout (ms) ], /* * Addresses to bind. Only one process can bind address * * About available transports (inproc, ipc, tcp) http://zguide.zeromq.org/page:all#toc13 */ 'bind' => [ 'tcp://127.0.0.1:5555', ], /* * Addresses of sockets, the same time can be connected multiple addresses * * About available transports (inproc, ipc, tcp) http://zguide.zeromq.org/page:all#toc13 */ 'connect' => [ 'tcp://127.0.0.1:5555', ], /* * Type of events to subscribe. Events masks (*) are not here supported. * * Only useful for SOCKET_SUB socket type */ 'subscribe' => 'my.events', /* * Send/wait confirmation after sending/receiving message */ 'confirmed' => false, /* * Serializer Interface through which message payload is serialized before sending */ 'serializer' => 'ZeroEvents\Serializers\JsonSerializer' ], ];
EventService 类
该类用于监听传入的事件并触发它们。
轮询套接字
EventService 类能够同时监听多个套接字。
(new EventService) ->listen(new EventListener(['connect' => 'ipc://first.ipc'])) ->listen(new EventListener(['connect' => 'ipc://second.ipc'])) ->run();
空闲事件
您可以指定轮询时间 - 服务等待传入事件的最大时间,如果没有此类事件,它将触发 zeroevents.service.idle 事件,您可以在没有工作执行服务时处理该事件并执行自己的代码。
系统信号
EventService 类具有默认的 POSIX 系统信号处理器。它忽略 SIGHUP,并在 SIGTERM 和 SIGINT 信号上优雅地停止。当然,您也可以定义自己的系统信号处理器。
停止服务
通常,服务是通过系统信号停止的。但您也可以通过触发 zeroevents.service.stop 事件轻松地停止服务。
许可证
版权所有(c)2015 彼得·特罗菲莫夫
MIT 许可协议
特此授予任何人免费获得此软件及其相关文档文件(以下简称“软件”)副本的权利,以不受限制地处理该软件,包括但不限于使用、复制、修改、合并、发布、分发、许可和/或销售软件副本,并允许向软件提供者提供软件的人执行上述操作,前提是遵守以下条件
上述版权声明和本许可声明应包含在软件的所有副本或主要部分中。
软件按“原样”提供,不提供任何形式的保证,无论是明示的还是隐含的,包括但不限于适销性、特定用途的适用性和非侵权性保证。在任何情况下,作者或版权所有者不对任何索赔、损害或其他责任负责,无论是基于合同、侵权或其他原因,无论该索赔、损害或其他责任源于、因之或与软件或软件的使用或其他操作有关。