prooph/psb-zeromq-producer

Prooph 服务总线 ZeroMQ 消息生产者

v0.3 2015-11-23 17:04 UTC

This package is auto-updated.

Last update: 2024-09-17 01:21:43 UTC


README

Build Status Coverage Status Gitter

使用 ZeroMQ 作为 Prooph 服务总线 的消息生产者。与 CommandBus、EventBus 等总线类型协同工作。

重要

此库将支持至 2019 年 12 月 31 日,之后将被弃用。

有关更多信息,请参阅官方公告:[链接](https://www.sasaprolic.com/2018/08/the-future-of-prooph-components.html)

要求

PHP 不自带对 ZeroMQ 的支持,但有一个名为 ext-zmq 的扩展,ZMQ 网站上有 PHP 绑定的说明。

https://zeromq.cn/bindings:php

安装

在您的服务器/开发机器上安装 ext-zmq 之后,您就准备好进行下一步了!Composer 可以在几秒钟内(甚至更快)安装 prooph zeromq producer。运行以下命令通过 composer 安装。

composer require prooph/psb-zeromq-producer:~0.2

命令/事件总线 (PUB/SUB)

要构建您的命令/事件总线,您需要一个运行 ZMQ 服务器的服务器,并具有 ZMQ::SOCKET_SUB,这样它就可以从生产者接收消息。

有关 PUB/SUB 的基本教程:[链接](https://zguide.zeromq.cn/page:all#Getting-the-Message-Out)

使用示例

$container = new Container;
$container['config'] = [
    'prooph' => [
        'zeromq_producer' => [
            'dsn' => 'tcp://127.0.0.1:5555', // ZMQ Server Address.
            'persistent_id' => 'example', // ZMQ Persistent ID to keep connections alive between requests.
            'rpc' => false, // Use as Query Bus.
        ]
    ]
];

$factory = Prooph\ServiceBus\Message\ZeroMQ\Container\ZeroMQMessageProducerFactory;
$zmqProducer = $factory($container);

// Setup complete, now to add it to the prooph service bus.

$commandBus = new Prooph\ServiceBus\CommandBus();
$router = new Prooph\ServiceBus\Plugin\Router\CommandRouter();
$router->route('ExampleCommand')
    ->to($zmqProducer);

$commandBus->utilize($router);
$echoText = new ExampleCommand('It works');
$commandBus->dispatch($echoText);

// Now check your server to make sure it received this command.

查询总线 (REQ/REP)

要构建您的查询总线,您需要一个运行 ZMQ 服务器的服务器,并具有 ZMQ::SOCKET_REP,这样它就可以从生产者接收消息,并且必须根据 REQ/REP 规范进行回复。

有关 REQ/REP 的基本教程:[链接](https://zguide.zeromq.cn/page:all#Ask-and-Ye-Shall-Receive)

使用示例

// file: CLIENT.php

$container = new Container;
$container['config'] = [
    'prooph' => [
        'zeromq_producer' => [
            'dsn' => 'tcp://127.0.0.1:5556', // ZMQ Server Address.
            'persistent_id' => 'example', // ZMQ Persistent ID to keep connections alive between requests.
            'rpc' => true, // Use as Query Bus.
        ]
    ]
];

$factory = Prooph\ServiceBus\Message\ZeroMQ\Container\ZeroMQMessageProducerFactory;
$zmqProducer = $factory($container);

// Setup complete, now to add it to the prooph service bus.

$queryBus = new Prooph\ServiceBus\QueryBus();
$router = new Prooph\ServiceBus\Plugin\Router\QueryRouter();
$router->route('ExampleQuery')
    ->to($zmqProducer);

$queryBus->utilize($router);
$getText = new ExampleQuery('Hello Server.');
$promise = $queryBus->dispatch($getText);

$promise->then(function ($response) {
    var_dump($response); // string "Hello Client."
});

exit(0);

// file: SERVER.php
<?php

$context = new ZMQContext;
$socket = new ZMQSocket($context, ZMQ::SOCKET_REP);
$socket->bind('tcp://127.0.0.1:5556');

echo "ZMQ Stub Server Started.";

while ($message = $socket->recv()) {
    if ('Hello Server.' === $message) {
        $socket->send('Hello Client.');
    }
}


支持

贡献

请随意分叉并扩展现有功能或添加新功能,并将您的更改以 pull request 的形式发送!为了确保代码质量一致,请为所有更改提供单元测试,并可能更新文档。

许可证

New BSD 许可证 下发布。