popphp/pop-queue

Pop PHP 框架的队列组件

2.1.0 2024-04-02 00:00 UTC

This package is auto-updated.

Last update: 2024-09-02 00:48:19 UTC


README

Build Status Coverage Status

Join the chat at https://popphp.slack.com Join the chat at https://discord.gg/TZjgT74U7E

概述

pop-queue 是一个作业队列组件,它提供了将可执行作业或任务传递给队列以在稍后日期和时间处理的特性。队列可以处理作业或计划任务。作业或任务将与可用的队列存储适配器一起存储,直到它们被调用执行。队列组件的可用的存储适配器包括

  • Redis
  • 数据库
  • 文件
  • AWS SQS*

作业和任务之间的区别在于,作业是“一次性的”(除非失败)并在完成后从队列中弹出。任务则是持久的,并且会保留在队列中以重复运行,或直到它们过期。

* - SQS 适配器不支持任务。

pop-queuePop PHP 框架 的一个组件。

顶部

安装

使用 Composer 安装 pop-queue

composer require popphp/pop-queue

或者,在 composer.json 文件中添加它

"require": {
    "popphp/pop-queue" : "^2.0.0"
}

顶部

快速入门

创建一个作业并将其推送到队列

简单地将作业添加到队列就会将其推送到队列存储适配器。

use Pop\Queue\Queue;
use Pop\Queue\Adapter\File;
use Pop\Queue\Process\Job;

// Create a job and add it to a queue
$job = Job::create(function() {
    echo 'This is job' . PHP_EOL;
});

$queue = new Queue('pop-queue', new File(__DIR__ . '/queue'));
$queue->addJob($job);

调用队列以处理作业

use Pop\Queue\Queue;
use Pop\Queue\Adapter\File;

// Call up the queue and pass it to a worker object
$queue  = new Queue('pop-queue', new File(__DIR__ . '/queue'));
$worker = Worker::create($queue);

// Trigger the worker to work the next job across its queues
$worker->workAll();

如果作业有效,它将运行。在这种情况下,它将产生以下输出

This is a job

创建一个计划任务并将其推送到队列

创建一个任务,设置计划并添加到队列。

use Pop\Queue\Queue;
use Pop\Queue\Adapter\File;
use Pop\Queue\Process\Task;

$task = Task::create(function() {
    echo 'This is a scheduled task' . PHP_EOL;
})->every30Minutes();

// Add to a queue
$queue = new Queue('pop-queue', new File(__DIR__ . '/queue'));
$queue->addTask($task);

调用队列以运行计划任务

use Pop\Queue\Queue;
use Pop\Queue\Adapter\File;

// Call up the queue and pass it to a worker object
$queue  = new Queue('pop-queue', new File(__DIR__ . '/queue'));
$worker = Worker::create($queue);

// Trigger the worker to run the next scheduled task across its queues
$worker->runAll();

如果任务有效,它将运行。在这种情况下,它将产生以下输出

This is a scheduled task

顶部

作业

作业对象是 pop-queue 组件的核心。它们是可以执行可调用对象、应用程序命令或基于 CLI 的命令(如果环境已设置以允许这样做)的对象。

作业默认会分配一个 ID 哈希以供引用。

var_dump($job->hasJobId());
$id = $job->getJobId();

当作业被选中执行时,有一系列方法可以帮助在作业的生命周期中跟踪其状态

var_dump($job->hasStarted()); // Has a started timestamp
var_dump($job->hasNotRun());  // No started timestamp and no completed timestamp
var_dump($job->isRunning());  // Has a started timestamp, but not a completed/failed
var_dump($job->isComplete()); // Has a completed timestamp
var_dump($job->hasFailed());  // Has a failed timestamp
var_dump($job->getStarted());
var_dump($job->getCompleted());
var_dump($job->getFailed());

顶部

可调用对象

可以将任何可调用对象传递给作业对象

use Pop\Queue\Process\Job;

// Create a job from a closure
$job1 = Job::create(function() {
    echo 'This is job #1' . PHP_EOL;
});

// Create a job from a static class method
$job2 = Job::create('MyApp\Service\SomeService::doSomething');

// Create a job with parameters
$closure = function($num) {
    echo 'This is job ' . $num . PHP_EOL;
};

// The second argument passed is the callable's parameter(s) 
$job3 = Job::create($closure, 1);

如果可调用对象需要访问主应用程序对象,可以将它传递给队列对象,它将被添加到可调用对象的参数之前

