otsch / ppq
一个非常简单的PHP队列系统,用于运行后台任务。
Requires
- php: ^8.1
- mockery/mockery: ^1.5
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.11
- pestphp/pest: ^1.22
- phpstan/phpstan: ^1.8
README
(Pew Pew Queue 或 PHP 队列)
这是一个非常简单的PHP队列系统,用于运行后台任务。
ℹ️ 目前处于非常早期阶段,所以请不要在生产环境中使用!直到v1.0,任何小版本都可能发生破坏向后兼容性的变化。
特性
- 仅运行在 文件系统上!您不需要任何redis、数据库或其他基础设施依赖(理想情况下可能需要类似supervisor的东西来确保队列始终运行,但这不是必需的)。
- 所有后台任务都在单独的(子)PHP进程中启动。因此,job类中的 代码更改 将立即影响新启动的任务,而无需重新启动任何东西。
- 优雅地停止队列工作者 => 不启动新的任务,等待当前运行的任务完成。
- 定义您想要的 任意数量的队列 以及它们的限制 并行运行的任务数量(最多)。
安装
composer require otsch/ppq
使用
配置
系统需要一个配置文件,至少定义一个 datapath
(只必需的设置)。以下是一个包含所有可用配置选项的配置文件。
<?php use Otsch\Ppq\Drivers\FileDriver; return [ /** * The system in any case needs a datapath, which is a path to a directory on your system, * that it can use to save some things about the state of the queues. */ 'datapath' => __DIR__ . '/../data/queue', /** * Currently, the only driver shipped with the package is the `FileDriver`. * So you only need to define a driver here, if you build another driver yourself. * * Using the FileDriver, the system works with filesystem only, no redis or database * or any other infrastructure dependency. */ 'driver' => FileDriver::class, /** * If you need to bootstrap your app before a job is executed (because e.g. you need some * framework dependencies in your job code), this should be the file path where * bootstrapping is done. */ 'bootstrap_file' => null, /** * Here you can define your queues. */ 'queues' => [ /** * For example, you can use a default queue for most of your background tasks that * occur from time to time. */ 'default' => [ /** * You should define how many jobs the queue should run in parallel at max. */ 'concurrent_jobs' => 2, /** * Optionally define how many past jobs the queue should remember until it forgets (removes) * older past (finished, failed, lost, cancelled) jobs. Default value is 100. */ 'keep_last_x_past_jobs' => 200, /** * Optionally define event listeners for certain events on this queue (more about this further below). */ 'listeners' => [ 'waiting' => WaitingEvent::class, 'running' => RunningEvent::class, 'finished' => [FinishedEventOne::class, FinishedEventTwo::class], ] ], /** * You can make a separate queue for example, if you're having a certain kind of job * that will be run very often and would flood the default queue and maybe cause * other more important jobs to wait. */ 'special_queue' => [ 'concurrent_jobs' => 4, ], ], /** * You can define a Scheduler that will be called when you run the * `php vendor/bin/ppq check-schedule` command. * More about Schedulers further below. */ 'scheduler' => [ 'class' => Scheduler::class, /** * The 'active' setting can be used to run scheduled jobs only in certain environments. */ 'active' => false, ], /** * Provide your own error handler, that will be called with all * uncaught exceptions and PHP errors (and optionally warnings). */ 'error_handler' => [ 'class' => MyErrorHandler::class, /** * Same as in the scheduler setting above. */ 'active' => true, ], /** * Optional. If you cannot reliably control the error_reporting() level for the sub-process * in which the background job is executed, define a level here. It will be passed * as a parameter when starting the process. */ 'error_reporting' => 'E_ALL', ];
配置文件应位于项目根目录下的 /config/ppq.php
。
命令行
您可以通过命令行控制队列。以下是可用的命令
运行/工作所有队列
php vendor/bin/ppq work
使用 work
命令来运行队列。只要您想让队列监听新任务并运行它们,就需要运行此命令。因此,理想情况下您会使用类似supervisor的系统来启动它。
如果您更改了配置或更新了ppq包,则需要重新启动它,以便使用更改后的代码运行。如果只是您的队列任务代码中发生了变化,则不需要重新启动,因为系统在启动队列任务时创建了单独的PHP进程。这意味着您更改任何东西后启动的每个任务都将自动使用更改后的代码库。
work
命令会在新任务启动或完成(失败)时输出消息。当您手动在命令行上启动队列时,您会在那里看到它;当您使用supervisor启动队列时,建议定义一个文件,将stdout写入其中,以便您可以事后查看。
优雅地停止工作队列
php vendor/bin/ppq stop
stop
命令将向正在运行的队列工作者进程发送信号,导致它不会启动任何进一步排队的工作,但在所有当前运行的工作完成后最终退出。这样,当您需要重新启动队列工作者时,不会中断任何正在运行的任务。如果您使用配置为自动重启的supervisor运行队列工作者进程,则无需担心在所有当前运行的任务完成后手动重新启动队列工作者。
列出所有队列及其正在运行/等待的任务
php vendor/bin/ppq list
列出您在配置中定义的所有队列及其当前正在运行和等待的任务。
获取任务日志
php vendor/bin/ppq logs 1a2b3c.456def
默认情况下,logs
命令最多打印最后1000条日志。如果您想获取更多或更少的日志,可以使用 --lines
选项
php vendor/bin/ppq logs 1a2b3c.456def --lines=1500
或者,如果您只想获取所有任务日志
php vendor/bin/ppq logs 1a2b3c.456def --lines=all
通过其ID取消等待或正在运行的任务
php vendor/bin/ppq cancel 1a2b3c.456def
清除队列
如果您不小心将大量工作派发到队列中,只想删除它们,您可以直接清空队列。
php vendor/bin/ppq clear queue_name
您还可以清空所有配置的队列。
php vendor/bin/ppq clear-all
清空队列
如果您不仅要删除等待队列的工作,还要完全清空队列(即使删除所有正在运行、已完成、失败、已取消和丢失的工作),您可以清空队列。
php vendor/bin/ppq flush queue_name
注意: 这不会取消正在运行的工作,它只是从队列中删除所有条目!
您还可以清空所有配置的队列。
php vendor/bin/ppq flush-all
调用调度器以启动待定工作
php vendor/bin/ppq check-schedule
这将调用您在配置中定义为 ['scheduler']['class']
的类中的 checkScheduleAndQueue()
方法。更多内容将在下面介绍。
工作类
您可以派发到队列中的工作类必须实现 Otsch\Ppq\Contracts\QueueableJob
接口。
use Otsch\Ppq\Contracts\QueueableJob; use Otsch\Ppq\Loggers\EchoLogger; class TestJob implements QueueableJob { public function __construct(int $arg = 1) { } public function invoke(): void { (new EchoLogger())->info('hello'); usleep(rand(2000, 500000)); } }
派发工作
对于派发工作到队列,存在 Dispatcher
类。一个简单的例子
use Your\App\TestJob; Dispatcher::queue('default') ->job(TestJob::class) ->dispatch();
这将把 Your\App\TestJob
派发到 default
队列。如果您想跟踪队列工作:dispatch()
方法返回一个包含工作队列中 id
的 QueueRecord
实例。
参数
如果您的作业有一些参数,您可以使用 arguments()
方法将标量值参数传递给作业类实例。假设您的作业类看起来像这样
class MyJob implements QueueableJob { public function __construct( private readonly string $foo, private readonly string $bar, ) { } public function invoke(): void { // Do something with $this->foo and $this->bar } }
然后您可以提供 foo
和 bar
如下
use Your\App\MyJob; Dispatcher::queue('default') ->job(MyJob::class) ->args(['foo' => 'boo', 'bar' => 'far']) ->dispatch();
如上所述,这仅适用于标量值。
只有在作业不在队列中时才派发作业
use Your\App\MyJob; Dispatcher::queue('default') ->job(MyJob::class) ->args() ->dispatchIfNotYetInQueue();
这将仅在作业尚未在默认队列等待或运行时启动作业。如果您的作业有参数,则只有当另一个作业当前正在等待或使用完全相同的参数运行时,它才会不派发作业。如果您只想使它依赖于一个参数,您可以这样做
use Your\App\MyJob; Dispatcher::queue('default') ->job(MyJob::class) ->args(['foo' => 'boo', 'bar' => 'far']) ->dispatchIfNotYetInQueue(['foo']);
这将不会派发作业,如果另一个作业当前正在等待或使用参数 foo
为 boo
运行。它不会关心 bar
参数是否不同。
队列事件
您可以通过配置注册队列事件的监听器。可用的事件有
等待
waiting
事件的监听器在将新工作派发到它所监听的队列时被调用。
运行
running
事件的监听器在队列中的作业开始时被调用。
完成
finished
事件的监听器在作业成功完成时被调用。
失败
failed
事件的监听器在作业由于某些原因失败时被调用。
丢失
lost
事件的监听器在队列以某种方式丢失作业进程时被调用。这可以发生在工作进程被杀死或出错,并且作业进程在工作进程重新启动之前死亡、完成或失败的情况下。
取消
cancelled
事件的监听器在作业被手动取消时被调用。
要添加事件监听器,您需要创建一个实现 QueueEventListener
接口的类
use Otsch\Ppq\Contracts\QueueEventListener; use Otsch\Ppq\Entities\QueueRecord; class RunningEventListener implements QueueEventListener { public function invoke(QueueRecord $queueRecord): void { // Whatever you want to do when this event occurs. } }
并将它添加到配置中,添加到它应该监听的队列
use Otsch\Ppq\Drivers\FileDriver; return [ 'datapath' => __DIR__ . '/../data/queue', 'queues' => [ 'default' => [ 'concurrent_jobs' => 3, 'listeners' => [ 'running' => RunningEventListener::class, ] ], 'other_queue' => [ 'concurrent_jobs' => 2, ], ], ];
调度
您在配置文件中定义的调度器类必须实现 Otsch\Ppq\Contracts\Scheduler
接口。系统目前对调度的支持不多。您可以完全自己实现它,系统唯一做的事情是在运行 php vendor/bin/ppq check-schedule
命令时调用您的调度器类中的 checkScheduleAndQueue()
方法。所以这里有一个非常简单的例子,如果您想每小时15分钟后调度一些工作运行
use Your\App\TestJob; use Otsch\Ppq\Contracts\Scheduler as SchedulerInterface; use Otsch\Ppq\Dispatcher; class Scheduler implements SchedulerInterface { public function __construct() { } public function checkScheduleAndQueue(): void { if (date('i') === '15') { Dispatcher::queue('default') ->job(TestJob::class) ->dispatch(); } } }
因此,如果使用php vendor/bin/ppq check-schedule
命令正好在整点后的15分钟运行,它将把Your\App\TestJob
任务调度到default
队列。这意味着您需要定期运行check-schedule
命令。为此,您可以在系统中添加一个crontab,例如:
* * * * * php /path/to/your/project/vendor/bin/ppq check-schedule
错误处理
为了捕获和处理在您的PPQ后台作业中发生的错误(异常和PHP错误/警告),您可以在配置中定义一个error_handler
(请参阅上面的示例配置)。您的错误处理器类必须扩展抽象类Otsch\Ppq\AbstractErrorHandler
,并在boot()
方法中注册您的处理器。
use Otsch\Ppq\AbstractErrorHandler; class MyErrorHandler extends AbstractErrorHandler { public function boot(): void { $this->registerHandler( function (Throwable $exception) { if ($exception instanceof ErrorException) { // This is a PHP error. You can get the error level via: if ($exception->getSeverity() === E_ERROR) { // Handle PHP error. } elseif ($exception->getSeverity() === E_WARNING) { // Handle PHP warning. // If you don't want this error handler to be called // with PHP warnings at all, you can provide bool false // as the second argument in the registerHandler() call // (see further below). } else { // Handle other PHP error (E_PARSE or others). } } else { // Handle an uncaught exception thrown somewhere // in your application code. } }, // true, // If you want to completely ignore PHP warnings (as mentioned above). ); } }