rybakit / phive-queue
$queue->push('I can be popped off after', '10 minutes');
Requires
- php: ^5.4|^7.0
Requires (Dev)
- pda/pheanstalk: ~3.0
Suggests
- ext-mongo: >=1.3.0
- ext-pdo
- ext-pdo_mysql
- ext-pdo_pgsql
- ext-pdo_sqlite
- ext-phpredis: >=2.2.3
- ext-sysvmsg
- ext-tarantool
- pda/pheanstalk: >=3.0
This package is auto-updated.
Last update: 2024-09-10 12:26:08 UTC
README
Phive Queue是一个基于时间的调度队列,支持多种后端。
目录
安装
推荐使用Composer来安装Phive Queue
$ composer require rybakit/phive-queue
使用示例
use Phive\Queue\InMemoryQueue; use Phive\Queue\NoItemAvailableException; $queue = new InMemoryQueue(); $queue->push('item1'); $queue->push('item2', new DateTime()); $queue->push('item3', time()); $queue->push('item4', '+5 seconds'); $queue->push('item5', 'next Monday'); // get the queue size $count = $queue->count(); // 5 // pop items off the queue // note that is not guaranteed that the items with the same scheduled time // will be received in the same order in which they were added $item123 = $queue->pop(); $item123 = $queue->pop(); $item123 = $queue->pop(); try { $item4 = $queue->pop(); } catch (NoItemAvailableException $e) { // item4 is not yet available } sleep(5); $item4 = $queue->pop(); // clear the queue (will remove 'item5') $queue->clear();
队列
目前有以下队列可用
- MongoQueue
- RedisQueue
- TarantoolQueue
- PheanstalkQueue
- GenericPdoQueue
- SqlitePdoQueue
- SysVQueue
- InMemoryQueue
MongoQueue
MongoQueue
需要Mongo PECL扩展(v1.3.0或更高版本)。
提示:在使用队列之前,强烈建议在eta
字段上创建索引
$ mongo my_db --eval 'db.my_coll.ensureIndex({ eta: 1 })'
构造函数
public MongoQueue::__construct(MongoClient $mongoClient, string $dbName, string $collName)
参数
mongoClient MongoClient实例
dbName 数据库名称
collName 集合名称
示例
use Phive\Queue\MongoQueue; $client = new MongoClient(); $queue = new MongoQueue($client, 'my_db', 'my_coll');
RedisQueue
对于RedisQueue
,您必须安装Redis PECL扩展(v2.2.3或更高版本)。
构造函数
public RedisQueue::__construct(Redis $redis)
参数
redis Redis实例
示例
use Phive\Queue\RedisQueue; $redis = new Redis(); $redis->connect('127.0.0.1'); $redis->setOption(Redis::OPT_PREFIX, 'my_prefix:'); // Since the Redis client v2.2.5 the RedisQueue has the ability to utilize serialization: // $redis->setOption(Redis::OPT_SERIALIZER, Redis::SERIALIZER_PHP); $queue = new RedisQueue($redis);
TarantoolQueue
要使用TarantoolQueue
,您必须安装Tarantool PECL扩展和用于管理队列的Lua脚本。
构造函数
public TarantoolQueue::__construct(Tarantool $tarantool, string $tubeName [, int $space = null ])
参数
tarantool Tarantool实例
tubeName 管道名称
space 可选。空间编号。默认为0
示例
use Phive\Queue\TarantoolQueue; $tarantool = new Tarantool('127.0.0.1', 33020); $queue = new TarantoolQueue($tarantool, 'my_tube');
PheanstalkQueue
PheanstalkQueue
需要安装Pheanstalk库(Beanstalk客户端)。
$ composer require pda/pheanstalk:~3.0
构造函数
public PheanstalkQueue::__construct(Pheanstalk\PheanstalkInterface $pheanstalk, string $tubeName)
参数
pheanstalk Pheanstalk\PheanstalkInterface实例
tubeName 管道名称
示例
use Pheanstalk\Pheanstalk; use Phive\Queue\PheanstalkQueue; $pheanstalk = new Pheanstalk('127.0.0.1'); $queue = new PheanstalkQueue($pheanstalk, 'my_tube');
GenericPdoQueue
GenericPdoQueue
适用于支持存储过程/函数的PDO驱动程序(实际上除了SQLite以外的所有驱动程序)。
GenericPdoQueue
需要安装PDO和特定数据库的PDO驱动程序。此外,PDO错误模式必须设置为抛出异常(PDO::ERRMODE_EXCEPTION
)。
创建表和存储过程的SQL文件可以在res目录中找到。
构造函数
public GenericPdoQueue::__construct(PDO $pdo, string $tableName [, string $routineName = null ] )
参数
pdo PDO实例
tableName 表名称
routineName 可选。过程名称。默认为tableName
_pop
示例
use Phive\Queue\Pdo\GenericPdoQueue; $pdo = new PDO('pgsql:host=127.0.0.1;port=5432;dbname=my_db', 'db_user', 'db_pass'); $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $queue = new GenericPdoQueue($pdo, 'my_table', 'my_routine');
SqlitePdoQueue
SqlitePdoQueue
需要安装PDO和SQLite PDO驱动程序。此外,PDO错误模式必须设置为抛出异常(PDO::ERRMODE_EXCEPTION
)。
创建表的SQL文件可以在res/sqlite目录中找到。
提示:出于性能考虑,强烈建议激活WAL模式
$pdo->exec('PRAGMA journal_mode=WAL');
构造函数
public SqlitePdoQueue::__construct(PDO $pdo, string $tableName)
参数
pdo PDO实例
tableName 表名称
示例
use Phive\Queue\Pdo\SqlitePdoQueue; $pdo = new PDO('sqlite:/opt/databases/my_db.sq3'); $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $pdo->exec('PRAGMA journal_mode=WAL'); $queue = new SqlitePdoQueue($pdo, 'my_table');
SysVQueue
SysVQueue
需要PHP通过选项--enable-sysvmsg
进行编译。
构造函数
public SysVQueue::__construct(int $key [, bool $serialize = null [, int $perms = null ]] )
参数
key 消息队列的数字ID
serialize 可选。是否序列化一个项。默认为false
perms 可选。队列权限。默认为0666
示例
use Phive\Queue\SysVQueue; $queue = new SysVQueue(123456);
InMemoryQueue
InMemoryQueue
在不需要持久性的情况下可能很有用。它仅存在于RAM中,因此比其他队列操作更快。
构造函数
public InMemoryQueue::__construct()
示例
use Phive\Queue\InMemoryQueue; $queue = new InMemoryQueue();
项目类型
以下表格详细说明了跨队列支持的各种项目类型。
✓* — 如果启用了序列化器,则支持。
为了绕过特定队列不支持类型的限制,您可以在推送之前将项转换为非二进制字符串,并在弹出后将其转换回来。库附带了一个TypeSafeQueue
装饰器,可以为您完成此操作
use Phive\Queue\GenericPdoQueue; use Phive\Queue\TypeSafeQueue; $queue = new GenericPdoQueue(...); $queue = new TypeSafeQueue($queue); $queue->push(['foo' => 'bar']); $array = $queue->pop(); // ['foo' => 'bar'];
异常
在Queue接口中声明的每个队列方法,如果在调用方法时发生运行时错误,都将抛出异常。
例如,在下面的代码中,如果远程服务器不可达,push()
调用将失败并抛出MongoConnectionException
异常。
use Phive\Queue\MongoQueue; $queue = new MongoQueue(...); // mongodb server goes down here $queue->push('item'); // throws MongoConnectionException
但有时您可能想要捕获来自队列的异常,而不管底层的驱动程序如何。为此,只需使用ExceptionalQueue
装饰器包装您的队列对象
use Phive\Queue\ExceptionalQueue; use Phive\Queue\MongoQueue; $queue = new MongoQueue(...); $queue = new ExceptionalQueue($queue); // mongodb server goes down here $queue->push('item'); // throws Phive\Queue\QueueException
然后,要捕获队列级别的异常,请使用QueueException
类
use Phive\Queue\QueueException; ... try { do_something_with_a_queue(); } catch (QueueException $e) { // handle queue exception } catch (\Exception $e) { // handle base exception }
测试
Phive Queue使用PHPUnit进行单元和集成测试。为了运行测试,您首先需要使用composer安装库依赖项
$ composer install
然后您可以运行测试
$ phpunit
您还可以指定某些测试的自己的默认值(例如数据库名称、密码、队列大小等)。您可以通过命令行设置环境变量来实现
$ export PHIVE_PDO_PGSQL_PASSWORD="pgsql_password" $ export PHIVE_PDO_MYSQL_PASSWORD="mysql_password" $ phpunit
您还可以通过复制phpunit.xml.dist文件并按需自定义来创建自己的phpunit.xml
文件。
性能
要检查队列的性能,请运行
$ phpunit --group performance
此测试将插入一定数量的项目(默认为1000)到队列中,然后检索它们。它测量push
和pop
操作的平均时间,并输出结果统计,例如
RedisQueue::push() Total operations: 1000 Operations per second: 14031.762 [#/sec] Time per operation: 71.267 [ms] Time taken for test: 0.071 [sec] RedisQueue::pop() Total operations: 1000 Operations per second: 16869.390 [#/sec] Time per operation: 59.279 [ms] Time taken for test: 0.059 [sec] . RedisQueue::push() (delayed) Total operations: 1000 Operations per second: 15106.226 [#/sec] Time per operation: 66.198 [ms] Time taken for test: 0.066 [sec] RedisQueue::pop() (delayed) Total operations: 1000 Operations per second: 14096.416 [#/sec] Time per operation: 70.940 [ms] Time taken for test: 0.071 [sec]
您还可以通过更改phpunit.xml
文件中的PHIVE_PERF_QUEUE_SIZE
值或通过命令行设置环境变量来更改测试中涉及的项的数量
$ PHIVE_PERF_QUEUE_SIZE=5000 phpunit --group performance
并发
为了检查并发性,您必须安装Gearman服务器和German PECL扩展。一旦服务器已安装并启动,运行以下命令创建一定数量的进程(工作者)
$ php tests/worker.php
然后运行测试
$ phpunit --group concurrency
此测试将插入一定数量的项目(默认为100)到队列中,然后每个工作者尝试并行检索它们。
您还可以通过更改phpunit.xml
文件中的PHIVE_CONCUR_QUEUE_SIZE
值或通过命令行设置环境变量来更改测试中涉及的项的数量
$ PHIVE_CONCUR_QUEUE_SIZE=500 phpunit --group concurrency
许可
Phive Queue采用MIT许可证发布。有关详细信息,请参阅附带LICENSE文件。