rybakit/phive-queue

$queue->push('I can be popped off after', '10 minutes');

v0.12.0 2015-12-29 10:57 UTC

README

Build Status Scrutinizer Code Quality Code Coverage

Phive Queue是一个基于时间的调度队列,支持多种后端。

目录

安装

推荐使用Composer来安装Phive Queue

$ composer require rybakit/phive-queue

使用示例

use Phive\Queue\InMemoryQueue;
use Phive\Queue\NoItemAvailableException;

$queue = new InMemoryQueue();

$queue->push('item1');
$queue->push('item2', new DateTime());
$queue->push('item3', time());
$queue->push('item4', '+5 seconds');
$queue->push('item5', 'next Monday');

// get the queue size
$count = $queue->count(); // 5

// pop items off the queue
// note that is not guaranteed that the items with the same scheduled time
// will be received in the same order in which they were added
$item123 = $queue->pop();
$item123 = $queue->pop();
$item123 = $queue->pop();

try {
    $item4 = $queue->pop();
} catch (NoItemAvailableException $e) {
    // item4 is not yet available
}

sleep(5);
$item4 = $queue->pop();

// clear the queue (will remove 'item5')
$queue->clear();

队列

目前有以下队列可用

MongoQueue

MongoQueue需要Mongo PECL扩展(v1.3.0或更高版本)。

提示:在使用队列之前,强烈建议在eta字段上创建索引

$ mongo my_db --eval 'db.my_coll.ensureIndex({ eta: 1 })'
构造函数
public MongoQueue::__construct(MongoClient $mongoClient, string $dbName, string $collName)

参数

mongoClient MongoClient实例
dbName 数据库名称
collName 集合名称

示例
use Phive\Queue\MongoQueue;

$client = new MongoClient();
$queue = new MongoQueue($client, 'my_db', 'my_coll');

RedisQueue

对于RedisQueue,您必须安装Redis PECL扩展(v2.2.3或更高版本)。

构造函数
public RedisQueue::__construct(Redis $redis)

参数

redis Redis实例

示例
use Phive\Queue\RedisQueue;

$redis = new Redis();
$redis->connect('127.0.0.1');
$redis->setOption(Redis::OPT_PREFIX, 'my_prefix:');

// Since the Redis client v2.2.5 the RedisQueue has the ability to utilize serialization:
// $redis->setOption(Redis::OPT_SERIALIZER, Redis::SERIALIZER_PHP);

$queue = new RedisQueue($redis);

TarantoolQueue

要使用TarantoolQueue,您必须安装Tarantool PECL扩展和用于管理队列的Lua脚本

构造函数
public TarantoolQueue::__construct(Tarantool $tarantool, string $tubeName [, int $space = null ])

参数

tarantool Tarantool实例
tubeName 管道名称
space 可选。空间编号。默认为0

示例
use Phive\Queue\TarantoolQueue;

$tarantool = new Tarantool('127.0.0.1', 33020);
$queue = new TarantoolQueue($tarantool, 'my_tube');

PheanstalkQueue

PheanstalkQueue需要安装Pheanstalk库(Beanstalk客户端)。

$ composer require pda/pheanstalk:~3.0
构造函数
public PheanstalkQueue::__construct(Pheanstalk\PheanstalkInterface $pheanstalk, string $tubeName)

参数

pheanstalk Pheanstalk\PheanstalkInterface实例
tubeName 管道名称

示例
use Pheanstalk\Pheanstalk;
use Phive\Queue\PheanstalkQueue;

$pheanstalk = new Pheanstalk('127.0.0.1');
$queue = new PheanstalkQueue($pheanstalk, 'my_tube');

GenericPdoQueue

GenericPdoQueue适用于支持存储过程/函数的PDO驱动程序(实际上除了SQLite以外的所有驱动程序)。

GenericPdoQueue需要安装PDO和特定数据库的PDO驱动程序。此外,PDO错误模式必须设置为抛出异常(PDO::ERRMODE_EXCEPTION)。

创建表和存储过程的SQL文件可以在res目录中找到。

构造函数
public GenericPdoQueue::__construct(PDO $pdo, string $tableName [, string $routineName = null ] )

参数

pdo PDO实例
tableName 表名称
routineName 可选。过程名称。默认为tableName_pop

示例
use Phive\Queue\Pdo\GenericPdoQueue;

$pdo = new PDO('pgsql:host=127.0.0.1;port=5432;dbname=my_db', 'db_user', 'db_pass');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

$queue = new GenericPdoQueue($pdo, 'my_table', 'my_routine');

SqlitePdoQueue

SqlitePdoQueue需要安装PDOSQLite PDO驱动程序。此外,PDO错误模式必须设置为抛出异常(PDO::ERRMODE_EXCEPTION)。

创建表的SQL文件可以在res/sqlite目录中找到。

提示:出于性能考虑,强烈建议激活WAL模式

$pdo->exec('PRAGMA journal_mode=WAL');
构造函数
public SqlitePdoQueue::__construct(PDO $pdo, string $tableName)

参数

pdo PDO实例
tableName 表名称

示例
use Phive\Queue\Pdo\SqlitePdoQueue;

