jenwachter / php-redis-queue
Requires
- php: >=8.0
- composer-runtime-api: ^2.2
- predis/predis: ^2.1
- psr/log: ^1|^2|^3
- symfony/console: ^4|^5|^6
README
这是一个使用Redis和PHP实现的简单后台服务器队列。
需求
- PHP >= 8.0
- Redis >= 6.0
安装
要安装库,您需要在项目中使用Composer。
composer require jenwachter/php-redis-queue
工作原理
一个 工作进程 定义了一个队列,监听作业被推入队列,并执行工作以完成队列中的每个作业。单个队列可以处理多种类型的作业,由不同的回调函数指定。例如,您可以使用 upload
和 delete
回调函数创建一个处理文件上传的队列,既可以上传文件也可以删除文件。理想情况下,工作进程作为服务器上的进程运行。
一个 客户端 将作业推送到队列中,可选地包含工作进程完成作业所需的数据。例如,对于上传文件的作业,传递文件的路径将很有帮助。
当客户端将作业推送到队列中时,它将在队列中等待,直到它到达顶部。一旦到达顶部,工作进程
- 从队列中删除作业并将其添加到处理队列中。
- 确定是否有可用于此作业的回调。如果没有,则认为作业失败。
- 如果已定义,则调用作业类型的 before 回调。
- 调用作业类型的主回调。
- 如果回调没有抛出异常,则认为它成功。如果回调抛出异常,则认为它失败。
- 作业从处理队列中删除并添加到失败或成功列表中。
- 如果已定义,则调用作业类型的 after 回调。
- 如果作业是作业组的一部分,并且组内的所有作业都已完成,则工作进程将调用
group_after
回调,如果已定义。 - 队列继续处理下一个作业或等待另一个作业被添加。
快速示例
在这个快速示例中,我们将设置一个处理上传和删除文件所需工作的作业进程。然后,我们将创建一个客户端向作业进程发送作业。
创建工作进程
如果使用 work()
方法的默认设置(阻塞),则单个文件中只能运行一个工作进程(队列);但是,单个工作进程可以处理多种类型的作业。例如,这里我们创建了一个名为 files
的工作进程,可以处理文件上传和删除时需要处理的工作
files.php
$predis = new Predis\Client(); $worker = new PhpRedisQueue\QueueWorker($predis, 'files'); $worker->addCallback('upload', function (array $data) { // create various crop sizes // optimize // ... }); $worker->addCallback('delete', function (array $data) { // delete all created crop sizes // ... }); $worker->work();
运行工作进程
要运行工作进程,您可以在服务器上手动运行它
$ php files.php
但是一旦脚本退出或连接关闭,工作进程也将停止运行。这在开发测试期间很有用,但在生产环境中则不那么有用。
为确保您的作业进程始终运行,请使用系统(如 Supervisor)作为进程运行作业进程。
创建客户端
继续我们的文件示例,以下客户端代码将在系统中的文件上传或删除后执行。
// trigger: user uploads a file... $predis = new Predis\Client(); $client = new PhpRedisQueue\Client($predis); // gather data required by worker upload callback $data = [ 'path' => '/path/to/uploaded/file', ]; // push the job to the files queue $client->push('files', 'upload', $data);
作业组
您还可以将任务分组到任务组中,这使您能够在组内所有任务完成后使用 group_after
回调。添加到任务组的任务可以分配给任何队列。有关详细信息,请参阅任务组文档。
client.php
$predis = new Predis\Client(); $client = new PhpRedisQueue\Client($predis); // create a job group $group = $client->createJobGroup(); // add jobs to the group $group->push('queuename', 'jobname'); $group->push('another-queuename', 'jobname'); $group->push('queuename', 'jobname'); $group->push('queuename', 'jobname'); // add jobs in the group to the queue $group->queue();
worker.php
$predis = new Predis\Client(); $worker = new PhpRedisQueue\QueueWorker($predis, 'queuename'); $worker->addCallback('jobname', fn (array $data) true); $worker->addCallback('group_after', function ($group, $success) { // respond to group completion }); $worker->work();
文档
工作进程
初始化
$predis = new Predis\Client(); $worker = new PhpRedisQueue\QueueWorker($predis, 'queuename');
注意:queuename 不能是整数。
配置
可以通过发送一个选项数组作为第三个参数来自定义工作的一些部分。
$predis = new Predis\Client(); $worker = new PhpRedisQueue\QueueWorker($predis, 'queuename', [ 'logger' => new Logger(), ]);
可用选项
- default_socket_timeout:工作者的超时时间(秒),如果使用默认阻塞功能。默认值:-1(无超时)
- logger:实现
Psr\Log\LoggerInterface
的记录器。默认值:null - wait:在处理任务之间的等待时间(秒)。默认值:1
方法
addCallback(string $name, callable $callable)
将回调附加到工作者。可用回调
<jobName>
:运行任务。示例:upload
<jobName>_before
:在任务开始之前运行。示例:upload_before
<jobName>_after
:在任务完成后运行。示例:upload_after
group_after
:在完成一组任务后运行。
返回:Null。
参数
$name
:与任务处理三个阶段之一对应的钩子名称。请参阅上面的格式。$callable
:要附加到给定钩子的函数。参数如下<jobName>(array $data)
$data
:客户端传递给任务的数组
<jobName>_before(array $data)
$data
:客户端传递给任务的数组
<jobName>_after(array $data, bool $success)
$data
:客户端传递给任务的数组。失败任务的异常数据可在$data['context']
中找到$success
:任务状态;成功(true
)或失败(false
)
group_after(JobGroup $group, bool $success)
$group
:任务组模型$success
:组状态;组内所有任务都成功完成(true
)或组内有一个或多个任务失败(false
)
work(bool $block = true)
指示工作者开始监听队列。
返回:Null。
参数
$block
:工作方法应该是阻塞的吗?
客户端
初始化
$predis = new Predis\Client(); $client = new \PhpRedisQueue\Client($predis);
配置
可以通过发送一个选项数组作为第二个参数来自定义工作的一些部分。
$predis = new Predis\Client(); $worker = new PhpRedisQueue\QueueWorker($predis, 'queuename', [ 'logger' => new Logger() ]);
可用选项
- logger:实现
Psr\Log\LoggerInterface
的记录器。默认值:null
方法
push(string $queue, string $jobName = 'default', array $jobData = [])
将任务推送到队列的末尾。
返回:整数。任务的 ID。
参数
$queue
:队列名称。$jobName
:处理工作的任务名称。$data
:传递给工作者的数据。
pushToFront(string $queue, string $jobName = 'default', array $jobData = [])
将任务推送到队列的前端。
返回:整数。任务的 ID。
参数
$queue
:队列名称。$jobName
:处理工作的任务名称。$data
:传递给工作者的数据。
pull(int $id)
从队列中提取任务。
返回:布尔值。如果成功移除任务,则为 true
;否则为 false
。
参数
$id
:要提取的任务的 ID。
rerun(int $id)
重新运行之前失败的任务。如果任务已经成功或找不到,则抛出异常。
返回:布尔值。如果任务成功重新添加到队列中,则为 TRUE。
参数
$id
:失败任务的 ID。
createJobGroup(int total = null, $data = [])
创建一个任务组,允许您将任务链接在一起。使用 group_after
回调在组内所有任务完成后执行工作。
返回:PhpRedisQueue\models\JobGroup 对象
参数
$total
:任务总数。$data
:与任务组一起存储的数据数组。
任务组
初始化
通过 Client::createJobGroup
创建一个作业组,然后返回 JobGroup 模型。
$predis = new Predis\Client(); $client = new \PhpRedisQueue\Client($predis); $group = $group->createJobGroup($total = null, $data = []);
返回: JobGroup 模型
参数
$total
: 初始化时已知的作业总数。$data
: 要附加到组的数组,在group_after
回调中可能有用。
JobGroup 模型方法
push(string $queue, string $jobName = 'default', array $jobData = [])
将作业推送到作业组。注意:如果作业组已经被排队,则不能向其添加作业。
返回:整数。任务的 ID。
参数
$queue
:队列名称。$jobName
:处理工作的任务名称。$data
:传递给工作者的数据。
setTotal(int total)
告诉组期望多少个作业。使作业组在总数达到时自动将作业添加到队列中。或者,可以使用 JobGroup::queue()
手动排队。
返回: 布尔值。如果总数成功设置,则为 TRUE。
queue()
将组的作业添加到队列中。只有当作业组在通过 JobGroup::setTotal()
初始化时不知道期望的作业总数时,才使用此方法。
返回: 布尔值
removeFromQueue()
从队列中删除组中剩余的作业。
返回: 布尔值
getJobs()
获取与组关联的作业数组。
返回: Job 模型数组。
getUserData()
获取在初始化时分配给组的数。
返回: 数组
命令行界面
通过运行
./vendor/bin/prq
队列命令
prq queues:list
获取有关所有队列的信息。通过查找活动工作者,检查挂起和正在处理的作业列表以及已处理的作业数量来发现队列。
示例输出
$ ./vendor/bin/prq queues:list +-------------------+----------------+--------------+----------------+ | Queue name | Active workers | Pending jobs | Processed jobs | +-------------------+----------------+--------------+----------------+ | files_queue | 1 | 2 | 16 | | another_queue | 0 | 10 | 3 | +-------------------+----------------+--------------+----------------+
prq queues:jobs <queuename>
列出与给定队列关联的作业。
参数
queuename
: 队列名称。
示例输出
$ ./vendor/bin/prq queues:jobs files_queue +----+----------------------+------------+ | ID | Datetime initialized | Job name | +----+----------------------+------------+ | 8 | 2023-09-21T10:38:34 | upload | | 7 | 2023-09-21T10:37:45 | upload | | 6 | 2023-09-21T10:36:02 | delete | | 5 | 2023-09-21T10:35:53 | delete | | 4 | 2023-09-21T10:35:09 | upload | | 3 | 2023-09-21T10:34:21 | upload | | 2 | 2023-09-21T10:32:03 | upload | | 1 | 2023-09-21T10:29:46 | upload | +----+----------------------+------------+
组命令
prq group:info <id>
获取有关组的信息。
参数
id
: 组的 ID。
示例输出
$ ./vendor/bin/prq group:info 1 +----------------------+------------+ Group #1 ----+-----------------+-------------+ | Datetime initialized | Total jobs | Pending jobs | Successful jobs | Failed jobs | +----------------------+------------+--------------+-----------------+-------------+ | 2023-12-20T15:06:17 | 30 | 30 | 0 | 0 | +----------------------+------------+--------------+-----------------+-------------+ ##### __`prq group:jobs <id>`__ List jobs associated with the given group. Arguments: * `id`: ID of the group. Example output: ```bash $ ./vendor/bin/prq group:jobs files_queue +-----+----------------------+----------+---------+ | ID | Datetime initialized | Job name | Status | +-----+----------------------+----------+---------+ | 121 | 2023-12-20T16:13:06 | upload | success | | 122 | 2023-12-20T16:13:06 | upload | success | | 123 | 2023-12-20T16:13:06 | upload | success | | 124 | 2023-12-20T16:13:06 | upload | pending | +-----+----------------------+----------+---------+
作业命令
prq job:info <id>
获取有关指定作业的信息。
参数
id
: 作业的 ID。
示例输出
$ ./vendor/bin/prq job:info 2 +----------------------+ Job #2 ----+---------+---------+ | Datetime initialized | Job name | Status | Context | +----------------------+------------+---------+---------+ | 2023-09-21T13:51:42 | invalidate | success | n/a | +----------------------+------------+---------+---------+
prq job:rerun <id>
重新运行失败的作业。
参数
id
: 失败作业的 ID。
示例输出
$ ./vendor/bin/prq job:rerun 2
Successfully added job #2 to the back of the files_queue queue.