giudicelli/distributed-architecture-queue

PHP 分布式架构队列

该软件包的官方仓库似乎已不存在,因此软件包已被冻结。

0.6.0 2020-06-17 07:14 UTC

This package is auto-updated.

Last update: 2022-03-01 00:35:39 UTC


README

PHP 分布式架构队列是一个库,它扩展了 分布式架构。它实现了一个喂入/消费者系统,以便在分布式架构中易于快速使用。

安装

$ composer require giudicelli/distributed-architecture-queue

使用

要运行您的分布式架构队列,您主要需要使用两个类 Master\LauncherQueue 和 Slave\HandlerQueue。

主进程

以下是一个启动主进程的简单示例。

use giudicelli\DistributedArchitecture\Master\Handlers\GroupConfig;
use giudicelli\DistributedArchitectureQueue\Master\Handlers\Consumer\Local\Config as LocalConsumerConfig;
use giudicelli\DistributedArchitectureQueue\Master\Handlers\Consumer\Remote\Config as RemoteConsumerConfig;
use giudicelli\DistributedArchitectureQueue\Master\Handlers\Feeder\Local\Config as LocalFeederConfig;
use giudicelli\DistributedArchitectureQueue\Master\Handlers\Feeder\Remote\Config as RemoteFeederConfig;
use giudicelli\DistributedArchitectureQueue\Master\LauncherQueue;
use Psr\Log\AbstractLogger;

class Logger extends AbstractLogger
{
    public function log($level, $message, array $context = [])
    {
        foreach ($context as $key => $value) {
            $message = str_replace('{'.$key.'}', $value, $message);
        }
        echo "{$level} - {$message}\n";
        flush();
    }
}

$logger = new Logger();

$groupConfigs = [
    (new GroupConfig())
        ->setName('First Group')
        ->setCommand('script.php')
        ->setProcessConfigs([
            (new LocalFeederConfig())
                ->setBindTo('192.168.0.1')
                ->setPort(9999),
            (new RemoteConsumerConfig())
                ->setHost('192.168.0.1')
                ->setPort(9999)
                ->setHosts(['remote-server1', 'remote-server2'])
                ->setInstancesCount(3),
        ]),
];

(new LauncherQueue($logger))
    ->setMaxRunningTime(3600)
    ->run($groupConfigs);

上述代码创建了一个名为 "First Group" 的组,并将运行 "script.php"

  • 1 个喂入实例在本地机器上启动,它将监听 192.168.0.1:9999,
  • 3 个消费者实例在 "remote-server1" 机器上,
  • 3 个消费者实例在 "remote-server2" 机器上。

所有 6 个消费者实例都将连接到监听 192.168.0.1:9999 的喂入实例。

"Master\LauncherQueue" 实例将在 1 小时后停止所有实例并返回。通常,在一段时间后重新启动主进程是一个好主意,以启动一个全新的环境。

请注意,"Master\LauncherQueue" 实例将无限期运行,除非您使用 SIGTERM 杀死它。

从进程

从进程必须使用 "Slave\HandlerQueue" 类,因为主进程将发送需要处理的命令。它还允许您的脚本在主进程请求时进行干净退出。单个脚本执行两种类型的任务,即喂入或消费者。

使用上述示例,以下是 "script.php" 的可能实现。

use giudicelli\DistributedArchitecture\Slave\HandlerInterface;
use giudicelli\DistributedArchitectureQueue\Slave\HandlerQueue;
use giudicelli\DistributedArchitectureQueue\Slave\Queue\Feeder\FeederInterface;
use Psr\Log\LoggerInterface;

if (empty($_SERVER['argv'][1])) {
    echo "Empty params\n";
    die();
}
/**
 * The is an example of a serializable job implementation.
 */
class Job implements \JsonSerializable
{
    public $id = 0;
    public $type = '';

    public function jsonSerialize()
    {
        return [
            'id' => $this->id,
            'type' => $this->type,
        ];
    }
}

/**
 * The is an example of a feeder queue implementation. It's returns the jobs that will be sent to the consumers.
 */
class Feeder implements FeederInterface
{
    private $items = [];
    private $successes = [];
    private $errors = [];

    public function __construct()
    {
        $item = new Job();
        $item->id = 1;
        $item->type = 'MyType';
        $this->items[] = $item;

        $item = new Job();
        $item->id = 2;
        $item->type = 'MyType';
        $this->items[] = $item;

        $item = new Job();
        $item->id = 3;
        $item->type = 'MyType';
        $this->items[] = $item;
    }

    public function empty(): bool
    {
        return empty($this->items);
    }

    public function get(): ?\JsonSerializable
    {
        if ($this->empty()) {
            return null;
        }

        $item = $this->items[0];
        array_splice($this->items, 0, 1);

        return $item;
    }

    public function success(\JsonSerializable $item): void
    {
        $this->successes[] = $item;
    }

    public function error(\JsonSerializable $item): void
    {
        $this->errors[] = $item;
    }
}

$handler = new HandlerQueue($_SERVER['argv'][1]);
$handler->runQueue(
    // The consumer callback
    function (HandlerInterface $handler, array $item) {

        // Anything echoed here will be considered log level "info" by the master process.
        // If you want another level for certain messages, use $handler->getLogger().
        // echo "Hello world!\n" is the same as $handler->getLogger()->info('Hello world!')

        // I received a job to handle, the job is an array form of the Job class.
        echo $item['type'].':'.$item['id']."\n";
    },
    // The feeder accesses the jobs queue
    new Feeder()
);