yidas/worker-dispatcher

PHP 多进程任务分发器,用于管理工作进程

1.0.0 2020-07-27 07:35 UTC

This package is auto-updated.

Last update: 2024-08-24 03:47:38 UTC


README

PHP Worker Dispatcher


PHP 多进程任务分发器,用于管理工作进程

Latest Stable Version License

功能

  • 多进程 在原生 PHP-CLI 上实现

  • 任务分发 到每个工作进程

  • 优雅的接口 用于设置和使用

概要

演示

使用多进程根据 CPU 核心数生成工作进程来分发任务

\yidas\WorkerDispatcher::run([
    'tasks' => ["R4NEJ1", "F5KH83", "..."],
    'callbacks' => [
        // The callback is for each forked process with decentralized tasks
        'task' => function ($config, $workderId, $task) {
            // $task is one of the `tasks` assigned to each worker, ex. "F5KH83" for $workderId is 2
            $token = $task;
            $result = file_get_contents("https://example/v1/register-by-token/{$token}");
        },
    ],
]);

使用多进程从队列中处理作业

\yidas\WorkerDispatcher::run([
    'tasks' => false,
    'callbacks' => [
        // The callback is for each forked process
        'process' => function ($config, $workderId, $task) {
            // Get and handle each job from queue in inifite loop (You need to define your own function)
            while (true) {
                $result = handleOneJobFromQueue();
                if ($result === null) {
                    break;
                }
            }
        },
    ],
]);

介绍

这个库是通过 PHP PCNTL 控制实现的,它提供了一个主要的 PHP-CLI 来 fork 多个子进程以共享任务,甚至可以使用无限循环设置来用于高并发应用。

由于 PHP 没有内置的共享变量或队列机制,如果没有外部作业队列,这个库提供了一个任务平均分发器,以简单地解决核心分布式处理问题。

要求

这个库需要以下内容

  • PHP PCNTL
  • PHP CLI 5.4.0+

安装

在项目中运行 Composer

composer require yidas/worker-dispatcher ~1.0.0

然后你可以在 PHP 项目中加载 Composer 后使用这个类

require __DIR__ . '/vendor/autoload.php';

use yidas\WorkerDispatcher;

使用

通过静态调用具有参数的 run() 方法,WorkerDispatcher 将开始分发任务(如果有),然后根据环境或设置 fork 指定数量的工作进程,并等待所有 forked 进程完成或终止主进程。

以下是一个设置示例,包含所有选项

\yidas\WorkerDispatcher::run([
    'debug' => true,
    'workers' => 4,
    'config' => ['uri' => "/v1/resource"],
    'tasks' => ["R4NEJ1", "F5KH83", "..."],
    'callbacks' => [
        'process' => function ($config, $workerId, $tasks) {
            echo "The number of tasks in forked process - {$workerId}: " . count($tasks[$workerId - 1]) . "\n";
        },
        'task' => function ($config, $workerId, $task) {
            echo "Forked process - {$workerId}: Request to {$config['uri']} with token {$task}\n";
        },
    ],
]);

选项

callbacks.process

创建每个 forked 进程后调用的回调函数

function (multitype $config, integer $workerId, array $tasks)

callbacks.task

每个 forked 进程的任务循环中调用的回调函数

function (multitype $config, integer $workerId, multitype $task)