adt/background-queue

Nette 框架的后台队列

v4.24.1 2024-09-10 12:11 UTC

This package is auto-updated.

Last update: 2024-09-16 13:53:29 UTC


README

该组件允许使用 cron 或 AMQP 代理(例如 RabbitMQ)处理后台任务。适用于长时间运行请求、与 API 通信或发送 webhook 或电子邮件。

组件使用自己的 doctrine 实体管理器来将记录保存到队列中。因此,组件的运行不受应用程序实体管理器的影响,反之亦然。

1. 安装和配置

1.1 安装

composer require adt/background-queue

1.2 注册和配置

BackgroundQueue 接受以下参数

$connection = [
	'serverVersion' => '8.0',
	'driver' => 'pdo_mysql',
	'host' => $_ENV['DB_HOST'],
	'port' => $_ENV['DB_PORT'],
	'user' => $_ENV['DB_USER'],
	'password' => $_ENV['DB_PASSWORD'],
	'dbname' => $_ENV['DB_DBNAME'],
];

$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
	'callbacks' => [
		'processEmail' => [$mailer, 'process'],
		'processEmail2' => [ // možnost specifikace jiné fronty pro tento callback
			'callback' => [$mailer, 'process'],
			'queue' => 'general',
		],
	]
	'notifyOnNumberOfAttempts' => 5, // počet pokusů o zpracování záznamu před zalogováním
	'tempDir' => $tempDir, // cesta pro uložení zámku proti vícenásobnému spuštění commandu
	'connection' => $connection, // pole parametru predavane do Doctrine\Dbal\Connection nebo DSN
	'queue' => $_ENV['PROJECT_NAME'], // název fronty, do které se ukládají a ze které se vybírají záznamy
	'bulkSize' => 1, // velikost dávky při vkládání více záznamů najednou
	'tableName' => 'background_job', // nepovinné, název tabulky, do které se budou ukládat jednotlivé joby
	'logger' => $logger, // nepovinné, musí implementovat psr/log LoggerInterface
	'onBeforeProcess' => function(array $parameters) {...}, // nepovinné
	'onError' => function(Throwable $e, array $parameters) {...},  // nepovinné
	'onAfterProcess' => function(array $parameters) {...}, // nepovinné
	'onProcessingGetMetadata' => function(array $parameters): ?array {...}, // nepovinné
	'parametersFormat' => \ADT\BackgroundQueue\Entity\BackgroundJob::PARAMETERS_FORMAT_SERIALIZE, // nepovinné, určuje v jakém formátu budou do DB ukládána data v `background_job.parameters` (@see \ADT\BackgroundQueue\Entity\BackgroundJob::setParameters)
]);

所需的数据库模式将在第一次使用队列时自动创建,如果需要也会自动更新。

1.3 代理(可选)

您可以使用任何消息代理使用此软件包。您的生产者或消费者只需实现 ADT\BackgroundQueue\Broker\ProducerADT\BackgroundQueue\Broker\Consumer

或者您可以使用 php-amqplib/php-amqplib,这个库已经为它准备好了可用的实现。

1.3.1 php-amqplib 安装

由于使用 php-amqplib/php-amqplib 是可选的,它不会检查您安装的版本与与此软件包一起测试的版本是否一致。因此,建议将以下内容添加到您的 composer

{
  "conflict": {
    "php-amqplib/php-amqplib": "<3.0.0 || >=4.0.0"
  }
}

此版本的 php-amqplib/php-amqplib 还需要 ext-sockets。您可以像这样将其添加到您的 Dockerfile 中

docker-php-ext-install sockets

然后运行

composer require php-amqplib/php-amqplib

这确保了在将来升级 php-amqplib/php-amqplib 时避免 BC 不兼容。

1.3.1 php-amqplib 配置

$connectionParams = [
    'host' => $_ENV['RABBITMQ_HOST'],
    'user' => $_ENV['RABBITMQ_USER'],
    'password' => $_ENV['RABBITMQ_PASSWORD']
];
$queueParams = [
    'arguments' => ['x-queue-type' => ['S', 'quorum']]
];

$manager = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Manager($connectionParams, $queueParams);
$producer = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Producer();
$consumer = new \ADT\BackgroundQueue\Broker\PhpAmqpLib\Consumer();

