tarantool / queue
Tarantool Queue 的 PHP 绑定。
Requires
- php: ^7.2.5|^8
- tarantool/client: ^0.10
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.19
- tarantool/phpunit-extras: ^0.2
- vimeo/psalm: ^3.9|^4
README
Tarantool 是一个运行在 Lua 应用服务器中的 NoSQL 数据库。它集成了名为 LuaRocks 的 Lua 模块。本包提供了 Tarantool Queue LuaRock 的 PHP 绑定。
目录
安装
推荐通过 Composer 安装库
composer require tarantool/queue
开始前
为了使用队列,你首先需要确保你的 Tarantool 实例已配置、启动并运行。最低要求的配置可能看起来像这样
-- queues.lua box.cfg {listen = 3301} queue = require('queue') queue.create_tube('foobar', 'fifottl', {if_not_exists = true})
要启动实例,您需要将 queues.lua
文件复制(或符号链接)到 /etc/tarantool/instances.enabled
目录,并运行以下命令
sudo tarantoolctl start queues
使用队列
一旦你的实例启动,你可以通过创建一个队列对象开始,该对象使用你在 Lua 脚本中定义的队列(管)名称
use Tarantool\Queue\Queue; ... $queue = new Queue($client, 'foobar');
其中 $client
是来自 tarantool/client 包的 Tarantool\Client\Client
实例。
数据类型
在内部,Tarantool 使用 MessagePack 二进制格式来序列化和反序列化队列中存储的数据。它可以处理大多数 PHP 数据类型(除资源外)而无需任何手动预处理或后处理
$queue->put('foo'); $queue->put(true); $queue->put(42); $queue->put(4.2); $queue->put(['foo' => ['bar' => ['baz' => null]]]); $queue->put(new MyObject());
要了解更多关于对象序列化的信息,请点击此 链接。
任务
大多数 Queue API 方法返回一个包含以下获取器的 Task 对象
Task::getId() Task::getState() // States::READY, States::TAKEN, States::DONE, States::BURY or States::DELAYED Task::getData()
和一些糖方法
Task::isReady() Task::isTaken() Task::isDone() Task::isBuried() Task::isDelayed()
生产者 API
正如你所看到的,要将任务插入队列,你需要调用 put()
方法,它接受两个参数:你想要处理的数据以及可选的任务选项数组,该队列支持这些选项。例如,支持 delay
、ttl
、ttr
和 pri
选项的 fifottl
队列(我们已经在 Lua 配置文件中 之前 定义了)
use Tarantool\Queue\Options; $queue->put('foo', [Options::DELAY => 30.0]); $queue->put('bar', [Options::TTL => 5.0]); $queue->put('baz', [Options::TTR => 10.0, Options::PRI => 42]);
请在此处查看所有可用选项的完整列表 此处。
消费者 API
要预留任务以执行,请调用 take()
方法。它接受一个可选的 timeout
参数。如果提供了超时值,调用将等待 timeout
秒,直到队列中出现 READY
任务。该方法返回一个 Task 对象或 null
$taskOrNull = $queue->take(); // wait 2 seconds $taskOrNull = $queue->take(2.0); // wait 100 milliseconds $taskOrNull = $queue->take(.1);
执行成功后,可以将任务标记为已确认(这将从队列中删除任务)
$data = $task->getData(); // process $data $task = $queue->ack($task->getId());
或者将其放回队列中,如果它无法执行
$task = $queue->release($task->getId()); // for *ttl queues you can specify a delay $task = $queue->release($task->getId(), [Options::DELAY => 30.0]);
要查看任务而不更改其状态,请使用
$task = $queue->peek($task->getId());
隐藏(禁用)一个任务
$task = $queue->bury($task->getId());
将隐藏的任务重置回 就绪
状态
$count = $queue->kick(3); // kick 3 buried tasks
增加正在运行的任务的TTR和/或TTL(仅适用于 *ttl 队列)
$taskOrNull = $queue->touch($takenTask->getId(), 5.0); // increase ttr/ttl to 5 seconds
可以使用 delete()
方法永久删除(任何状态)的任务
$task = $queue->delete($task->getId());
删除队列中的所有任务
$queue->truncate();
有关详细API文档,请阅读队列 README 中的 "使用队列模块" 部分。
统计信息
stats()
方法提供了自队列创建以来累积的统计信息访问权限
$stats = $queue->stats();
此调用的结果可能如下所示
[ 'tasks' => [ 'taken' => 1, 'buried' => 1, 'ready' => 1, 'done' => 0, 'delayed' => 0, 'total' => 3, ], 'calls' => [ 'bury' => 1, 'put' => 3, 'take' => 1, ... ], ]
此外,您还可以指定一个键以返回数组的子集
$calls = $queue->stats('calls'); $total = $queue->stats('tasks.total');
自定义方法
感谢 队列 Lua 模块的灵活性,您可以轻松创建自己的队列驱动程序或通过添加额外功能扩展现有驱动程序。例如,假设您向 foobar
队列中添加了 put_many
方法,该方法可以原子性地插入多个任务
-- queues.lua ... queue.tube.foobar.put_many = function(self, items) local put = {} box.begin() for k, item in pairs(items) do put[k] = tube:put(unpack(item)) end box.commit() return put end
要从 PHP 中调用此方法,请使用 Queue::call()
$result = $queue->call('put_many', [ 'foo' => ['foo', [Options::DELAY => 30.0]], 'bar' => ['bar'], ]);
测试
使用 Docker 运行测试是最简单的方法。首先,使用 dockerfile.sh 生成器构建镜像
./dockerfile.sh | docker build -t queue -
然后运行一个 Tarantool 实例(需要用于集成测试)
docker network create tarantool-php
docker run -d --net=tarantool-php -p 3301:3301 --name=tarantool \
-v $(pwd)/tests/Integration/queues.lua:/queues.lua \
tarantool/tarantool:2 tarantool /queues.lua
然后运行单元测试和集成测试
docker run --rm --net=tarantool-php -v $(pwd):/queue -w /queue queue
此库在底层使用 PHPUnit,如果需要,您可以将额外的参数和选项传递给 phpunit
命令。例如,要运行仅单元测试,请执行
docker run --rm --net=tarantool-php -v $(pwd):/queue -w /queue queue \
vendor/bin/phpunit --testsuite=unit
许可证
此库在 MIT 许可下发布。有关详细信息,请参阅附带的 LICENSE 文件。