chipslays / queue
简单实现PHP中的队列处理。
1.1.2
2021-11-17 12:59 UTC
Requires
- chipslays/collection: ^1.0
README
简单实现PHP中的队列处理。
安装
$ composer require chipslays/queue
用法
客户端
我们将某些内容推送到队列。
use Chipslays\Queue\Queue; use Chipslays\Queue\Drivers\File; require __DIR__ . '/vendor/autoload.php'; $driver = new File([ 'storage' => __DIR__ . '/storage/', ]); $queue = new Queue($driver); $queue->add('payment', ['user_id' => 1, 'amount' => 10]);
工作进程
我们有工作进程,它会从队列中获取值并开始处理。
worker.php
use Chipslays\Queue\Queue; use Chipslays\Queue\Drivers\File; require __DIR__ . '/vendor/autoload.php'; $driver = new File([ 'storage' => __DIR__ . '/storage/', ]); $queue = new Queue($driver); while (true) { if (!$item = $queue->next('payment')) { continue; } echo 'channel: ' . $item->getChannel() . PHP_EOL; echo 'id: ' . $item->getId() . PHP_EOL; echo 'data: ' . print_r($item->getData(), true) . PHP_EOL; // also can be getting by magic getter: $item->id, $item->channel, $item->data }
定时任务
或者,我们可以使用cron作业来循环工作进程。
cron-worker.php
use Chipslays\Queue\Queue; use Chipslays\Queue\Drivers\File; require __DIR__ . '/vendor/autoload.php'; $driver = new File([ 'storage' => __DIR__ . '/storage/', ]); $queue = new Queue($driver); if (!$item = $queue->next('payment')) { exit; } echo 'channel: ' . $item->getChannel() . PHP_EOL; echo 'id: ' . $item->getId() . PHP_EOL; echo 'data: ' . print_r($item->getData(), true) . PHP_EOL; // also can be getting by magic getter: $item->id, $item->channel, $item->data
队列
队列操作的基类。
方法
__construct
/** * @param DriverInterface $driver */ public function __construct(DriverInterface $driver);
平面文件(文件系统)驱动
use Chipslays\Queue\Queue; use Chipslays\Queue\Drivers\File; require __DIR__ . '/vendor/autoload.php'; $driver = new File([ 'storage' => __DIR__ . '/storage/', ]); $queue = new Queue($driver);
添加
/** * Add item to queue. * * Returns the `id` of the added item. * * @param string $channel * @param array $data * @param int $sort * @return string */ public function add(string $channel, array $data, int $sort = QUEUE_DEFAULT_SORT): string;
示例
use Chipslays\Queue\Queue; $queue = new Queue($driver); $id = $queue->add('payment', ['key' => 'value']); echo $queue->position('payment', $id); // e.g. 1
获取
/** * Get item by ID. * * @param string $channel * @param string $id * @return Item|null */ public function get(string $channel, string $id): ?Item;
示例
use Chipslays\Queue\Queue; $queue = new Queue($driver); $id = $queue->add('payment', ['key' => 'value']); $item = $queue->get('payment', $id)); echo 'channel: ' . $item->getChannel() . PHP_EOL; echo 'id: ' . $item->getId() . PHP_EOL; echo 'data: ' . print_r($item->getData(), true) . PHP_EOL; // also can be getting by magic getter: $item->id, $item->channel, $item->data
第一个
/** * Get first item in queue. * * If queue is empty or `channel` not exists returns `null`. * * @param string $channel * @return Item|null */ public function first(string $channel): ?Item;
示例
use Chipslays\Queue\Queue; $queue = new Queue($driver); $item = $queue->first('payment'); if (!$item) { return; } echo 'channel: ' . $item->getChannel() . PHP_EOL; echo 'id: ' . $item->getId() . PHP_EOL; echo 'data: ' . print_r($item->getData(), true) . PHP_EOL; // also can be getting by magic getter: $item->id, $item->channel, $item->data
下一个
/** * Get next item in queue. * * @param string $channel * @return Item|null */ public function next(string $channel): ?Item;
示例
use Chipslays\Queue\Queue; $queue = new Queue($driver); // somewhere in client code... $queue->add('payment', ['currency' => 'EUR', 'amount' => 10]); // somewhere in worker/cron code... if (!$item = $queue->next('payment')) { return; } echo 'channel: ' . $item->getChannel() . PHP_EOL; echo 'id: ' . $item->getId() . PHP_EOL; echo 'data: ' . print_r($item->getData(), true) . PHP_EOL; // also can be getting by magic getter: $item->id, $item->channel, $item->data
删除
/** * Delete item from queue. * * Returns `true` on success delete and `false` on fail. * * @param string|Item $channel e.g. Can be passed as result from `first` method. * @param string $id * @return boolean */ public function delete($channel, string $id = null): bool;
示例
use Chipslays\Queue\Queue; $queue = new Queue($driver); $item = $queue->first('payment'); if (!$item) { return; } // Delete by pass received item from `first` method. $queue->delete($item); // Delete by `channel` and `id`. $queue->delete($item->channel, $item->id);
列表
/** * Get list of queue items. * * Returns array of `id's`, if `channel` not exists returns `null`. * * @param string $channel * @return array|null */ public function list(string $channel): ?array;
示例
use Chipslays\Queue\Queue; $queue = new Queue($driver); print_r($queue->list('payment')); // Array // // [0] => item_id_1 // [1] => item_id_2 // [2] => item_id_3 // )
注意:对于每个驱动,
id
的名称可能不同!
计数
/** * Get count of items in queue. * * Returns count, if `channel` not exists returns 0. * * @param string $channel * @return integer */ public function count(string $channel): int;
示例
use Chipslays\Queue\Queue; $queue = new Queue($driver); echo $queue->count('payment'); // e.g. 32
位置
/** * Get item position in queue. * * Return position, if `channel` or `id` not exists returns 0. * * @param string $channel * @param string $id * @return int */ public function position(string $channel, string $id): int;
示例
use Chipslays\Queue\Queue; $queue = new Queue($driver); $id = $queue->add('payment', ['key' => 'value']); echo $queue->position('payment', $id); // e.g. 1