wapmorgan/threadable

易于使用的线程库,提供所有基本功能,以便在并行模式下运行您的代码

0.0.6 2018-07-23 18:13 UTC

This package is auto-updated.

Last update: 2024-09-05 04:07:06 UTC


README

易于使用的线程库,提供所有基本功能以在后台模式下执行工作。

您需要安装的软件

  • pcntl
  • posix
  • sockets

此库也可以在模拟模式下工作,其中不执行实际的分叉。所有工作都在一个主线程中完成。如果pnctl扩展不可用或您在Worker构造函数中指定它,则启用此模式。

Latest Stable Version Latest Unstable Version License

  1. 结构
    • 什么是Worker
      • 如何创建您的Worker
    • 什么是WorkersPool
  2. 简单用法
  3. 它是如何工作的
    • 一个worker
    • 使用WorkersPool的少量worker
  4. API
    • Worker API
    • WorkersPool API
  5. 预定义worker
    • DownloadWorker
  6. 用例

结构

什么是Worker

Worker - 是任何worker的基本类。它由两部分组成(在物理上,存储在一个类中,但提供不同的功能)

  1. Worker - 一个单独的线程,执行所有后台工作。
  2. Worker管理器 - worker线程的操纵器。

如何创建您的Worker

您需要扩展wapmorgan\Threadable\Worker类并重新实现onPayload($data)公共方法。

例如

use wapmorgan\Threadable\Worker;
class SleepingWorker extends Worker
{
    public function onPayload($data)
    {
        echo 'I have started at '.date('r').PHP_EOL;
        sleep(3);
        echo 'I have ended at '.date('r').PHP_EOL;
        
        return true;
    }
}

什么是WorkersPool

WorkersPool (wapmorgan\Threadable\WorkersPool) - 是worker的容器,用于处理类似任务。它负责所有维护、负载分配和worker的生命周期。允许您动态更改池的大小和其他有用的功能。

简单用法

例如,您想只进行后台下载工作。让我们使用wapmorgan\Threadable\BackgroundWork类在后台执行它并显示进度(或存储在数据库中/...)。

您需要执行的操作

  1. DownloadWorker准备负载
  2. 分别使用BackgroundWork::doInBackground()BackgroundWork::doInBackgroundParallel()启动一个线程或多个线程。

第一阶段. 准备负载

DownloadWorker需要一个包含sourcetarget元素的数组。准备它

use wapmorgan\Threadable\BackgroundWork;
use wapmorgan\Threadable\DownloadWorker;
use wapmorgan\Threadable\Worker;

$file_sources = ['https://yandex.ru/images/today?size=1920x1080', 'http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip'];
$files = [];
foreach ($file_sources as $file_to_download) {
    $files[] = [
        'source' => $file_to_download,
        'size' => DownloadWorker::getRemoteFileSize($file_to_download),
        'target' => tempnam(sys_get_temp_dir(), 'thrd_test'),
    ];
}

第二阶段. 在后台启动

单线程worker

使用doInBackground函数在一个线程中运行它。签名如下

doInBackground(Worker $worker, array $payloads, callable $payloadHandlingCallback = null, callable $onPayloadFinishCallback = null, $sleepMicroTime = 1000)

  • $worker - worker的实例。
  • $payloads - 所有负载的数组。
  • $payloadHandlingCallback - 一个回调,每隔$sleepMicrotime微秒调用一次,带有有关当前正在运行的负载的信息。回调的签名:(Worker $worker, int $payloadI, $payloadData)
  • $onPayloadFinishCallback - 当worker完成一个负载时调用的回调。回调的签名:(Worker $worker, int $payloadI, $payloadData, $payloadResult)

因此,收集所有信息来运行它

$result = BackgroundWork::doInBackground(new DownloadWorker(), $files,
    function (Worker $worker, $payloadI, $payloadData) {
        clearstatcache(true, $payloadData['target']);
        echo "\r" . '#' . ($payloadI + 1) . '. ' . basename($payloadData['source']) . ' downloading ' . round(filesize($payloadData['target']) * 100 / $payloadData['size'], 2) . '%';
    },
    function (Worker $worker, $payloadI, $payloadData, $payloadResult) {
        echo "\r" . '#' . ($payloadI + 1) . '. ' . basename($payloadData['source']) . ' successfully downloaded' . PHP_EOL;
        return true;
    }
);
if ($result)
    echo 'All files downloaded successfully'.PHP_EOL;