use Pop\Queue\Queue;
use Pop\Queue\Worker;
use Pop\Queue\Adapter\File;
use Pop\Queue\Process\Job;

// Create a job that needs the application object
$job = Job::create(function($application) {
    // Do something with the application
});

$queue = new Queue('pop-queue', new File(__DIR__ . '/queue'));
$queue->addJob($job);

一旦可调用对象被添加到队列中,工作进程需要知道应用程序对象以便将其传递给作业

use Pop\Queue\Queue;
use Pop\Queue\Worker;
use Pop\Queue\Adapter\File;
use Pop\Application;

$application = new Application();

$queue  = new Queue('pop-queue', new File(__DIR__ . '/queue'));
$worker = Worker::create($queue, $application);

// When the worker works the job, it will push the application object to the job
$worker->workAll();

顶部

应用程序命令

可以将应用程序命令与作业对象一起注册。您将注册命令的“路由”部分。例如,如果以下应用程序命令路由存在

$ ./app hello world

您将像这样将命令注册到作业对象中

use Pop\Queue\Queue;
use Pop\Queue\Adapter\File;
use Pop\Queue\Process\Job;

// Create a job from an application command and add to the queue
$job   = Job::command('hello world');
$queue = new Queue('pop-queue', new File(__DIR__ . '/queue'));
$queue->addJob($job);

同样,工作进程对象需要知道应用程序对象,以便将其推送到需要它的作业对象

$queue  = new Queue('pop-queue', new File(__DIR__ . '/queue'));
$worker = Worker::create($queue, $application);

顶部

CLI 命令

如果环境设置允许在 PHP 中执行可执行命令,您可以将基于 CLI 的命令像这样注册到作业对象中

use Pop\Queue\Process\Job;

// Create a job from an executable CLI command
$job = Job::exec('ls -la');

出于安全原因,使用此功能时应谨慎。

顶部

尝试次数

默认情况下,作业只运行一次。但是,如果作业失败,它将被推回队列。但是,您可以通过设置作业的最大尝试次数来限制这种情况的发生。

use Pop\Queue\Process\Job;

$job = Job::create(function() {
    echo 'This is job #1' . PHP_EOL;
});
$job->setMaxAttempts(10);

如果您希望作业在失败后不注销并继续尝试执行,可以将最大尝试次数设置为 0

$job->setMaxAttempts(0);

您还可以这样检查尝试次数与最大尝试次数

var_dump($job->hasExceededMaxAttempts());

isValid() 方法也可用,并检查最大尝试次数和“运行直到”设置(通常用于任务对象,见下文)。

注意:“运行直到”可以在非计划任务中强制执行,而“最大尝试次数”可以在计划任务中强制执行。

顶部

任务

任务对象是具有调度功能的作业对象的扩展。它有一个 Cron 对象,并支持类似 cron 的调度格式。然而,与 cron 不同,它还可以支持精确到秒的子分钟调度。

以下是一个示例任务对象,其中计划设置为每 5 分钟一次

use Pop\Queue\Queue;
use Pop\Queue\Adapter\File;
use Pop\Queue\Process\Task;

// Create a scheduled task and add to the queue
$task = Task::create(function() {
    echo 'This is job #1' . PHP_EOL;
})->every5Minutes();

$queue = new Queue('pop-queue', new File(__DIR__ . '/queue'));
$queue->addTask($task);

顶部

调度

以下是帮助设置常见计划的可用方法列表

  • everySecond()
  • every5Seconds()
  • every10Seconds()
  • every15Seconds()
  • every20Seconds()
  • every30Seconds()
  • seconds(mixed $seconds)
  • everyMinute()
  • every5Minutes()
  • every10Minutes()
  • every15Minutes()
  • every20Minutes()
  • every30Minutes()
  • minutes(mixed $minutes)
  • hours(mixed $hours, mixed $minutes = null)
  • hourly(mixed $minutes = null)
  • daily(mixed $hours, mixed $minutes = null)
  • dailyAt(string $time)
  • weekly(mixed $day, mixed $hours = null, mixed $minutes = null)
  • monthly(mixed $day, mixed $hours = null, mixed $minutes = null)
  • quarterly(mixed $hours = null, mixed $minutes = null)
  • yearly(bool $endOfYear = false, mixed $hours = null, mixed $minutes = null)
  • weekdays()
  • weekends()
  • sundays()
  • mondays()
  • tuesdays()
  • wednesdays()
  • thursdays()
  • fridays()
  • saturdays()
  • between(int $start, int $end)

