heureka/php-rq

基于 Redis 的几个简单且可靠的队列

v3.0.0 2023-03-08 13:18 UTC

README

Travis CI Packagist Join the chat at https://gitter.im/heureka/php-rq

PhpRQ

这个库是一组 PHP 文件和 Lua 脚本,它使您能够轻松实现基于 Redis 的队列系统。由于 Lua 脚本使用频繁,该库完全使用集成测试进行测试。该库中实现了多种类型的队列,因此您可以选择最适合您需求的队列。

所有队列在多线程环境中都能很好地工作。您需要注意的唯一一点是,在有多个消费者的情况下,无法保证项目的顺序。例如,如果多个消费者意外退出,然后您使用重新入队方法将项目放回队列中,那么您很可能会丢失项目的顺序。如果您想依赖项目的顺序,那么您必须一次只使用一个消费者,在失败后拒绝整个批次,并在获取另一块项目之前重新入队所有内容。

队列

这是最基本类型的队列。如果您不需要队列中项目的唯一性,那么您可能想使用队列。

如果您随机拒绝项目,或者在多线程环境中使用队列,则没有可能保证项目的顺序。队列会尽力维护项目的顺序,但您必须记住这只是 尽力而为 的方法。只有当您使用单个消费者,在失败后拒绝整个批次,并在获取新项目之前重新入队失败的项目时,才能保证项目的顺序。

队列 - 通用用法

您可以使用 addItemaddItems 方法向队列中添加项目。如果您想从队列中获取项目,则可以使用 getItemsgetAllItems 方法。当您从队列中获取项目时,您必须承认(成功;ackItemackItems)或拒绝(失败;rejectItemrejectItemsrejectBatch)它们。如果您拒绝一个项目,则它将被移动到队列的头部。如果您错过承认或撤销项目,则它将保留在处理队列中,直到您使用 reEnqueueTimedOutItemsreEnqueueAllItemsdropTimedOutItemsdropAllItems(取决于您的需求)来清除它。您必须以某种方式清除失败的处理队列,否则遗忘的数据会填满您的 Redis。

Queue::getRedisClient

返回使用的 Redis 客户端实例(PhpRQ\ClientInterface)。这在多线程环境中很有用,当您想重新连接连接时。

Queue::getCount

返回队列中项目的数量。

Queue::addItem

参数: $item 任何可以转换为字符串的对象

向队列中添加项目。

Queue::addItems

参数: (数组) $items 包含任何可以转换为字符串的对象的数组

向队列中添加多个项目。

Queue::getItems

参数: (int) $size 您希望从队列中返回的项目数量

返回队列中的 $size 个元素。您可以设置任何您喜欢的 $size,项目将以固定的安全大小分块从队列中获取,以防止服务器过载。

从队列中获取的项目将被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。

Queue::getAllItems

返回队列中的所有项目。项目将以固定的安全大小分块从队列中获取,以防止服务器过载。

从队列中获取的项目将被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。

Queue::ackItem

参数: $item 任何可以转换为字符串的对象

确认项目 - 从处理队列中移除项目。

Queue::ackItems

参数: (数组) $items 包含任何可以转换为字符串的对象的数组

确认多个项目 - 从处理队列中移除它们。

Queue::rejectItem

参数: $item 任何可以转换为字符串的对象

撤销项目 - 从处理队列中移除项目并将其放回队列头部(即它将成为第一个获取的项目)

注意:顺序只能由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认另一个项目,则顺序将丢失。

Queue::rejectItems

参数: (数组) $items 包含任何可以转换为字符串的对象的数组

撤销多个项目 - 从处理队列中移除项目并将其按相反顺序放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。

注意:顺序只能由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认另一个项目,则顺序将丢失。

Queue::rejectBatch

撤销处理队列中剩余的所有项目,并按相反顺序将其放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。

Queue::reEnqueueTimedOutItems

参数:(int)$timeout 处理队列及其所有项目被认为失败后的秒数

您应该定期调用此方法(或reEnqueueAllItems)或将超时项目从处理队列放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。

注意:顺序只能由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“排在第二批次之后”。

Queue::reEnqueueAllItems

在从队列获取项目之前,您应该调用此方法,但仅当您有单个消费者时。此方法将所有处理队列的项目按相反顺序放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。请注意,此方法可以将仍然有效的处理队列(即未失败的那些)重新入队。