示例在bin/example_file_downloading_easy文件中。

多线程worker

要使用doInBackgroundParallel在多个线程中运行它。它几乎与单线程函数具有相同的签名

doInBackgroundParallel(Worker $worker, array $payloads, callable $payloadHandlingCallback = null, callable $onPayloadFinishCallback = null, $sleepMicroTime = 1000, $poolSize = self::BY_CPU_NUMBER)

通过调整$poolSize,您可以选择应使用的worker数量。

示例在bin/example_file_downloading_pool_easy文件中。

它是如何工作的

一个worker

如果您只需要并行执行一些工作并在另一个线程中完成它,您可以仅使用Worker类,无需任何其他依赖项。

要正确使用它,您需要了解工作线程的生命周期。

  1. 工作线程在另一个线程中启动。为此,请调用start()
  2. 工作线程接受新的负载并开始处理它。为此,请调用sendPayload(array $data)。实际上,工作线程管理器通过本地套接字发送负载。工作线程开始处理它,并在完成后通过相同的套接字返回工作结果。
  3. 工作线程管理器检查工作线程是否完成,并读取工作结果。为此,请调用checkForFinish()
  4. 工作线程可以通过stop()kill()方法分别停止或被杀死。
  5. 工作线程管理器检查工作线程是否完成,并标记自身为已终止。为此,请调用checkForTermination()

后台工作分为两步,其中工作线程运行具有实际负载的类的onPayload($data)方法。

总之,这是一个在另一个线程中下载文件并实时显示进度的示例。

设置和结构

// Implement class-downloader
class DownloadWorker extends Worker
{
    public function onPayload($data)
    {
        echo 'Started '.$data[0].' into '.$data[2].PHP_EOL;
        copy($data[0], $data[2]);
    }
}

// supplementary function, just to avoid hand-writing of file sizes
function remote_filesize($path)
{
    $fp = fopen($path, 'r');
    $inf = stream_get_meta_data($fp);
    fclose($fp);
    foreach($inf["wrapper_data"] as $v) {
        if (stristr($v,"content-length")) {
            $v = explode(":",$v);
            return (int)trim($v[1]);
        }
    }
}

// our function to print actual status of downloads
function show_status(&$files)
{
    foreach ($files as $i => $file) {
        if (file_exists($file[2])) {
            clearstatcache(true, $file[2]);
            $downloaded_size = filesize($file[2]);
            if ($downloaded_size == $file[1]) {
                echo $file[0].' downloaded'.PHP_EOL;
                unset($files[$i]);
                unlink($file[2]);
            } else if ($downloaded_size === 0) {
                // echo $file[0].' in queue'.PHP_EOL;
            } else  {
                echo $file[0].' downloading '.round($downloaded_size * 100 / $file[1], 2).'%'.PHP_EOL;
            }
        }
    }
}

// list of files to be downloaded
$file_sources = ['https://yandex.ru/images/today?size=1920x1080', 'http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip'];
// process of remote file size detection and creation temp local file for this downloading
$files = [];
foreach ($file_sources as $file_to_download) {
    $file_size = remote_filesize($file_to_download);
    $output = tempnam(sys_get_temp_dir(), 'thrd_test');
    $files[] = [$file_to_download, $file_size, $output];
}

实际工作

// construct and start new worker
$worker = new DownloadWorker();
// or if you want to simulate forking
$worker = new DownloadWorker(true);

// add files to work queue
foreach ($files as $file) {
    echo 'Enqueuing '.$file[0].' with size '.$file[1].PHP_EOL;
    $worker->sendPayload([$file]);
}

// main worker thread loop
while ($worker->state !== Worker::TERMINATED) {
    // Worker::RUNNING state indicates that worker thread is still working over some payload
    if ($worker->state == Worker::RUNNING) {

        // prints status of all files
        show_status($files);
        // call check for finishing all tasks
        $worker->checkForFinish();
        usleep(500000);
    }
    // Worker::IDLE state indicates that worker thread does not have any work right now
    else if ($worker->state == Worker::IDLE) {
        echo 'Ended. Stopping worker...'.PHP_EOL;
        // we don't need worker anymore, just stop it
        $worker->stop();
        usleep(500000);
    }
    // Worker::TERMINATING state indicates that worker thread is going to be stopped and can't be used to process data
    else if ($worker->state == Worker::TERMINATING) {
        echo 'Wait for terminating ...'.PHP_EOL;
        // just to set Worker::TERMINATED state
        $worker->checkForTermination();
        usleep(500000);
    }
}

