tarantool/queue

Tarantool Queue 的 PHP 绑定。

资助包维护!
rybakit

v0.10.0 2022-09-13 21:49 UTC

This package is auto-updated.

Last update: 2024-08-29 23:34:33 UTC


README

Quality Assurance Scrutinizer Code Quality Code Coverage Mentioned in Awesome PHP Telegram

Tarantool 是一个运行在 Lua 应用服务器中的 NoSQL 数据库。它集成了名为 LuaRocks 的 Lua 模块。本包提供了 Tarantool Queue LuaRock 的 PHP 绑定。

目录

安装

推荐通过 Composer 安装库

composer require tarantool/queue

开始前

为了使用队列,你首先需要确保你的 Tarantool 实例已配置、启动并运行。最低要求的配置可能看起来像这样

-- queues.lua

box.cfg {listen = 3301}

queue = require('queue')
queue.create_tube('foobar', 'fifottl', {if_not_exists = true})

你可以在官方 Tarantool 文档 中阅读有关 box 配置的更多信息。有关队列配置的更多信息,请参阅 此处

要启动实例,您需要将 queues.lua 文件复制(或符号链接)到 /etc/tarantool/instances.enabled 目录,并运行以下命令

sudo tarantoolctl start queues

使用队列

一旦你的实例启动,你可以通过创建一个队列对象开始,该对象使用你在 Lua 脚本中定义的队列(管)名称

use Tarantool\Queue\Queue;

...

$queue = new Queue($client, 'foobar');

其中 $client 是来自 tarantool/client 包的 Tarantool\Client\Client 实例。

数据类型

在内部,Tarantool 使用 MessagePack 二进制格式来序列化和反序列化队列中存储的数据。它可以处理大多数 PHP 数据类型(除资源外)而无需任何手动预处理或后处理

$queue->put('foo');
$queue->put(true);
$queue->put(42);
$queue->put(4.2);
$queue->put(['foo' => ['bar' => ['baz' => null]]]);
$queue->put(new MyObject());

要了解更多关于对象序列化的信息,请点击此 链接

任务

大多数 Queue API 方法返回一个包含以下获取器的 Task 对象

Task::getId()
Task::getState() // States::READY, States::TAKEN, States::DONE, States::BURY or States::DELAYED
Task::getData()

和一些糖方法

Task::isReady()
Task::isTaken()
Task::isDone()
Task::isBuried()
Task::isDelayed()

生产者 API

正如你所看到的,要将任务插入队列,你需要调用 put() 方法,它接受两个参数:你想要处理的数据以及可选的任务选项数组,该队列支持这些选项。例如,支持 delayttlttrpri 选项的 fifottl 队列(我们已经在 Lua 配置文件中 之前 定义了)

use Tarantool\Queue\Options;

$queue->put('foo', [Options::DELAY => 30.0]);
$queue->put('bar', [Options::TTL => 5.0]);
$queue->put('baz', [Options::TTR => 10.0, Options::PRI => 42]);

请在此处查看所有可用选项的完整列表 此处

消费者 API

要预留任务以执行,请调用 take() 方法。它接受一个可选的 timeout 参数。如果提供了超时值,调用将等待 timeout 秒,直到队列中出现 READY 任务。该方法返回一个 Task 对象或 null

$taskOrNull = $queue->take();

// wait 2 seconds
$taskOrNull = $queue->take(2.0);

// wait 100 milliseconds
$taskOrNull = $queue->take(.1);

执行成功后,可以将任务标记为已确认(这将从队列中删除任务)

$data = $task->getData();

// process $data

$task = $queue->ack($task->getId());

或者将其放回队列中,如果它无法执行

$task = $queue->release($task->getId());

// for *ttl queues you can specify a delay
$task = $queue->release($task->getId(), [Options::DELAY => 30.0]);

要查看任务而不更改其状态,请使用

$task = $queue->peek($task->getId());

隐藏(禁用)一个任务

$task = $queue->bury($task->getId());

将隐藏的任务重置回 就绪 状态

$count = $queue->kick(3); // kick 3 buried tasks

增加正在运行的任务的TTR和/或TTL(仅适用于 *ttl 队列)

$taskOrNull = $queue->touch($takenTask->getId(), 5.0); // increase ttr/ttl to 5 seconds

可以使用 delete() 方法永久删除(任何状态)的任务

$task = $queue->delete($task->getId());

删除队列中的所有任务

$queue->truncate();

有关详细API文档,请阅读队列 README 中的 "使用队列模块" 部分。

统计信息

stats() 方法提供了自队列创建以来累积的统计信息访问权限

$stats = $queue->stats();

此调用的结果可能如下所示

[
    'tasks' => [
        'taken'   => 1,
        'buried'  => 1,
        'ready'   => 1,
        'done'    => 0,
        'delayed' => 0,
        'total'   => 3,
    ],
    'calls' => [
        'bury' => 1,
        'put'  => 3,
        'take' => 1,
        ...
    ],
]

此外,您还可以指定一个键以返回数组的子集

$calls = $queue->stats('calls');
$total = $queue->stats('tasks.total');

自定义方法

感谢 队列 Lua 模块的灵活性,您可以轻松创建自己的队列驱动程序或通过添加额外功能扩展现有驱动程序。例如,假设您向 foobar 队列中添加了 put_many 方法,该方法可以原子性地插入多个任务

-- queues.lua

...

queue.tube.foobar.put_many = function(self, items)
    local put = {}

    box.begin()
    for k, item in pairs(items) do
        put[k] = tube:put(unpack(item))
    end
    box.commit()

    return put
end

要从 PHP 中调用此方法,请使用 Queue::call()

$result = $queue->call('put_many', [
    'foo' => ['foo', [Options::DELAY => 30.0]],
    'bar' => ['bar'],
]);

测试

使用 Docker 运行测试是最简单的方法。首先,使用 dockerfile.sh 生成器构建镜像

./dockerfile.sh | docker build -t queue -

然后运行一个 Tarantool 实例(需要用于集成测试)

docker network create tarantool-php
docker run -d --net=tarantool-php -p 3301:3301 --name=tarantool \
    -v $(pwd)/tests/Integration/queues.lua:/queues.lua \
    tarantool/tarantool:2 tarantool /queues.lua

然后运行单元测试和集成测试

docker run --rm --net=tarantool-php -v $(pwd):/queue -w /queue queue

此库在底层使用 PHPUnit,如果需要,您可以将额外的参数和选项传递给 phpunit 命令。例如,要运行仅单元测试,请执行

docker run --rm --net=tarantool-php -v $(pwd):/queue -w /queue queue \
    vendor/bin/phpunit --testsuite=unit

许可证

此库在 MIT 许可下发布。有关详细信息,请参阅附带的 LICENSE 文件。