注意:顺序只能由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“排在第二批次之后”。

Queue::dropTimedOutItems

参数:(int)$timeout 处理队列及其所有项目被认为失败后的秒数

您应该定期调用此方法(或在从队列获取项目之前)以从处理队列中删除超时项目。

Queue::dropAllItems

在从队列获取项目之前,您应该调用此方法,但仅当您有单个消费者时。此方法将从所有处理队列中删除项目。请注意,此方法可以删除仍然有效的处理队列(即未失败的那些)。

Queue::clearQueue

从队列及其处理列表中删除所有项目。对于测试目的很有用。

UniqueQueue

此类型队列在您不需要推入队列的所有项目,只需每种“类型”中的一个项目时很有用。例如,如果您想为按类别刷新缓存的产品构建一个队列,那么您可能希望将类别ID发送到队列。如果处理缓存刷新的进程太慢,那么您可能会向队列发送多个更新请求(类别ID)。在这种情况下,不需要刷新缓存两次 - 您只需要一次,但进程还没有处理到您的项目。解决方案是忽略第二个项目。这正是UniqueQueue的目的。它保证了队列中项目的唯一性。

如果您随机拒绝项目或在多线程环境中使用队列,则无法保证项目的顺序。UniqueQueue 尽力保持项目的顺序,但请记住这只是最佳努力方法。只有在使用单个消费者、在失败后拒绝整个批次并在获取新项目之前重新入队失败项目时,才能保证项目的顺序。

UniqueQueue - 通用用法

您可以使用 addItemaddItems 方法向队列中添加项目。如果您想从队列中获取项目,则可以使用 getItemsgetAllItems 方法。当您从队列中获取项目时,您必须承认(成功;ackItemackItems)或拒绝(失败;rejectItemrejectItemsrejectBatch)它们。如果您拒绝一个项目,则它将被移动到队列的头部。如果您错过承认或撤销项目,则它将保留在处理队列中,直到您使用 reEnqueueTimedOutItemsreEnqueueAllItemsdropTimedOutItemsdropAllItems(取决于您的需求)来清除它。您必须以某种方式清除失败的处理队列,否则遗忘的数据会填满您的 Redis。

UniqueQueue::getRedisClient

返回使用的 Redis 客户端实例(PhpRQ\ClientInterface)。这在多线程环境中很有用,当您想重新连接连接时。

UniqueQueue::getCount

返回队列中项目的数量。

UniqueQueue::addItem

参数: $item 任何可以转换为字符串的对象

向队列中添加项目。

UniqueQueue::addItems

参数: (数组) $items 包含任何可以转换为字符串的对象的数组

向队列中添加多个项目。

UniqueQueue::getItems

参数: (int) $size 您希望从队列中返回的项目数量

返回队列中的 $size 个元素。您可以设置任何您喜欢的 $size,项目将以固定的安全大小分块从队列中获取,以防止服务器过载。

从队列中获取的项目将被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。

UniqueQueue::getAllItems

返回队列中的所有项目。项目将以固定的安全大小分块从队列中获取,以防止服务器过载。

从队列中获取的项目将被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。

UniqueQueue::ackItem

参数: $item 任何可以转换为字符串的对象

确认项目 - 从处理队列中移除项目。

UniqueQueue::ackItems

参数: (数组) $items 包含任何可以转换为字符串的对象的数组

确认多个项目 - 从处理队列中移除它们。

UniqueQueue::rejectItem

参数: $item 任何可以转换为字符串的对象

撤销项目 - 从处理队列中移除项目并将其放回队列头部(即它将成为第一个获取的项目)

注意:顺序只能由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认另一个项目,则顺序将丢失。

UniqueQueue::rejectItems

参数: (数组) $items 包含任何可以转换为字符串的对象的数组

撤销多个项目 - 从处理队列中移除项目并将其按相反顺序放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。

注意:顺序只能由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认另一个项目,则顺序将丢失。

UniqueQueue::rejectBatch

撤销处理队列中剩余的所有项目,并按相反顺序将其放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。

UniqueQueue::reEnqueueTimedOutItems

参数:(int)$timeout 处理队列及其所有项目被认为失败后的秒数

您应该定期调用此方法(或reEnqueueAllItems)或将超时项目从处理队列放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。

注意:顺序只能由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“排在第二批次之后”。

UniqueQueue::reEnqueueAllItems