结果

Enqueuing https://yandex.ru/images/today?size=1920x1080 with size 343103
Enqueuing http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip with size 52693477
Started https://yandex.ru/images/today?size=1920x1080 into /tmp/thrd_test0Y3i3k
Started http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip into /tmp/thrd_testrwwYiE
https://yandex.ru/images/today?size=1920x1080 downloaded
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 28.89%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 66.06%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloaded
Ended. Stopping worker...
Wait for terminating ...

此代码带有大量注释,但如果您不需要在所有工作完成后重用工作线程,可以简化此示例。您可以将这个巨大的循环替换为更小的循环。

// loops works only when worker is running.
// just to show information about downloaded files
while ($worker->state == Worker::RUNNING) {
    show_status($files);
    $worker->checkForFinish();
    usleep(500000);
}
// when thread is in idle state, just stop right now (`true` as 1st argument forces it to send stop command and wait it termination).
$worker->stop(true);

结果

Enqueuing https://yandex.ru/images/today?size=1920x1080 with size 343103
Enqueuing http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip with size 52693477
Started https://yandex.ru/images/today?size=1920x1080 into /tmp/thrd_testbGsRBp
Started http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip into /tmp/thrd_testv0E5Qy
https://yandex.ru/images/today?size=1920x1080 downloaded
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 17.4%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 36.82%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 55.95%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 76%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 95.05%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloaded

使用WorkersPool的几个工作线程

但如果有几个作业需要同时执行呢?您可以创建几个工作线程的实例,但这将非常痛苦,并且难以操作和同步它们。在这种情况下,您可以使用WorkersPool,它会处理以下内容:

  1. 在开始时启动新工作线程。
  2. 当您调用sendData并将数据发送到任何空闲工作线程时,分发您的负载。
  3. 在更改poolSize时创建新工作线程或删除冗余工作线程。
  4. 当工作线程完成并标记为空闲时,接受工作线程的结果。
  5. 监控所有工作线程,并计算空闲、运行、活跃(空闲或运行)的工作线程数量。提供获取此信息的接口。
  6. WorkersPool对象被销毁(通过unset()或脚本执行结束时),停止所有工作线程。
  7. *可以工作在dataOverhead-模式。此模式允许在工作线程正在处理任何任务时发送额外的负载。如果在此模式下向工作线程发送了多个负载,则它不会切换到Worker::IDLE状态,直到所有传递的负载都已处理。
  8. 提供接口,指定进度跟踪器并定期运行它们,直到所有线程都变为Worker::IDLE状态。

功能丰富,对吧?让我们用2个线程重写我们的下载器以加快下载速度。

代码的Settings and structures块保持不变,但为了演示目的,让我们使用两个大文件。

// ...
$file_sources = ['http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip', 'http://soft.eurodir.ru/test-speed-100Mb.bin'];
// ...

我们只需要更新与线程相关的代码。

// create pool with downloading-workers
$pool = new WorkersPool('DownloadWorker');
/**
 * Also, you can create pool out of object:
 * $pool = new WorkersPool(new DownloadWorker());
 * This is useful, when you open shared sources within worker constructor so all workers can use them.
 */
// use only 2 workers (this is enough for our work)
$pool->setPoolSize(2);

// dispatch payload to workers. Notice! WorkersPool uses sendData() method instead of sendPayload().
foreach ($files as $file) {
    echo 'Enqueuing '.$file[0].' with size '.$file[1].PHP_EOL;
    $pool->sendData($file);
}

// register tracker, which should be launched every 0.5 seconds.
// This method will hold the execution until all workers finish their work and go in Worker::IDLE state
$pool->waitToFinish([
    '0.5' => function ($pool) use (&$files) {
        show_status($files);
    }]
);

结果

