yiisoft/queue

支持数据库、Redis、RabbitMQ、Beanstalk、SQS 和 Gearman 的队列扩展

dev-master / 3.0.x-dev 2024-09-15 17:34 UTC

README

Yii

Yii Queue


Latest Stable Version Total Downloads Build status Scrutinizer Code Quality Code Coverage Mutation testing badge static analysis type-coverage

一个用于通过队列异步执行任务的扩展。

要求

  • PHP 8.1 或更高版本。

安装

可以使用 Composer 安装此包

composer require yiisoft/queue

准备使用 yiisoft/config

如果您正在使用 yiisoft/config,您会发现此包在 commonparams 配置中提供了一些默认设置,这样可以节省您的时间。以下是您应该更改以开始使用队列的内容

  • 可选:定义默认 \Yiisoft\Queue\Adapter\AdapterInterface 实现。
  • 并且/或者定义特定通道的 AdapterInterface 实现在 channel-definitions 参数键中,以用于 队列工厂
  • handlers 参数键中定义 消息处理器,以用于 QueueWorker
  • 解决其他 \Yiisoft\Queue\Queue 依赖(psr-compliant 事件调度器)。

yii2-queue 的差异

如果您有使用 yiisoft/yii2-queue 的经验,您会发现这个包很相似。不过,还有一些关键差异,在 "从 yii2-queue 迁移" 文章中进行了描述。

一般用法

每个队列任务由两部分组成

  1. 消息是一个实现 MessageInterface 的类。对于简单情况,您可以使用默认实现,Yiisoft\Queue\Message\Message。对于更复杂的情况,您应该自己实现该接口。
  2. 消息处理器是一个由 Yiisoft\Queue\Worker\Worker 调用的可调用对象。处理器处理每个队列消息。

例如,如果您需要下载并保存文件,您的消息可能如下所示

$data = [
    'url' => $url,
    'destinationFile' => $filename,
];
$message = new \Yiisoft\Queue\Message\Message('file-download', $data);

然后您应该将其推送到队列

$queue->push($message);

其处理器可能如下所示

class FileDownloader
{
    private string $absolutePath;

    public function __construct(string $absolutePath) 
    {
        $this->absolutePath = $absolutePath;
    }

    public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void
    {
        $fileName = $downloadMessage->getData()['destinationFile'];
        $path = "$this->absolutePath/$fileName"; 
        file_put_contents($path, file_get_contents($downloadMessage->getData()['url']));
    }
}

最后,我们需要为 Yiisoft\Queue\Worker\Worker 创建配置

$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']];
$worker = new \Yiisoft\Queue\Worker\Worker(
    $handlers, // Here it is
    $logger,
    $injector,
    $container
);

有一种方法可以运行队列中的所有消息,然后退出

$queue->run(); // this will execute all the existing messages
$queue->run(10); // while this will execute only 10 messages as a maximum before exit

如果您不希望脚本立即退出,您可以使用 listen 方法

$queue->listen();

您还可以检查推送消息的状态(您使用的队列适配器必须支持此功能)

$queue->push($message);
$id = $message->getId();

// Get status of the job
$status = $queue->status($id);

// Check whether the job is waiting for execution.
$status->isWaiting();

// Check whether a worker got the job from the queue and executes it.
$status->isReserved();

// Check whether a worker has executed the job.
$status->isDone();

不同的队列通道

我们通常需要将消息推送到不同的队列通道,而只有一个应用程序。存在一个 QueueFactory 类来为不同的通道创建不同的 Queue 对象。使用此工厂,特定通道的 Queue 创建与以下方式一样简单

$queue = $factory->get('channel-name');

主要使用策略是使用显式定义通道特定适配器。定义传递给工厂的 $definitions 构造函数参数中,键是通道名称,值是 Yiisoft\Factory\Factory 的定义。以下是一些示例

use Yiisoft\Queue\Adapter\SynchronousAdapter;

[
    'channel1' => new SynchronousAdapter(),
    'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'),
    'channel3' => [
        'class' => SynchronousAdapter::class,
        '__constructor' => ['channel' => 'channel3'],
    ],
]

有关可用定义格式的更多信息,请参阅 工厂 文档。

另一种队列工厂使用策略是通过调用 withChannel() 方法隐式创建适配器。要使用这种方法,您应传递一些特定的构造函数参数

  • true 传递给 $enableRuntimeChannelDefinition
  • 将默认的 AdapterInterface 实现传递给 $defaultAdapter

在这种情况下,如果没有在 $definitions 中显式适配器定义,则 $factory->get('channel-name') 调用将被转换为 $this->queue->withAdapter($this->defaultAdapter->withChannel($channel))

警告:此策略不建议使用,因为它不会对通道名称中的拼写错误和错误提供任何保护。

控制台执行

任务执行的确切方式取决于所使用的适配器。大多数适配器可以使用控制台命令运行,该组件会自动在您的应用程序中注册这些命令。

以下命令循环获取并执行队列中的任务,直到队列为空

yii queue:run

以下命令启动一个守护进程,该守护进程无限期地查询队列

yii queue:listen

有关特定于适配器的控制台命令及其选项的详细信息,请参阅文档。

