yiisoft/yii-queue

此包已被弃用且不再维护。作者建议使用yiisoft/queue包。

支持DB、Redis、RabbitMQ、Beanstalk、SQS和Gearman的队列扩展

dev-master / 3.0.x-dev 2024-01-09 05:23 UTC

README

993323

Yii Queue Extension


一个用于通过队列异步运行任务的扩展。

文档位于 docs/guide/README.md

Latest Stable Version Total Downloads Build status Scrutinizer Code Quality Code Coverage Mutation testing badge static analysis type-coverage

安装

安装此扩展的首选方式是通过 composer

运行以下命令:

composer require yiisoft/queue

或者将以下内容添加到您的 composer.json 文件的 require 部分:

"yiisoft/queue": "~3.0"

(此处省略了具体的代码示例,因为原文中并未提供)

yiisoft/config 集成

如果您正在使用 yiisoft/config,您会发现此包在 commonparams 配置中提供了一些默认值,以节省您的时间。以下是您需要更改以开始使用队列的内容

  • 可选:定义默认的 \Yiisoft\Queue\Adapter\AdapterInterface 实现。
  • 并在 channel-definitions 参数键中定义特定通道的 AdapterInterface 实现,以便与 队列工厂 一起使用。
  • handlers 参数键中定义 消息处理器,以便与 QueueWorker 一起使用。
  • 解决其他 \Yiisoft\Queue\Queue 依赖项(psr-compliant 事件调度器)。

与 yii2-queue 的区别

如果您有使用 yiisoft/yii2-queue 的经验,您会发现这个包非常相似。尽管如此,还有一些关键的区别,这些区别在 "从 yii2-queue 迁移" 文章中有描述。

基本用法

每个队列任务由两部分组成

  1. 消息是一个实现 MessageInterface 的类。对于简单的情况,您可以使用默认实现,Yiisoft\Queue\Message\Message。对于更复杂的情况,您应该通过自己的方式实现接口。
  2. 消息处理器是由 Yiisoft\Queue\Worker\Worker 调用的可调用对象。处理器处理每个队列消息。

例如,如果您需要下载并保存文件,您的消息可能如下所示

$data = [
    'url' => $url,
    'destinationFile' => $filename,
];
$message = new \Yiisoft\Queue\Message\Message('file-download', $data);

然后您应该将其推送到队列

$queue->push($message);

其处理器可能如下所示

class FileDownloader
{
    private string $absolutePath;

    public function __construct(string $absolutePath) 
    {
        $this->absolutePath = $absolutePath;
    }

    public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void
    {
        $fileName = $downloadMessage->getData()['destinationFile'];
        $path = "$this->absolutePath/$fileName"; 
        file_put_contents($path, file_get_contents($downloadMessage->getData()['url']));
    }
}

最后,我们需要为 Yiisoft\Queue\Worker\Worker 创建一个配置

$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']];
$worker = new \Yiisoft\Queue\Worker\Worker(
    $handlers, // Here it is
    $logger,
    $injector,
    $container
);

运行队列中所有已存在的消息,然后退出

$queue->run(); // this will execute all the existing messages
$queue->run(10); // while this will execute only 10 messages as a maximum before exit

如果您不想脚本立即退出,可以使用listen方法

$queue->listen();

您还可以检查已推送消息的状态(您所使用的队列适配器必须支持此功能)

$queue->push($message);
$id = $message->getId();

// Get status of the job
$status = $queue->status($id);

// Check whether the job is waiting for execution.
$status->isWaiting();

// Check whether a worker got the job from the queue and executes it.
$status->isReserved();

// Check whether a worker has executed the job.
$status->isDone();

不同的队列通道

通常,我们需要使用单个应用程序将消息推送到不同的队列通道。存在QueueFactory类,用于为不同的通道创建不同的Queue对象。使用此工厂,通道特定的Queue创建与以下操作一样简单:

$queue = $factory->get('channel-name');

主要使用策略是与通道特定的适配器显式定义。定义传递给工厂的$definitions构造函数参数,其中键是通道名称,值是Yiisoft\Factory\Factory的定义。以下是一些示例:

use Yiisoft\Queue\Adapter\SynchronousAdapter;

[
    'channel1' => new SynchronousAdapter(),
    'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'),
    'channel3' => [
        'class' => SynchronousAdapter::class,
        '__constructor' => ['channel' => 'channel3'],
    ],
]

有关可用定义格式的更多信息,请参阅工厂文档。

另一种队列工厂使用策略是通过调用withChannel()方法隐式创建适配器。要使用此方法,您应该传递一些特定的构造函数参数:

  • true传递给$enableRuntimeChannelDefinition
  • 将默认的AdapterInterface实现传递给$defaultAdapter

在这种情况下,当$definitions中没有显式适配器定义时,将调用$factory->get('channel-name'),该调用将被转换为$this->queue->withAdapter($this->defaultAdapter->withChannel($channel))

警告:此策略不推荐使用,因为它不会提供对通道名称中打字错误和错误的任何保护。

控制台执行

任务执行的准确方式取决于所使用的适配器。大多数适配器可以通过控制台命令运行,这些命令组件会自动注册到您的应用程序中。

以下命令会循环获取并执行队列中的任务,直到队列为空:

yii queue:run

以下命令启动一个守护进程,无限期地查询队列

yii queue:listen

有关适配器特定控制台命令及其选项的更多详细信息,请参阅文档。

组件还具有跟踪已推送到队列中的作业状态的Ability。

有关更多详细信息,请参阅指南

中间件管道

推送到队列或从中消耗的消息都会通过两个不同的中间件管道:一个在消息推送时,另一个在消息消费时。这个过程与HTTP请求相同,但对于队列消息会执行两次。这意味着您可以通过配置两个类:分别配置PushMiddlewareDispatcherConsumeMiddlewareDispatcher来在消息推送和消费时添加额外的功能。

