yidas / worker-dispatcher
PHP 多进程任务分发器,用于管理工作进程
1.0.0
2020-07-27 07:35 UTC
Requires
- php: >=5.4
This package is auto-updated.
Last update: 2024-08-24 03:47:38 UTC
README
PHP Worker Dispatcher
PHP 多进程任务分发器,用于管理工作进程
功能
-
多进程 在原生 PHP-CLI 上实现
-
任务分发 到每个工作进程
-
优雅的接口 用于设置和使用
概要
演示
使用多进程根据 CPU 核心数生成工作进程来分发任务
\yidas\WorkerDispatcher::run([ 'tasks' => ["R4NEJ1", "F5KH83", "..."], 'callbacks' => [ // The callback is for each forked process with decentralized tasks 'task' => function ($config, $workderId, $task) { // $task is one of the `tasks` assigned to each worker, ex. "F5KH83" for $workderId is 2 $token = $task; $result = file_get_contents("https://example/v1/register-by-token/{$token}"); }, ], ]);
使用多进程从队列中处理作业
\yidas\WorkerDispatcher::run([ 'tasks' => false, 'callbacks' => [ // The callback is for each forked process 'process' => function ($config, $workderId, $task) { // Get and handle each job from queue in inifite loop (You need to define your own function) while (true) { $result = handleOneJobFromQueue(); if ($result === null) { break; } } }, ], ]);
介绍
这个库是通过 PHP PCNTL 控制实现的,它提供了一个主要的 PHP-CLI 来 fork 多个子进程以共享任务,甚至可以使用无限循环设置来用于高并发应用。
由于 PHP 没有内置的共享变量或队列机制,如果没有外部作业队列,这个库提供了一个任务平均分发器,以简单地解决核心分布式处理问题。
要求
这个库需要以下内容
- PHP PCNTL
- PHP CLI 5.4.0+
安装
在项目中运行 Composer
composer require yidas/worker-dispatcher ~1.0.0
然后你可以在 PHP 项目中加载 Composer 后使用这个类
require __DIR__ . '/vendor/autoload.php'; use yidas\WorkerDispatcher;
使用
通过静态调用具有参数的 run()
方法,WorkerDispatcher 将开始分发任务(如果有),然后根据环境或设置 fork 指定数量的工作进程,并等待所有 forked 进程完成或终止主进程。
以下是一个设置示例,包含所有选项
\yidas\WorkerDispatcher::run([ 'debug' => true, 'workers' => 4, 'config' => ['uri' => "/v1/resource"], 'tasks' => ["R4NEJ1", "F5KH83", "..."], 'callbacks' => [ 'process' => function ($config, $workerId, $tasks) { echo "The number of tasks in forked process - {$workerId}: " . count($tasks[$workerId - 1]) . "\n"; }, 'task' => function ($config, $workerId, $task) { echo "Forked process - {$workerId}: Request to {$config['uri']} with token {$task}\n"; }, ], ]);
选项
callbacks.process
创建每个 forked 进程后调用的回调函数
function (multitype $config, integer $workerId, array $tasks)
callbacks.task
每个 forked 进程的任务循环中调用的回调函数
function (multitype $config, integer $workerId, multitype $task)