在从队列获取项目之前,您应该调用此方法,但仅当您有单个消费者时。此方法将所有处理队列的项目按相反顺序放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。请注意,此方法可以将仍然有效的处理队列(即未失败的那些)重新入队。

注意:顺序只能由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“排在第二批次之后”。

UniqueQueue::dropTimedOutItems

参数:(int)$timeout 处理队列及其所有项目被认为失败后的秒数

您应该定期调用此方法(或在从队列获取项目之前)以从处理队列中删除超时项目。

UniqueQueue::dropAllItems

在从队列获取项目之前,您应该调用此方法,但仅当您有单个消费者时。此方法将从所有处理队列中删除项目。请注意,此方法可以删除仍然有效的处理队列(即未失败的那些)。

UniqueQueue::clearQueue

从队列及其处理列表中删除所有项目。对于测试目的很有用。

如果您有某些独特的操作需要处理,例如每天处理一次,那么池(Pool)是您的绝佳工具。池中的项目按其分配的时间标记进行处理。当一个项目成功处理后,它的时间标记会按设定的周期(5分钟、小时、天、周等)增加,并且只有当它的时间标记低于当前时间时,它才会再次被处理。

如果池中的项目大多数时间都是恒定的,这很有用。那么使用池而不是队列是明智的选择。

Pool - 通用用法

您可以使用 addItemaddItems 方法将项目添加到池中。如果您想从池中获取项目,则可以使用 getItemsgetAllItems 方法。当您从池中获取项目时,您可以确认(成功;ackItemackItems)或简单地忽略它们。如果您不再希望处理某些项目,则可以使用 removeItemremoveItems 将它们从池中移除。您还可以使用 clearPool 方法清除整个池。

Pool::getRedisClient

返回使用的 Redis 客户端实例(PhpRQ\ClientInterface)。这在多线程环境中很有用,当您想重新连接连接时。

Pool::getCount

返回池中的项目数。

Pool::getCountToProcess

返回需要处理的项目数。

Pool::isInPool

参数:(数组|混合)$item 单个项目或可以转换为字符串的项目数组

检查给定的项目(或多个项目)是否在池中 - 返回布尔值。对于多个项目,返回由项目索引的布尔值数组。

Pool::addItem

参数: $item 任何可以转换为字符串的对象

将项目添加到池中。

Pool::addItems

参数: (数组) $items 包含任何可以转换为字符串的对象的数组

将多个项目添加到池中。

Pool::getItems

参数:(int)$size 从池中返回的项目数

返回 $size 个应处理的池项目。您可以将 $size 设置为您喜欢的任何数字 - 项目逐个从池中取出,因此服务器不会被压垮。

从池中取出的项目的时间标记会更改为一个浮点数(一个"处理"标记)。这样,您可以确保所有项目都得到处理(例如,如果处理过程崩溃)。

Pool::getAllItems

返回应处理的池中所有项目。项目逐个从池中取出,因此服务器不会被压垮。

从池中取出的项目的时间标记会更改为一个浮点数(一个"处理"标记)。这样,您可以确保所有项目都得到处理(例如,如果处理过程崩溃)。

Pool::ackItem

参数: $item 任何可以转换为字符串的对象

确认项目 - 增加项目的时间标记。

Pool::ackItems

参数: (数组) $items 包含任何可以转换为字符串的对象的数组

确认多个项目 - 增加项目的时间标记。

Pool::removeItem

参数: $item 任何可以转换为字符串的对象

从池中删除指定的项目。

Pool::removeItems

参数: (数组) $items 包含任何可以转换为字符串的对象的数组

从池中删除指定的项目。

Pool::clearPool

从池中删除所有项目。对于测试目的很有用。

基础

此功能在抽象类 Base 中实现,因此您可以从库中的所有对象类型(如 Queue、Pool 等)调用它。

safeExecution()

PhpRQ 方法的功能包装器,用于在数据包丢失的情况下避免抛出 Predis\Connection\ConnectionException。

示例

$queue = new \PhpRQ\Queue($predisConnection);

$items = $queue->safeExecution(
    function() use ($queue) {$queue->getItems(1);},
    function($returnValue) { /* what should happen in case of success */ },
    function() { /* what should happen in case of failure */ },
    $retryWait,  // Microseconds to wait between retries.
    $maxAttempts // Maximal count of attempts.
);