afrihost/swarm-process

类似于Process包,但这个会运行一个进程列表,限制并发数量,并跟踪它们

v1.3.2 2024-02-15 15:36 UTC

README

Latest Stable Version Total Downloads Latest Unstable Version License Build Status Coverage Status

是什么以及为什么

一个进程处理器,运行并发进程,同时考虑到最大值,并在进程列表完成前重用槽位。

示例: 你有50个任务需要运行,尽可能快地运行。一个解决方案是让一个脚本同时运行所有50个。当然,这可能在服务器足够强大时适用于10个、50个甚至100个任务 - 但如果你有10,000个任务要运行呢?你不可能同时运行10,000个任务。如果所有这些任务都需要数据库连接,你会在大约150个任务时耗尽大多数数据库,即使那样,你正在进行的任务可能也会对数据库造成太大的压力。

解决方案: 同时只运行最大数量的并发任务 - 但不要只是在第一个组完成后运行10个任务,然后再运行另外10个任务 - 运行10个任务,一旦一个“槽位”变得空闲,就运行第11个任务,然后是第12个任务,最终是第10,000个任务 - 这意味着减少了浪费的“等待时间”。

为什么用PHP编写一个“并发cron”解决方案?

答案很简单:你的大部分代码都是用PHP编写的,所以为什么你的作业管理器要用Python或Node或任何当月的流行语言呢?用你已知的语言编写(假设这是PHP,这也是你在这里的原因)有助于故障排除和调整。

安装

应该就像composer-install一样简单,然后你就可以使用它了

composer require afrihost/swarm-process

使用方法

有两种使用方式,完全运行或使用tick进行交互模式。

$swarm->run()

use Afrihost\SwarmProcess\SwarmProcess;

$swarm = new SwarmProcess(); // you may provide a Psr/Logger here if you want to
$swarm->setMaxRunStackSize(20); // default is 10 (cannot make it <= 0)

// Just some mock things for it to do:
for ($i = 0; $i < 10000; $i++) {
    $swarm->pushNativeCommandOnQueue('echo "test"');

    // Some examples of how to use it - note the new Process way!
    // $swarm->pushNativeCommandOnQueue('ls');
    // $swarm->pushNativeCommandOnQueue('sleep 10');
    // $swarm->pushProcessOnQueue(new Process('ls'));
}

// Now we tell it to go run all 10k things, but to adhere to the 20 concurrent rule set above:
$swarm->run();

上面的代码应该相当直观。我想指出的是,当你调用$swarm->run();时,你必须等待它完成10k周期,然后你的应用程序的其他部分才能继续。然而,如果你想继续做其他事情,那么这就是$swarm->tick();的作用...

$swarm->tick()

比如说,你有一个场景,你既想在等待10k个进程在后台运行的同时在应用程序中做其他事情,或者更常见的是,你可能想添加更多的事情(或者你只是担心将10k或100k或10亿件事情添加到要运行的数组中会消耗太多内存)。这就是$swarm->tick();派上用场的地方。

在内部,$swarm->run();方法只是一个while循环,并运行$this->tick()直到没有更多事情要做。返回值的决定是:“如果还有要运行的命令在队列中<强>或如果还有正在运行的事情,那么返回<强>true,否则返回<强>false

因此,你可以用以下内容替换上面的最后一部分代码,即$swarm->run();

do {
	// do nothing
} while ($swarm->tick());

这将与$swarm->run()函数完全一样。

如果你想说,比如,检查你的数据库是否还有更多事情可以添加到swarm,那么你可能做些像这样的事情

do {
	if ($db->hasMoreStuffToAddDummyFunction()) {
		$swarm->pushProcessOnQueue(new Process($db->getSomeCommandToAddToTheQueue()));
	}
} while ($swarm->tick());

关于大数组的说明:当你将新的命令/过程推入数组时,从数组开头移除元素的方法是使用array_shift。虽然在PHP的后续版本中它不那么显眼,但由于PHP在每次array_shift后都需要重新索引数组,因此在处理成千上万的条目时,大数组仍然会有轻微的性能损耗。因此,如果你因此遇到了性能问题,建议尝试使用上面的$swarm->tick()方法和逐步向系统中添加内容。

闭包/回调

从版本1.1开始,我们现在为->run()方法提供了两个回调参数。