如果需要更自定义的计划值,可以直接使用 cron 格式的字符串进行计划

use Pop\Queue\Process\Task;

$task = Task::create(function() {
    echo 'This is job #1' . PHP_EOL;
});

// Submit a cron-formatted schedule string
$task->schedule('* */2 1,15 1-4 *')

或者,您可以使用非标准格式,在字符串前面添加一个“秒”值

// Submit a non-standard cron-formatted schedule string
// that includes a prepended "seconds" value 
$task->schedule('*/10 * */2 1,15 1-4 *')

标准的 cron 字符串支持以下 5 个值

  • 分钟
  • 小时
  • 月份中的日期
  • 月份
  • 星期的日期

格式为

min  hour  dom  month  dow
 *    *     *     *     *

为了保持该格式并支持非标准的“秒”值,该值将添加到字符串中,创建 6 个值

sec  min  hour  dom  month  dow
 *    *    *     *     *     *

如果任务使用秒进行计划,它将触发工作器以子分钟级别处理任务。

顶部

运行直到

默认情况下,任务设置为无限次数的尝试,并预期将继续在计划时间执行。但是,可以使用任务对象设置“运行直到”值以给它一个“到期”日期

use Pop\Queue\Process\Task;

$task = Task::create(function() {
    echo 'This is job #1' . PHP_EOL;
});
// Using a valid date/time string
$task->every30Minutes()->runUntil('2023-11-30 23:59:59');

它还可以接受时间戳

// Using a valid UNIX timestamp
$task->every30Minutes()->runUntil(1701410399);

isExpired() 方法将评估作业是否超过了“运行直到”值。此外,isValid() 方法将评估“运行直到”和最大尝试次数设置。

注意:“运行直到”可以在非计划任务中强制执行,而“最大尝试次数”可以在计划任务中强制执行。

顶部

缓冲区

默认情况下,计划任务的时评估是严格的,在大多数情况下意味着执行时间将在时间戳的 00 秒发生。如果由于某种原因,存在对任务执行可能会延迟的担忧或可能性 - 并且不会在 00 秒的时间戳上评估 - 您可以设置一个时间缓冲区以“软化”计划时间评估的严格性。

以下示例提供了 10 秒的“缓冲”时间,以确保如果存在任何处理延迟,则任务的计划时间评估应在评估时间戳的 0-10 秒窗口中评估为 true

use Pop\Queue\Process\Task;

$task = Task::create(function() {
    echo 'This is job #1' . PHP_EOL;
});
$task->every30Minutes()
$task->setBuffer(10);

如果您想设置任务无论如何都会运行,只要评估的时间戳在或超过计划时间,可以将缓冲区设置为 -1

use Pop\Queue\Process\Task;

$task = Task::create(function() {
    echo 'This is job #1' . PHP_EOL;
});
$task->every30Minutes()
$task->setBuffer(-1);

顶部

适配器

默认情况下,有四个可用的适配器,但只要它们实现 Pop\Queue\Adapter\AdapterInterface 并扩展 Pop\Queue\Adapter\AbstractAdapter,就可以创建额外的适配器。

Redis

Redis适配器要求Redis在服务器上正确配置并运行,同时需要安装与PHP配合使用的redis扩展。

use Pop\Queue\Adapter\Redis;

$adapter = new Redis();

Redis适配器默认使用localhost和端口6379。它通过键前缀来管理Redis服务器上的作业。默认前缀设置为pop-queue。如果您希望使用其他值,可以在构造函数中传递。

$adapter = new Redis('my.redis.server.com', 6380, 'my-queue');

顶部

数据库

数据库适配器需要使用pop-db组件以及该组件中的数据库适配器。

use Pop\Queue\Adapter\Database;
use Pop\Db\Db;

$db = Db::mysqlConnect([
    'database' => 'DATABASE',
    'username' => 'DB_USER',
    'password' => 'DB_PASS'
]);

$adapter = new Database($db); 

数据库中用于管理作业的表默认为pop_queue。如果您希望使用其他名称,可以在构造函数中传递。

$adapter = new Database($db, 'my_queue_jobs'); 

顶部

文件

文件适配器只需要指定队列数据存储在磁盘上的位置。

use Pop\Queue\Adapter\File;

$adapter = new File(__DIR__ . '/queues'); 

顶部

AWS SQS