组件还具有跟踪已推送到队列中的作业状态的 ability。

有关详细信息,请参阅 指南

中间件管道

任何推送到队列或从中消费的消息都会通过两个不同的中间件管道:一个在消息推送时,另一个在消息消费时。这个过程与 HTTP 请求相同,但对于队列消息会执行两次。这意味着您可以通过配置两个类:分别的 PushMiddlewareDispatcherConsumeMiddlewareDispatcher 来在消息推送和消费时添加额外功能。

您可以使用以下任何格式来定义中间件

  • 一个可用的中间件对象:new FooMiddleware()。它必须实现 MiddlewarePushInterfaceMiddlewareConsumeInterfaceMiddlewareFailureInterface,具体取决于您使用它的位置。
  • 一个格式为 yiisoft/definitions 的数组。**仅当您使用 yiisoft/definitions 和 yiisoft/di**。
  • 一个 callablefn() => // do stuff$object->foo(...) 等。它将通过 yiisoft/injector 执行,因此您的可调用程序的所有依赖项都将得到解决。
  • 一个字符串,用于您的 DI 容器解析中间件,例如 FooMiddleware::class

中间件将按照它们定义的顺序向前执行。如果您像以下这样定义它:[$middleware1, $midleware2],则执行将如下所示

推送管道

当您推送消息时,您可以使用中间件修改消息和队列适配器。使用消息修改,您可以添加额外数据、混淆数据、收集度量等。
使用队列适配器修改,您可以重新定向消息到另一个队列、延迟消息消费等。

要使用此功能,您必须创建一个实现 MiddlewarePushInterface 的中间件类,并从 processPush 方法返回一个修改后的 PushRequest 对象。

return $pushRequest->withMessage($newMessage)->withAdapter($newAdapter);

使用推送中间件,您可以在运行时定义一个适配器对象,而不是在Queue构造函数中。有一个限制:在所有中间件按照正向顺序执行完毕后,适配器必须在PushRequest对象中指定。如果没有指定,您将得到一个AdapterNotConfiguredException异常。

您有三个地方可以定义推送中间件

  1. PushMiddlewareDispatcher。您可以将它传递给构造函数,或者传递给withMiddlewares()方法,该方法
    创建一个全新的派发器对象,该对象仅包含作为参数传递的中间件。如果您使用yiisoft/config,您可以在params中添加中间件到yiisoft/queue数组中的middlewares-push键。
  2. 将中间件传递给Queue::withMiddlewares()Queue::withMiddlewaresAdded()方法。两者的区别在于前者将完全替换现有的中间件栈,而后者将传递的中间件添加到现有栈的末尾。这些中间件将在通用的中间件之后执行,直接传递给PushMiddlewareDispatcher。当定义队列通道时很有用。这两种方法都返回Queue类的新实例。
  3. 将中间件放入Queue::push()方法中,例如:$queue->push($message, ...$middlewares)。这些中间件具有最低的优先级,将在PushMiddlewareDispatcher中的那些以及传递给Queue::withMiddlewares()Queue::withMiddlewaresAdded()的中间件之后执行,并且仅针对与它们一起传递的消息。

消费管道

您可以为从队列服务器中消费消息时设置一个中间件管道。这有助于收集指标、修改消息数据等。与推送中间件配合使用,您可以在队列中删除重复消息、计算从推送到消费的时间、处理错误(再次将消息推送到队列、将失败的消息重定向到另一个队列、发送通知等)。除非是推送管道,否则您只有一个地方可以定义中间件栈:在ConsumeMiddlewareDispatcher中,无论是在构造函数中还是在withMiddlewares()方法中。如果您使用yiisoft/config,您可以在params中添加中间件到yiisoft/queue数组中的middlewares-consume键。

错误处理管道

当某些作业失败时,我们通常希望执行其执行几次或将其重定向到另一个队列通道。这可以通过yiisoft/queue中的失败中间件管道来完成。它们会在通过消费中间件管道处理消息的任何时候被任何Throwable中断时触发。与前两个管道的主要区别

  • 您应该为每个队列通道单独设置中间件管道。这意味着格式应该是['channel-name' => [FooMiddleware::class]],而不是像其他两个管道那样是[FooMiddleware::class]。还有一个默认键,它将用于没有自己键的通道:FailureMiddlewareDispatcher::DEFAULT_PIPELINE
  • 最后一个中间件将抛出带有FailureHandlingRequest对象的异常。如果您不希望抛出异常,您的中间件应该return一个不带调用$handler->handleFailure()的请求。

您可以在FailureMiddlewareDispatcher中声明错误处理中间件管道,无论是在构造函数中还是在withMiddlewares()方法中。如果您使用yiisoft/config,您可以在params中添加中间件到yiisoft/queue数组中的middlewares-fail键。

有关详细信息,请参阅错误处理文档

文档

如果您需要帮助或有任何疑问,可以访问Yii 论坛,那里是一个寻求帮助的好地方。您还可以查看其他Yii 社区资源

许可

Yii 队列是免费的软件。它根据BSD许可证发布。有关更多信息,请参阅LICENSE

Yii 软件维护。

支持项目

Open Collective

关注更新

Official website Twitter Telegram Facebook Slack