luciansabo/async-command

使用Worker Pool帮助并行化在命令行脚本中运行的PHP代码执行

v1.0.0 2020-01-13 14:06 UTC

This package is auto-updated.

Last update: 2024-09-14 02:53:19 UTC


README

这允许您在命令行脚本中使用时并行化PHP代码执行。Worker Pool概念使用固定数量的进程来执行任务。它类似于多线程,但使用进程而不是线程。

快速入门

此示例显示了如何并行执行4个HTTP GET请求

<?php

use Lucian\AsyncCommand\EntryPoint\PhpScriptEntryPoint;
use Lucian\AsyncCommand\WorkerPool;

// include composer autoloader
require_once 'vendor/autoload.php';

$urls = [
    'https://google.com',
    'https://bing.com',
    'https://yahoo.com',
    'https://amazon.com',
];

$workerPool = new WorkerPool(null, 4);

$entryPoint = new PhpScriptEntryPoint(__FILE__, 'workerCode');

foreach ($urls as $key => $url) {
    $promise = $workerPool->runAsync($entryPoint, $url, $key);
    $promise->then(
        function ($value) {
            // we receive what was returned from workerCode ([$url, strlen($contents)])
            var_dump($value);
            return $value;
        },
        function ($reason) {
            echo "\n\nErrors:\n$reason\n";
            return $reason;
        }
    );
}

$workerPool->wait();

function workerCode(string $url)
{
    $contents = file_get_contents($url);
    // exceptions or fatal errors should result in a fail
    //throw new \Exception($url . ' caca');
    //trigger_error("Warning", E_WARNING);
    //trigger_error("Fatal error", E_USER_ERROR);

    return [$url, strlen($contents)];
}

runAsync() 返回与Promises/A+兼容的Promise。所使用的Promise实现来自Guzzle。

Promise代表异步操作最终的结果。与Promise交互的主要方式是通过其then方法,该方法注册回调以接收Promise的最终值或Promise无法满足的原因。

use GuzzleHttp\Promise\PromiseInterface;

/** @var PromiseInterface $promise */

//$promise = new Promise();
$promise->then(
    // $onFulfilled
    function ($value) {
        echo 'The promise was fulfilled.';
    },
    // $onRejected
    function ($reason) {
        echo 'The promise was rejected.';
    }
);

从同一类执行工作代码

要定义当前类中的入口点,请将 this:<method> 传递给 PhpScriptEntryPoint 方法参数。不要忘记使用 $this 作为父参数构建WorkerPool。

<?php

use Lucian\AsyncCommand\EntryPoint\PhpScriptEntryPoint;
use Lucian\AsyncCommand\WorkerPool;

require_once 'vendor/autoload.php';

class SampleObjectMethodEntrypoint
{
    private $workerPool;

    public function __construct()
    {
        $this->workerPool = new WorkerPool($this, 4);
    }

    public function execute()
    {
        $entryPoint = new PhpScriptEntryPoint(__FILE__, 'this:workerCode');

        $param2 = 'test';
        for ($i = 0; $i < 10; $i++) {
            $promise = $this->workerPool->runAsync($entryPoint, $i, $param2);
            $promise->then(
                function ($value) {
                    // do something with the value
                    echo "$value\n";
                    return $value;
                }
            );
        }

        $this->workerPool->wait();
    }

    public function workerCode(int $counter, string $param2)
    {
        return $param2 . $counter;
    }
}

$app = new SampleObjectMethodEntrypoint();
$app->execute();

workerCode() 是当前类中的一个方法。

在Symfony控制台命令中执行工作代码

要定义当前symfony命令中的入口点,请使用 SymfonyCommandEntryPoint

use Lucian\AsyncCommand\EntryPoint\SymfonyCommandEntryPoint;
use Lucian\AsyncCommand\WorkerPool;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class TestAsyncCommand extends Command
{
    /**
     * @var WorkerPool
     */
    private $workerPool;

    public function __construct()
    {
        $this->workerPool = new WorkerPool($this, 4);

        parent::__construct();
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $entryPoint = new SymfonyCommandEntryPoint($this, 'workerCode');

        $param2 = 'test';
        for ($i = 0; $i < 10; $i++) {
            $promise = $this->workerPool->runAsync($entryPoint, $i, $param2);
            $promise->then(
                function ($value) {
                    // do something with the value
                    echo "$value\n";
                    return $value;
                }
            );
        }

        $this->workerPool->wait();
    }

    public function workerCode(int $counter, string $param2)
    {
        sleep(1); // simulate time consuming task

        return $param2 . $counter;
    }
}