adt / background-queue
Nette 框架的后台队列
Requires
- php: ^7.4|^8.0
- ext-json: *
- adt/command-lock: ^1.1
- adt/utils: ^2.10
- doctrine/dbal: ^2.0|^3.0
- psr/log: ^1.0|^2.0|^3.0
- symfony/console: ^4.0|^5.0|^6.0
Requires (Dev)
- ext-bcmath: *
- ext-sockets: *
- adt/docker-binaries: ^3.6
- codeception/assert-throws: ^1.3
- codeception/codeception: ^5.0
- codeception/module-asserts: ^2.0
- php-amqplib/php-amqplib: ^2.8
- dev-master
- v4.24.1
- v4.24
- v4.23.5
- v4.23.4
- v4.23.3
- v4.23.2
- v4.23.1
- v4.23
- v4.22.3
- v4.22.2
- v4.22.1
- v4.22
- v4.21
- v4.20
- v4.19.1
- v4.19
- v4.18.2
- v4.18.1
- v4.18
- v4.17.11
- v4.17.10
- v4.17.9
- v4.17.8
- v4.17.7
- v4.17.6
- v4.17.5
- v4.17.4
- v4.17.3
- v4.17.2
- v4.17.1
- v4.17.0
- v4.16.18
- v4.16.17
- v4.16.16
- v4.16.15
- v4.16.14
- v4.16.13
- v4.16.12
- v4.16.11
- v4.16.10
- v4.16.9
- v4.16.8
- v4.16.7
- v4.16.6
- v4.16.5
- v4.16.4
- v4.16.3
- v4.16.2
- v4.16.1
- v4.16
- v4.15.2
- v4.15.1
- v4.15
- v4.14
- v4.13.4
- v4.13.3
- v4.13.2
- v4.13.1
- v4.13
- v4.12.7
- v4.12.6
- v4.12.5
- v4.12.4
- v4.12.3
- v4.12.2
- v4.12.1
- v4.12.0
- v4.11.9
- v4.11.8
- v4.11.7
- v4.11.6
- v4.11.5
- v4.11.4
- v4.11.3
- v4.11.2
- v4.11.1
- v4.11.0
- v4.10.4
- v4.10.3
- v4.10.2
- v4.10.1
- v4.10
- v4.9.2
- v4.9.1
- v4.9
- v4.8
- v4.7.3
- v4.7.2
- v4.7.1
- v4.7
- v4.6.1
- v4.6
- v4.5.4
- v4.5.3
- v4.5.2
- v4.5.1
- v4.5
- v4.4.3
- v4.4.2
- v4.4.1
- v4.4
- v4.3
- v4.2.3
- v4.2.2
- v4.2.1
- v4.2
- v4.1.1
- v4.1
- v4.0.1
- v4.0
- v3.20.7
- v3.20.6
- v3.20.5
- v3.20.4
- v3.20.3
- v3.20.2
- v3.20.1
- v3.20
- v3.19.3
- v3.19.2
- v3.19.1
- v3.19
- v3.18
- v3.17
- v3.16
- v3.15
- v3.14.1
- v3.14
- v3.13
- v3.12.0
- v3.11.1
- v3.11.0
- v3.10.1
- v3.10.0
- v3.9.1
- v3.9
- v3.8.4
- v3.8.3
- v3.8.1
- v3.8
- v3.7.3
- v3.7.2
- v3.7.1
- v3.7
- v3.6.2
- v3.6.1
- v3.6
- v3.5
- v3.4
- v3.3
- v3.2
- v3.1
- v3.0
- v2.2
- v2.1.1
- v2.1
- v2.0
- v1.5
- v1.4
- v1.3
- v1.2
- v1.1
- v1.0
- dev-consumer-id
- dev-die-consumer
- dev-channel-connection-close-correct
- dev-optimizations
- dev-fix-consumer-create-queue
- dev-fixes
- dev-entity-extend
- dev-priorities
- dev-manually-back-to-broker
- dev-extend-monitoring
- dev-allow-descendants-of-exceptions
- dev-fix-type-hinting
This package is auto-updated.
Last update: 2024-09-16 13:53:29 UTC
README
该组件允许使用 cron 或 AMQP 代理(例如 RabbitMQ)处理后台任务。适用于长时间运行请求、与 API 通信或发送 webhook 或电子邮件。
组件使用自己的 doctrine 实体管理器来将记录保存到队列中。因此,组件的运行不受应用程序实体管理器的影响,反之亦然。
1. 安装和配置
1.1 安装
composer require adt/background-queue
1.2 注册和配置
BackgroundQueue 接受以下参数
$connection = [ 'serverVersion' => '8.0', 'driver' => 'pdo_mysql', 'host' => $_ENV['DB_HOST'], 'port' => $_ENV['DB_PORT'], 'user' => $_ENV['DB_USER'], 'password' => $_ENV['DB_PASSWORD'], 'dbname' => $_ENV['DB_DBNAME'], ]; $backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([ 'callbacks' => [ 'processEmail' => [$mailer, 'process'], 'processEmail2' => [ // možnost specifikace jiné fronty pro tento callback 'callback' => [$mailer, 'process'], 'queue' => 'general', ], ] 'notifyOnNumberOfAttempts' => 5, // počet pokusů o zpracování záznamu před zalogováním 'tempDir' => $tempDir, // cesta pro uložení zámku proti vícenásobnému spuštění commandu 'connection' => $connection, // pole parametru predavane do Doctrine\Dbal\Connection nebo DSN 'queue' => $_ENV['PROJECT_NAME'], // název fronty, do které se ukládají a ze které se vybírají záznamy 'bulkSize' => 1, // velikost dávky při vkládání více záznamů najednou 'tableName' => 'background_job', // nepovinné, název tabulky, do které se budou ukládat jednotlivé joby 'logger' => $logger, // nepovinné, musí implementovat psr/log LoggerInterface 'onBeforeProcess' => function(array $parameters) {...}, // nepovinné 'onError' => function(Throwable $e, array $parameters) {...}, // nepovinné 'onAfterProcess' => function(array $parameters) {...}, // nepovinné 'onProcessingGetMetadata' => function(array $parameters): ?array {...}, // nepovinné 'parametersFormat' => \ADT\BackgroundQueue\Entity\BackgroundJob::PARAMETERS_FORMAT_SERIALIZE, // nepovinné, určuje v jakém formátu budou do DB ukládána data v `background_job.parameters` (@see \ADT\BackgroundQueue\Entity\BackgroundJob::setParameters) ]);
所需的数据库模式将在第一次使用队列时自动创建,如果需要也会自动更新。
1.3 代理(可选)
您可以使用任何消息代理使用此软件包。您的生产者或消费者只需实现 ADT\BackgroundQueue\Broker\Producer
或 ADT\BackgroundQueue\Broker\Consumer
。
或者您可以使用 php-amqplib/php-amqplib
,这个库已经为它准备好了可用的实现。
1.3.1 php-amqplib 安装
由于使用 php-amqplib/php-amqplib
是可选的,它不会检查您安装的版本与与此软件包一起测试的版本是否一致。因此,建议将以下内容添加到您的 composer
{ "conflict": { "php-amqplib/php-amqplib": "<3.0.0 || >=4.0.0" } }
此版本的 php-amqplib/php-amqplib
还需要 ext-sockets
。您可以像这样将其添加到您的 Dockerfile 中
docker-php-ext-install sockets
然后运行
composer require php-amqplib/php-amqplib
这确保了在将来升级 php-amqplib/php-amqplib
时避免 BC 不兼容。
1.3.1 php-amqplib 配置
$connectionParams = [ 'host' => $_ENV['RABBITMQ_HOST'], 'user' => $_ENV['RABBITMQ_USER'], 'password' => $_ENV['RABBITMQ_PASSWORD'] ]; $queueParams = [ 'arguments' => ['x-queue-type' => ['S', 'quorum']] ]; $manager = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Manager($connectionParams, $queueParams); $producer = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Producer(); $consumer = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Consumer(); $backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([ ... 'producer' => $producer, 'waitingJobExpiration' => 1000, // nepovinné, délka v ms, po které se job pokusí znovu provést, když čeká na dokončení předchozího ]);
2 使用
2.1 将记录添加到队列并处理它
namespace App\Presenters; use ADT\BackgroundQueue\BackgroundQueue; class Mailer { private BackgroundQueue $backgroundQueue public function __construct(BackgroundQueue $backgroundQueue) { $this->backgroundQueue = $backgroundQueue; } public function send(Invoice $invoice) { $callbackName = 'processEmail'; $parameters = [ 'to' => 'hello@appsdevteam.com', 'subject' => 'Background queue test' 'text' => 'Anything you want.' ]; $serialGroup = 'invoice-' . $invoice->getId(); $identifier = 'sendEmail-' . $invoice->getId(); $isUnique = true; // always set to true if a callback on an entity should be performed only once, regardless of how it can happen that it is added to your queue twice $availableAt = new \DateTimeImmutable('+1 hour'); // earliest time when the record should be processed $this->backgroundQueue->publish($callbackName, $parameters, $serialGroup, $identifier, $isUnique, $availableAt); } public function process(string $to, string $subject, string $text) { // own implementation } }
记录将被存储在 READY
状态中。
参数 $parameters
可以接受任何常见类型(数组、对象、字符串等)或它们的组合(对象数组),甚至二进制数据。
参数 $serialGroup
是可选的 - 通过设置它,您可以确保具有相同 serialGroup 的所有作业将顺序执行。
参数 $identifier
是可选的 - 您可以使用它来标记作业并使用方法 getUnfinishedJobIdentifiers(array $identifiers = [])
来确定哪些尚未执行。
如果回调抛出 ADT\BackgroundQueue\Exception\PermanentErrorException
,记录将被存储在 PERMANENTLY_FAILED
状态中,需要手动处理。
如果回调抛出 ADT\BackgroundQueue\Exception\WaitingException
,记录将被存储在 WAITING
状态中,并在下一次启动 background-queue:process
命令时尝试处理(见下文)。尝试计数器不会增加。
如果回调抛出 ADT\BackgroundQueue\Exception\DieException
,将根据 ->getPrevious()
中的异常处理一切(如果没有异常,则类似于 ADT\BackgroundQueue\Exception\PermanentErrorException
)。然后(在下一迭代之前),消费者将被终止。这可以用于在应用程序中关闭 Doctrine Entity manager 时在 onError
中使用,如果消费者再次尝试迭代会再次失败。
public function onError(\Throwable $exception) { // Příklad 1: Bude to neopakovatelná chyba a konzumer se před další iterací ukončí. if (!$this->entityManager->isOpen()) { throw new \ADT\BackgroundQueue\Exception\DieException('EM is closed.'); } // Příklad 2: Bude se zpracovávat dle toho, co je v $exception, tedy pokud je $exception instanceof TemporaryErrorException, tak to bude opakovatelná chyba, ale konzumer se také před další iterací ukončí. Používá se například pri deadlocku. if (!$this->entityManager->isOpen()) { throw new \ADT\BackgroundQueue\DieException('EM is closed. Reason: ' . $exception->getMessage(), $exception->getCode(), $exception); } }
如果回调抛出任何其他错误或实现 Throwable
的异常,记录将被存储在 TEMPORARILY_FAILED
状态中,并在下一次启动 background-queue:process
命令时尝试处理(见下文)。在 notifyOnNumberOfAttempts
后发送通知。每次重复尝试之间的延迟将增加两倍时间,但最多为 16 分钟。
在其他所有情况下,记录将作为成功完成的状态 STATE_FINISHED
存储。
2.2 命令
background-queue:process
不使用代理的情况下,将处理所有处于 READY
、TEMPORARILY_FAILED
、WAITING
和 BROKER_FAILED
状态的记录。如果使用代理,则处理处于 STATE_BACK_TO_BROKER
、TEMPORARILY_FAILED
和 WAITING
状态的记录。命令是理想的,可以每分钟通过 cron 启动一次。如果使用代理,处于 STATE_BACK_TO_BROKER
、TEMPORARILY_FAILED
和 WAITING
状态的记录将被重新加入到代理中,并状态更改为 READY
。通常情况下,将 STATE_BACK_TO_BROKER
状态手动设置在数据库中,以允许重新处理这些记录。
background-queue:clear-finished
删除所有已成功处理的记录。
background-queue:clear-finished 14
删除所有超过 14 天的已成功处理的记录。
background-queue:reload-consumers QUEUE NUMBER
重新加载指定 QUEUE 的 NUMBER 个消费者。
background-queue:update-schema
如果需要,则更新数据库模式。
所有命令都受到防止多次启动的保护。
2.3 回调
您还可以使用 2 个回调 onBeforeProcess
和 onAfterProcess
,在其中您可以执行例如切换数据库的操作。
3 监控
在启动消费者时,当前进程的 PID 将被存储在 background_job
表的 pid
列中。这不是系统视角的 PID,而是在 Docker 容器内部的 PID。
在回调完成时,将在 background_job
表的 memory
列中存储完成前后的内存使用信息。如果在 background-queue:consume
命令中使用 jobs
参数,从 PHP 8.2 版本开始,在每次单个处理之前将重置 "memory peak"(使用 memory_reset_peak_usage()
方法)。
'notRealActual' => memory_get_usage(),
'realActual' => memory_get_usage(true),
'notRealPeak' => memory_get_peak_usage(),
'realPeak' => memory_get_peak_usage(true),
4 批量插入
如果需要插入大量记录,BackgroundQueue 可以通过使用 INSERT INTO table () VALUES (...), (...), ...
的方式,以批量方式更有效地将记录插入到数据库中。批量大小通过 bulkSize
参数设置。通过 startBulk
和 endBulk
方法指定批量插入的开始和结束。如果没有使用 startBulk
方法开始批量插入,则批量大小的默认值为 1,无论 bulkSize
参数设置为何值。
$this->backgroundQueueService->startBulk(); foreach ($data as $oneJobData) { $this->backgroundQueue->publish(...); } $this->backgroundQueueService->endBulk();
5 记录优先级
可以为插入的记录指定其优先级。较晚插入且优先级较高的记录将在处理时优先于较早插入的记录。
在设置中可以指定要使用的优先级。该参数是可选的,默认值为 [1]
。
$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
...
'priorities' => [10, 15, 20, 25, 30, 35, 40, 45, 50],
...
]);
可以直接在处理记录的各个回调中指定它们的优先级。如果回调没有指定优先级,则将使用最高可用优先级。
例如,我们有以下类型的工作
- ACL 重新计算
- 从第三方 API 下载数据
- 发送电子邮件(例如注册电子邮件)
通常从 API 下载数据,这可能是一个耗时的任务。但是,如果我们需要重新计算 ACL,我们不希望它被 API 数据下载阻塞。更优先的是,我们希望在注册时发送偶尔的电子邮件。
$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
...
'priorities' => [10, 15, 20, 25, 30, 35, 40, 45, 50],
'callbacks' => [
'email' => [$mailer, 'process'], // záznamy budou mít prioritu 10
'aclRecalculation' => [
'callback' => [$aclService, 'process'],
'priority' => 20,
],
'dataImporting' => [
'callback' => [$apiService, 'process'],
'priority' => 30,
],
],
...
]);
我们可以通过 background-queue:consume
命令的参数 -p
设置优先级范围来处理(例如 -p 10
、-p 20-40
、-p 25-
、-p"-20"
等)。例如,我们可以为发送注册电子邮件保留一个消费者(background-queue:consume -p 10
),并为所有其他任务保留其他消费者(background-queue:consume
)。这样可以确保快速发送注册电子邮件不会等待耗时的任务,因为第一个消费者将首先处理它。但是,如果出现多个发送电子邮件的请求,经过一段时间,所有消费者都将开始处理这些请求。
我们还可以在 publish
方法中为插入的记录设置优先级。例如,我们知道这是发送时事通讯。因此,这是发送电子邮件,但处理优先级较低。
$priority = null; // aplikuje se priorita 10 z nastavení pro callback
if ($isNewsletter) {
$priority = 25;
}
$this->backgroundQueue->publish('email', $parameters, $serialGroup, $identifier, $isUnique, $availableAt, $priority);