$pdo = new PDO('sqlite:/opt/databases/my_db.sq3');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->exec('PRAGMA journal_mode=WAL');

$queue = new SqlitePdoQueue($pdo, 'my_table');

SysVQueue

SysVQueue需要PHP通过选项--enable-sysvmsg进行编译。

构造函数
public SysVQueue::__construct(int $key [, bool $serialize = null [, int $perms = null ]] )

参数

key 消息队列的数字ID
serialize 可选。是否序列化一个项。默认为false
perms 可选。队列权限。默认为0666

示例
use Phive\Queue\SysVQueue;

$queue = new SysVQueue(123456);

InMemoryQueue

InMemoryQueue 在不需要持久性的情况下可能很有用。它仅存在于RAM中,因此比其他队列操作更快。

构造函数
public InMemoryQueue::__construct()
示例
use Phive\Queue\InMemoryQueue;

$queue = new InMemoryQueue();

项目类型

以下表格详细说明了跨队列支持的各种项目类型。

✓* — 如果启用了序列化器,则支持。

为了绕过特定队列不支持类型的限制,您可以在推送之前将项转换为非二进制字符串,并在弹出后将其转换回来。库附带了一个TypeSafeQueue装饰器,可以为您完成此操作

use Phive\Queue\GenericPdoQueue;
use Phive\Queue\TypeSafeQueue;

$queue = new GenericPdoQueue(...);
$queue = new TypeSafeQueue($queue);

$queue->push(['foo' => 'bar']);
$array = $queue->pop(); // ['foo' => 'bar'];

异常

Queue接口中声明的每个队列方法,如果在调用方法时发生运行时错误,都将抛出异常。

例如,在下面的代码中,如果远程服务器不可达,push()调用将失败并抛出MongoConnectionException异常。

use Phive\Queue\MongoQueue;

$queue = new MongoQueue(...);

// mongodb server goes down here

$queue->push('item'); // throws MongoConnectionException

但有时您可能想要捕获来自队列的异常,而不管底层的驱动程序如何。为此,只需使用ExceptionalQueue装饰器包装您的队列对象

use Phive\Queue\ExceptionalQueue;
use Phive\Queue\MongoQueue;

$queue = new MongoQueue(...);
$queue = new ExceptionalQueue($queue);

// mongodb server goes down here

$queue->push('item'); // throws Phive\Queue\QueueException

然后,要捕获队列级别的异常,请使用QueueException

use Phive\Queue\QueueException;

...

try {
    do_something_with_a_queue();
} catch (QueueException $e) {
    // handle queue exception
} catch (\Exception $e) {
    // handle base exception
}

测试

Phive Queue使用PHPUnit进行单元和集成测试。为了运行测试,您首先需要使用composer安装库依赖项

$ composer install

然后您可以运行测试

$ phpunit

您还可以指定某些测试的自己的默认值(例如数据库名称、密码、队列大小等)。您可以通过命令行设置环境变量来实现

$ export PHIVE_PDO_PGSQL_PASSWORD="pgsql_password"
$ export PHIVE_PDO_MYSQL_PASSWORD="mysql_password"
$ phpunit

您还可以通过复制phpunit.xml.dist文件并按需自定义来创建自己的phpunit.xml文件。

性能

要检查队列的性能,请运行

$ phpunit --group performance

此测试将插入一定数量的项目(默认为1000)到队列中,然后检索它们。它测量pushpop操作的平均时间,并输出结果统计,例如

RedisQueue::push()
   Total operations:      1000
   Operations per second: 14031.762 [#/sec]
   Time per operation:    71.267 [ms]
   Time taken for test:   0.071 [sec]

RedisQueue::pop()
   Total operations:      1000
   Operations per second: 16869.390 [#/sec]
   Time per operation:    59.279 [ms]
   Time taken for test:   0.059 [sec]
.
RedisQueue::push() (delayed)
   Total operations:      1000
   Operations per second: 15106.226 [#/sec]
   Time per operation:    66.198 [ms]
   Time taken for test:   0.066 [sec]

RedisQueue::pop() (delayed)
   Total operations:      1000
   Operations per second: 14096.416 [#/sec]
   Time per operation:    70.940 [ms]
   Time taken for test:   0.071 [sec]

您还可以通过更改phpunit.xml文件中的PHIVE_PERF_QUEUE_SIZE值或通过命令行设置环境变量来更改测试中涉及的项的数量

$ PHIVE_PERF_QUEUE_SIZE=5000 phpunit --group performance

并发

为了检查并发性,您必须安装Gearman服务器和German PECL扩展。一旦服务器已安装并启动,运行以下命令创建一定数量的进程(工作者)

$ php tests/worker.php

然后运行测试

$ phpunit --group concurrency

此测试将插入一定数量的项目(默认为100)到队列中,然后每个工作者尝试并行检索它们。

您还可以通过更改phpunit.xml文件中的PHIVE_CONCUR_QUEUE_SIZE值或通过命令行设置环境变量来更改测试中涉及的项的数量

$ PHIVE_CONCUR_QUEUE_SIZE=500 phpunit --group concurrency

许可

Phive Queue采用MIT许可证发布。有关详细信息,请参阅附带LICENSE文件。