Enqueuing http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip with size 52693477
Enqueuing http://soft.eurodir.ru/test-speed-100Mb.bin with size 102854656
Started http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip into /tmp/thrd_testchcHBK
Started http://soft.eurodir.ru/test-speed-100Mb.bin into /tmp/thrd_testt6dyJa
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 23.26%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 1.3%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 47.08%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 3.08%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 72.62%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 5.66%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 98.7%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 8.05%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloaded
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 19.15%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 31.31%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 43.69%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 56.87%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 71.95%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 87.56%

如您所见,我们获得了一些改进。

  1. 我们的代码变得更小、更清晰。
  2. 我们可以运行所需数量的工作线程。
  3. 我们不再需要担心工作线程的终止。让WorkersPool为我们工作。

API

工作线程API

  • sendPayload($data): int - 将负载发送到工作线程,并返回负载的序列号。
  • checkForFinish(): array|null - 检查工作线程是否发送了负载的结果,如果是,则返回它。
  • checkForTermination(): boolean|null - 如果工作进程已死亡,则返回true。
  • stop($wait = false): boolean - 向工作线程发送停止命令。它使用SIGTERM信号来允许工作线程正确完成工作,并避免数据丢失。如果$wait = true,则保持执行,直到工作线程停止。
  • kill($wait = false): boolean - 向工作线程发送停止命令。它使用SIGKILL信号,除特殊情况外不建议使用,因为它会直接杀死工作线程,并且会丢失此时正在处理的所有数据。如果$wait = true,则保持执行直到工作线程关闭。

信息

  • isActive(): boolean - 如果工作线程处于Worker::RUNNINGWorker::IDLE状态,则为true。
  • isRunning(): boolean - 如果工作线程处于Worker::RUNNING状态,则为true。
  • isIdle(): boolean - 如果工作线程处于Worker::IDLE状态,则为true。
  • getPid(): int - 返回工作进程ID。
  • getCurrentPayload(): int - 返回最后一个完成的负载的序列号。

关于工作线程复用的警告!您不能重新启动已经终止的工作线程(使用stop()kill()),您需要创建新的工作线程并使用start()启动它。

WorkersPool API

  • countIdleWorkers(): integer - 返回处于Worker::IDLE状态的工作线程数量。

  • countRunningWorkers(): integer - 返回处于Worker::RUNNING状态的工作线程数量。

  • countActiveWorkers(): integer - 返回处于Worker::RUNNINGWorker::IDLE状态的工作线程数量。

  • getRunningWorkers(): Worker[] - 返回处于Worker::RUNNING状态的工作线程。

  • enableDataOverhead() / disableDataOverhead() - 启用/禁用dataOverhead-模式。

  • sendData($data, $wait = false): null|boolean - 将负载分配给任何空闲工作线程。行为取决于dataOverhead功能状态。

    • dataOverhead被禁用且$wait = false(默认情况),如果没有空闲工作线程,此方法返回null或返回表示分配状态的booleantrue/false)。
    • dataOverhead被禁用(默认情况)且$wait = true,此方法将保持脚本的执行,直到任何工作线程空闲,将您的负载分配给它,并返回分配状态(true/false)。
    • dataOverhead被启用时,此方法将您的负载分配给任何空闲工作线程。如果没有空闲工作线程,它将新任务放入工作线程的内部队列中,这些任务将被处理。此方法在所有工作线程之间使用公平分配(因此您可以确信24个任务将分配给6个工作线程,每个工作线程4个)。
  • waitToFinish(array $trackers = null) - 保持脚本执行,直到所有工作线程进入IDLE状态。

预定义worker

DownloadWorker

如您在示例中所见,我们创建了一个下载工作线程。但是,这并不是必需的,我们可以使用预定义的DownloadWorker,它执行相同的操作。

  • 完整路径:wapmorgan\Threadable\DownloadWorker
  • 描述:下载远程文件并将其保存到本地服务器。
  • 负载(数组)
    • source - 远程文件URL
    • target - 本地文件路径

ExtractWorker

Zip归档提取器。

  • 完整路径:wapmorgan\Threadable\ExtractWorker
  • 描述:将指定的zip归档提取到文件夹中。
  • 负载(数组)
    • archive - 归档文件名
    • output - 输出目录

用例

可以使用Threadable构建的程序示例

  • 媒体转换器/编码器
  • 数据导入器/导出器
  • 社交网络/即时通讯机器人
  • 解析器/扫描器/分析器
  • 服务器(除非您想重新发明轮子,否则不建议使用)
  • ...