giudicelli / distributed-architecture-queue
PHP 分布式架构队列
该软件包的官方仓库似乎已不存在,因此软件包已被冻结。
0.6.0
2020-06-17 07:14 UTC
Requires
- php: >=7.2.5
- giudicelli/distributed-architecture: >=0.8.0
- psr/log: ~1.0
Requires (Dev)
- phpunit/phpunit: ^9
README
PHP 分布式架构队列是一个库,它扩展了 分布式架构。它实现了一个喂入/消费者系统,以便在分布式架构中易于快速使用。
安装
$ composer require giudicelli/distributed-architecture-queue
使用
要运行您的分布式架构队列,您主要需要使用两个类 Master\LauncherQueue 和 Slave\HandlerQueue。
主进程
以下是一个启动主进程的简单示例。
use giudicelli\DistributedArchitecture\Master\Handlers\GroupConfig; use giudicelli\DistributedArchitectureQueue\Master\Handlers\Consumer\Local\Config as LocalConsumerConfig; use giudicelli\DistributedArchitectureQueue\Master\Handlers\Consumer\Remote\Config as RemoteConsumerConfig; use giudicelli\DistributedArchitectureQueue\Master\Handlers\Feeder\Local\Config as LocalFeederConfig; use giudicelli\DistributedArchitectureQueue\Master\Handlers\Feeder\Remote\Config as RemoteFeederConfig; use giudicelli\DistributedArchitectureQueue\Master\LauncherQueue; use Psr\Log\AbstractLogger; class Logger extends AbstractLogger { public function log($level, $message, array $context = []) { foreach ($context as $key => $value) { $message = str_replace('{'.$key.'}', $value, $message); } echo "{$level} - {$message}\n"; flush(); } } $logger = new Logger(); $groupConfigs = [ (new GroupConfig()) ->setName('First Group') ->setCommand('script.php') ->setProcessConfigs([ (new LocalFeederConfig()) ->setBindTo('192.168.0.1') ->setPort(9999), (new RemoteConsumerConfig()) ->setHost('192.168.0.1') ->setPort(9999) ->setHosts(['remote-server1', 'remote-server2']) ->setInstancesCount(3), ]), ]; (new LauncherQueue($logger)) ->setMaxRunningTime(3600) ->run($groupConfigs);
上述代码创建了一个名为 "First Group" 的组,并将运行 "script.php"
- 1 个喂入实例在本地机器上启动,它将监听 192.168.0.1:9999,
- 3 个消费者实例在 "remote-server1" 机器上,
- 3 个消费者实例在 "remote-server2" 机器上。
所有 6 个消费者实例都将连接到监听 192.168.0.1:9999 的喂入实例。
"Master\LauncherQueue" 实例将在 1 小时后停止所有实例并返回。通常,在一段时间后重新启动主进程是一个好主意,以启动一个全新的环境。
请注意,"Master\LauncherQueue" 实例将无限期运行,除非您使用 SIGTERM 杀死它。
从进程
从进程必须使用 "Slave\HandlerQueue" 类,因为主进程将发送需要处理的命令。它还允许您的脚本在主进程请求时进行干净退出。单个脚本执行两种类型的任务,即喂入或消费者。
使用上述示例,以下是 "script.php" 的可能实现。
use giudicelli\DistributedArchitecture\Slave\HandlerInterface; use giudicelli\DistributedArchitectureQueue\Slave\HandlerQueue; use giudicelli\DistributedArchitectureQueue\Slave\Queue\Feeder\FeederInterface; use Psr\Log\LoggerInterface; if (empty($_SERVER['argv'][1])) { echo "Empty params\n"; die(); } /** * The is an example of a serializable job implementation. */ class Job implements \JsonSerializable { public $id = 0; public $type = ''; public function jsonSerialize() { return [ 'id' => $this->id, 'type' => $this->type, ]; } } /** * The is an example of a feeder queue implementation. It's returns the jobs that will be sent to the consumers. */ class Feeder implements FeederInterface { private $items = []; private $successes = []; private $errors = []; public function __construct() { $item = new Job(); $item->id = 1; $item->type = 'MyType'; $this->items[] = $item; $item = new Job(); $item->id = 2; $item->type = 'MyType'; $this->items[] = $item; $item = new Job(); $item->id = 3; $item->type = 'MyType'; $this->items[] = $item; } public function empty(): bool { return empty($this->items); } public function get(): ?\JsonSerializable { if ($this->empty()) { return null; } $item = $this->items[0]; array_splice($this->items, 0, 1); return $item; } public function success(\JsonSerializable $item): void { $this->successes[] = $item; } public function error(\JsonSerializable $item): void { $this->errors[] = $item; } } $handler = new HandlerQueue($_SERVER['argv'][1]); $handler->runQueue( // The consumer callback function (HandlerInterface $handler, array $item) { // Anything echoed here will be considered log level "info" by the master process. // If you want another level for certain messages, use $handler->getLogger(). // echo "Hello world!\n" is the same as $handler->getLogger()->info('Hello world!') // I received a job to handle, the job is an array form of the Job class. echo $item['type'].':'.$item['id']."\n"; }, // The feeder accesses the jobs queue new Feeder() );