xmlshop / queue-monitor
Laravel 数据库作业队列的队列监控
Requires
- php: ^8.0
- ext-json: *
- ext-mbstring: *
- ext-posix: *
- gpressutto5/laravel-slack: ^2.0
- illuminate/database: ^5.5|^6.0|^7.0|^8.0|^9.0
- illuminate/queue: ^5.5|^6.0|^7.0|^8.0|^9.0
- illuminate/support: ^5.5|^6.0|^7.0|^8.0|^9.0
- lorisleiva/cron-translator: ^0.4
- nesbot/carbon: ^2.0
- webpatser/laravel-uuid: ^4.0
Requires (Dev)
- laravel/framework: ^5.5|^6.0|^7.0|^8.0|^9.0
- mockery/mockery: ^1.3.2
- orchestra/testbench: ^3.8|^4.0|^5.0|^6.0
- phpstan/phpstan: ^0.12.99|^1.0
- phpunit/phpunit: ^8.0|^9.0
README
Laravel 框架
此包提供数据库队列的监控功能,类似于 Laravel Horizon。
此包最初是从 Laravel-Queue-Monitor 分支出来的。
功能
- 监控任何队列的作业,类似于 Laravel Horizon。
- 处理失败的作业并存储异常
- 监控作业进度
- 获取作业剩余时间的估计值
- 为作业监控存储额外的数据
安装
composer require xmlshop/queue-monitor
配置
将配置和迁移复制到您的项目
php artisan vendor:publish --provider="xmlshop\QueueMonitor\Providers\MonitorProvider" --tag=config --tag=migrations
迁移队列监控表。表名可以在配置文件或通过发布的迁移中配置。
php artisan migrate
调度器
class Kernel extends ConsoleKernel
{
#...
protected function schedule(Schedule $schedule)
{
#...
$schedule->command('queue-monitor:aggregate-queues-sizes')->everyMinute();
$schedule->command('queue-monitor:clean-up')->dailyAt('01:23');
$schedule->command('queue-monitor:listener')->everyMinute();
$schedule->command('monitor:sync-scheduler')->daily();
$schedule->command('monitor:pid-checker')->everyMinute(); #for each node, if >1
#...
}
}
监听器将自动启动 queue-monitor:listener
。它可能在配置或命令中被禁用。
#php artisan queue-monitor:listener disable {hours} php artisan queue-monitor:listener disable 24 #disables alert-launcher for a day. By default 1 hour php artisan queue-monitor:listener enable #enables that back
需要在您的应用程序中替换应用程序错误处理器。请添加自定义错误处理器
final class Handler extends \Illuminate\Foundation\Exceptions\Handler implements \Illuminate\Contracts\Debug\ExceptionHandler { /** * Render an exception to the console. * * @param \Symfony\Component\Console\Output\OutputInterface $output * @param \Throwable $e * @return void */ final public function renderForConsole($output, Throwable $e) { $command_name = null; foreach (request()->server('argv') as $arg) { if (Str::contains($arg, ':')) { $command_name = $arg; break; } } if (null !== $command_name && class_exists('xmlshop\QueueMonitor\Services\CLIFailureHandler')) { app('xmlshop\QueueMonitor\Services\CLIFailureHandler')->handle($command_name, $e); } parent::renderForConsole($output, $e); } }
并在应用程序提供者中注册它
$this->app->bind(\Illuminate\Contracts\Debug\ExceptionHandler::class, \App\Exceptions\Handler::class);
警报功能
- 监听器检查数据库中的
x_queue_monitoring_queue_sizes
表,并将当前数量与字段alert_threshold
中提到的数量进行比较。如果超出,则发出警报。 - 监听器检查数据库中的
x_queue_monitoring
表,并将多个指标(待处理时间、执行时间等)进行比较。 - 您可以管理每个作业的异常。包括忽略警报。
使用方法
要监控作业,只需将 xmlshop\QueueMonitor\Traits\IsMonitored
特性添加到作业类。
use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use xmlshop\QueueMonitor\Traits\IsMonitored; // <--- class ExampleJob implements ShouldQueue { use Dispatchable; use InteractsWithQueue; use Queueable; use SerializesModels; use IsMonitored; // <--- }
重要!您需要在作业类中实现 Illuminate\Contracts\Queue\ShouldQueue
接口。否则,Laravel 框架不会派发任何包含状态信息的作业事件,用于监控作业。
用户界面
您可以通过在路由文件中调用 Route::queueMonitor()
启用可选的用户界面路由,类似于官方的 ui 框架。
Route::prefix('monitor')->group(function () { Route::queueMonitor(); });
路由
有关更多信息,请参阅 配置文件。
扩展使用
进度
您可以设置一个 进度值(0-100),以获取作业进度的估计。
use Illuminate\Contracts\Queue\ShouldQueue; use xmlshop\QueueMonitor\Traits\IsMonitored; class ExampleJob implements ShouldQueue { use IsMonitored; public function handle() { $this->queueProgress(0); // Do something... $this->queueProgress(50); // Do something... $this->queueProgress(100); } }
块进度
作业的一个常见场景是遍历大量集合。
此示例作业遍历大量用户,并在每个块迭代中更新其进度值。
use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Database\Eloquent\Collection; use xmlshop\QueueMonitor\Traits\IsMonitored; class ChunkJob implements ShouldQueue { use IsMonitored; public function handle() { $usersCount = User::count(); $perChunk = 50; User::query() ->chunk($perChunk, function (Collection $users) use ($perChunk, $usersCount) { $this->queueProgressChunk($usersCount‚ $perChunk); foreach ($users as $user) { // ... } }); } }
进度冷却
为了避免快速重复的更新查询使数据库过载,您可以覆盖 progressCooldown
方法并指定等待秒数,然后每个进度更新才写入数据库。注意,对于值 0、25、50、75 和 100,冷却将始终被忽略。
use Illuminate\Contracts\Queue\ShouldQueue; use xmlshop\QueueMonitor\Traits\IsMonitored; class LazyJob implements ShouldQueue { use IsMonitored; public function progressCooldown(): int { return 10; // Wait 10 seconds between each progress update } }
自定义数据
此包还允许在监控模型上以数组语法设置自定义数据。
use Illuminate\Contracts\Queue\ShouldQueue; use xmlshop\QueueMonitor\Traits\IsMonitored; class CustomDataJob implements ShouldQueue { use IsMonitored; public function handle() { $this->queueData(['foo' => 'Bar']); // WARNING! This is overriding the monitoring data $this->queueData(['bar' => 'Foo']); // To preserve previous data and merge the given payload, set the $merge parameter true $this->queueData(['bar' => 'Foo'], true); } }
为了在 UI 上显示自定义数据,您需要在 config/monitor.php
下添加此行
'ui' => [ ... 'show_custom_data' => true, ... ]
仅保留失败的作业
您可以覆盖 keepMonitorOnSuccess()
方法,以仅存储执行作业的失败监控条目。如果您只想保留频繁执行的作业的失败监控,这很有用。或者,您可以使用 Laravel 内置的 failed_jobs
表。
use Illuminate\Contracts\Queue\ShouldQueue; use xmlshop\QueueMonitor\Traits\IsMonitored; class FrequentSucceedingJob implements ShouldQueue { use IsMonitored; public static function keepMonitorOnSuccess(): bool { return false; } }
检索处理过的作业
use xmlshop\QueueMonitor\Models\MonitorQueue; $job = MonitorQueue::query()->first(); // Check the current state of a job $job->isFinished(); $job->hasFailed(); $job->hasSucceeded(); // If the job is still running, get the estimated seconds remaining // Notice: This requires a progress to be set $job->getRemainingSeconds(); $job->getRemainingInterval(); // Carbon\CarbonInterval // Retrieve any data that has been set while execution $job->getData(); // Get the base name of the executed job $job->getBasename();
模型作用域
use xmlshop\QueueMonitor\Models\MonitorQueue; // Filter by Status MonitorQueue::failed(); MonitorQueue::succeeded(); // Filter by Date MonitorQueue::lastHour(); MonitorQueue::today(); // Chain Scopes MonitorQueue::today()->failed();