hds-solutions/parallel-sdk

用于实现并行 PHP 扩展的 SDK

v2.1.4 2024-03-27 17:42 UTC

This package is auto-updated.

Last update: 2024-09-09 19:39:14 UTC


README

krakjoe/parallel PHP 扩展的实现。

Latest stable version License Total Downloads Monthly Downloads Required PHP version

PHP 8.0 PHP 8.1 PHP 8.2 PHP 8.3

即使没有安装 parallel 扩展,此库也能正常工作。在这种情况下,任务将以顺序执行。这允许您的代码在任何环境中部署,如果启用 parallel,您将获得并行处理的优势。

安装

依赖

您需要这些依赖来并行执行任务。

  • PHP >= 8.0 且 ZTS 已启用
  • parallel PECL 扩展

parallel 扩展的文档可以在 https://php.ac.cn/parallel 上找到。

通过 composer

composer require hds-solutions/parallel-sdk

用法

您应该设置并行线程的引导文件。设置 composer 的自动加载器就足够了。

// check if extension is loaded to allow deploying even in environments where parallel isn't installed
if (extension_loaded('parallel')) {
    // set the path to composer's autoloader
    parallel\bootstrap(__DIR__.'/vendor/autoload.php');
}

幕后,parallel 扩展创建了一个空的运行时(线程),其中执行任务。每个运行时都是一个干净、空白的、隔离的环境,不包含任何来自父线程/进程的预加载类、函数或自动加载器。这种隔离确保每个运行时以最小的占用开始。更多信息请参见参考#1#2

然后您定义一个 Worker 来处理任务。有两种选项

  1. 使用匿名函数作为 Worker
  2. 创建一个继承自 ParallelWorker 并实现 process() 方法的类。

然后您可以使用 Scheduler::runTask() 方法安排并行运行的任务。

引导 Laravel 应用程序

由于 ZTS 只在 cli 中可用,您应该在 artisan 文件中设置并行线程的引导文件。

#!/usr/bin/env php
<?php

define('LARAVEL_START', microtime(true));

require __DIR__.'/vendor/autoload.php';

$app = require_once __DIR__.'/bootstrap/app.php';

+ // check if parallel extension is loaded
+ if (extension_loaded('parallel')) {
+     // and register the bootstrap file for the threads
+     parallel\bootstrap(__DIR__.'/parallel.php');
+ }

然后,在并行线程的引导文件中,您只需获取应用实例并引导 Laravel 内核。这样您将拥有所有 Laravel 服务提供者注册。bootstrap/parallel.php

<?php declare(strict_types=1);

require __DIR__.'/../vendor/autoload.php';

$app = require_once __DIR__.'/app.php';

$kernel = $app->make(Illuminate\Contracts\Console\Kernel::class);

// bootstrap the Kernel
$kernel->bootstrap();

匿名工作线程

将匿名函数定义为 Worker 来处理任务。

use HDSSolutions\Console\Parallel\Scheduler;

Scheduler::using(static function(int $number): int {
    // here you do some work with the received data
    // this portion of code will run on a separated thread
    
    // example process
    $microseconds = random_int(100, 500);
    echo sprintf("AnonymousWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds);
    usleep($microseconds * 1000);
    // end example process
    
    // the data returned will be available later
    return $number;
});

工作线程实例

创建一个继承自 ParallelWorker 类的类。这可以用于复杂的进程并保持代码整洁。

ExampleWorker.php:

use HDSSolutions\Console\Parallel\ParallelWorker;

final class ExampleWorker extends ParallelWorker {

    protected function process(int $number = 0): int {
        // example process
        $microseconds = random_int(100, 500);
        echo sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds);
        usleep($microseconds * 1000);
        // end example process

        return $number;
    }

}
use HDSSolutions\Console\Parallel\Scheduler;

Scheduler::using(ExampleWorker::class);

您还可以向 Worker 的构造函数发送参数。

use HDSSolutions\Console\Parallel\ParallelWorker;

final class ExampleWorker extends ParallelWorker {

    public function __construct(
        private array $multipliers,
    ) {}

}
use HDSSolutions\Console\Parallel\Scheduler;

Scheduler::using(ExampleWorker::class, [ 2, 4, 8 ]);

安排任务

定义 Worker 后,您可以安排并行运行的任务。

use HDSSolutions\Console\Parallel\Scheduler;

foreach (range(1, 100) as $task_data) {
    try {
        // tasks will start as soon as a thread is available
        Scheduler::runTask($task_data);

    } catch (Throwable) {
        // if no Worker was defined, a RuntimeException will be thrown
        // also, Workers have some limitations, see Reference #3 for more info
    }
}

检查任务状态

每个任务都有一个状态。还有一些辅助函数可以检查当前任务状态。

use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