您可以使用以下任何格式来定义中间件:

  • 一个可用的中间件对象:new FooMiddleware()。它必须根据您使用它的位置实现MiddlewarePushInterfaceMiddlewareConsumeInterfaceMiddlewareFailureInterface
  • 一个格式为yiisoft/definitions的数组。**仅当您使用yiisoft/definitions和yiisoft/di时**。
  • 一个callablefn() => // do stuff$object->foo(...)等。它将通过yiisoft/injector执行,因此所有可调用项的依赖项都将得到解决。
  • 一个字符串,用于您的DI容器解析中间件,例如FooMiddleware::class

中间件将按照它们定义的顺序正向执行。如果您像以下这样定义:[$middleware1, $midleware2],则执行顺序将如下所示

graph LR
    StartPush((Start)) --> PushMiddleware1[$middleware1] --> PushMiddleware2[$middleware2] --> Push(Push to a queue)
    -.-> PushMiddleware2[$middleware2] -.-> PushMiddleware1[$middleware1]
    PushMiddleware1[$middleware1] -.-> EndPush((End))
    

    StartConsume((Start)) --> ConsumeMiddleware1[$middleware1] --> ConsumeMiddleware2[$middleware2] --> Consume(Consume / handle)
    -.-> ConsumeMiddleware2[$middleware2] -.-> ConsumeMiddleware1[$middleware1]
    ConsumeMiddleware1[$middleware1] -.-> EndConsume((End))

推送管道

当您推送消息时,可以使用中间件修改消息和队列适配器。通过消息修改,您可以添加额外数据、混淆数据、收集指标等。
通过修改队列适配器,您可以重定向消息到另一个队列、延迟消息消费等。

要使用此功能,您必须创建一个中间件类,该类实现了MiddlewarePushInterface接口,并从processPush方法返回一个修改后的PushRequest对象。

return $pushRequest->withMessage($newMessage)->withAdapter($newAdapter);

使用推送中间件,您可以在运行时定义适配器对象,而不是在Queue构造函数中。有一个限制:在所有中间件按正向顺序执行完毕后,适配器必须在PushRequest对象中指定。如果没有指定,您将获得一个AdapterNotConfiguredException异常。

您有三个地方可以定义推送中间件

  1. PushMiddlewareDispatcher。您可以将其传递给构造函数,或者传递给withMiddlewares()方法,该方法
    创建一个完全新的派发器对象,其中只包含作为参数传递的中间件。如果您使用yiisoft/config,您可以将中间件添加到paramsyiisoft/queue数组中的middlewares-push键。
  2. 将中间件传递给Queue::withMiddlewares()Queue::withMiddlewaresAdded()方法。两者的区别在于前者将完全替换现有的中间件堆栈,而后者将传递的中间件添加到现有堆栈的末尾。这些中间件将在直接传递给PushMiddlewareDispatcher的中间件之后执行。当定义队列通道时很有用。这两种方法都返回Queue类的新实例。
  3. 将中间件放入Queue::push()方法中,如下所示:$queue->push($message, ...$middlewares)。这些中间件优先级最低,将在PushMiddlewareDispatcher中的中间件、传递给Queue::withMiddlewares()Queue::withMiddlewaresAdded()的中间件之后执行,并且仅针对与它们一起传递的消息。

消费管道

当从队列服务器消费消息时,您可以为此消息设置一个中间件管道。这很有用,可以收集指标、修改消息数据等。与推送中间件一起使用时,您可以在队列中删除重复的消息、计算从推送到消费的时间、处理错误(将失败的任务推送到队列、将失败的消息重定向到另一个队列、发送通知等)。除非推送管道,否则您只有一个地方可以定义中间件堆栈:在ConsumeMiddlewareDispatcher中,无论是在构造函数中还是在withMiddlewares()方法中。如果您使用yiisoft/config,您可以将中间件添加到paramsyiisoft/queue数组中的middlewares-consume键。

错误处理管道

当某些任务失败时,我们通常希望再次尝试执行它或将其重定向到另一个队列通道。这可以在yiisoft/queue中使用失败中间件管道完成。它们在通过消费中间件管道处理消息时每次遇到任何Throwable时都会触发。与前两个管道的主要区别是

  • 您应分别为每个队列通道设置独立的中间件管道。这意味着,格式应该是 ['channel-name' => [FooMiddleware::class]],而不是像其他两个管道那样使用 [FooMiddleware::class]。还有一个默认键,它将用于没有自己键的通道:FailureMiddlewareDispatcher::DEFAULT_PIPELINE
  • 最后一个中间件将抛出异常,该异常将包含 FailureHandlingRequest 对象。如果您不想抛出异常,您的中间件应该 return 一个不带调用 $handler->handleFailure() 的请求。

您可以在 FailureMiddlewareDispatcher 中声明错误处理中间件管道,无论是在构造函数中还是在 withMiddlewares() 方法中。如果您使用 yiisoft/config,您可以在 params 中将中间件添加到 yiisoft/queue 数组的 middlewares-fail 键。

有关详细信息,请参阅 错误处理文档

额外信息

单元测试

该软件包使用 PHPUnit 进行测试。要运行测试

./vendor/bin/phpunit

突变测试

该软件包的测试使用 Infection 突变框架进行检测。要运行它

./vendor/bin/infection

静态分析

代码使用 Psalm 进行静态分析。要运行静态分析

./vendor/bin/psalm

支持项目

Open Collective

关注更新

Official website Twitter Telegram Facebook Slack

许可证

Yii Queue 扩展是免费软件。它根据 BSD 许可证的条款发布。有关更多信息,请参阅 LICENSE

Yii 软件 维护。