agatanga/中继

在Laravel中创建复杂批处理作业队列的更好方法。

1.2.2 2024-05-12 14:49 UTC

This package is auto-updated.

Last update: 2024-09-12 15:25:54 UTC


README

在Laravel中创建和管理复杂批处理作业队列的更好方法

Relay::chain([new Job1, new Job2])
    ->batch([new Job3_1, new Job3_2])
    ->chain([new Job4, new Job5])
    ->through([new Middleware])
    ->dispatch();

主要特性

  • Relay不会创建/修改数据库表
  • 看起来整洁复杂的批处理
  • 可以为多个作业设置中间件
  • 通过元数据搜索特定批处理
  • 监控批处理进度
  • 搜索失败的作业

安装

composer require agatanga/relay

使用示例

平展嵌套回调

以下是使用Relay编写的平展批处理

use Agatanga\Relay\Facades\Relay;

Relay::chain('Downloading', [
        new DownloadSources($project),
        new DetectSettings($project),
    ])
    ->then('Updating project data', [
        new ReadStringFiles($project),
        new ReadSourceFiles($project),
    ])
    ->then('Updating project data', [
        new FixFalseUnusedStrings($project),
    ])
    ->finally('Cleaning up', [
        new IgnoreKnownStrings($project),
        new RemoveSources($project),
    ])
    ->through(new Middleware($project))
    ->dispatch();
查看不使用Relay编写的相同代码
Bus::batch([
    [
        new DownloadSources($project)->through(new Middleware($project)),
        new DetectSettings($project)->through(new Middleware($project)),
    ],
])->then(function (Batch $batch) use ($project) {
    Bus::batch([
        new ReadStringFiles($project)->through(new Middleware($project)),
        new ReadSourceFiles($project)->through(new Middleware($project)),
    ])->then(function (Batch $batch) use ($project) {
        Bus::batch([
            new FixFalseUnusedStrings($project)->through(new Middleware($project)),
        ])->finally(function (Batch $batch) use ($project) {
            Bus::batch([
                new IgnoreKnownStrings($project)->through(new Middleware($project)),
                new RemoveSources($project)->through(new Middleware($project)),
            ])->name('Cleaning up')->dispatch();
        })->name('Updating project data')->dispatch();
    })->name('Updating project data')->dispatch();
})->name('Downloading')->dispatch();

中间件

Relay允许您为多个作业设置中间件

Relay::chain([new Job1, new Job2])
    ->batch([new Job3_1, new Job3_2])
    ->chain([new Job4, new Job5], [new Middleware1])
    ->chain([new Job6, new Job7])
    ->through([new Middleware2]) // middleware for Job1-3, Job6-7
    ->dispatch();

元数据

在我们开始之前,您可能想知道Relay不会修改job_batches表来存储元数据。所有数据都存储在name列中,限制为255个字符。以下是带有元数据的批处理名称可能看起来像什么

Cleaning up|[project:58][project.update:58][3/3]

现在,让我们看看如何使用meta方法来存储额外的信息

use Agatanga\Relay\Facades\Relay;

Relay::chain([
        new DownloadSources($project),
        new DetectSettings($project),
    ])
    ->then([
        new ReadStringFiles($project),
        new ReadSourceFiles($project),
    ])
    ->finally([
        new IgnoreKnownStrings($project),
        new RemoveSources($project),
    ])
    ->name('Update Project (:current of :total)')
    ->meta('project.update', $project->id)
    ->meta('causer', auth()->user()->id)
    ->dispatch();

然后搜索批处理并检索元数据值或批处理名称

use Agatanga\Relay\Facades\Relay;

Relay::whereMeta('causer', $userId)->all();
Relay::whereMeta('project', $id)->first()->meta('causer');
Relay::whereMeta('project.update', $id)->first()->name; // returns clean name

进度

假设上面的搜索查询返回了第一个批处理(thenfinally回调尚未开始)。Relay会考虑这一点,并将返回的进度在0-33%范围内。

use Agatanga\Relay\Facades\Relay;

Relay::whereMeta('project.update', $id)->first()->progress; // only the last callback can return 100%

失败的作业

当批处理作业失败时,Laravel会将失败的作业记录添加到failed_jobs表。

Relay允许您检索这些失败的作业

use Agatanga\Relay\Facades\Relay;

Relay::whereMeta('project.update', $id)->first()->failedJobs();

// or get the exception string of the last failed job

$batch = Relay::whereMeta('project.update', $id)->first();

if ($batch->failed) {
    echo $batch->exception;
}