chipslays/queue

简单实现PHP中的队列处理。

1.1.2 2021-11-17 12:59 UTC

This package is auto-updated.

Last update: 2024-09-17 20:04:44 UTC


README

Packagist Version GitHub

简单实现PHP中的队列处理。

安装

$ composer require chipslays/queue

用法

客户端

我们将某些内容推送到队列。

use Chipslays\Queue\Queue;
use Chipslays\Queue\Drivers\File;

require __DIR__ . '/vendor/autoload.php';

$driver = new File([
    'storage' => __DIR__ . '/storage/',
]);

$queue = new Queue($driver);
$queue->add('payment', ['user_id' => 1, 'amount' => 10]);

工作进程

我们有工作进程,它会从队列中获取值并开始处理。

worker.php
use Chipslays\Queue\Queue;
use Chipslays\Queue\Drivers\File;

require __DIR__ . '/vendor/autoload.php';

$driver = new File([
    'storage' => __DIR__ . '/storage/',
]);

$queue = new Queue($driver);

while (true) {
    if (!$item = $queue->next('payment')) {
        continue;
    }

    echo 'channel: ' . $item->getChannel() . PHP_EOL;
    echo 'id: ' . $item->getId() . PHP_EOL;
    echo 'data: ' . print_r($item->getData(), true) . PHP_EOL;

    // also can be getting by magic getter: $item->id, $item->channel, $item->data
}

定时任务

或者,我们可以使用cron作业来循环工作进程。

cron-worker.php
use Chipslays\Queue\Queue;
use Chipslays\Queue\Drivers\File;

require __DIR__ . '/vendor/autoload.php';

$driver = new File([
    'storage' => __DIR__ . '/storage/',
]);

$queue = new Queue($driver);

if (!$item = $queue->next('payment')) {
    exit;
}

echo 'channel: ' . $item->getChannel() . PHP_EOL;
echo 'id: ' . $item->getId() . PHP_EOL;
echo 'data: ' . print_r($item->getData(), true) . PHP_EOL;

// also can be getting by magic getter: $item->id, $item->channel, $item->data

队列

队列操作的基类。

方法

__construct

/**
 * @param DriverInterface $driver
 */
public function __construct(DriverInterface $driver);

平面文件(文件系统)驱动

use Chipslays\Queue\Queue;
use Chipslays\Queue\Drivers\File;

require __DIR__ . '/vendor/autoload.php';

$driver = new File([
    'storage' => __DIR__ . '/storage/',
]);

$queue = new Queue($driver);

添加

/**
 * Add item to queue.
 *
 * Returns the `id` of the added item.
 *
 * @param string $channel
 * @param array $data
 * @param int $sort
 * @return string
 */
public function add(string $channel, array $data, int $sort = QUEUE_DEFAULT_SORT): string;

示例

use Chipslays\Queue\Queue;

$queue = new Queue($driver);
$id = $queue->add('payment', ['key' => 'value']);
echo $queue->position('payment', $id); // e.g. 1

获取

/**
 * Get item by ID.
 *
 * @param string $channel
 * @param string $id
 * @return Item|null
 */
public function get(string $channel, string $id): ?Item;

示例

use Chipslays\Queue\Queue;

$queue = new Queue($driver);
$id = $queue->add('payment', ['key' => 'value']);
$item = $queue->get('payment', $id));

echo 'channel: ' . $item->getChannel() . PHP_EOL;
echo 'id: ' . $item->getId() . PHP_EOL;
echo 'data: ' . print_r($item->getData(), true) . PHP_EOL;

// also can be getting by magic getter: $item->id, $item->channel, $item->data

第一个

 /**
 * Get first item in queue.
 *
 * If queue is empty or `channel` not exists returns `null`.
 *
 * @param string $channel
 * @return Item|null
 */
public function first(string $channel): ?Item;

示例

use Chipslays\Queue\Queue;

$queue = new Queue($driver);
$item = $queue->first('payment');

if (!$item) {
    return;
}

echo 'channel: ' . $item->getChannel() . PHP_EOL;
echo 'id: ' . $item->getId() . PHP_EOL;
echo 'data: ' . print_r($item->getData(), true) . PHP_EOL;

// also can be getting by magic getter: $item->id, $item->channel, $item->data

下一个

/**
 * Get next item in queue.
 *
 * @param string $channel
 * @return Item|null
 */
public function next(string $channel): ?Item;

示例

use Chipslays\Queue\Queue;

$queue = new Queue($driver);

// somewhere in client code...
$queue->add('payment', ['currency' => 'EUR', 'amount' => 10]);

// somewhere in worker/cron code...
if (!$item = $queue->next('payment')) {
    return;
}

echo 'channel: ' . $item->getChannel() . PHP_EOL;
echo 'id: ' . $item->getId() . PHP_EOL;
echo 'data: ' . print_r($item->getData(), true) . PHP_EOL;

// also can be getting by magic getter: $item->id, $item->channel, $item->data

删除

/**
 * Delete item from queue.
 *
 * Returns `true` on success delete and `false` on fail.
 *
 * @param string|Item $channel e.g. Can be passed as result from `first` method.
 * @param string $id
 * @return boolean
 */
public function delete($channel, string $id = null): bool;

示例

use Chipslays\Queue\Queue;

$queue = new Queue($driver);
$item = $queue->first('payment');

if (!$item) {
    return;
}

// Delete by pass received item from `first` method.
$queue->delete($item);

// Delete by `channel` and `id`.
$queue->delete($item->channel, $item->id);

列表

/**
 * Get list of queue items.
 *
 * Returns array of `id's`, if `channel` not exists returns `null`.
 *
 * @param string $channel
 * @return array|null
 */
public function list(string $channel): ?array;

示例

use Chipslays\Queue\Queue;

$queue = new Queue($driver);
print_r($queue->list('payment'));

// Array
//
//     [0] => item_id_1
//     [1] => item_id_2
//     [2] => item_id_3
// )

注意:对于每个驱动,id 的名称可能不同!

计数

/**
 * Get count of items in queue.
 *
 * Returns count, if `channel` not exists returns 0.
 *
 * @param string $channel
 * @return integer
 */
public function count(string $channel): int;

示例

use Chipslays\Queue\Queue;

$queue = new Queue($driver);
echo $queue->count('payment'); // e.g. 32

位置

/**
 * Get item position in queue.
 *
 * Return position, if `channel` or `id` not exists returns 0.
 *
 * @param string $channel
 * @param string $id
 * @return int
 */
public function position(string $channel, string $id): int;

示例

use Chipslays\Queue\Queue;

$queue = new Queue($driver);
$id = $queue->add('payment', ['key' => 'value']);
echo $queue->position('payment', $id); // e.g. 1