yiisoft / queue
支持数据库、Redis、RabbitMQ、Beanstalk、SQS 和 Gearman 的队列扩展
Requires
- php: ^8.1
- psr/container: ^1.0|^2.0
- psr/log: ^2.0|^3.0
- symfony/console: ^5.4|^6.0
- yiisoft/definitions: ^1.0|^2.0|^3.0
- yiisoft/friendly-exception: ^1.0
- yiisoft/injector: ^1.0
Requires (Dev)
- maglnet/composer-require-checker: ^4.7
- phpunit/phpunit: ^10.5
- rector/rector: ^1.0.0
- roave/infection-static-analysis-plugin: ^1.34
- spatie/phpunit-watcher: ^1.23
- vimeo/psalm: ^5.20
- yiisoft/test-support: ^3.0
- yiisoft/yii-debug: dev-master
Suggests
- ext-pcntl: Need for process signals
- dev-master / 3.0.x-dev
- dev-initial-benchmarks
- dev-allow-handler-classes
- dev-simplify-envelopes
- dev-improve-envelope-obtaining
- dev-restore-message-class
- dev-new
- dev-new-stack
- dev-refactor-handlers
- dev-middlewares
- dev-use-yii-console-optional
- dev-psalm_level_1
- dev-revert-137-middleware-factory-improvement
This package is auto-updated.
Last update: 2024-09-21 12:12:33 UTC
README
Yii Queue
一个用于通过队列异步执行任务的扩展。
要求
- PHP 8.1 或更高版本。
安装
可以使用 Composer 安装此包
composer require yiisoft/queue
准备使用 yiisoft/config
如果您正在使用 yiisoft/config,您会发现此包在 common
和 params
配置中提供了一些默认设置,这样可以节省您的时间。以下是您应该更改以开始使用队列的内容
- 可选:定义默认
\Yiisoft\Queue\Adapter\AdapterInterface
实现。 - 并且/或者定义特定通道的
AdapterInterface
实现在channel-definitions
参数键中,以用于 队列工厂。 - 在
handlers
参数键中定义 消息处理器,以用于QueueWorker
。 - 解决其他
\Yiisoft\Queue\Queue
依赖(psr-compliant 事件调度器)。
yii2-queue 的差异
如果您有使用 yiisoft/yii2-queue
的经验,您会发现这个包很相似。不过,还有一些关键差异,在 "从 yii2-queue 迁移" 文章中进行了描述。
一般用法
每个队列任务由两部分组成
- 消息是一个实现
MessageInterface
的类。对于简单情况,您可以使用默认实现,Yiisoft\Queue\Message\Message
。对于更复杂的情况,您应该自己实现该接口。 - 消息处理器是一个由
Yiisoft\Queue\Worker\Worker
调用的可调用对象。处理器处理每个队列消息。
例如,如果您需要下载并保存文件,您的消息可能如下所示
$data = [ 'url' => $url, 'destinationFile' => $filename, ]; $message = new \Yiisoft\Queue\Message\Message('file-download', $data);
然后您应该将其推送到队列
$queue->push($message);
其处理器可能如下所示
class FileDownloader { private string $absolutePath; public function __construct(string $absolutePath) { $this->absolutePath = $absolutePath; } public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void { $fileName = $downloadMessage->getData()['destinationFile']; $path = "$this->absolutePath/$fileName"; file_put_contents($path, file_get_contents($downloadMessage->getData()['url'])); } }
最后,我们需要为 Yiisoft\Queue\Worker\Worker
创建配置
$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']]; $worker = new \Yiisoft\Queue\Worker\Worker( $handlers, // Here it is $logger, $injector, $container );
有一种方法可以运行队列中的所有消息,然后退出
$queue->run(); // this will execute all the existing messages $queue->run(10); // while this will execute only 10 messages as a maximum before exit
如果您不希望脚本立即退出,您可以使用 listen
方法
$queue->listen();
您还可以检查推送消息的状态(您使用的队列适配器必须支持此功能)
$queue->push($message); $id = $message->getId(); // Get status of the job $status = $queue->status($id); // Check whether the job is waiting for execution. $status->isWaiting(); // Check whether a worker got the job from the queue and executes it. $status->isReserved(); // Check whether a worker has executed the job. $status->isDone();
不同的队列通道
我们通常需要将消息推送到不同的队列通道,而只有一个应用程序。存在一个 QueueFactory
类来为不同的通道创建不同的 Queue
对象。使用此工厂,特定通道的 Queue
创建与以下方式一样简单
$queue = $factory->get('channel-name');
主要使用策略是使用显式定义通道特定适配器。定义传递给工厂的 $definitions
构造函数参数中,键是通道名称,值是 Yiisoft\Factory\Factory
的定义。以下是一些示例
use Yiisoft\Queue\Adapter\SynchronousAdapter; [ 'channel1' => new SynchronousAdapter(), 'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'), 'channel3' => [ 'class' => SynchronousAdapter::class, '__constructor' => ['channel' => 'channel3'], ], ]
有关可用定义格式的更多信息,请参阅 工厂 文档。
另一种队列工厂使用策略是通过调用 withChannel()
方法隐式创建适配器。要使用这种方法,您应传递一些特定的构造函数参数
- 将
true
传递给$enableRuntimeChannelDefinition
- 将默认的
AdapterInterface
实现传递给$defaultAdapter
。
在这种情况下,如果没有在 $definitions
中显式适配器定义,则 $factory->get('channel-name')
调用将被转换为 $this->queue->withAdapter($this->defaultAdapter->withChannel($channel))
。
警告:此策略不建议使用,因为它不会对通道名称中的拼写错误和错误提供任何保护。
控制台执行
任务执行的确切方式取决于所使用的适配器。大多数适配器可以使用控制台命令运行,该组件会自动在您的应用程序中注册这些命令。
以下命令循环获取并执行队列中的任务,直到队列为空
yii queue:run
以下命令启动一个守护进程,该守护进程无限期地查询队列
yii queue:listen
有关特定于适配器的控制台命令及其选项的详细信息,请参阅文档。
组件还具有跟踪已推送到队列中的作业状态的 ability。
有关详细信息,请参阅 指南。
中间件管道
任何推送到队列或从中消费的消息都会通过两个不同的中间件管道:一个在消息推送时,另一个在消息消费时。这个过程与 HTTP 请求相同,但对于队列消息会执行两次。这意味着您可以通过配置两个类:分别的 PushMiddlewareDispatcher
和 ConsumeMiddlewareDispatcher
来在消息推送和消费时添加额外功能。
您可以使用以下任何格式来定义中间件
- 一个可用的中间件对象:
new FooMiddleware()
。它必须实现MiddlewarePushInterface
、MiddlewareConsumeInterface
或MiddlewareFailureInterface
,具体取决于您使用它的位置。 - 一个格式为 yiisoft/definitions 的数组。**仅当您使用 yiisoft/definitions 和 yiisoft/di**。
- 一个
callable
:fn() => // do stuff
、$object->foo(...)
等。它将通过 yiisoft/injector 执行,因此您的可调用程序的所有依赖项都将得到解决。 - 一个字符串,用于您的 DI 容器解析中间件,例如
FooMiddleware::class
中间件将按照它们定义的顺序向前执行。如果您像以下这样定义它:[$middleware1, $midleware2]
,则执行将如下所示
推送管道
当您推送消息时,您可以使用中间件修改消息和队列适配器。使用消息修改,您可以添加额外数据、混淆数据、收集度量等。
使用队列适配器修改,您可以重新定向消息到另一个队列、延迟消息消费等。
要使用此功能,您必须创建一个实现 MiddlewarePushInterface
的中间件类,并从 processPush
方法返回一个修改后的 PushRequest
对象。
return $pushRequest->withMessage($newMessage)->withAdapter($newAdapter);
使用推送中间件,您可以在运行时定义一个适配器对象,而不是在Queue
构造函数中。有一个限制:在所有中间件按照正向顺序执行完毕后,适配器必须在PushRequest
对象中指定。如果没有指定,您将得到一个AdapterNotConfiguredException
异常。
您有三个地方可以定义推送中间件
PushMiddlewareDispatcher
。您可以将它传递给构造函数,或者传递给withMiddlewares()
方法,该方法
创建一个全新的派发器对象,该对象仅包含作为参数传递的中间件。如果您使用yiisoft/config,您可以在params
中添加中间件到yiisoft/queue
数组中的middlewares-push
键。- 将中间件传递给
Queue::withMiddlewares()
或Queue::withMiddlewaresAdded()
方法。两者的区别在于前者将完全替换现有的中间件栈,而后者将传递的中间件添加到现有栈的末尾。这些中间件将在通用的中间件之后执行,直接传递给PushMiddlewareDispatcher
。当定义队列通道时很有用。这两种方法都返回Queue
类的新实例。 - 将中间件放入
Queue::push()
方法中,例如:$queue->push($message, ...$middlewares)
。这些中间件具有最低的优先级,将在PushMiddlewareDispatcher
中的那些以及传递给Queue::withMiddlewares()
和Queue::withMiddlewaresAdded()
的中间件之后执行,并且仅针对与它们一起传递的消息。
消费管道
您可以为从队列服务器中消费消息时设置一个中间件管道。这有助于收集指标、修改消息数据等。与推送中间件配合使用,您可以在队列中删除重复消息、计算从推送到消费的时间、处理错误(再次将消息推送到队列、将失败的消息重定向到另一个队列、发送通知等)。除非是推送管道,否则您只有一个地方可以定义中间件栈:在ConsumeMiddlewareDispatcher
中,无论是在构造函数中还是在withMiddlewares()
方法中。如果您使用yiisoft/config,您可以在params
中添加中间件到yiisoft/queue
数组中的middlewares-consume
键。
错误处理管道
当某些作业失败时,我们通常希望执行其执行几次或将其重定向到另一个队列通道。这可以通过yiisoft/queue
中的失败中间件管道来完成。它们会在通过消费中间件管道处理消息的任何时候被任何Throwable
中断时触发。与前两个管道的主要区别
- 您应该为每个队列通道单独设置中间件管道。这意味着格式应该是
['channel-name' => [FooMiddleware::class]]
,而不是像其他两个管道那样是[FooMiddleware::class]
。还有一个默认键,它将用于没有自己键的通道:FailureMiddlewareDispatcher::DEFAULT_PIPELINE
。 - 最后一个中间件将抛出带有
FailureHandlingRequest
对象的异常。如果您不希望抛出异常,您的中间件应该return
一个不带调用$handler->handleFailure()
的请求。
您可以在FailureMiddlewareDispatcher
中声明错误处理中间件管道,无论是在构造函数中还是在withMiddlewares()
方法中。如果您使用yiisoft/config,您可以在params
中添加中间件到yiisoft/queue
数组中的middlewares-fail
键。
有关详细信息,请参阅错误处理文档。
文档
如果您需要帮助或有任何疑问,可以访问Yii 论坛,那里是一个寻求帮助的好地方。您还可以查看其他Yii 社区资源。
许可
Yii 队列是免费的软件。它根据BSD许可证发布。有关更多信息,请参阅LICENSE
。
由Yii 软件维护。