igorsantos07/phalcon-queue-db

此包已被废弃,不再维护。未建议替代包。

用于数据库存储的 Phalcon 队列接口

v0.9.8 2018-06-26 19:31 UTC

README

StyleCI Build Status Latest Version Software License

此包与 \Phalcon\Queue\Beanstalk 并列,为那些不想安装和维护 Beanstalk 服务器的用户提供作业队列。

这主要适用于作业吞吐量较低的情况。如果您计划同时处理大量作业和工作者,建议使用像 Beanstalk 这样更快的工具,因为这可能会对您的数据库和磁盘 I/O 产生很大压力,从而减慢作业速度。

安装

  1. 将此包添加到您的 composer 安装中: composer require igorsantos07/phalcon-queue-db
  2. 通过导入 sql/ 目录中的一个文件来创建所需的表。您可以将它复制到迁移或您的设置中所需的任何地方,以创建一个新表并使其运行:
  3. 阅读本文档的其余部分以了解如何使用它并获得收益!

使用

与大多数作业队列系统一样,这里的想法是将应用某部分的一些负载移除并在外部运行。因此,有两个部分相互交互:当您 [排队作业][#job-queuing] 时,以及当您 [处理它][#job-processing] 时。

由于此包基于 Phalcon 的原始 Beanstalk 实现,您还可能发现阅读 基础类文档 和一个关于 Beanstalk 的 几乎完整的教程 有用。但要注意,还实现了一些原始类中没有的功能,我们没有遵循 Beanstalk 的严格行为,因为基础类也没有遵循 - 我们在保持向后兼容性的成本下追求额外的功能。

以下示例中考虑以下 uses

use \Phalcon\Queue\Db as DbQueue;
use \Phalcon\Queue\Db\Job as Job;

作业排队

实际的队列

要获取队列对象,只需实例化它。唯一参数是您的数据库连接名称,如 Phalcon DI 中所示 - 默认为 db。您还希望将队列本身设置在您的依赖注入容器中。

$queue = new DbQueue(); //gets a database connection named db from the DI
$outsiderQueue = new DbQueue('weird_db'); another queue, in another db?

向队列中添加内容

因此,您想稍后执行某些操作。假设您需要一次性发送大量电子邮件,如果这在用户请求期间发生,将花费很长时间。我们有“作业管”的概念,即不同的管子接收不同类型的作业,允许您为每种类型的作业拥有专门的工作者。

如果没有指定管子,则默认管子称为...没错,是“默认”。

class ImportantController {

    function veryImportantAction()
    {
        // Do some stuff and ends up with an emails array.
        
        // Instead of sending all those emails from the user request,
        // we are going to hand this job to a worker.
        $queue = new DbQueue();
        $queue->choose('email_notification'); //sets the tube we'll be using
        $queue->put($emailList);

        //tell the user to be happy because stuff went ok
    }
    
}

此阶段的一些有用代码片段是:

  • DbQueue::choose($tube) - 定义将内容放在哪个管道上
  • DbQueue::using() - 告诉您正在使用哪个管道来放置内容
  • DbQueue::put($body, $options) - 将作业存储在队列中,并返回其ID

原始Beanstalk实现的一个区别是:作业体不需要是字符串。只要您为作业提供可序列化的内容,即可。

作业选项

也可以为作业定义一些特定的选项。

我们将在下一节中看到这些选项是如何相互作用的,关于检索作业并在其上工作。

//adds a job on top of the queue
$queue->put($bossEmails, [DbQueue::OPT_PRIORITY => Job::PRIORITY_HIGHEST]);
//adds a job to be ran only later (in seconds)
$queue->put($taskReminder, [DbQueue::OPT_DELAY => 60 * 10]);

Job类中有几个常量定义了其他优先级预设。如果没有给出优先级,则默认为Job::PRIORITY_MEDIUM

作业处理

从您的命令行脚本(以下称为worker),您可以通过在队列上使用peek或reserve方法来处理作业。peek作业建议仅用于验证和维护:实际工作应仅在保留的作业上完成。

$queue->watch('email_notification');
while ($job = $queue->reserve()) {
    $payload = $job->getBody();
    //do stuff with the payload
    if ($worked) {
        $job->delete();
    } else {
        $job->bury();
    }
}

有用的信息

  • DbQueue::watch($tubes, $replace) - 定义要获取作业的哪些管道。每个被观察的管道都进入一个栈,并且在任何被观察的管道中找到的第一个作业将在reserve()或[peek*()调用][#peeking-into-the-queue]时检索。默认的观察管道是“default”。
  • DbQueue::ignore($tube) - 从观察列表中删除管道。请注意,观察列表永远不会为空:如果您尝试忽略最后一个,实际上您的调用将被忽略
  • DbQueue::reserve($timeout) - 一旦有可用的作业,就返回一个作业。$timeout使该方法在等待指定数量的秒后返回false。另一个reserve()调用不会检索已保留的作业,只会检索另一个。因此,此方法是线程安全的 - 您可以同时运行许多工作者,并且没有两个工作者会收到相同的作业。
  • Job::getBody() - 检索在DbQueue::put()上最初提供的作业有效负载。
  • Job::delete() - 当完成对给定作业的工作后,删除它!
  • Job::bury($priority) - 以特殊的“埋藏”状态将作业存储回来,表示作业未能完成。
  • Job::release() - 将作业返回到队列,而不将其标记为失败。

查看队列

对于维护任务,您可以使用各种peeking方法来查看当前队列状态

  • DbQueue::peek($id)返回由特定ID指定的作业
  • DbQueue::peekReady()获取队列中下一个准备就绪的作业,或false
  • DbQueue::peekBuried()获取队列中第一个埋藏的作业,或false
  • DbQueue::peekDelayed()获取队列中第一个延迟的作业,或false

请记住,作业总是按优先级和年龄排序:紧急作业总是首先,然后是较旧的作业返回在较新的作业之前。

将作业踢回

要将埋藏的作业放回正常队列以进行处理,或将延迟的作业推进队列,您可以使用Job::kick()。如果您想一次将多个埋藏的作业踢回队列,还可以使用DbQueue::kick($numberOfJobs)

提示:您可能想在DI中创建一个持久的单独数据库连接。这样,每次调用工作者时就不需要重新启动连接。

维护

最后但同样重要的是,有一系列辅助方法可以帮助您获取有关当前队列状态的信息。

  • Job::stats() 将为您提供作业 ID、年龄、通道、状态以及延迟和优先级详细信息。还有一些类常量与这些信息相对应,例如 Job::PRIORITY_*Job::ST_*注意:您无法从已删除的作业中获取统计信息,好吗?
  • Job::getId()Job::getState() 将为您提供作业的 ID 和状态——后者与 Job::ST_* 常量之一相匹配。
  • DbQueue::stats() 将检索有关所有通道的统计信息,而 statsTube($tube) 将为您提供有关单个通道的信息——默认情况下,是默认使用的通道。这些统计信息将包括作业总数以及埋藏、延迟、紧急和就绪作业的数量——以及通道名称。
  • DbQueue::watching()DbQueue::using()/chosen()——这些将回答作业将从中获取的通道以及它们将被放置的位置。
  • DbQueue::listTubes() 将告诉您队列中所有当前活动的通道。之前使用过但当前没有作业排队等待的通道将不会显示。

额外奖励:作业模型

由于这是一个数据库库,我们确实使用模型与实际数据库表交互。但话虽如此,您几乎不需要这样做。

好吧,如果您确实需要,您可以使用 Job::getModel() 获取与作业相关的模型,或者直接使用 Db\Model 类。

与原始实现的主要差异

这是一个不完整的列表,列出了原始 Beanstalk 客户端、Phalcon 的一个以及我们最终实现的酷队列系统之间的差异。

从基本的 Beanstalk 客户端

这里的实现参考是 Python 的 Beanstalkc

  • 不需要将主体作为字符串。它在存储时进行序列化,在检索前进行反序列化,保持其精确的结构和类型;
  • 作业工作流程并不严格遵循,因为 Phalcon 本身也没有严格遵循,我们希望至少保持一些向后兼容性,以便人们可以从一个迁移到另一个,而不会遇到太多麻烦。例如,可以删除未保留的作业,因为这正是当前 Phalcon 文档所建议的 😑
  • 仍然没有 TTR/touch() 实现。请参阅 问题 #2

从 Phalcon 的 Queue\Beanstalk

添加了一些功能,一方面是为了符合原始 Beanstalk 客户端,另一方面是为了与其他 PHP 中找到的队列系统(如 Laravel 的)保持一致。有一个特定的测试(tests/unit/OriginalTest.php),它模仿了 Phalcon 代码库中的队列测试。

请记住,这里的一些内容也可能与原始客户端相比是新增功能。

  • 依赖于默认的 DI 容器以获取数据库连接。因此,相关的 connect() 和 `disconnect()` 方法已经更改:前者直接在构造函数中调用,而后者会引发异常,因为没有真正手动从队列断开连接的价值;
  • 显然增加了 getModel(),因为原始的一个与模型没有关系;
  • 增加了 getState(),这样我们就可以在不查询所有统计信息的情况下获取作业状态;
  • getStats() 还指示了优先级的人类可读描述以及作业将被认为就绪的时间戳(delayed_until)。它不是数组,而是一个 ArrayAccess 对象,因此可以在您最喜欢的 IDE 中启用代码补全提示;
  • 增加了几个常量,以帮助定义作业选项、状态和优先级;
  • 增加了一些方法以符合原始客户端,例如
    • watch() 方法包括一个字符串/数组参数 $tubes 和一个布尔类型的 $replace 参数;
    • ignore() 方法用于取消监视;
    • peek() 方法用于获取特定的作业;
    • kick() 方法用于一次性踢出多个作业;
    • using()watching()chosen() 方法用于查看正在使用或监视的内容。 原始客户端通过 use() 方法调用 choose(),但由于PHP < 7的保留字限制,这是不可能的;
  • 低级别方法如 read()write() 会抛出异常,因为它们在数据库支持的库中没有意义。