kickpeach / queue
基于beanstalkd实现的任务队列,方便分发任务和解决任务
dev-master
2019-02-06 14:37 UTC
Requires
- php: >=7.1.3
- pda/pheanstalk: ^3.1
Requires (Dev)
- phpunit/phpunit: ^5.4
This package is not auto-updated.
Last update: 2024-09-27 12:22:10 UTC
README
基于beanstalkd实现的任务队列,方便分发任务和解决任务,此库可适用于以下场景:
- 用作延时队列:比如可以用于如果用户30分钟内不操作,任务关闭。
- 用作定时任务:比如可以用于专门的后台任务。
- 用作异步操作:这是所有消息队列都最常用的,先将任务放入,顺序执行。
- 用作循环队列:使用release命令可以循环执行任务,比如可以做负载均衡任务分发。
- 用作兜底机制:比如一个请求有失败的概率,可以用Beanstalk不断重试,设定超时时间,时间内尝试到成功为止
关于Beanstalkd
Beanstalkd 是一个轻量级的内存型队列,利用了和 Memcache 类似的协议。依赖 libevent 单线程事件分发机制,可以部署多个实例,但是高并发支持还是不太友好;
几个重要的概念
- job:一个需要异步处理的任务,是 Beanstalkd 中的基本单元,需要放在一个 tube 中。
- tube:一个有名的任务队列,用来存储统一类型的 job,是 producer 和 consumer 操作的对象。
- producer:Job 的生产者,通过 put 命令来将一个 job 放到一个 tube 中。
- consumer:Job的消费者,通过 reserve/release/bury/delete 命令来获取 job 或改变 job 的状态。
Job的生命周期
任务在队列中被称为 Job. 一个 Job 在 Beanstalkd 中有以下的生命周期:
- put 将一个任务放置进 tube 中
- deayed 这个任务现在在等待中,需要若干秒才能准备完毕【延迟队列】
- ready 这个任务已经准备好了,可以消费了。所有的消费都是要从取 ready 状态的 job
- reserved 这个任务已经被消费者消费
- release 这个 job 执行失败了,把它放进 ready 状态队列中。让其他队列执行
- bury 这个 job 执行失败了,但不希望其他队列执行,先把它埋起来
put with delay release with delay ----------------> [DELAYED] <------------. | | | (time passes) | | | put v reserve | delete -----------------> [READY] ---------> [RESERVED] --------> *poof* ^ ^ | | | \ release | | | `-------------' | | | | kick | | | | bury | [BURIED] <---------------' | | delete `--------> *poof*
如何使用
具体实例可参考测试例子
安装
composer require kickpeach/queue -vvv
概述
创建队列
use KickPeach\Queue\Drivers\Beanstalkd;
use KickPeach\Queue\Queue;
$queue = new Queue(new Beanstalkd($host, $port));
创建任务
<?php
use KickPeach\Queue\Job;
class ExampleJob extends Job
{
/**
* @var string job queue name (beanstalkd tube)
*/
public $queue = 'default';
/**
* The "time to run" for all pushed jobs. (beanstalkd ttr, timeout)
*
* @var int 允许 worker 执行的最大秒数,超时 job 将会被 release 到 ready 状态.
*/
public $retry_after = 60;
/**
* The number of times the job may be attempted.
*
* @var int 最大尝试次数
*/
public $tries = 1;
/**
* @var array
*/
public $words;
public function __construct(array $words)
{
$this->words = $words;
}
public function handle()
{
var_export($this->words);
var_dump($this->retry_after, $this->tries);
// throw new \Exception('handle job with error...lol ^_^');
}
}
通过定义 $queue
指定工作队列,通过定义 $tries
指定最大任务尝试次数,通过定义 $retry_after
指定超时时间。
分发任务
$queue->push(new ExampleJob(['i', 'love', 'china']));
当然,您可以稍后分发任务(推送一个延时任务)
$queue->later(60, new ExampleJob(['i', 'love', 'china']));
处理任务
$worker = new Worker($queue);
$worker->daemon();
注意:
$worker->daemon()
是阻塞的。
默认情况下,工作者将监听名为 default
的 tube,您可以指定工作者队列(beanstalkd tube),如:
$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);
$worker->daemon();
您还可以指定在没有任务时工作者将睡眠多长时间,以及内存限制,如:
$sleep = 60;
$memoryLimit = 128;
$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);
$worker->daemon($sleep, $memoryLimit);
许可证
The MIT License (MIT)。