mle86/wq

一个简单的用于工作队列和任务处理的库。

v0.21.1 2022-05-24 09:37 UTC

README

Build Status Coverage Status Latest Stable Version PHP 8 License

此软件包提供了一种简单的方法将任何类型的PHP任务放入工作队列(如Beanstalkd或Redis),以便稍后执行。

这是 版本 0.21.1

安装和依赖关系

$ composer require mle86/wq

它需要PHP 8.0+,并且没有其他依赖(除了PHPUnit/Coveralls用于开发和PSR-3接口)。

适配器实现

您还需要安装至少一个包含 WorkServerAdapter 实现的软件包,例如

基本概念

  • 工作 是应该恰好执行一次的事情。可能它是发送电子邮件,可能是类似webhook的外部API调用,也可能是某种慢速清理过程。无论如何,我们谈论的是一个可以立即执行但为了应用程序的性能更好,最好将其放入 工作队列 中的工作单元。

  • 工作队列 是一个应该在某个时间点执行的作业列表。它们存储在某种 工作服务器 中。在PHP世界中,Beanstalkd 是一个知名的工作服务器。它可以存储任意数量的工作队列,尽管它称之为“管道”。

不同的工作队列或管道通常用于分离作业类型。例如,同一个工作服务器可能有一个“mail”队列用于发送外部邮件,一个“cleanup”队列用于各种清理作业,以及一个“webhook”队列用于发送外部webhook调用。

此软件包提供了一些有用的类来设置简单的工作队列系统。

快速入门

这是我们的作业实现。它代表可以发送的电子邮件。

<?php

use mle86\WQ\Job\AbstractJob;

class EMail extends AbstractJob
{
    protected $recipient;
    protected $subject;
    protected $message;
    
    public function __construct(string $recipient, string $subject, string $message)
    {
        $this->recipient = $recipient;
        $this->subject   = $subject;
        $this->message   = $message;
    }
    
    public function send()
    {
        if (mail($this->recipient, $this->subject, $this->message)) {
            // ok, has been sent!
        } else {
            throw new \RuntimeException ("mail() failed");
        }
    }
}

我们有一些使用该电子邮件类的代码

<?php

use mle86\WQ\WorkServerAdapter\BeanstalkdWorkServer;

$mailJob = new EMail("test@myproject.xyz", "Hello?", "This is a test mail.");

$workServer = BeanstalkdWorkServer::connect("localhost");
$workServer->storeJob("mail", $mailJob);

最后,我们有一个后台工作脚本,它定期检查工作服务器以获取新的电子邮件作业

<?php

use mle86\WQ\WorkServerAdapter\BeanstalkdWorkServer;
use mle86\WQ\WorkProcessor;

$queue = "mail";
printf("%s worker %d starting.\n", $queue, getmypid());

$processor  = new WorkProcessor(BeanstalkdWorkServer::connect("localhost"));
$fn_handler = function(EMail $mailJob) {
    $mailJob->send();
    // don't catch exceptions here, or the WorkProcessor won't see them.
};

while (true) {
    try {
        $processor->processNextJob($queue, $fn_handler);
    } catch (\Throwable $e) {
        echo $e . "\n";  // TODO: add some real logging here
    }
}

文档

  1. 实现作业类
  2. 执行或排队
  3. 处理队列
  4. 错误处理
  5. 使用示例

类参考