agatanga / 中继
在Laravel中创建复杂批处理作业队列的更好方法。
1.2.2
2024-05-12 14:49 UTC
Requires
- php: ^8.0
- illuminate/bus: ^8.0|^9.0|^10.0|^11.0
- illuminate/support: ^7.14|^8.0|^9.0|^10.0|^11.0
- laravel/serializable-closure: ^1.0
README
在Laravel中创建和管理复杂批处理作业队列的更好方法
Relay::chain([new Job1, new Job2]) ->batch([new Job3_1, new Job3_2]) ->chain([new Job4, new Job5]) ->through([new Middleware]) ->dispatch();
主要特性
- Relay不会创建/修改数据库表
- 看起来整洁复杂的批处理
- 可以为多个作业设置中间件
- 通过元数据搜索特定批处理
- 监控批处理进度
- 搜索失败的作业
安装
composer require agatanga/relay
使用示例
平展嵌套回调
以下是使用Relay编写的平展批处理
use Agatanga\Relay\Facades\Relay; Relay::chain('Downloading', [ new DownloadSources($project), new DetectSettings($project), ]) ->then('Updating project data', [ new ReadStringFiles($project), new ReadSourceFiles($project), ]) ->then('Updating project data', [ new FixFalseUnusedStrings($project), ]) ->finally('Cleaning up', [ new IgnoreKnownStrings($project), new RemoveSources($project), ]) ->through(new Middleware($project)) ->dispatch();
查看不使用Relay编写的相同代码
Bus::batch([ [ new DownloadSources($project)->through(new Middleware($project)), new DetectSettings($project)->through(new Middleware($project)), ], ])->then(function (Batch $batch) use ($project) { Bus::batch([ new ReadStringFiles($project)->through(new Middleware($project)), new ReadSourceFiles($project)->through(new Middleware($project)), ])->then(function (Batch $batch) use ($project) { Bus::batch([ new FixFalseUnusedStrings($project)->through(new Middleware($project)), ])->finally(function (Batch $batch) use ($project) { Bus::batch([ new IgnoreKnownStrings($project)->through(new Middleware($project)), new RemoveSources($project)->through(new Middleware($project)), ])->name('Cleaning up')->dispatch(); })->name('Updating project data')->dispatch(); })->name('Updating project data')->dispatch(); })->name('Downloading')->dispatch();
中间件
Relay允许您为多个作业设置中间件
Relay::chain([new Job1, new Job2]) ->batch([new Job3_1, new Job3_2]) ->chain([new Job4, new Job5], [new Middleware1]) ->chain([new Job6, new Job7]) ->through([new Middleware2]) // middleware for Job1-3, Job6-7 ->dispatch();
元数据
在我们开始之前,您可能想知道Relay不会修改job_batches
表来存储元数据。所有数据都存储在name
列中,限制为255个字符。以下是带有元数据的批处理名称可能看起来像什么
Cleaning up|[project:58][project.update:58][3/3]
现在,让我们看看如何使用meta
方法来存储额外的信息
use Agatanga\Relay\Facades\Relay; Relay::chain([ new DownloadSources($project), new DetectSettings($project), ]) ->then([ new ReadStringFiles($project), new ReadSourceFiles($project), ]) ->finally([ new IgnoreKnownStrings($project), new RemoveSources($project), ]) ->name('Update Project (:current of :total)') ->meta('project.update', $project->id) ->meta('causer', auth()->user()->id) ->dispatch();
然后搜索批处理并检索元数据值或批处理名称
use Agatanga\Relay\Facades\Relay; Relay::whereMeta('causer', $userId)->all(); Relay::whereMeta('project', $id)->first()->meta('causer'); Relay::whereMeta('project.update', $id)->first()->name; // returns clean name
进度
假设上面的搜索查询返回了第一个批处理(then
和finally
回调尚未开始)。Relay会考虑这一点,并将返回的进度在0-33%
范围内。
use Agatanga\Relay\Facades\Relay; Relay::whereMeta('project.update', $id)->first()->progress; // only the last callback can return 100%
失败的作业
当批处理作业失败时,Laravel会将失败的作业记录添加到failed_jobs
表。
Relay允许您检索这些失败的作业
use Agatanga\Relay\Facades\Relay; Relay::whereMeta('project.update', $id)->first()->failedJobs(); // or get the exception string of the last failed job $batch = Relay::whereMeta('project.update', $id)->first(); if ($batch->failed) { echo $batch->exception; }