misterion/ko-worker

Ko-worker 项目是开发基于 AMQP 的消息派发器的底层框架

v1.3.1 2018-02-20 10:15 UTC

This package is not auto-updated.

Last update: 2024-09-14 15:48:22 UTC


README

Build Status Latest Stable Version Code Coverage Scrutinizer Code Quality Latest Unstable Version License

关于

Ko-worker 项目是一个 PHP 库,旨在抽象 RabbitMQ 上的队列和 RPC 消息模式。另一个目标是简化 RabbitMQ 上的消息派发和 RPC 服务器开发。

$m = new Ko\AmqpBroker($config);
$m->getProducer('upload_picture')->publish($message);

每次您开始使用类似队列的东西时,都应该决定如何处理您的消息。是的,这听起来并不复杂

  • 编写一些控制台应用程序或守护进程
  • 从队列中获取消息并在主进程或子进程中执行它们
  • 如果子进程失败,则重新启动子进程
  • 记录崩溃或其他情况
  • ...

而 Ko-Worker 允许您将此简化为仅两步

  • 创建配置文件
  • 运行 ko 应用程序
$ ./bin/ko --workers 5 --consumer upload_picture

为什么选择 Ko-worker?

  • 实际应用于高负载的 GameNet 项目
  • 接近 100% 的代码覆盖率
  • 简单 - 5 分钟内开始。

要求

PHP >= 5.4
pcntl extension installed
posix extension installed
amqp extension installed

安装

Composer

安装库的推荐方法是 composer。您可以在 Packagist 上查看 软件包信息

{
	"require": {
		"misterion/ko-worker": "*"
	}
}

不使用 composer?

只需克隆存储库并注意命名空间 Ko 的自动加载。

示例

查看 examples 文件夹

未完成

用法

首先,您应该创建您的 RabbitMq 配置文件。例如 config.yaml

connections:
  default:
    host:     'localhost'
    port:     5672
    login:    'guest'
    password: 'guest'
    vhost:    '/'
producers:
  social_activity:
    connection:       default
    exchange_options: {name: 'social_activity_exchange', type: direct, durable: 1, passive: 1}
consumers:
  social_activity:
    connection:    default
    queue_options:
      name: 'social_activity_queue'
      durable: 1
      autodelete: 1
      exclusive: 0
      qos: 5
      binding: {name: social_activity_exchange, routing-keys: *}
  class: \MyProject\TestAction

在此,我们配置了应用程序将拥有的交换生产者和队列消费者。在此示例中,Ko\AmqpBroker 包含生产者 social_activity 和名为 social_activity 的消费者。

您应该指定客户端的连接。

如果您需要添加可选的队列或交换参数,则选项可能如下所示

queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}

另一个示例,消息 TTL 为 20 秒

queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}

参数值必须是数据类型和值的列表。有效的数据类型包括

  • S - 字符串
  • I - 整数
  • D - 小数
  • T - 时间戳
  • F - 表
  • A - 数组

根据您的需求调整 arguments

如果您想将队列绑定到特定的路由键,您可以在生产者或消费者配置中声明它

queue_options:
    binding:
        - {name: "social_activity_exchange", routing_keys: 'social.#.addFriends'}
        - {name: "social_activity_exchange", routing_keys: '*.removeFriends'}

生产者、消费者、AqmBroker?

在消息应用中,向代理发送消息的过程称为 生产者,而接收这些消息的过程称为 消费者。在您的应用程序中,您将拥有多个它们,您可以在配置中的相应条目下列出它们。

生产者

生产者用于向服务器发送消息。在 AMQP 模型中,消息发送到 交换,这意味着在生产者的配置中,您将需要指定连接选项以及交换选项,这通常将是交换的名称和类型。

现在假设您想在后台处理一些社交网络活动。在您添加朋友后,您将使用以下信息向服务器发布消息

public function addFriendAction($name)
{
    $msg = array('user_id' => 1235, 'friend_id' => '67890');
    $this->broker->getProducer('social_activity')->publish(json_encode($msg));
}

除了消息本身外,Ko\RabbitMq\Producer#publish() 方法还接受一个可选的路由键参数和一个可选的附加属性数组。这样,例如,你可以更改应用程序的头部。

下一个问题是要有一个消费者,它会从队列中取出消息并相应地处理。

消费者

消费者将连接到服务器并开始一个等待接收消息以进行处理 循环。根据指定的此类消费者的 ,它将具有相应的行为。让我们回顾一下上面的消费者配置

consumers:
  social_activity:
    connection:    default
    queue_options:
      name: 'social_activity_queue'
      durable: 1
      autodelete: 1
      exclusive: 0
      qos: 5
      binding: {name: social_activity_exchange, routing-keys: *}
  class: \MyProject\TestAction

正如我们所见, 选项引用了 \MyProject\TestAction 类。它应该实现 Ko\Worker\ActionInterface

当消费者从服务器接收到消息时,它会创建并执行此类。如果您需要指定不同的类进行测试或调试,则可以在此处更改。

除了回调之外,我们还指定了要使用的连接方式,与使用 生产者 的方式相同。其余选项是 队列选项。在 队列选项 中,我们将提供一个 队列名称绑定

为什么?

正如我们所说的,AMQP 中的消息是发布到一个 交换机 的。这并不意味着消息已经到达一个 队列。为了实现这一点,我们首先需要创建这样的 队列,然后将其绑定到 交换机

酷的地方在于,你可以将多个 队列 绑定到一个 交换机 上,这样一条消息就可以到达多个目的地。

这种方法的优点是解耦了生产者和消费者。生产者不关心会有多少消费者处理它的消息。

它只需要确保消息到达服务器。这样我们就可以在不更改控制器代码的情况下扩展每次朋友添加时执行的操作。

现在,如何运行消费者?有一个命令可以执行,如下所示

$ ./bin/ko --c ../config_queue.yaml --q social_activity --w 10

这意味着什么?

我们正在执行 social_activity 消费者,告诉它应该使用 10 个子进程进行消费。

每次消费者从服务器接收消息时,它都会执行配置的回调,并将 AMQP 消息作为 AMQPEnvelope 类的实例传递。可以通过调用 $msg->getBody() 来获取消息体。

默认情况下,消费者将以 无限循环 的方式处理消息,其中 无限 是一个定义。

未完成

生产

Ko-worker 随附一个名为 ko-package 的实用工具,允许您从工作项目创建可执行的 phar。

$ ./bin/ko-package

ko-package version 0.0.1
Usage: ko-package [options] [operands]
Options:
  -p, --path <arg>        Path to application code
  -o, --output [<arg>]    Output file name
  -v, --version [<arg>]   Application version
  -n, --name [<arg>]      Application name
  -e, --exclude [<arg>]   Excluded folders like logs, test, etc.

致谢

Ko-worker 是作为 GameNet 项目 的一部分由 Nikolay Bondarenko(misterionkell at gmail.com)和 Vadim Sabirov(pr0head at gmail.com)编写的。我们使用了 Alvaro Videla 的有趣想法 Thumper 以及来自 RabbitMqBundle 的部分文档。

许可

MIT 许可下发布。

链接