第一个可调用参数用于你在运行过程中添加更多工作到队列中。想想看,它就像是上面提到的例子中的do-while循环内部。

第二个可调用参数用于覆盖循环的结束。例如,你可能不希望在队列为空时结束循环,而只是在5分钟的不活动之后。然后你可以将这个放在第二个回调中。内部逻辑是:“如果tick()返回true或者回调返回true,循环仍然继续!”

下面是一个示例,展示它会是什么样子

$swarm->run(
    function() {
        // do a check to see if we should have more commands added to the queue
        return new Process('sleep 5');
    },
    function () {
        // check if the loop should still continue, if so return true
        return true;
    }
);

完成回调

从版本1.2开始,你将能够提供在每个过程完成后被调用的回调。例如,目的是用它来确定退出码。一个用例是在失败的情况下重新安排过程,或者记录失败供人类调查。

使用SwarmProcess/Configuration对象将回调传入系统。

以下是使用它的方法

$logger = new Logger('swarm_logger');

$closure = function(Process $process) use ($logger) {
    $logger->warning('Do something, like checking the exit codes: '.$process->getExitCode().' ['.$process->getExitCodeText().']');
};

$swarmProcess = new SwarmProcess($logger, (new Configuration())->setCompletedCallback($closure));

超时强制执行

当使用Process时,默认超时设置为60秒。在版本1.2之前,SwarmProcess没有检查和强制执行这一点。现在有一个配置选项可以让你打开强制执行。默认情况下它是关闭的,所以当前在版本1.*下使用SwarmProcess的用法不会受到影响。

下面是如何实现这一点的示例

$logger = new Logger('swarm_logger');

$configuration = (new Configuration())
    ->setEnforceProcessTimeouts(true);

$swarmProcess = new SwarmProcess($logger, $configuration);

// two Processes, both set to timeout at 5 seconds
$swarmProcess->pushProcessOnQueue(new Process('sleep 9', null, null, null, 5));
$swarmProcess->pushProcessOnQueue(new Process('sleep 2', null, null, null, 5));

$swarmProcess->setMaxRunStackSize(4);

$swarmProcess->run();

上述代码将同时运行两个过程。它们的时间限制都设置为5秒(你可以显然为所有设置单个时间限制)。因此,一旦执行了内部的tick()方法,sleep 9过程就会超时,但sleep 2过程将自然运行。以这种方式调用超时,SwarmProcess会将警告传递给传入的记录器。你可以使用上面解释的回调结构来通过查看退出码来程序化地注意到这一点并进行超时处理。

tick延迟

这个特性是为了使内部循环对CPU的负担更轻。没有必要不断以尽可能快的速度运行循环。在大多数情况下,每0.1秒检查一次就足够了。为了向后兼容,这是一个可选的配置。然而,一个合理的低值0.01秒(10000微秒)已被设置为默认值。以下是它是如何被激活的

$logger = new Logger('swarm_logger');

$configuration = (new Configuration())
    ->setTickLoopDelayMicroseconds(100000); // uses usleep, so microseconds.

$swarmProcess = new SwarmProcess($logger, $configuration);

示例

你也可以看看examples文件夹中提供的示例。使用以下命令运行它们:

php examples/simple-run.php
php examples/simple-run-process.php
php examples/simple-tick.php
php examples/simple-run-with-callbacks.php
php examples/simple-run-enforcing-timeouts.php
php examples/simple-run-with-completion-callback.php

需要帮助?

在GitHub上打开一个问题,让我们从这里开始

贡献

  • 将仓库Fork
  • 将你的仓库本地Clone
  • composer install
  • 运行测试以确保一切正常:./vendor/bin/phpunit
  • 创建一个分支,命名你正在做的更改
  • 做你的事情 :)
  • 按照.travis.yml文件中的说明运行测试(./vendor/bin/phpunit
  • 如果你准备好了:提交并推送到你的仓库,然后发送给我一个pull request

如果你想讨论它,我很乐意在github.com上的问题中聊天 ;)

TODO

  • 完成README.md - 然而,这应该等到项目代码更完善一些再进行
  • 创建交互模式(->tick()
  • 创建公共方法以询问队列中剩余多少个事物,以及当前正在运行多少个事物
  • ->run()创建闭包回调,以使用户无需编写自己的while循环就能获得更多控制权