baohan/swoole-gearman

基于Swoole和Gearman的多进程工作框架

1.8.0 2022-09-09 02:54 UTC

README

基于Swoole的多进程工作框架。

安装

$ composer require baohan/swoole-gearman

但首先你应该使用指定的版本安装运行时基础设施 swoolegearmanphp

通过Docker运行

> docker pull baohanddd/swoole-gearman:1.6.1
> docker pull artefactual/gearmand
> docker network create swoole-gearman
> docker run --rm --name=gearman --network=swoole-gearman artefactual/gearmand
> cd your_project/
> docker run --rm --name=worker --network=swoole-gearman -v=$(pwd):/data -w=/data baohanddd/swoole-gearman:1.6.1 php server.php

如何

快速开始

这是一个简单的服务器,我们运行服务器并启动一些等待下一个任务的工人。

use baohan\SwooleGearman\Collection;
use baohan\SwooleGearman\Server;
use Monolog\Logger;

require('vendor/autoload.php');

try {
    $s = new Server(Logger::INFO);
    $s->worker_num = 1;
    $s->task_worker_num = 19;
    $s->addCallback('timestamp::print', function () {
        return new class() extends \baohan\SwooleGearman\Job {
            public function execute(Collection $payload, int $workerId): bool {
                sleep(1);
                echo "worker[{$workerId}] => ".$payload['message'].PHP_EOL;
                return true;
            }
        };
    });
    $s->start();
} catch (Throwable $e) {
    echo $e->getMessage();
}

服务器将启动。

[2021-08-20 05:52:59] swoole-gearman.INFO: server starting... {"host":"127.0.0.1","port":9500,"worker_num":1,"task_worker_num":19,"task_max_request":500} []

让我们写一个简单的客户端

use Swoole\Client;

$client = new Client(SWOOLE_SOCK_TCP);
if (!$client->connect('127.0.0.1', 9500, 0.5))
{
    echo "connect failed. Error: {$client->errCode}\n";
}
$data = [
    'name' => 'timestamp::print',
    'data' => [
        'message' => ''
    ]
];
for ($i = 0; $i < 1024; $i++) {
    $data['data']['message'] = 'query on '.$i;
    $client->send(json_encode($data));
    echo $client->recv();
}
$client->close();

结果

......
worker[11] => query on 1019
worker[15] => query on 1023
worker[14] => query on 1022
worker[9] => query on 1017
worker[5] => query on 1013
worker[8] => query on 1016
worker[4] => query on 1012
worker[2] => query on 1010
worker[13] => query on 1021
worker[12] => query on 1020
worker[7] => query on 1015
......

现在我们尝试将任务发布到Gearman作业服务器。

为此,只需将 \baohan\SwooleGearman\Server 替换为 \baohan\SwooleGearman\Gearman。请查看,目前还没有需要 task_worker_num 选项,hostport 与 gearman-job-server 相同。

use baohan\SwooleGearman\Collection;
use Monolog\Logger;

require('vendor/autoload.php');

try {
    $s = new \baohan\SwooleGearman\Gearman(Logger::INFO);
    $s->worker_num = 10;
    $s->host = 'gearman';
    $s->port = '4730';
    $s->addCallback('timestamp::print', function () {
        return new class() extends \baohan\SwooleGearman\Job {
            public function execute(Collection $payload, int $workerId): bool {
                sleep(1);
                echo "worker[{$workerId}] => ".$payload['message'].PHP_EOL;
                return true;
            }
        };
    });
    $s->start();
} catch (Throwable $e) {
    echo $e->getMessage();
}

在客户端,我们通过gearman客户端将任务发布到gearman服务器而不是swoole服务器。

<?php
$gmc = new GearmanClient();
$gmc->addServer('gearman', '4730');
$data = [
    'name' => 'timestamp::print',
    'data' => [
        'message' => ''
    ]
];
for ($i = 0; $i < 1024; $i++) {
    $data['data']['message'] = $i + 1;
    $gmc->doBackground('timestamp::print', json_encode($data));
    if ($gmc->returnCode() != GEARMAN_SUCCESS) {
        echo "bad return code".PHP_EOL;
        exit;
    }
}

在这种模式下,巨大的优势是即使工作服务器崩溃也不会丢失任务,所有任务都存储在Gearman中。另一方面,所有客户端只需要知道gearman服务器的地址和端口,我们可以更容易地在不同的机器上部署多个Swoole服务器实例。

就是这样。