zxin/ php-rq
基于Redis的几个简单可靠的队列
Requires
- php: >=7.1.0
- predis/predis: ^1.1.7
Requires (Dev)
- mockery/mockery: ^1.2.3
- nette/finder: ~2.5
- phpunit/phpunit: ~7.5.15 | ^8.5.17
README
PhpRQ
这个库是一系列PHP文件和Lua脚本,使您能够轻松实现基于Redis的排队系统。该库通过集成测试进行完全测试(因为Lua脚本的广泛使用)。该库实现了多种类型的队列,因此您可以选择最适合您需求的一种。
所有队列在多线程环境中都能很好地工作。您需要注意的唯一一件事是,如果有多个消费者,则无法保证项目顺序。例如,如果有多个消费者意外退出,然后您使用重新入队方法将项目放回队列,那么您很可能会丢失项目顺序。如果您想依赖项目顺序,则必须一次只使用一个消费者,在失败后拒绝整个批次,并在获取另一部分项目之前重新入队所有内容。
队列
这是最基本的队列类型。如果您不需要队列中项目的唯一性,那么您可能想使用队列。
如果您随机拒绝项目或队列在多线程环境中使用,则**无法保证项目顺序**。队列会尽可能保持项目顺序,但您必须记住,这只是一种**最佳努力**的方法。只有当您使用单个消费者、在失败后拒绝整个批次并在获取新项目之前重新入队失败的项目时,才能保证项目顺序。
队列 - 通用用法
您可以使用 addItem 和 addItems 方法将项目添加到队列中。如果您想从队列中获取项目,则可以使用 getItems 和 getAllItems 方法。当您从队列中获取项目时,您必须承认(成功;ackItem,ackItems)或拒绝(失败;rejectItem,rejectItems,rejectBatch)它们。如果您拒绝了一个项目,则该项目将被移动到队列的头部。如果您错过了承认或撤销一个项目,则它将保留在处理队列中,直到您使用 reEnqueueTimedOutItems,reEnqueueAllItems,dropTimedOutItems 或 dropAllItems(根据您的需求)清除它。您必须以某种方式清除失败的处理队列,否则遗忘的数据会填满您的Redis。
Queue::getRedisClient
返回使用的Redis客户端实例(PhpRQ\ClientInterface)。这在多线程环境中很有用,当时您想重新连接连接。
Queue::getCount
返回队列中的项目数量。
Queue::addItem
参数: $item 可以转换为字符串的任何内容
将项目添加到队列中。
Queue::addItems
参数: (array) $items 包含可以转换为字符串的任何内容的数组
将多个项目添加到队列中。
Queue::getItems
参数: (int) $size 您希望从队列中返回的项目数量
返回队列中的 $size 个元素。您可以设置 $size 为您喜欢的任何数字 - 项目将通过固定安全大小分块从队列中获取,以便服务器不会被压垮。
从队列中获取的项目被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。
Queue::getAllItems
返回队列中的所有项目。项目以固定安全大小的块从队列中检索,以便服务器不会被压垮。
从队列中获取的项目被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。
Queue::ackItem
参数: $item 可以转换为字符串的任何内容
确认项目 - 从处理队列中移除项目。
Queue::ackItems
参数: (array) $items 包含可以转换为字符串的任何内容的数组
确认多个项目 - 从处理队列中移除它们。
Queue::rejectItem
参数: $item 可以转换为字符串的任何内容
撤销项目 - 从处理队列中移除项目并将其放回队列头部(即它将成为下一个检索的项目)。
注意:顺序仅由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认了另一个,顺序就会丢失。
Queue::rejectItems
参数: (array) $items 包含可以转换为字符串的任何内容的数组
撤销多个项目 - 从处理队列中移除这些项目,并以相反的顺序将它们放回队列头部(即它们将成为下一个检索的项目,顺序与之前检索的顺序相同)。
注意:顺序仅由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认了另一个,顺序就会丢失。
Queue::rejectBatch
撤销处理队列中剩余的所有项目,并以相反的顺序将它们放回队列头部(即它们将成为下一个检索的项目,顺序与之前检索的顺序相同)。
Queue::reEnqueueTimedOutItems
参数: (int) $timeout 处理队列及其中的所有项目被视为失败后的秒数
您应该定期调用此方法(或在从队列中检索项目之前)将超时的项目从处理队列放回队列头部,以相反的顺序(即它们将成为下一个检索的项目,顺序与之前检索的顺序相同)。
注意:顺序仅由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“跟在第二批次之后”。
Queue::reEnqueueAllItems
在从队列中检索项目之前,如果您只有一个消费者,应调用此方法。此方法将所有处理队列中的项目以相反的顺序放回队列头部(即它们将成为下一个检索的项目,顺序与之前检索的顺序相同)。请注意,此方法可以将仍有效的处理队列重新入队(即不是失败的那些)。
注意:顺序仅由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“跟在第二批次之后”。
Queue::dropTimedOutItems
参数: (int) $timeout 处理队列及其中的所有项目被视为失败后的秒数
您应该定期调用此方法(或在从队列中检索项目之前)以从处理队列中丢弃超时的项目。
Queue::dropAllItems
在从队列中检索项目之前,如果您只有一个消费者,应调用此方法。此方法从所有处理队列中丢弃项目。请注意,此方法可以丢弃仍有效的处理队列(即不是失败的那些)。
Queue::clearQueue
从队列及其处理列表中删除所有项目。对于测试目的很有用。
UniqueQueue
这种队列在有多个项目需要入队时非常有用,但只需保留每种“类型”的一个项目。假设您想为按类别刷新缓存的产品创建一个队列。在这种情况下,您可能需要将类别ID发送到队列。如果处理刷新缓存的进程太慢,那么可能会向队列发送多个更新请求(类别ID)。在这种情况下,不需要刷新缓存两次 - 您只需要一次,但进程尚未到达您的项目。解决方案是忽略第二个项目。这正是UniqueQueue的目的。它确保队列中项目的一致性。
如果随机拒绝项目或队列在多线程环境中使用,则无法保证项目的顺序。UniqueQueue会尽可能保持项目的顺序,但您必须记住,这只是尽最大努力的方法。只有当您使用单消费者、在失败后拒绝整个批次并在新项目到来之前重新入队失败的项目时,才能保证项目的顺序。
UniqueQueue - 一般用法
您可以使用 addItem 和 addItems 方法将项目添加到队列中。如果您想从队列中获取项目,则可以使用 getItems 和 getAllItems 方法。当您从队列中获取项目时,您必须承认(成功;ackItem,ackItems)或拒绝(失败;rejectItem,rejectItems,rejectBatch)它们。如果您拒绝了一个项目,则该项目将被移动到队列的头部。如果您错过了承认或撤销一个项目,则它将保留在处理队列中,直到您使用 reEnqueueTimedOutItems,reEnqueueAllItems,dropTimedOutItems 或 dropAllItems(根据您的需求)清除它。您必须以某种方式清除失败的处理队列,否则遗忘的数据会填满您的Redis。
UniqueQueue::getRedisClient
返回使用的Redis客户端实例(PhpRQ\ClientInterface)。这在多线程环境中很有用,当时您想重新连接连接。
UniqueQueue::getCount
返回队列中的项目数量。
UniqueQueue::addItem
参数: $item 可以转换为字符串的任何内容
将项目添加到队列中。
UniqueQueue::addItems
参数: (array) $items 包含可以转换为字符串的任何内容的数组
将多个项目添加到队列中。
UniqueQueue::getItems
参数: (int) $size 您希望从队列中返回的项目数量
返回队列中的 $size 个元素。您可以设置 $size 为您喜欢的任何数字 - 项目将通过固定安全大小分块从队列中获取,以便服务器不会被压垮。
从队列中获取的项目被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。
UniqueQueue::getAllItems
返回队列中的所有项目。项目以固定安全大小的块从队列中检索,以便服务器不会被压垮。
从队列中获取的项目被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。
UniqueQueue::ackItem
参数: $item 可以转换为字符串的任何内容
确认项目 - 从处理队列中移除项目。
UniqueQueue::ackItems
参数: (array) $items 包含可以转换为字符串的任何内容的数组
确认多个项目 - 从处理队列中移除它们。
UniqueQueue::rejectItem
参数: $item 可以转换为字符串的任何内容
撤销项目 - 从处理队列中移除项目并将其放回队列头部(即它将成为下一个检索的项目)。
注意:顺序仅由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认了另一个,顺序就会丢失。
UniqueQueue::rejectItems
参数: (array) $items 包含可以转换为字符串的任何内容的数组
撤销多个项目 - 从处理队列中移除这些项目,并以相反的顺序将它们放回队列头部(即它们将成为下一个检索的项目,顺序与之前检索的顺序相同)。
注意:顺序仅由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认了另一个,顺序就会丢失。
UniqueQueue::rejectBatch
撤销处理队列中剩余的所有项目,并以相反的顺序将它们放回队列头部(即它们将成为下一个检索的项目,顺序与之前检索的顺序相同)。
UniqueQueue::reEnqueueTimedOutItems
参数: (int) $timeout 处理队列及其中的所有项目被视为失败后的秒数
您应该定期调用此方法(或在从队列中检索项目之前)将超时的项目从处理队列放回队列头部,以相反的顺序(即它们将成为下一个检索的项目,顺序与之前检索的顺序相同)。
注意:顺序仅由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“跟在第二批次之后”。
UniqueQueue::reEnqueueAllItems
在从队列中检索项目之前,如果您只有一个消费者,应调用此方法。此方法将所有处理队列中的项目以相反的顺序放回队列头部(即它们将成为下一个检索的项目,顺序与之前检索的顺序相同)。请注意,此方法可以将仍有效的处理队列重新入队(即不是失败的那些)。
注意:顺序仅由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“跟在第二批次之后”。
UniqueQueue::dropTimedOutItems
参数: (int) $timeout 处理队列及其中的所有项目被视为失败后的秒数
您应该定期调用此方法(或在从队列中检索项目之前)以从处理队列中丢弃超时的项目。
UniqueQueue::dropAllItems
在从队列中检索项目之前,如果您只有一个消费者,应调用此方法。此方法从所有处理队列中丢弃项目。请注意,此方法可以丢弃仍有效的处理队列(即不是失败的那些)。
UniqueQueue::clearQueue
从队列及其处理列表中删除所有项目。对于测试目的很有用。
池
如果您需要每天执行某些独特的操作,那么Pool是一个很好的工具。池中的项目按照它们分配的时间标记进行处理。当项目成功处理后,它的时间标记会增加设定的周期(5分钟、小时、天、周等...),直到时间标记低于实际时间,它才不会再次被处理。
如果池中的项目大部分时间都是恒定的,那么使用Pool而不是队列是明智的。
Pool - 一般用法
您可以使用addItem和addItems方法向池中添加项目。如果您想从池中获取项目,则可以使用getItems和getAllItems方法。当您从池中获取项目时,您可以确认(成功;ackItem,ackItems)或忽略它们。如果您不再希望处理某些项目,则可以使用removeItem和removeItems将它们从池中删除。您还可以使用clearPool方法清除整个池。
Pool::getRedisClient
返回使用的Redis客户端实例(PhpRQ\ClientInterface)。这在多线程环境中很有用,当时您想重新连接连接。
Pool::getCount
返回池中的项目数量。
Pool::getCountToProcess
返回需要处理的项目数量。
Pool::isInPool
参数:(数组|混合)$item 单个项目或可以转换为字符串的数组项目
检查给定项目(或多个项目)是否在池中 - 返回布尔值。对于多个项目,返回由项目索引的布尔值数组。
Pool::addItem
参数: $item 可以转换为字符串的任何内容
将项目添加到池中。
Pool::addItems
参数: (array) $items 包含可以转换为字符串的任何内容的数组
将多个项目添加到池中。
Pool::getItems
参数:(整数)$size 从池中返回的项目数量
返回池中应该处理的$size个元素。您可以设置$size为任何您喜欢的数字 - 项目会一个接一个地从池中获取,这样服务器就不会过载。
从池中获取的项目的时间标记已更改为浮点数(一个“处理”标签)。这样您可以确保所有项目都得到处理(例如如果进程崩溃)。
Pool::getAllItems
返回池中所有应该处理的项目。项目逐个从池中获取,这样服务器就不会过载。
从池中获取的项目的时间标记已更改为浮点数(一个“处理”标签)。这样您可以确保所有项目都得到处理(例如如果进程崩溃)。
Pool::ackItem
参数: $item 可以转换为字符串的任何内容
确认项目 - 增加项目的时间标记。
Pool::ackItems
参数: (array) $items 包含可以转换为字符串的任何内容的数组
确认多个项目 - 增加项目的时间标记。
Pool::removeItem
参数: $item 可以转换为字符串的任何内容
从池中删除指定项目。
Pool::removeItems
参数: (array) $items 包含可以转换为字符串的任何内容的数组
从池中删除指定项目。
Pool::clearPool
从池中删除所有项目。在测试目的上很有用。
Base
此功能实现在抽象类Base
中,因此您可以从库中所有对象类型(如队列、池等)调用它。
safeExecution()
PhpRQ方法的调用功能包装器,用于重试,以避免在数据包丢失的情况下抛出Predis\Connection\ConnectionException。
示例
$queue = new \PhpRQ\Queue($predisConnection); $items = $queue->safeExecution( function() use ($queue) {$queue->getItems(1);}, function($returnValue) { /* what should happen in case of success */ }, function() { /* what should happen in case of failure */ }, $retryWait, // Microseconds to wait between retries. $maxAttempts // Maximal count of attempts. );