igorsantos07 / phalcon-queue-db
用于数据库存储的 Phalcon 队列接口
Requires
- ext-phalcon: ^2.0.11 || ^3.0
Requires (Dev)
- ext-sqlite3: *
- codeception/codeception: ^2.2
This package is not auto-updated.
Last update: 2023-12-28 21:41:09 UTC
README
此包与 \Phalcon\Queue\Beanstalk
并列,为那些不想安装和维护 Beanstalk 服务器的用户提供作业队列。
这主要适用于作业吞吐量较低的情况。如果您计划同时处理大量作业和工作者,建议使用像 Beanstalk 这样更快的工具,因为这可能会对您的数据库和磁盘 I/O 产生很大压力,从而减慢作业速度。
安装
- 将此包添加到您的 composer 安装中:
composer require igorsantos07/phalcon-queue-db
- 通过导入
sql/
目录中的一个文件来创建所需的表。您可以将它复制到迁移或您的设置中所需的任何地方,以创建一个新表并使其运行: - 阅读本文档的其余部分以了解如何使用它并获得收益!
使用
与大多数作业队列系统一样,这里的想法是将应用某部分的一些负载移除并在外部运行。因此,有两个部分相互交互:当您 [排队作业][#job-queuing] 时,以及当您 [处理它][#job-processing] 时。
由于此包基于 Phalcon 的原始 Beanstalk 实现,您还可能发现阅读 基础类文档 和一个关于 Beanstalk 的 几乎完整的教程 有用。但要注意,还实现了一些原始类中没有的功能,我们没有遵循 Beanstalk 的严格行为,因为基础类也没有遵循 - 我们在保持向后兼容性的成本下追求额外的功能。
以下示例中考虑以下
uses
use \Phalcon\Queue\Db as DbQueue; use \Phalcon\Queue\Db\Job as Job;
作业排队
实际的队列
要获取队列对象,只需实例化它。唯一参数是您的数据库连接名称,如 Phalcon DI 中所示 - 默认为 db
。您还希望将队列本身设置在您的依赖注入容器中。
$queue = new DbQueue(); //gets a database connection named db from the DI
$outsiderQueue = new DbQueue('weird_db'); another queue, in another db?
向队列中添加内容
因此,您想稍后执行某些操作。假设您需要一次性发送大量电子邮件,如果这在用户请求期间发生,将花费很长时间。我们有“作业管”的概念,即不同的管子接收不同类型的作业,允许您为每种类型的作业拥有专门的工作者。
如果没有指定管子,则默认管子称为...没错,是“默认”。
class ImportantController { function veryImportantAction() { // Do some stuff and ends up with an emails array. // Instead of sending all those emails from the user request, // we are going to hand this job to a worker. $queue = new DbQueue(); $queue->choose('email_notification'); //sets the tube we'll be using $queue->put($emailList); //tell the user to be happy because stuff went ok } }
此阶段的一些有用代码片段是:
DbQueue::choose($tube)
- 定义将内容放在哪个管道上DbQueue::using()
- 告诉您正在使用哪个管道来放置内容DbQueue::put($body, $options)
- 将作业存储在队列中,并返回其ID
原始Beanstalk实现的一个区别是:作业体不需要是字符串。只要您为作业提供可序列化的内容,即可。
作业选项
也可以为作业定义一些特定的选项。
我们将在下一节中看到这些选项是如何相互作用的,关于检索作业并在其上工作。
//adds a job on top of the queue $queue->put($bossEmails, [DbQueue::OPT_PRIORITY => Job::PRIORITY_HIGHEST]); //adds a job to be ran only later (in seconds) $queue->put($taskReminder, [DbQueue::OPT_DELAY => 60 * 10]);
Job
类中有几个常量定义了其他优先级预设。如果没有给出优先级,则默认为Job::PRIORITY_MEDIUM
。
作业处理
从您的命令行脚本(以下称为worker),您可以通过在队列上使用peek或reserve方法来处理作业。peek作业建议仅用于验证和维护:实际工作应仅在保留的作业上完成。
$queue->watch('email_notification'); while ($job = $queue->reserve()) { $payload = $job->getBody(); //do stuff with the payload if ($worked) { $job->delete(); } else { $job->bury(); } }
有用的信息
DbQueue::watch($tubes, $replace)
- 定义要获取作业的哪些管道。每个被观察的管道都进入一个栈,并且在任何被观察的管道中找到的第一个作业将在reserve()
或[peek*()
调用][#peeking-into-the-queue]时检索。默认的观察管道是“default”。DbQueue::ignore($tube)
- 从观察列表中删除管道。请注意,观察列表永远不会为空:如果您尝试忽略最后一个,实际上您的调用将被忽略。DbQueue::reserve($timeout)
- 一旦有可用的作业,就返回一个作业。$timeout
使该方法在等待指定数量的秒后返回false。另一个reserve()
调用不会检索已保留的作业,只会检索另一个。因此,此方法是线程安全的 - 您可以同时运行许多工作者,并且没有两个工作者会收到相同的作业。Job::getBody()
- 检索在DbQueue::put()
上最初提供的作业有效负载。Job::delete()
- 当完成对给定作业的工作后,删除它!Job::bury($priority)
- 以特殊的“埋藏”状态将作业存储回来,表示作业未能完成。Job::release()
- 将作业返回到队列,而不将其标记为失败。
查看队列
对于维护任务,您可以使用各种peeking方法来查看当前队列状态
DbQueue::peek($id)
返回由特定ID指定的作业DbQueue::peekReady()
获取队列中下一个准备就绪的作业,或falseDbQueue::peekBuried()
获取队列中第一个埋藏的作业,或falseDbQueue::peekDelayed()
获取队列中第一个延迟的作业,或false
请记住,作业总是按优先级和年龄排序:紧急作业总是首先,然后是较旧的作业返回在较新的作业之前。
将作业踢回
要将埋藏的作业放回正常队列以进行处理,或将延迟的作业推进队列,您可以使用Job::kick()
。如果您想一次将多个埋藏的作业踢回队列,还可以使用DbQueue::kick($numberOfJobs)
。
提示:您可能想在DI中创建一个持久的单独数据库连接。这样,每次调用工作者时就不需要重新启动连接。
维护
最后但同样重要的是,有一系列辅助方法可以帮助您获取有关当前队列状态的信息。
Job::stats()
将为您提供作业 ID、年龄、通道、状态以及延迟和优先级详细信息。还有一些类常量与这些信息相对应,例如Job::PRIORITY_*
或Job::ST_*
。 注意:您无法从已删除的作业中获取统计信息,好吗?Job::getId()
和Job::getState()
将为您提供作业的 ID 和状态——后者与Job::ST_*
常量之一相匹配。DbQueue::stats()
将检索有关所有通道的统计信息,而statsTube($tube)
将为您提供有关单个通道的信息——默认情况下,是默认使用的通道。这些统计信息将包括作业总数以及埋藏、延迟、紧急和就绪作业的数量——以及通道名称。DbQueue::watching()
和DbQueue::using()/chosen()
——这些将回答作业将从中获取的通道以及它们将被放置的位置。DbQueue::listTubes()
将告诉您队列中所有当前活动的通道。之前使用过但当前没有作业排队等待的通道将不会显示。
额外奖励:作业模型
由于这是一个数据库库,我们确实使用模型与实际数据库表交互。但话虽如此,您几乎不需要这样做。
好吧,如果您确实需要,您可以使用 Job::getModel()
获取与作业相关的模型,或者直接使用 Db\Model
类。
与原始实现的主要差异
这是一个不完整的列表,列出了原始 Beanstalk 客户端、Phalcon 的一个以及我们最终实现的酷队列系统之间的差异。
从基本的 Beanstalk 客户端
这里的实现参考是 Python 的 Beanstalkc
- 不需要将主体作为字符串。它在存储时进行序列化,在检索前进行反序列化,保持其精确的结构和类型;
- 作业工作流程并不严格遵循,因为 Phalcon 本身也没有严格遵循,我们希望至少保持一些向后兼容性,以便人们可以从一个迁移到另一个,而不会遇到太多麻烦。例如,可以删除未保留的作业,因为这正是当前 Phalcon 文档所建议的 😑
- 仍然没有 TTR/
touch()
实现。请参阅 问题 #2。
从 Phalcon 的 Queue\Beanstalk
添加了一些功能,一方面是为了符合原始 Beanstalk 客户端,另一方面是为了与其他 PHP 中找到的队列系统(如 Laravel 的)保持一致。有一个特定的测试(tests/unit/OriginalTest.php
),它模仿了 Phalcon 代码库中的队列测试。
请记住,这里的一些内容也可能与原始客户端相比是新增功能。
- 依赖于默认的 DI 容器以获取数据库连接。因此,相关的
connect()
和 `disconnect()` 方法已经更改:前者直接在构造函数中调用,而后者会引发异常,因为没有真正手动从队列断开连接的价值; - 显然增加了
getModel()
,因为原始的一个与模型没有关系; - 增加了
getState()
,这样我们就可以在不查询所有统计信息的情况下获取作业状态; getStats()
还指示了优先级的人类可读描述以及作业将被认为就绪的时间戳(delayed_until
)。它不是数组,而是一个 ArrayAccess 对象,因此可以在您最喜欢的 IDE 中启用代码补全提示;- 增加了几个常量,以帮助定义作业选项、状态和优先级;
- 增加了一些方法以符合原始客户端,例如
watch()
方法包括一个字符串/数组参数$tubes
和一个布尔类型的$replace
参数;ignore()
方法用于取消监视;peek()
方法用于获取特定的作业;kick()
方法用于一次性踢出多个作业;using()
、watching()
和chosen()
方法用于查看正在使用或监视的内容。 原始客户端通过use()
方法调用choose()
,但由于PHP < 7的保留字限制,这是不可能的;
- 低级别方法如
read()
和write()
会抛出异常,因为它们在数据库支持的库中没有意义。