phelixjuma/php-enqueue

这是PHP中一个简单但健壮的基于Redis的作业队列实现。


README

这是PHP中一个简单但健壮的基于Redis的作业队列实现。

为什么还需要另一个作业队列包?因为,我尝试了所有我能找到的顶级选项,但没有一个完全符合:一些顶级建议的选项已经超过5年没有维护,并且它们的依赖项导致我的其他包产生了大量冲突,所以我构建了一个新的包,为了我自己。

此包的后端是redis。

要求

  • PHP >= 7.1
  • vlucas/phpdotenv
  • predis/predis
  • symfony/console
  • amphp/parallel

安装

composer require phelixjuma/php-enqueue

使用说明

1. 运行Worker

php-enqueue是事件驱动的。作业被调度并在redis中安排。必须设置一个Worker以“永远运行”。这个Worker监听任何传入的作业并执行它。作业执行是通过amphp/parallel包并发进行的,这允许并发执行多个作业

要设置Worker,请运行以下命令

./bin/worker --queue=name_of_queue --threaded=1 --concurrency=1 --max_retries=3 --log_path=/path/to/log log_level=100

注意,Worker接受参数,例如

  1. 队列:Worker监听的队列名称。每个Worker只能监听一个队列。这允许你拥有多个Worker处理不同的队列,这为你的作业执行引入了并行化
  2. 线程化:值为1表示作业将以多线程方式运行(非阻塞)。0表示以阻塞方式运行作业。如果你的作业不依赖于任何全局变量,请使用多线程,否则将其设置为0(阻塞执行)
  3. 并发性:定义单个Worker在给定时间可以处理的并发作业数量
  4. 最大重试次数:如果作业失败,通常是通过抛出异常,它将被重试,直到达到这里定义的最大次数。默认情况下,不进行重试
  5. 日志路径:日志应放入的目录路径。指定目录路径而不是日志文件,并确保php有权限写入该目录
  6. 日志级别:按照monolog日志级别

注意

  1. 你可以使用像supervisord这样的服务来运行Worker并监视它们,这样,如果Worker本身失败,它可以自动重新启动。
  2. 当你更新任何依赖Worker的代码库的部分时,请注意,除非重新启动Worker,否则更新不会反映出来

2. 管理作业

您有命令行选项来管理任务。

2.1 查看作业列表

./bin/manager enqueue:list --queue=queue_name

2.2 添加新作业

./bin/manager enqueue:add --queue=queue_name --class=job_class_name --parameters=job_args

2.3 从队列中删除作业

./bin/manager enqueue:remove --queue=queue_name --taskId=task_id

2.4 列出失败的作业

./bin/manager enqueue:failed:list --queue=queue_name

2.5 重新排队失败的作业

./bin/manager enqueue:failed:requeue --queue=queue_name

2.6 删除所有失败的作业

./bin/manager enqueue:failed:purge --queue=queue_name

3. 作业

每个作业类都实现了JobInterface。需要定义的类是

  • setUp() - 在作业执行之前所需的任何设置,
  • perform() - 作业运行的实际逻辑,
  • tearDown() - 要执行的任何后处理任务。

3.1 示例作业类

class EmailJob implements JobInterface
{

    /**
     * @var Logger
     */
    private $logger;

    public function setUp(Task $task)
    {
        $this->logger = new Logger("email_job");
        $this->logger->pushHandler(new StreamHandler('/path/to/log/email_job.log', Logger::DEBUG));
    }

    public function perform(Task $task)
    {
        // Actual logic to send an email goes here
        $this->logger->info("Performing email job. Args: ".json_encode($task->getArgs()));
    }

    public function tearDown(Task $task)
    {
    }
}

3.2 调度作业的脚本

use Phelixjuma\Enqueue\Jobs\EmailJob;
use Phelixjuma\Enqueue\RedisQueue;
use Phelixjuma\Enqueue\Task;
use Predis\Client;

// Some global section where redis and queue are defined
$redis = new Client('tcp://127.0.0.1:6379');
$queue = new RedisQueue($redis);

// Actual section to queue the email job task
$queue
->setName('test_queue')
->enqueue(new Task(new EmailJob(), ['some_arg' => 'some_value']));

注意:更好的方法是将php-enqueue包装在一个enqueue服务中。在这个服务中,你定义redis和队列,使它们可重用。以下是一个此类服务的示例

<?php

namespace \Some\Name\Space\Service;

use Phelixjuma\Enqueue\Jobs\EmailJob;
use Phelixjuma\Enqueue\RedisQueue;
use Phelixjuma\Enqueue\Task;
use Predis\Client;

final class EnqueueService {

    const DEFAULT_QUEUE = 'default';

    private static $instance = null;
    private RedisQueue|null $queue = null;

    /**
     * @return EnqueueService|null
     */
    private static function getInstance(): ?EnqueueService
    {
        if (self::$instance === null) {
            self::$instance = new self();
        }
        return self::$instance;
    }

    /**
     * @param $job
     * @param $args
     * @param string $queueName
     * @return void
     */
    public static function enqueue($job, $args, string $queueName=self::DEFAULT_QUEUE): void
    {
        self::getInstance()
            ->init()
            ->queue
            ->setName($queueName)
            ->enqueue(new Task($job, $args));;
    }

    /**
     * @return $this
     */
    private function init(): EnqueueService
    {
        if ($this->queue !== null) {
            return $this;
        }

        $redisHost = getenv("REDIS_HOST");
        $redisPort = getenv("REDIS_PORT");

        $redis = new Client("tcp://$redisHost:$redisPort");

        $this->queue = new RedisQueue($redis);

        return $this;
    }

}

// Use the service within your application to add a job to a queue as
EnqueueService::enqueue(new EmailJob(), ['email' => 'test@gmail.com', 'subject' => 'php-enqueue works!', 'message' => 'Thank you for this amazing package']);

4. 关于作业的说明

  • 此包使用Redis来排队作业。
  • 这意味着在排队之前将作业及其参数序列化,并在获取时反序列化
  • 由于PHP处理序列化/反序列化的方式,你的作业和参数不应包含非序列化的实例
  • 不可序列化的一个良好例子是pdo。所以如果你的工作类注入了一个pdo实例到数据库类,那么工作队列将失败
  • 良好的实践是不在Job类中注入任何类,并避免使用类实例作为参数
  • 相反,使用setUp()方法与你的依赖注入容器一起实例化你需要的其他类。

致谢