Amazon AWS SQS适配器与AWS SQS服务交互,并需要从AWS管理控制台中获取以下凭证和访问信息:

  • AWS密钥
  • AWS密钥
  • AWS区域
  • AWS版本(通常是latest
  • AWS队列URL

确保授予尝试访问SQS服务的用户角色正确的权限。

use Pop\Queue\Adapter\Sqs;
use Aws\Sqs\SqsClient;

$client = new SqsClient([
        'key'    => 'AWS_KEY',
        'secret' => 'AWS_SECRET',
    ],
    'region'  => 'AWS_REGION',
    'version' => 'AWS_VERSION'
]);

$adapter = new Sqs($client, 'YOUR_AWS_QUEUE_URL');

SQS适配器在其行为上存在一些限制。它不支持计划任务,只能用于作业。此外,AWS SQS服务提供两种队列类型——标准队列和FIFO队列。FIFO队列强制执行严格的FIFO顺序,在将作业推入和从队列中弹出时提供一致的行为。标准队列不那么严格,在请求频率的不同情况下,队列中作业的顺序和可用性可能会有意外行为。

将适配器注入队列

创建任何适配器对象后,可以将其传递给队列对象。

use Pop\Queue\Queue;

$queue = Queue::create('pop-queue', $adapter); 

顶部

队列

如上例所示,队列对象充当作业和队列存储适配器之间的中介。简单地将作业或任务添加到队列对象中,它们将被推送到存储对象,在那里它们将等待轮到它们的时候。

如以下示例所示,可以添加多个作业和多个任务到同一个队列

use Pop\Queue\Queue;
use Pop\Queue\Adapter\File;
use Pop\Queue\Process\Job;
use Pop\Queue\Process\Task;

$job1 = Job::create(function() {
    echo 'This is job #1' . PHP_EOL;
});

$job2 = Job::create(function() {
    echo 'This is job #2' . PHP_EOL;
});

$task1 = Task::create(function() {
    echo 'This is scheduled task #1' . PHP_EOL;
})->every30Minutes();

$task2 = Task::create(function() {
    echo 'This is scheduled task #2' . PHP_EOL;
})->sundays();

$queue = new Queue('pop-queue', new File(__DIR__ . '/queue'), Queue::FILO);
$queue->addJobs([$job1, $job2])
    ->addTasks([$task1, $task2]);

优先级

队列有两种优先级之一

  • FIFO:先进先出(默认)
  • FILO:先进后出

这意味着,在FIFO中,第一个推入的作业将是第一个弹出的作业。在FILO中,第一个推入的作业将是最后一个弹出的作业,因为最近推入的作业将被弹出。

(当您使用SQS FIFO队列时,队列优先级将自动设置为FIFO)

工作进程

工作对象允许您从单个工作对象配置和管理多个队列。一旦您将作业或任务添加到队列或多个队列中,您可以将这些队列对象添加到工作对象中,从那里进行管理。队列被赋予名称,以帮助在工作对象中管理和调用它们。

use Pop\Queue\Queue;
use Pop\Queue\Adapter\File;

// Call up the queue and pass it to a worker object
$queue1  = new Queue('pop-queue1', new File(__DIR__ . '/queue1'));
$queue2  = new Queue('pop-queue2', new File(__DIR__ . '/queue2'));
$worker = Worker::create([$queue1, $queue2]);

然后,您可以使用work()方法触发特定队列的下一个作业

$worker->work('pop-queue1');

或者,您可以触发已注册的所有队列的下一个作业

$worker->workAll();

管理计划任务与run()方法类似

$worker->run('pop-queue1');

或者,触发已注册的所有队列的所有下一个计划任务

$worker->runAll();

清除队列

您可以通过几种不同的方式清除队列

  • $worker->clear(string $queueName) // 清除队列中的完成作业
  • $worker->clearFailed(string $queueName) // 清除队列中的失败作业
  • $worker->clearTasks(string $queueName) // 清除队列中的任务
  • $worker->clearAll() // 清除所有队列中的完成作业
  • $worker->clearAllFailed() // 从所有队列中清除失败的作业
  • $worker->clearAllTasks() // 从所有队列中清除任务

顶部

配置

如果您有一个了解您的队列并能访问它们的CLI应用程序,您可以使用该应用程序作为队列的“管理器”,检查和处理它们。假设您有一个通过类似以下命令处理队列的CLI应用程序

$ ./app manage queue

您可以为该应用程序设置一个cron作业,每分钟触发一次

* * * * * cd /path/to/your/project && ./app manage queue

或者,如果您希望任何输出都路由到 /dev/null

* * * * * cd /path/to/your/project && ./app manage queue >> /dev/null 2>&1

顶部