$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
	...
	'producer' => $producer,
	'waitingJobExpiration' => 1000, // nepovinné, délka v ms, po které se job pokusí znovu provést, když čeká na dokončení předchozího
]);

2 使用

2.1 将记录添加到队列并处理它

namespace App\Presenters;

use ADT\BackgroundQueue\BackgroundQueue;

class Mailer
{
    private BackgroundQueue $backgroundQueue

    public function __construct(BackgroundQueue $backgroundQueue)
    {
        $this->backgroundQueue = $backgroundQueue;
    }

	public function send(Invoice $invoice) 
	{
		$callbackName = 'processEmail';
		$parameters = [
			'to' => 'hello@appsdevteam.com',
			'subject' => 'Background queue test'
			'text' => 'Anything you want.'
		];
		$serialGroup = 'invoice-' . $invoice->getId();
		$identifier = 'sendEmail-' . $invoice->getId();
		$isUnique = true; // always set to true if a callback on an entity should be performed only once, regardless of how it can happen that it is added to your queue twice
		$availableAt = new \DateTimeImmutable('+1 hour'); // earliest time when the record should be processed

		$this->backgroundQueue->publish($callbackName, $parameters, $serialGroup, $identifier, $isUnique, $availableAt);
	}
	
	public function process(string $to, string $subject, string $text) 
	{
	    // own implementation
	}
}

记录将被存储在 READY 状态中。

参数 $parameters 可以接受任何常见类型(数组、对象、字符串等)或它们的组合(对象数组),甚至二进制数据。

参数 $serialGroup 是可选的 - 通过设置它,您可以确保具有相同 serialGroup 的所有作业将顺序执行。

参数 $identifier 是可选的 - 您可以使用它来标记作业并使用方法 getUnfinishedJobIdentifiers(array $identifiers = []) 来确定哪些尚未执行。

如果回调抛出 ADT\BackgroundQueue\Exception\PermanentErrorException,记录将被存储在 PERMANENTLY_FAILED 状态中,需要手动处理。

如果回调抛出 ADT\BackgroundQueue\Exception\WaitingException,记录将被存储在 WAITING 状态中,并在下一次启动 background-queue:process 命令时尝试处理(见下文)。尝试计数器不会增加。

如果回调抛出 ADT\BackgroundQueue\Exception\DieException,将根据 ->getPrevious() 中的异常处理一切(如果没有异常,则类似于 ADT\BackgroundQueue\Exception\PermanentErrorException)。然后(在下一迭代之前),消费者将被终止。这可以用于在应用程序中关闭 Doctrine Entity manager 时在 onError 中使用,如果消费者再次尝试迭代会再次失败。

public function onError(\Throwable $exception) {

	// Příklad 1: Bude to neopakovatelná chyba a konzumer se před další iterací ukončí.
	if (!$this->entityManager->isOpen()) {
		throw new \ADT\BackgroundQueue\Exception\DieException('EM is closed.');
	}

	// Příklad 2: Bude se zpracovávat dle toho, co je v $exception, tedy pokud je $exception instanceof TemporaryErrorException, tak to bude opakovatelná chyba, ale konzumer se také před další iterací ukončí. Používá se například pri deadlocku.
	if (!$this->entityManager->isOpen()) {
		throw new \ADT\BackgroundQueue\DieException('EM is closed. Reason: ' . $exception->getMessage(), $exception->getCode(), $exception);
	}
}

如果回调抛出任何其他错误或实现 Throwable 的异常,记录将被存储在 TEMPORARILY_FAILED 状态中,并在下一次启动 background-queue:process 命令时尝试处理(见下文)。在 notifyOnNumberOfAttempts 后发送通知。每次重复尝试之间的延迟将增加两倍时间,但最多为 16 分钟。

在其他所有情况下,记录将作为成功完成的状态 STATE_FINISHED 存储。

2.2 命令

background-queue:process 不使用代理的情况下,将处理所有处于 READYTEMPORARILY_FAILEDWAITINGBROKER_FAILED 状态的记录。如果使用代理,则处理处于 STATE_BACK_TO_BROKERTEMPORARILY_FAILEDWAITING 状态的记录。命令是理想的,可以每分钟通过 cron 启动一次。如果使用代理,处于 STATE_BACK_TO_BROKERTEMPORARILY_FAILEDWAITING 状态的记录将被重新加入到代理中,并状态更改为 READY。通常情况下,将 STATE_BACK_TO_BROKER 状态手动设置在数据库中,以允许重新处理这些记录。

