hds-solutions / parallel-sdk
用于实现并行 PHP 扩展的 SDK
Requires
- php: >=8.0
Requires (Dev)
- phpunit/phpunit: ^9.6
- roave/security-advisories: dev-latest
- symfony/console: ^6.0
Suggests
- ext-parallel: Allows to run multi-threaded processes
- symfony/console: Allows usage of a shared ProgressBar between the Workers
README
krakjoe/parallel PHP 扩展的实现。
即使没有安装 parallel 扩展,此库也能正常工作。在这种情况下,任务将以顺序执行。这允许您的代码在任何环境中部署,如果启用 parallel,您将获得并行处理的优势。
安装
依赖
您需要这些依赖来并行执行任务。
- PHP >= 8.0 且 ZTS 已启用
- parallel PECL 扩展
parallel 扩展的文档可以在 https://php.ac.cn/parallel 上找到。
通过 composer
composer require hds-solutions/parallel-sdk
用法
您应该设置并行线程的引导文件。设置 composer 的自动加载器就足够了。
// check if extension is loaded to allow deploying even in environments where parallel isn't installed if (extension_loaded('parallel')) { // set the path to composer's autoloader parallel\bootstrap(__DIR__.'/vendor/autoload.php'); }
幕后,parallel 扩展创建了一个空的运行时(线程),其中执行任务。每个运行时都是一个干净、空白的、隔离的环境,不包含任何来自父线程/进程的预加载类、函数或自动加载器。这种隔离确保每个运行时以最小的占用开始。更多信息请参见参考#1和#2。
然后您定义一个 Worker
来处理任务。有两种选项
- 使用匿名函数作为
Worker
。 - 创建一个继承自
ParallelWorker
并实现process()
方法的类。
然后您可以使用 Scheduler::runTask()
方法安排并行运行的任务。
引导 Laravel 应用程序
由于 ZTS 只在 cli 中可用,您应该在 artisan 文件中设置并行线程的引导文件。
#!/usr/bin/env php <?php define('LARAVEL_START', microtime(true)); require __DIR__.'/vendor/autoload.php'; $app = require_once __DIR__.'/bootstrap/app.php'; + // check if parallel extension is loaded + if (extension_loaded('parallel')) { + // and register the bootstrap file for the threads + parallel\bootstrap(__DIR__.'/parallel.php'); + }
然后,在并行线程的引导文件中,您只需获取应用实例并引导 Laravel 内核。这样您将拥有所有 Laravel 服务提供者注册。bootstrap/parallel.php
<?php declare(strict_types=1); require __DIR__.'/../vendor/autoload.php'; $app = require_once __DIR__.'/app.php'; $kernel = $app->make(Illuminate\Contracts\Console\Kernel::class); // bootstrap the Kernel $kernel->bootstrap();
匿名工作线程
将匿名函数定义为 Worker
来处理任务。
use HDSSolutions\Console\Parallel\Scheduler; Scheduler::using(static function(int $number): int { // here you do some work with the received data // this portion of code will run on a separated thread // example process $microseconds = random_int(100, 500); echo sprintf("AnonymousWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); usleep($microseconds * 1000); // end example process // the data returned will be available later return $number; });
工作线程实例
创建一个继承自 ParallelWorker
类的类。这可以用于复杂的进程并保持代码整洁。
ExampleWorker.php
:
use HDSSolutions\Console\Parallel\ParallelWorker; final class ExampleWorker extends ParallelWorker { protected function process(int $number = 0): int { // example process $microseconds = random_int(100, 500); echo sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); usleep($microseconds * 1000); // end example process return $number; } }
use HDSSolutions\Console\Parallel\Scheduler; Scheduler::using(ExampleWorker::class);
您还可以向 Worker 的构造函数发送参数。
use HDSSolutions\Console\Parallel\ParallelWorker; final class ExampleWorker extends ParallelWorker { public function __construct( private array $multipliers, ) {} }
use HDSSolutions\Console\Parallel\Scheduler; Scheduler::using(ExampleWorker::class, [ 2, 4, 8 ]);
安排任务
定义 Worker 后,您可以安排并行运行的任务。
use HDSSolutions\Console\Parallel\Scheduler; foreach (range(1, 100) as $task_data) { try { // tasks will start as soon as a thread is available Scheduler::runTask($task_data); } catch (Throwable) { // if no Worker was defined, a RuntimeException will be thrown // also, Workers have some limitations, see Reference #3 for more info } }
检查任务状态
每个任务都有一个状态。还有一些辅助函数可以检查当前任务状态。
use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\Task; do { $all_processed = true; foreach (Scheduler::getTasks() as $task) { switch (true) { case $task->isPending(): $all_processed = false; break; case $task->isBeingProcessed(): $all_processed = false; break; case $task->wasProcessed(): $result = $task->getOutput(); break; } } } while ($all_processed == false);
等待任务完成
而不是检查每个任务状态,您可以在继续代码执行之前等待所有任务完成。
use HDSSolutions\Console\Parallel\Scheduler; // This will pause execution until all tasks are processed Scheduler::awaitTasksCompletion();
您还可以指定等待时间限制。进程将在所有任务处理完毕或达到最大时间限制之前暂停,以先到者为准。
use HDSSolutions\Console\Parallel\Scheduler; // Pause until all tasks are processed or until 15 minutes pass Scheduler::awaitTasksCompletion(wait_until: new DateInterval('PT15M'));
获取处理任务的输出结果
use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\Task; foreach (Scheduler::getTasks() as $task) { // you have access to the Worker class that was used to process the task $worker = $task->getWorkerClass(); // and the result of the task processed $result = $task->getOutput(); }
删除待处理任务
如果您的进程需要提前停止,您可以停止处理队列中的任务。
use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\Task; // this will remove tasks from the pending queue Scheduler::removePendingTasks(); // after cleaning the queue, you should wait for tasks that are currently being processed to finish Scheduler::awaitTasksCompletion(); $results = []; $unprocessed_tasks = []; foreach (Scheduler::getTasks() as $task) { if ($task->wasProcessed()) { $results[] = $task->getOutput(); } else { // tasks that were not processed, will remain in the Pending state $unprocessed_tasks[] = $task; } }
删除待处理/运行中的任务
如果需要,您可以从处理队列中删除特定的任务。
use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\Task; foreach (Scheduler::getTasks() as $task) { // if for some reason you want to remove a task, or just want to free memory when a task finishes if (someValidation($task) || $task->wasProcessed()) { // this will remove the task from the processing queue // IMPORTANT: if the task is already running, it will be stopped Scheduler::removeTask($task); } }
立即停止处理所有任务
如果您需要立即停止所有操作,可以调用 Scheduler::stop()
方法。这将立即停止处理所有任务。
use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\Task; // this will stop processing tasks immediately Scheduler::stop(); // in this state, Tasks should have 3 of the following states foreach (Scheduler::getTasks() as $task) { switch (true) { case $task->isPending(): // Task was never processed break; case $task->wasProcessed(): // Task was processed by the Worker break; case $task->wasCancelled(): // Task was cancelled while was being processed break; } }
指定CPU核心数
您可以通过调用以下方法来控制要使用的最大CPU核心百分比或数量
use HDSSolutions\Console\Parallel\Scheduler; Scheduler::setMaxCpuCountUsage(2); // Use at max two CPU cores Scheduler::setMaxCpuPercentageUsage(0.5); // Use at max 50% of the total of CPU cores
进度条
要求
symfony/console
包- 通过调用
withProgress()
方法为调用工作的进程启用进度条。
use HDSSolutions\Console\Parallel\Scheduler; $tasks = range(1, 10); Scheduler::using(ExampleWorker::class) ->withProgress(steps: count($tasks));
从工作进程的使用方法
可用方法包括
setMessage(string $message)
advance(int $steps)
setProgress(int $step)
display()
clear()
use HDSSolutions\Console\Parallel\ParallelWorker; final class ExampleWorker extends ParallelWorker { protected function process(int $number = 0): int { // example process $microseconds = random_int(100, 500); $this->setMessage(sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds)); usleep($microseconds * 1000); $this->advance(); // end example process return $number; } }
示例输出
28 of 52: ExampleWorker >> Hello from task #123, I'll wait 604ms [===========================================>------------------------------------] 53% elapsed: 2 secs, remaining: 2 secs, ~13.50 items/s memory: 562 KiB, threads: 12x ~474 KiB, Σ 5,6 MiB ↑ 5,6 MiB
参考资料
安全漏洞
如果您遇到任何与安全相关的问题,请随时在问题跟踪器上提交工单。
贡献
欢迎贡献!如果您发现任何问题或希望添加新功能或改进,请随时提交拉取请求。