赵文阿加科夫/speed-hunter

从php-cgi下管理php-cli进程,并等待最后一个进程的响应。(仅适用于支持共享内存的*nix系统)。

v0.4 2020-09-13 17:08 UTC

README

您可以从https://packagist.org.cn下载此包

composer require pavel_agarkov/speed-hunter

在服务器处理请求时启动多任务处理。此包是可嵌入的。仅适用于Unix类似的操作系统。此包使用Unix的共享内存。

此包提供了3种工作模式

- запуск одиночного процесса, отвязывающегося от основного потока вывода(т.е. не требует ожидания,
    в дальнейшем "асинхронным");
- запуск множества асинхронных процессов;
- запуск множества процессов, удерживающих поток вывода за управляющим процессом (необходимо для получения данных, 
    записанных каждым отдельным процессом в разделяемую память).

列出的工作模式适用于cgi和cli模式。初始化应用程序的方式相同,除了传递的参数。应用程序通过配置参数的一个参数接受php文件(其中需要放置此过程的逻辑)相对于初始化文件的路径。

初始化和启动单个异步进程

示例:

Starting::singleAsyncProcess(
    array(
        "jobName" => 'jobs/async_1',
        "shSizeForOneJob" => 300,
        "data" => array(1, 2, 3)
    )
);

Starting::singleAsyncProcess接受一个数组,其中

"jobName"            - относительный пусть до файла процесса,
"shSizeForOneJob"    - объем разделяемой памяти в байтах для процесса,
"data"               - массив с данными передаваемыми в процесс.

进程逻辑"jobName" => 'jobs/async_1'位于文件jobs/async_1.php中

require __DIR__ . './vendor/autoload.php';

Job::runSingleAsyncJob(
    $argv,
    function (&$Job, $read) {
        sleep(1);
        $id = posix_getpid();
        $fp = fopen("t{$id}.txt", "w");
        $str = implode(',', $read);
        fwrite($fp, " {$str} \r\n");
        fclose($fp);
    }
);

其中

$argv                       - массив переданных в процесс параметров(является необходимым),
function (&$Job, $read) {}  - анонимная функция содержащую логику работы процесса c параметрами:
    &$Job - ссылка на объект обработчика задания,
    $read - передаваемые данные из основного процесса.

所示示例的工作方式如下

1. В основном процессе инициализируется объект для работы с параллельными заданиями.
2. Для указанного процесса резервируется разделяемая память.
3. Поток вывода процесса перенаправляется, освобождая поток вывода для основного процесса.
4. В создавшемся процессе выполняется анонимная функция переданная в Job::runSingleAsyncJob().
5. По окончанию выполнения основной логики очищается ячейка разделяемой памяти для процесса.

初始化和启动多个异步进程

示例:

Starting::multipleAsyncProcesses(
    array(
        array(
            "jobName" => 'jobs/async_1',
            "numberJobs" => 3,
            "shSizeForOneJob" => 300,
            "dataPartitioning" => array(
                "flagPartitioning" => 1,
                "dataToPartitioning" => array(1, 2, 3)
            )
        ),
        array(
            "jobName" => 'jobs/async_2',
            "numberJobs" => 1,
            "shSizeForOneJob" => 300,
            "dataPartitioning" => array(
                "flagPartitioning" => 0,
                "dataToPartitioning" => array('Hi')
            )
        )
    )
);

Starting::multipleAsyncProcesses()接受一个数组,该数组是特定进程集配置数组的数组,其中

"jobName" - относительный пусть до файла процесса,
"numberJobs" - количество одинаковых процессов "jobName" для запуска,
"shSizeForOneJob"    - объем разделяемой памяти в байтах для каждого процесса,
"dataPartitioning" - массив определяющий передаваемые данных в каждый процесс:
    параметр "flagPartitioning" - 0 или 1, если "flagPartitioning" = 0, то данные
    поставляются в каждый процесс в указанном виде, иначе если "flagPartitioning" = 1,
    то данные указанные разделяются для количества процессов поровну,
    т.е. если указано "numberJobs" => 3 и "flagPartitioning" => 1,
    то "dataToPartitioning" => array(1, 2, 3) разделится на 3 массива,
    в каждом из которых будет по 1 элементу по порядку и массива array(1, 2, 3).
    Число элементов массива не может быть меньше чем количество процессов.

初始化和启动多个进程,它们保持控制台输出在主进程之后

示例:

$parallel =
    Starting::parallel(
        array(
            array(
                "jobName" => 'jobs/job_1',
                "numberJobs" => 1,
                "shSizeForOneJob" => 300,
            ),
            array(
                "jobName" => 'jobs/job_2',
                "numberJobs" => 5,
                "shSizeForOneJob" => 90000,
                "dataPartitioning" => array(
                    "flagPartitioning" => 0,
                    "dataToPartitioning" => ['commit', 'sin']
                )
            ),
            array(
                "jobName" => 'jobs/job_4',
                "numberJobs" => 2,
                "shSizeForOneJob" => 300,
                "dataPartitioning" => array(
                    "flagPartitioning" => 1,
                    "dataToPartitioning" => ['commit', 'sin']
                )
            )
        )
    );

$output = $parallel->getOutput();

Starting::parallel()使用与Starting::multipleAsyncProcesses()相同的参数集进行初始化。

Starting::parallel()与Starting::multipleAsyncProcesses()的主要区别在于它保持对主进程的控制,等待子进程执行,这允许将“一个进程”的工作分配给“更多进程”,然后从每个并行进程获取数据。主进程的数据获取在行中发生

 $output = $parallel->getOutput();

调试

在支持多个调试器同时连接的IDE中调试并行工作进程更有效。

建议

在Debian系统中,可以使用标准工具(如ipcsps或任何具有类似功能)来控制共享内存和打开的进程。在cgi模式下使用时,最好有一个后台进程监视共享内存的填充,以及一个进程用于监控打开的进程 - 以防止内存溢出。