yiisoft / yii-queue
Requires
- php: ^8.0
- psr/container: ^1.0|^2.0
- psr/log: ^2.0|^3.0
- symfony/console: ^5.4|^6.0
- yiisoft/definitions: ^1.0|^2.0|^3.0
- yiisoft/friendly-exception: ^1.0
- yiisoft/injector: ^1.0
Requires (Dev)
- maglnet/composer-require-checker: ^4.2
- phpunit/phpunit: ^9.5
- rector/rector: ^0.18.10
- roave/infection-static-analysis-plugin: ^1.16
- spatie/phpunit-watcher: ^1.23
- vimeo/psalm: ^4.30|^5.8
- yiisoft/test-support: ^3.0
- yiisoft/yii-debug: dev-master
Suggests
- ext-pcntl: Need for process signals
This package is auto-updated.
Last update: 2024-01-09 05:25:09 UTC
README
Yii Queue Extension
一个用于通过队列异步运行任务的扩展。
文档位于 docs/guide/README.md。
安装
安装此扩展的首选方式是通过 composer。
运行以下命令:
composer require yiisoft/queue
或者将以下内容添加到您的 composer.json
文件的 require
部分:
"yiisoft/queue": "~3.0"
(此处省略了具体的代码示例,因为原文中并未提供)
yiisoft/config 集成
如果您正在使用 yiisoft/config,您会发现此包在 common
和 params
配置中提供了一些默认值,以节省您的时间。以下是您需要更改以开始使用队列的内容
- 可选:定义默认的
\Yiisoft\Queue\Adapter\AdapterInterface
实现。 - 并在
channel-definitions
参数键中定义特定通道的AdapterInterface
实现,以便与 队列工厂 一起使用。 - 在
handlers
参数键中定义 消息处理器,以便与QueueWorker
一起使用。 - 解决其他
\Yiisoft\Queue\Queue
依赖项(psr-compliant 事件调度器)。
与 yii2-queue 的区别
如果您有使用 yiisoft/yii2-queue
的经验,您会发现这个包非常相似。尽管如此,还有一些关键的区别,这些区别在 "从 yii2-queue 迁移" 文章中有描述。
基本用法
每个队列任务由两部分组成
- 消息是一个实现
MessageInterface
的类。对于简单的情况,您可以使用默认实现,Yiisoft\Queue\Message\Message
。对于更复杂的情况,您应该通过自己的方式实现接口。 - 消息处理器是由
Yiisoft\Queue\Worker\Worker
调用的可调用对象。处理器处理每个队列消息。
例如,如果您需要下载并保存文件,您的消息可能如下所示
$data = [ 'url' => $url, 'destinationFile' => $filename, ]; $message = new \Yiisoft\Queue\Message\Message('file-download', $data);
然后您应该将其推送到队列
$queue->push($message);
其处理器可能如下所示
class FileDownloader { private string $absolutePath; public function __construct(string $absolutePath) { $this->absolutePath = $absolutePath; } public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void { $fileName = $downloadMessage->getData()['destinationFile']; $path = "$this->absolutePath/$fileName"; file_put_contents($path, file_get_contents($downloadMessage->getData()['url'])); } }
最后,我们需要为 Yiisoft\Queue\Worker\Worker
创建一个配置
$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']]; $worker = new \Yiisoft\Queue\Worker\Worker( $handlers, // Here it is $logger, $injector, $container );
运行队列中所有已存在的消息,然后退出
$queue->run(); // this will execute all the existing messages $queue->run(10); // while this will execute only 10 messages as a maximum before exit
如果您不想脚本立即退出,可以使用listen
方法
$queue->listen();
您还可以检查已推送消息的状态(您所使用的队列适配器必须支持此功能)
$queue->push($message); $id = $message->getId(); // Get status of the job $status = $queue->status($id); // Check whether the job is waiting for execution. $status->isWaiting(); // Check whether a worker got the job from the queue and executes it. $status->isReserved(); // Check whether a worker has executed the job. $status->isDone();
不同的队列通道
通常,我们需要使用单个应用程序将消息推送到不同的队列通道。存在QueueFactory
类,用于为不同的通道创建不同的Queue
对象。使用此工厂,通道特定的Queue
创建与以下操作一样简单:
$queue = $factory->get('channel-name');
主要使用策略是与通道特定的适配器显式定义。定义传递给工厂的$definitions
构造函数参数,其中键是通道名称,值是Yiisoft\Factory\Factory
的定义。以下是一些示例:
use Yiisoft\Queue\Adapter\SynchronousAdapter; [ 'channel1' => new SynchronousAdapter(), 'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'), 'channel3' => [ 'class' => SynchronousAdapter::class, '__constructor' => ['channel' => 'channel3'], ], ]
有关可用定义格式的更多信息,请参阅工厂文档。
另一种队列工厂使用策略是通过调用withChannel()
方法隐式创建适配器。要使用此方法,您应该传递一些特定的构造函数参数:
- 将
true
传递给$enableRuntimeChannelDefinition
- 将默认的
AdapterInterface
实现传递给$defaultAdapter
。
在这种情况下,当$definitions
中没有显式适配器定义时,将调用$factory->get('channel-name')
,该调用将被转换为$this->queue->withAdapter($this->defaultAdapter->withChannel($channel))
。
警告:此策略不推荐使用,因为它不会提供对通道名称中打字错误和错误的任何保护。
控制台执行
任务执行的准确方式取决于所使用的适配器。大多数适配器可以通过控制台命令运行,这些命令组件会自动注册到您的应用程序中。
以下命令会循环获取并执行队列中的任务,直到队列为空:
yii queue:run
以下命令启动一个守护进程,无限期地查询队列
yii queue:listen
有关适配器特定控制台命令及其选项的更多详细信息,请参阅文档。
组件还具有跟踪已推送到队列中的作业状态的Ability。
有关更多详细信息,请参阅指南。
中间件管道
推送到队列或从中消耗的消息都会通过两个不同的中间件管道:一个在消息推送时,另一个在消息消费时。这个过程与HTTP请求相同,但对于队列消息会执行两次。这意味着您可以通过配置两个类:分别配置PushMiddlewareDispatcher
和ConsumeMiddlewareDispatcher
来在消息推送和消费时添加额外的功能。
您可以使用以下任何格式来定义中间件:
- 一个可用的中间件对象:
new FooMiddleware()
。它必须根据您使用它的位置实现MiddlewarePushInterface
、MiddlewareConsumeInterface
或MiddlewareFailureInterface
。 - 一个格式为yiisoft/definitions的数组。**仅当您使用yiisoft/definitions和yiisoft/di时**。
- 一个
callable
:fn() => // do stuff
、$object->foo(...)
等。它将通过yiisoft/injector执行,因此所有可调用项的依赖项都将得到解决。 - 一个字符串,用于您的DI容器解析中间件,例如
FooMiddleware::class
中间件将按照它们定义的顺序正向执行。如果您像以下这样定义:[$middleware1, $midleware2]
,则执行顺序将如下所示
graph LR StartPush((Start)) --> PushMiddleware1[$middleware1] --> PushMiddleware2[$middleware2] --> Push(Push to a queue) -.-> PushMiddleware2[$middleware2] -.-> PushMiddleware1[$middleware1] PushMiddleware1[$middleware1] -.-> EndPush((End)) StartConsume((Start)) --> ConsumeMiddleware1[$middleware1] --> ConsumeMiddleware2[$middleware2] --> Consume(Consume / handle) -.-> ConsumeMiddleware2[$middleware2] -.-> ConsumeMiddleware1[$middleware1] ConsumeMiddleware1[$middleware1] -.-> EndConsume((End))
推送管道
当您推送消息时,可以使用中间件修改消息和队列适配器。通过消息修改,您可以添加额外数据、混淆数据、收集指标等。
通过修改队列适配器,您可以重定向消息到另一个队列、延迟消息消费等。
要使用此功能,您必须创建一个中间件类,该类实现了MiddlewarePushInterface
接口,并从processPush
方法返回一个修改后的PushRequest
对象。
return $pushRequest->withMessage($newMessage)->withAdapter($newAdapter);
使用推送中间件,您可以在运行时定义适配器对象,而不是在Queue
构造函数中。有一个限制:在所有中间件按正向顺序执行完毕后,适配器必须在PushRequest
对象中指定。如果没有指定,您将获得一个AdapterNotConfiguredException
异常。
您有三个地方可以定义推送中间件
PushMiddlewareDispatcher
。您可以将其传递给构造函数,或者传递给withMiddlewares()
方法,该方法
创建一个完全新的派发器对象,其中只包含作为参数传递的中间件。如果您使用yiisoft/config,您可以将中间件添加到params
中yiisoft/queue
数组中的middlewares-push
键。- 将中间件传递给
Queue::withMiddlewares()
或Queue::withMiddlewaresAdded()
方法。两者的区别在于前者将完全替换现有的中间件堆栈,而后者将传递的中间件添加到现有堆栈的末尾。这些中间件将在直接传递给PushMiddlewareDispatcher
的中间件之后执行。当定义队列通道时很有用。这两种方法都返回Queue
类的新实例。 - 将中间件放入
Queue::push()
方法中,如下所示:$queue->push($message, ...$middlewares)
。这些中间件优先级最低,将在PushMiddlewareDispatcher
中的中间件、传递给Queue::withMiddlewares()
和Queue::withMiddlewaresAdded()
的中间件之后执行,并且仅针对与它们一起传递的消息。
消费管道
当从队列服务器消费消息时,您可以为此消息设置一个中间件管道。这很有用,可以收集指标、修改消息数据等。与推送中间件一起使用时,您可以在队列中删除重复的消息、计算从推送到消费的时间、处理错误(将失败的任务推送到队列、将失败的消息重定向到另一个队列、发送通知等)。除非推送管道,否则您只有一个地方可以定义中间件堆栈:在ConsumeMiddlewareDispatcher
中,无论是在构造函数中还是在withMiddlewares()
方法中。如果您使用yiisoft/config,您可以将中间件添加到params
中yiisoft/queue
数组中的middlewares-consume
键。
错误处理管道
当某些任务失败时,我们通常希望再次尝试执行它或将其重定向到另一个队列通道。这可以在yiisoft/queue
中使用失败中间件管道完成。它们在通过消费中间件管道处理消息时每次遇到任何Throwable
时都会触发。与前两个管道的主要区别是
- 您应分别为每个队列通道设置独立的中间件管道。这意味着,格式应该是
['channel-name' => [FooMiddleware::class]]
,而不是像其他两个管道那样使用[FooMiddleware::class]
。还有一个默认键,它将用于没有自己键的通道:FailureMiddlewareDispatcher::DEFAULT_PIPELINE
。 - 最后一个中间件将抛出异常,该异常将包含
FailureHandlingRequest
对象。如果您不想抛出异常,您的中间件应该return
一个不带调用$handler->handleFailure()
的请求。
您可以在 FailureMiddlewareDispatcher
中声明错误处理中间件管道,无论是在构造函数中还是在 withMiddlewares()
方法中。如果您使用 yiisoft/config,您可以在 params
中将中间件添加到 yiisoft/queue
数组的 middlewares-fail
键。
有关详细信息,请参阅 错误处理文档。
额外信息
单元测试
该软件包使用 PHPUnit 进行测试。要运行测试
./vendor/bin/phpunit
突变测试
该软件包的测试使用 Infection 突变框架进行检测。要运行它
./vendor/bin/infection
静态分析
代码使用 Psalm 进行静态分析。要运行静态分析
./vendor/bin/psalm
支持项目
关注更新
许可证
Yii Queue 扩展是免费软件。它根据 BSD 许可证的条款发布。有关更多信息,请参阅 LICENSE
。
由 Yii 软件 维护。