do {
    $all_processed = true;
    foreach (Scheduler::getTasks() as $task) {
        switch (true) {
            case $task->isPending():
                $all_processed = false;
                break;
    
            case $task->isBeingProcessed():
                $all_processed = false;
                break;
    
            case $task->wasProcessed():
                $result = $task->getOutput();
                break;
        }
    }
} while ($all_processed == false);

等待任务完成

而不是检查每个任务状态,您可以在继续代码执行之前等待所有任务完成。

use HDSSolutions\Console\Parallel\Scheduler;

// This will pause execution until all tasks are processed
Scheduler::awaitTasksCompletion();

您还可以指定等待时间限制。进程将在所有任务处理完毕或达到最大时间限制之前暂停,以先到者为准。

use HDSSolutions\Console\Parallel\Scheduler;

// Pause until all tasks are processed or until 15 minutes pass
Scheduler::awaitTasksCompletion(wait_until: new DateInterval('PT15M'));

获取处理任务的输出结果

use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

foreach (Scheduler::getTasks() as $task) {
    // you have access to the Worker class that was used to process the task
    $worker = $task->getWorkerClass();
    // and the result of the task processed
    $result = $task->getOutput();
}

删除待处理任务

如果您的进程需要提前停止,您可以停止处理队列中的任务。

use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

// this will remove tasks from the pending queue
Scheduler::removePendingTasks();

// after cleaning the queue, you should wait for tasks that are currently being processed to finish
Scheduler::awaitTasksCompletion();

$results = [];
$unprocessed_tasks = [];
foreach (Scheduler::getTasks() as $task) {
    if ($task->wasProcessed()) {
        $results[] = $task->getOutput();
    } else {
        // tasks that were not processed, will remain in the Pending state
        $unprocessed_tasks[] = $task;
    }
}

删除待处理/运行中的任务

如果需要,您可以从处理队列中删除特定的任务。

use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

foreach (Scheduler::getTasks() as $task) {
    // if for some reason you want to remove a task, or just want to free memory when a task finishes
    if (someValidation($task) || $task->wasProcessed()) {
        // this will remove the task from the processing queue
        // IMPORTANT: if the task is already running, it will be stopped
        Scheduler::removeTask($task);
    }
}

立即停止处理所有任务

如果您需要立即停止所有操作,可以调用 Scheduler::stop() 方法。这将立即停止处理所有任务。

use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

// this will stop processing tasks immediately
Scheduler::stop();

// in this state, Tasks should have 3 of the following states
foreach (Scheduler::getTasks() as $task) {
    switch (true) {
        case $task->isPending():
            // Task was never processed
            break;

        case $task->wasProcessed():
            // Task was processed by the Worker
            break;

        case $task->wasCancelled():
            // Task was cancelled while was being processed
            break;
    }
}

指定CPU核心数

您可以通过调用以下方法来控制要使用的最大CPU核心百分比或数量

use HDSSolutions\Console\Parallel\Scheduler;

Scheduler::setMaxCpuCountUsage(2);        // Use at max two CPU cores
Scheduler::setMaxCpuPercentageUsage(0.5); // Use at max 50% of the total of CPU cores

进度条

要求

  • symfony/console
  • 通过调用 withProgress() 方法为调用工作的进程启用进度条。
use HDSSolutions\Console\Parallel\Scheduler;

$tasks = range(1, 10);

Scheduler::using(ExampleWorker::class)
    ->withProgress(steps: count($tasks));

从工作进程的使用方法

可用方法包括

  • setMessage(string $message)
  • advance(int $steps)
  • setProgress(int $step)
  • display()
  • clear()
use HDSSolutions\Console\Parallel\ParallelWorker;

final class ExampleWorker extends ParallelWorker {

    protected function process(int $number = 0): int {
        // example process
        $microseconds = random_int(100, 500);
        $this->setMessage(sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds));
        usleep($microseconds * 1000);
        $this->advance();
        // end example process

        return $number;
    }

}

示例输出

 28 of 52: ExampleWorker >> Hello from task #123, I'll wait 604ms
 [===========================================>------------------------------------]  53%
 elapsed: 2 secs, remaining: 2 secs, ~13.50 items/s
 memory: 562 KiB, threads: 12x ~474 KiB, Σ 5,6 MiB ↑ 5,6 MiB

参考资料

  1. parallel\bootstrap()
  2. parallel\Runtime
  3. Parallel\Runtime::run() 任务特性

安全漏洞

如果您遇到任何与安全相关的问题,请随时在问题跟踪器上提交工单。

贡献

欢迎贡献!如果您发现任何问题或希望添加新功能或改进,请随时提交拉取请求。

贡献者

许可

此库是开源软件,根据 MIT 许可证 许可。有关更多信息,请参阅 许可证文件