background-queue:clear-finished 删除所有已成功处理的记录。

background-queue:clear-finished 14 删除所有超过 14 天的已成功处理的记录。

background-queue:reload-consumers QUEUE NUMBER 重新加载指定 QUEUE 的 NUMBER 个消费者。

background-queue:update-schema 如果需要,则更新数据库模式。

所有命令都受到防止多次启动的保护。

2.3 回调

您还可以使用 2 个回调 onBeforeProcessonAfterProcess,在其中您可以执行例如切换数据库的操作。

3 监控

在启动消费者时,当前进程的 PID 将被存储在 background_job 表的 pid 列中。这不是系统视角的 PID,而是在 Docker 容器内部的 PID。

在回调完成时,将在 background_job 表的 memory 列中存储完成前后的内存使用信息。如果在 background-queue:consume 命令中使用 jobs 参数,从 PHP 8.2 版本开始,在每次单个处理之前将重置 "memory peak"(使用 memory_reset_peak_usage() 方法)。

'notRealActual' => memory_get_usage(),
'realActual' => memory_get_usage(true),
'notRealPeak' => memory_get_peak_usage(),
'realPeak' => memory_get_peak_usage(true),

4 批量插入

如果需要插入大量记录,BackgroundQueue 可以通过使用 INSERT INTO table () VALUES (...), (...), ... 的方式,以批量方式更有效地将记录插入到数据库中。批量大小通过 bulkSize 参数设置。通过 startBulkendBulk 方法指定批量插入的开始和结束。如果没有使用 startBulk 方法开始批量插入,则批量大小的默认值为 1,无论 bulkSize 参数设置为何值。

$this->backgroundQueueService->startBulk();
foreach ($data as $oneJobData) {
    $this->backgroundQueue->publish(...);
}
$this->backgroundQueueService->endBulk();

5 记录优先级

可以为插入的记录指定其优先级。较晚插入且优先级较高的记录将在处理时优先于较早插入的记录。

在设置中可以指定要使用的优先级。该参数是可选的,默认值为 [1]

$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
	...
	'priorities' => [10, 15, 20, 25, 30, 35, 40, 45, 50],
	...
]);

可以直接在处理记录的各个回调中指定它们的优先级。如果回调没有指定优先级,则将使用最高可用优先级。

例如,我们有以下类型的工作

  • ACL 重新计算
  • 从第三方 API 下载数据
  • 发送电子邮件(例如注册电子邮件)

通常从 API 下载数据,这可能是一个耗时的任务。但是,如果我们需要重新计算 ACL,我们不希望它被 API 数据下载阻塞。更优先的是,我们希望在注册时发送偶尔的电子邮件。

$backgroundQueue = new \ADT\BackgroundQueue\BackgroundQueue([
	...
	'priorities' => [10, 15, 20, 25, 30, 35, 40, 45, 50],
	'callbacks' => [
		'email' => [$mailer, 'process'], // záznamy budou mít prioritu 10
		'aclRecalculation' => [
			'callback' => [$aclService, 'process'],
			'priority' => 20,
		],
		'dataImporting' => [
			'callback' => [$apiService, 'process'],
			'priority' => 30,
		],
	],
	...
]);

我们可以通过 background-queue:consume 命令的参数 -p 设置优先级范围来处理(例如 -p 10-p 20-40-p 25--p"-20" 等)。例如,我们可以为发送注册电子邮件保留一个消费者(background-queue:consume -p 10),并为所有其他任务保留其他消费者(background-queue:consume)。这样可以确保快速发送注册电子邮件不会等待耗时的任务,因为第一个消费者将首先处理它。但是,如果出现多个发送电子邮件的请求,经过一段时间,所有消费者都将开始处理这些请求。

我们还可以在 publish 方法中为插入的记录设置优先级。例如,我们知道这是发送时事通讯。因此,这是发送电子邮件,但处理优先级较低。

$priority = null; // aplikuje se priorita 10 z nastavení pro callback
if ($isNewsletter) {
	$priority = 25;
}
$this->backgroundQueue->publish('email', $parameters, $serialGroup, $identifier, $isUnique, $availableAt, $priority);

6 集成到框架中