wapmorgan / threadable
易于使用的线程库,提供所有基本功能,以便在并行模式下运行您的代码
Requires (Dev)
- phpunit/phpunit: ~4.8
Suggests
- ext-pcntl: For real multi-threading
- ext-posix: For real multi-threading
- ext-sockets: For real multi-threading
README
易于使用的线程库,提供所有基本功能以在后台模式下执行工作。
您需要安装的软件
- pcntl
- posix
- sockets
此库也可以在模拟模式下工作,其中不执行实际的分叉。所有工作都在一个主线程中完成。如果pnctl扩展不可用或您在Worker构造函数中指定它,则启用此模式。
- 结构
- 什么是
Worker
?- 如何创建您的Worker
- 什么是
WorkersPool
?
- 什么是
- 简单用法
- 它是如何工作的
- 一个worker
- 使用
WorkersPool
的少量worker
- API
Worker
APIWorkersPool
API
- 预定义worker
DownloadWorker
- 用例
结构
什么是Worker
?
Worker - 是任何worker的基本类。它由两部分组成(在物理上,存储在一个类中,但提供不同的功能)
Worker
- 一个单独的线程,执行所有后台工作。Worker
管理器 - worker线程的操纵器。
如何创建您的Worker
您需要扩展wapmorgan\Threadable\Worker
类并重新实现onPayload($data)
公共方法。
例如
use wapmorgan\Threadable\Worker; class SleepingWorker extends Worker { public function onPayload($data) { echo 'I have started at '.date('r').PHP_EOL; sleep(3); echo 'I have ended at '.date('r').PHP_EOL; return true; } }
什么是WorkersPool
?
WorkersPool (wapmorgan\Threadable\WorkersPool) - 是worker的容器,用于处理类似任务。它负责所有维护、负载分配和worker的生命周期。允许您动态更改池的大小和其他有用的功能。
简单用法
例如,您想只进行后台下载工作。让我们使用wapmorgan\Threadable\BackgroundWork
类在后台执行它并显示进度(或存储在数据库中/...)。
您需要执行的操作
- 为
DownloadWorker
准备负载 - 分别使用
BackgroundWork::doInBackground()
或BackgroundWork::doInBackgroundParallel()
启动一个线程或多个线程。
第一阶段. 准备负载
DownloadWorker
需要一个包含source
和target
元素的数组。准备它
use wapmorgan\Threadable\BackgroundWork; use wapmorgan\Threadable\DownloadWorker; use wapmorgan\Threadable\Worker; $file_sources = ['https://yandex.ru/images/today?size=1920x1080', 'http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip']; $files = []; foreach ($file_sources as $file_to_download) { $files[] = [ 'source' => $file_to_download, 'size' => DownloadWorker::getRemoteFileSize($file_to_download), 'target' => tempnam(sys_get_temp_dir(), 'thrd_test'), ]; }
第二阶段. 在后台启动
单线程worker
使用doInBackground
函数在一个线程中运行它。签名如下
doInBackground(Worker $worker, array $payloads, callable $payloadHandlingCallback = null, callable $onPayloadFinishCallback = null, $sleepMicroTime = 1000)
$worker
- worker的实例。$payloads
- 所有负载的数组。$payloadHandlingCallback
- 一个回调,每隔$sleepMicrotime
微秒调用一次,带有有关当前正在运行的负载的信息。回调的签名:(Worker $worker, int $payloadI, $payloadData)
$onPayloadFinishCallback
- 当worker完成一个负载时调用的回调。回调的签名:(Worker $worker, int $payloadI, $payloadData, $payloadResult)
因此,收集所有信息来运行它
$result = BackgroundWork::doInBackground(new DownloadWorker(), $files, function (Worker $worker, $payloadI, $payloadData) { clearstatcache(true, $payloadData['target']); echo "\r" . '#' . ($payloadI + 1) . '. ' . basename($payloadData['source']) . ' downloading ' . round(filesize($payloadData['target']) * 100 / $payloadData['size'], 2) . '%'; }, function (Worker $worker, $payloadI, $payloadData, $payloadResult) { echo "\r" . '#' . ($payloadI + 1) . '. ' . basename($payloadData['source']) . ' successfully downloaded' . PHP_EOL; return true; } ); if ($result) echo 'All files downloaded successfully'.PHP_EOL;
示例在bin/example_file_downloading_easy
文件中。
多线程worker
要使用doInBackgroundParallel
在多个线程中运行它。它几乎与单线程函数具有相同的签名
doInBackgroundParallel(Worker $worker, array $payloads, callable $payloadHandlingCallback = null, callable $onPayloadFinishCallback = null, $sleepMicroTime = 1000, $poolSize = self::BY_CPU_NUMBER)
通过调整$poolSize
,您可以选择应使用的worker数量。
示例在bin/example_file_downloading_pool_easy
文件中。
它是如何工作的
一个worker
如果您只需要并行执行一些工作并在另一个线程中完成它,您可以仅使用Worker
类,无需任何其他依赖项。
要正确使用它,您需要了解工作线程的生命周期。
- 工作线程在另一个线程中启动。为此,请调用
start()
。 - 工作线程接受新的负载并开始处理它。为此,请调用
sendPayload(array $data)
。实际上,工作线程管理器通过本地套接字发送负载。工作线程开始处理它,并在完成后通过相同的套接字返回工作结果。 - 工作线程管理器检查工作线程是否完成,并读取工作结果。为此,请调用
checkForFinish()
。 - 工作线程可以通过
stop()
或kill()
方法分别停止或被杀死。 - 工作线程管理器检查工作线程是否完成,并标记自身为已终止。为此,请调用
checkForTermination()
。
后台工作分为两步,其中工作线程运行具有实际负载的类的onPayload($data)
方法。
总之,这是一个在另一个线程中下载文件并实时显示进度的示例。
设置和结构
// Implement class-downloader class DownloadWorker extends Worker { public function onPayload($data) { echo 'Started '.$data[0].' into '.$data[2].PHP_EOL; copy($data[0], $data[2]); } } // supplementary function, just to avoid hand-writing of file sizes function remote_filesize($path) { $fp = fopen($path, 'r'); $inf = stream_get_meta_data($fp); fclose($fp); foreach($inf["wrapper_data"] as $v) { if (stristr($v,"content-length")) { $v = explode(":",$v); return (int)trim($v[1]); } } } // our function to print actual status of downloads function show_status(&$files) { foreach ($files as $i => $file) { if (file_exists($file[2])) { clearstatcache(true, $file[2]); $downloaded_size = filesize($file[2]); if ($downloaded_size == $file[1]) { echo $file[0].' downloaded'.PHP_EOL; unset($files[$i]); unlink($file[2]); } else if ($downloaded_size === 0) { // echo $file[0].' in queue'.PHP_EOL; } else { echo $file[0].' downloading '.round($downloaded_size * 100 / $file[1], 2).'%'.PHP_EOL; } } } } // list of files to be downloaded $file_sources = ['https://yandex.ru/images/today?size=1920x1080', 'http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip']; // process of remote file size detection and creation temp local file for this downloading $files = []; foreach ($file_sources as $file_to_download) { $file_size = remote_filesize($file_to_download); $output = tempnam(sys_get_temp_dir(), 'thrd_test'); $files[] = [$file_to_download, $file_size, $output]; }
实际工作
// construct and start new worker $worker = new DownloadWorker(); // or if you want to simulate forking $worker = new DownloadWorker(true); // add files to work queue foreach ($files as $file) { echo 'Enqueuing '.$file[0].' with size '.$file[1].PHP_EOL; $worker->sendPayload([$file]); } // main worker thread loop while ($worker->state !== Worker::TERMINATED) { // Worker::RUNNING state indicates that worker thread is still working over some payload if ($worker->state == Worker::RUNNING) { // prints status of all files show_status($files); // call check for finishing all tasks $worker->checkForFinish(); usleep(500000); } // Worker::IDLE state indicates that worker thread does not have any work right now else if ($worker->state == Worker::IDLE) { echo 'Ended. Stopping worker...'.PHP_EOL; // we don't need worker anymore, just stop it $worker->stop(); usleep(500000); } // Worker::TERMINATING state indicates that worker thread is going to be stopped and can't be used to process data else if ($worker->state == Worker::TERMINATING) { echo 'Wait for terminating ...'.PHP_EOL; // just to set Worker::TERMINATED state $worker->checkForTermination(); usleep(500000); } }
结果
Enqueuing https://yandex.ru/images/today?size=1920x1080 with size 343103 Enqueuing http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip with size 52693477 Started https://yandex.ru/images/today?size=1920x1080 into /tmp/thrd_test0Y3i3k Started http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip into /tmp/thrd_testrwwYiE https://yandex.ru/images/today?size=1920x1080 downloaded http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 28.89% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 66.06% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloaded Ended. Stopping worker... Wait for terminating ...
此代码带有大量注释,但如果您不需要在所有工作完成后重用工作线程,可以简化此示例。您可以将这个巨大的循环替换为更小的循环。
// loops works only when worker is running. // just to show information about downloaded files while ($worker->state == Worker::RUNNING) { show_status($files); $worker->checkForFinish(); usleep(500000); } // when thread is in idle state, just stop right now (`true` as 1st argument forces it to send stop command and wait it termination). $worker->stop(true);
结果
Enqueuing https://yandex.ru/images/today?size=1920x1080 with size 343103 Enqueuing http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip with size 52693477 Started https://yandex.ru/images/today?size=1920x1080 into /tmp/thrd_testbGsRBp Started http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip into /tmp/thrd_testv0E5Qy https://yandex.ru/images/today?size=1920x1080 downloaded http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 17.4% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 36.82% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 55.95% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 76% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 95.05% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloaded
使用WorkersPool的几个工作线程
但如果有几个作业需要同时执行呢?您可以创建几个工作线程的实例,但这将非常痛苦,并且难以操作和同步它们。在这种情况下,您可以使用WorkersPool
,它会处理以下内容:
- 在开始时启动新工作线程。
- 当您调用
sendData
并将数据发送到任何空闲工作线程时,分发您的负载。 - 在更改
poolSize
时创建新工作线程或删除冗余工作线程。 - 当工作线程完成并标记为空闲时,接受工作线程的结果。
- 监控所有工作线程,并计算空闲、运行、活跃(空闲或运行)的工作线程数量。提供获取此信息的接口。
- 当
WorkersPool
对象被销毁(通过unset()
或脚本执行结束时),停止所有工作线程。 - *可以工作在dataOverhead-模式。此模式允许在工作线程正在处理任何任务时发送额外的负载。如果在此模式下向工作线程发送了多个负载,则它不会切换到
Worker::IDLE
状态,直到所有传递的负载都已处理。 - 提供接口,指定进度跟踪器并定期运行它们,直到所有线程都变为
Worker::IDLE
状态。
功能丰富,对吧?让我们用2个线程重写我们的下载器以加快下载速度。
代码的Settings and structures
块保持不变,但为了演示目的,让我们使用两个大文件。
// ... $file_sources = ['http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip', 'http://soft.eurodir.ru/test-speed-100Mb.bin']; // ...
我们只需要更新与线程相关的代码。
// create pool with downloading-workers $pool = new WorkersPool('DownloadWorker'); /** * Also, you can create pool out of object: * $pool = new WorkersPool(new DownloadWorker()); * This is useful, when you open shared sources within worker constructor so all workers can use them. */ // use only 2 workers (this is enough for our work) $pool->setPoolSize(2); // dispatch payload to workers. Notice! WorkersPool uses sendData() method instead of sendPayload(). foreach ($files as $file) { echo 'Enqueuing '.$file[0].' with size '.$file[1].PHP_EOL; $pool->sendData($file); } // register tracker, which should be launched every 0.5 seconds. // This method will hold the execution until all workers finish their work and go in Worker::IDLE state $pool->waitToFinish([ '0.5' => function ($pool) use (&$files) { show_status($files); }] );
结果
Enqueuing http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip with size 52693477 Enqueuing http://soft.eurodir.ru/test-speed-100Mb.bin with size 102854656 Started http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip into /tmp/thrd_testchcHBK Started http://soft.eurodir.ru/test-speed-100Mb.bin into /tmp/thrd_testt6dyJa http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 23.26% http://soft.eurodir.ru/test-speed-100Mb.bin downloading 1.3% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 47.08% http://soft.eurodir.ru/test-speed-100Mb.bin downloading 3.08% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 72.62% http://soft.eurodir.ru/test-speed-100Mb.bin downloading 5.66% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 98.7% http://soft.eurodir.ru/test-speed-100Mb.bin downloading 8.05% http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloaded http://soft.eurodir.ru/test-speed-100Mb.bin downloading 19.15% http://soft.eurodir.ru/test-speed-100Mb.bin downloading 31.31% http://soft.eurodir.ru/test-speed-100Mb.bin downloading 43.69% http://soft.eurodir.ru/test-speed-100Mb.bin downloading 56.87% http://soft.eurodir.ru/test-speed-100Mb.bin downloading 71.95% http://soft.eurodir.ru/test-speed-100Mb.bin downloading 87.56%
如您所见,我们获得了一些改进。
- 我们的代码变得更小、更清晰。
- 我们可以运行所需数量的工作线程。
- 我们不再需要担心工作线程的终止。让WorkersPool为我们工作。
API
工作线程API
sendPayload($data): int
- 将负载发送到工作线程,并返回负载的序列号。checkForFinish(): array|null
- 检查工作线程是否发送了负载的结果,如果是,则返回它。checkForTermination(): boolean|null
- 如果工作进程已死亡,则返回true。stop($wait = false): boolean
- 向工作线程发送停止命令。它使用SIGTERM信号来允许工作线程正确完成工作,并避免数据丢失。如果$wait = true
,则保持执行,直到工作线程停止。kill($wait = false): boolean
- 向工作线程发送停止命令。它使用SIGKILL信号,除特殊情况外不建议使用,因为它会直接杀死工作线程,并且会丢失此时正在处理的所有数据。如果$wait = true
,则保持执行直到工作线程关闭。
信息
isActive(): boolean
- 如果工作线程处于Worker::RUNNING
或Worker::IDLE
状态,则为true。isRunning(): boolean
- 如果工作线程处于Worker::RUNNING
状态,则为true。isIdle(): boolean
- 如果工作线程处于Worker::IDLE
状态,则为true。getPid(): int
- 返回工作进程ID。getCurrentPayload(): int
- 返回最后一个完成的负载的序列号。
关于工作线程复用的警告!您不能重新启动已经终止的工作线程(使用stop()
或kill()
),您需要创建新的工作线程并使用start()
启动它。
WorkersPool API
-
countIdleWorkers(): integer
- 返回处于Worker::IDLE
状态的工作线程数量。 -
countRunningWorkers(): integer
- 返回处于Worker::RUNNING
状态的工作线程数量。 -
countActiveWorkers(): integer
- 返回处于Worker::RUNNING
或Worker::IDLE
状态的工作线程数量。 -
getRunningWorkers(): Worker[]
- 返回处于Worker::RUNNING
状态的工作线程。 -
enableDataOverhead()
/disableDataOverhead()
- 启用/禁用dataOverhead-模式。 -
sendData($data, $wait = false): null|boolean
- 将负载分配给任何空闲工作线程。行为取决于dataOverhead功能状态。- 当dataOverhead被禁用且
$wait = false
(默认情况),如果没有空闲工作线程,此方法返回null
或返回表示分配状态的boolean
(true/false
)。 - 当dataOverhead被禁用(默认情况)且
$wait = true
,此方法将保持脚本的执行,直到任何工作线程空闲,将您的负载分配给它,并返回分配状态(true/false
)。 - 当dataOverhead被启用时,此方法将您的负载分配给任何空闲工作线程。如果没有空闲工作线程,它将新任务放入工作线程的内部队列中,这些任务将被处理。此方法在所有工作线程之间使用公平分配(因此您可以确信24个任务将分配给6个工作线程,每个工作线程4个)。
- 当dataOverhead被禁用且
-
waitToFinish(array $trackers = null)
- 保持脚本执行,直到所有工作线程进入IDLE
状态。
预定义worker
DownloadWorker
如您在示例中所见,我们创建了一个下载工作线程。但是,这并不是必需的,我们可以使用预定义的DownloadWorker
,它执行相同的操作。
- 完整路径:
wapmorgan\Threadable\DownloadWorker
- 描述:下载远程文件并将其保存到本地服务器。
- 负载(数组)
source
- 远程文件URLtarget
- 本地文件路径
ExtractWorker
Zip归档提取器。
- 完整路径:
wapmorgan\Threadable\ExtractWorker
- 描述:将指定的zip归档提取到文件夹中。
- 负载(数组)
archive
- 归档文件名output
- 输出目录
用例
可以使用Threadable
构建的程序示例
- 媒体转换器/编码器
- 数据导入器/导出器
- 社交网络/即时通讯机器人
- 解析器/扫描器/分析器
- 服务器(除非您想重新发明轮子,否则不建议使用)
- ...