ptrofimov/zeroevents

进程间的事件。基于 Illuminate/Events 和 ZeroMQ 构建

3.2 2015-02-20 06:10 UTC

This package is not auto-updated.

Last update: 2024-09-14 15:16:33 UTC


README

Latest Stable Version Total Downloads License Build Status

进程间的事件。基于 Illuminate/Events 和 ZeroMQ 构建。

可用于构建分布式 PHP 应用程序和异步 事件 处理系统,其中使用 ZeroMQ 作为消息传输。

安装

使用 composer 安装包

composer require ptrofimov/zeroevents:3.*

快速入门

    1. 在第一个进程中:订阅所需的事件 EventListener。
Event::listen('my.events.*', new EventListener(['connect' => 'ipc://my.ipc']));

在 EventListener 构造函数中,可以传递选项数组(将与默认选项合并)或字符串(Config::get() 的选项路径)

    1. 在第二个进程中:定义事件监听器。然后运行事件服务,该服务将监听套接字上的消息并触发传入的事件。
Event::listen('my.events.*', function () {
    echo 'Catched event:', Event::firing(), PHP_EOL;
});

(new EventService)
    ->listen(new EventListener(['bind' => 'ipc://my.ipc']))
    ->run();

就是这样。每次第一个进程触发事件时,该事件将被传输到第二个进程并在那里触发。

EventSocket 类

EventSocket PHP 类继承自 ZMQSocket。它支持所有原生方法,如 connect 和 send,并添加了使用 ZeroMQ 发送和接收事件的额外方法。

方法

  • encode(string event, array payload) - 在发送前将事件和有效负载序列化为帧数组
  • 事件作为消息的第一个帧中的字符串传递
  • 有效负载序列化为 JSON(默认)并传递到消息的后续帧中
  • encode 方法用于 push 方法
  • decode(array frames) - 从接收到的帧数组中反序列化事件和有效负载
  • 返回数组 [event, payload]
  • 事件应作为消息的第一个帧中的字符串传递
  • 有效负载应作为消息后续帧中的序列化 JSON 传递
  • decode 方法用于 pull 方法

连接套接字

您可以使用 ZMQSocket 构造函数的常规方式连接到 ZeroMQ 套接字,或者更好的是使用 EventListener,它根据配置选项为您提供连接的套接字。

use ZeroEvents\EventSocket;

$socket = new EventSocket(new ZMQContext, ZMQ::SOCKET_PUSH);
$socket->connect('ipc:///var/tmp/test.ipc');

推送事件

建议监听消息发送过程中发生的事件(例如,ttl 过期、连接失败)以处理它们。

Event::listen('zeroevents.push.error', function () {
    // logging or something else
});

$socket->push('event', ['payload']);

EventListener 类

EventListener PHP 类应传递给事件调度器作为监听器回调。它具有魔法方法 __invoke,当事件被触发时调用。EventListener 会按需创建 EventSocket 实例(延迟连接)并调用 EventSocket::push 方法。

Event::listen('my.events.*', new EventListener('my.socket.config.key'));

构造函数接受字符串(配置键)或选项数组。

连接选项

$options = [

    'example.connection' => [

        /*
         * Number of io-threads in context, default = 1
         */

        'threads' => 1,

        /*
         * Persistent context is stored over multiple requests, default = false
         */

        'is_persistent' => false,

        /*
         * Socket type
         *
         * Full list of available types https://php.ac.cn/manual/en/class.zmq.php
         * Description of sockets http://zguide.zeromq.org/page:all#toc11
         */

        'socket_type' => ZMQ::SOCKET_PUSH,

        /*
         * Default options for ZeroMQ socket
         */

        'socket_options' => [
            ZMQ::SOCKOPT_LINGER => 2000, // wait before disconnect (ms)
            ZMQ::SOCKOPT_SNDTIMEO => 2000, // send message timeout (ms)
            ZMQ::SOCKOPT_RCVTIMEO => 2000, // receive message timeout (ms)
        ],

        /*
         * Addresses to bind. Only one process can bind address
         *
         * About available transports (inproc, ipc, tcp) http://zguide.zeromq.org/page:all#toc13
         */

        'bind' => [
            'tcp://127.0.0.1:5555',
        ],

        /*
         * Addresses of sockets, the same time can be connected multiple addresses
         *
         * About available transports (inproc, ipc, tcp) http://zguide.zeromq.org/page:all#toc13
         */

        'connect' => [
            'tcp://127.0.0.1:5555',
        ],

        /*
         * Type of events to subscribe. Events masks (*) are not here supported.
         *
         * Only useful for SOCKET_SUB socket type
         */

        'subscribe' => 'my.events',

        /*
         * Send/wait confirmation after sending/receiving message
         */

        'confirmed' => false,

        /*
        * Serializer Interface through which message payload is serialized before sending
        */

        'serializer' => 'ZeroEvents\Serializers\JsonSerializer'
    ],
];

EventService 类

该类用于监听传入的事件并触发它们。

轮询套接字

EventService 类能够同时监听多个套接字。

(new EventService)
    ->listen(new EventListener(['connect' => 'ipc://first.ipc']))
    ->listen(new EventListener(['connect' => 'ipc://second.ipc']))
    ->run();

空闲事件

您可以指定轮询时间 - 服务等待传入事件的最大时间,如果没有此类事件,它将触发 zeroevents.service.idle 事件,您可以在没有工作执行服务时处理该事件并执行自己的代码。

系统信号

EventService 类具有默认的 POSIX 系统信号处理器。它忽略 SIGHUP,并在 SIGTERMSIGINT 信号上优雅地停止。当然,您也可以定义自己的系统信号处理器。

停止服务

通常,服务是通过系统信号停止的。但您也可以通过触发 zeroevents.service.stop 事件轻松地停止服务。

许可证

版权所有(c)2015 彼得·特罗菲莫夫

MIT 许可协议

特此授予任何人免费获得此软件及其相关文档文件(以下简称“软件”)副本的权利,以不受限制地处理该软件,包括但不限于使用、复制、修改、合并、发布、分发、许可和/或销售软件副本,并允许向软件提供者提供软件的人执行上述操作,前提是遵守以下条件

上述版权声明和本许可声明应包含在软件的所有副本或主要部分中。

软件按“原样”提供,不提供任何形式的保证,无论是明示的还是隐含的,包括但不限于适销性、特定用途的适用性和非侵权性保证。在任何情况下,作者或版权所有者不对任何索赔、损害或其他责任负责,无论是基于合同、侵权或其他原因,无论该索赔、损害或其他责任源于、因之或与软件或软件的使用或其他操作有关。