mle86 / wq
一个简单的用于工作队列和任务处理的库。
v0.21.1
2022-05-24 09:37 UTC
Requires
- php: >=8.0
- psr/log: ^1.0 | ^2.0 | ^3.0
Requires (Dev)
Suggests
- ext-pcntl: Needed if you want to use the SignalSafeWorkProcessor.
This package is auto-updated.
Last update: 2024-09-24 14:27:15 UTC
README
此软件包提供了一种简单的方法将任何类型的PHP任务放入工作队列(如Beanstalkd或Redis),以便稍后执行。
这是 版本 0.21.1。
安装和依赖关系
$ composer require mle86/wq
它需要PHP 8.0+,并且没有其他依赖(除了PHPUnit/Coveralls用于开发和PSR-3接口)。
适配器实现
您还需要安装至少一个包含 WorkServerAdapter
实现的软件包,例如
- mle86/wq-beanstalkd(Beanstalkd服务器适配器),
- mle86/wq-redis(Redis服务器适配器),
- mle86/wq-amqp(RabbitMQ服务器适配器)。
基本概念
-
工作 是应该恰好执行一次的事情。可能它是发送电子邮件,可能是类似webhook的外部API调用,也可能是某种慢速清理过程。无论如何,我们谈论的是一个可以立即执行但为了应用程序的性能更好,最好将其放入 工作队列 中的工作单元。
-
工作队列 是一个应该在某个时间点执行的作业列表。它们存储在某种 工作服务器 中。在PHP世界中,Beanstalkd 是一个知名的工作服务器。它可以存储任意数量的工作队列,尽管它称之为“管道”。
不同的工作队列或管道通常用于分离作业类型。例如,同一个工作服务器可能有一个“mail
”队列用于发送外部邮件,一个“cleanup
”队列用于各种清理作业,以及一个“webhook
”队列用于发送外部webhook调用。
此软件包提供了一些有用的类来设置简单的工作队列系统。
快速入门
这是我们的作业实现。它代表可以发送的电子邮件。
<?php use mle86\WQ\Job\AbstractJob; class EMail extends AbstractJob { protected $recipient; protected $subject; protected $message; public function __construct(string $recipient, string $subject, string $message) { $this->recipient = $recipient; $this->subject = $subject; $this->message = $message; } public function send() { if (mail($this->recipient, $this->subject, $this->message)) { // ok, has been sent! } else { throw new \RuntimeException ("mail() failed"); } } }
我们有一些使用该电子邮件类的代码
<?php use mle86\WQ\WorkServerAdapter\BeanstalkdWorkServer; $mailJob = new EMail("test@myproject.xyz", "Hello?", "This is a test mail."); $workServer = BeanstalkdWorkServer::connect("localhost"); $workServer->storeJob("mail", $mailJob);
最后,我们有一个后台工作脚本,它定期检查工作服务器以获取新的电子邮件作业
<?php use mle86\WQ\WorkServerAdapter\BeanstalkdWorkServer; use mle86\WQ\WorkProcessor; $queue = "mail"; printf("%s worker %d starting.\n", $queue, getmypid()); $processor = new WorkProcessor(BeanstalkdWorkServer::connect("localhost")); $fn_handler = function(EMail $mailJob) { $mailJob->send(); // don't catch exceptions here, or the WorkProcessor won't see them. }; while (true) { try { $processor->processNextJob($queue, $fn_handler); } catch (\Throwable $e) { echo $e . "\n"; // TODO: add some real logging here } }
文档
类参考
- Job 接口
- AbstractJob 基类
- QueueEntry 包装类
- WorkServerAdapter 接口
- AffixAdapter 包装类
- BlackHoleWorkServer 模拟类
- WorkProcessor 类
- SignalSafeWorkProcessor 类
- JobResult 枚举类
- JobContext 数据传输对象类
- 异常