heureka / php-rq
基于 Redis 的几个简单且可靠的队列
Requires
- php: >=7.3.0
- predis/predis: 2.0.0
Requires (Dev)
- mockery/mockery: ~1.2.3
- nette/finder: ~2.5
- phpunit/phpunit: ~7.5.15
README
PhpRQ
这个库是一组 PHP 文件和 Lua 脚本,它使您能够轻松实现基于 Redis 的队列系统。由于 Lua 脚本使用频繁,该库完全使用集成测试进行测试。该库中实现了多种类型的队列,因此您可以选择最适合您需求的队列。
所有队列在多线程环境中都能很好地工作。您需要注意的唯一一点是,在有多个消费者的情况下,无法保证项目的顺序。例如,如果多个消费者意外退出,然后您使用重新入队方法将项目放回队列中,那么您很可能会丢失项目的顺序。如果您想依赖项目的顺序,那么您必须一次只使用一个消费者,在失败后拒绝整个批次,并在获取另一块项目之前重新入队所有内容。
队列
这是最基本类型的队列。如果您不需要队列中项目的唯一性,那么您可能想使用队列。
如果您随机拒绝项目,或者在多线程环境中使用队列,则没有可能保证项目的顺序。队列会尽力维护项目的顺序,但您必须记住这只是 尽力而为 的方法。只有当您使用单个消费者,在失败后拒绝整个批次,并在获取新项目之前重新入队失败的项目时,才能保证项目的顺序。
队列 - 通用用法
您可以使用 addItem 和 addItems 方法向队列中添加项目。如果您想从队列中获取项目,则可以使用 getItems 和 getAllItems 方法。当您从队列中获取项目时,您必须承认(成功;ackItem,ackItems)或拒绝(失败;rejectItem,rejectItems,rejectBatch)它们。如果您拒绝一个项目,则它将被移动到队列的头部。如果您错过承认或撤销项目,则它将保留在处理队列中,直到您使用 reEnqueueTimedOutItems,reEnqueueAllItems,dropTimedOutItems 或 dropAllItems(取决于您的需求)来清除它。您必须以某种方式清除失败的处理队列,否则遗忘的数据会填满您的 Redis。
Queue::getRedisClient
返回使用的 Redis 客户端实例(PhpRQ\ClientInterface)。这在多线程环境中很有用,当您想重新连接连接时。
Queue::getCount
返回队列中项目的数量。
Queue::addItem
参数: $item 任何可以转换为字符串的对象
向队列中添加项目。
Queue::addItems
参数: (数组) $items 包含任何可以转换为字符串的对象的数组
向队列中添加多个项目。
Queue::getItems
参数: (int) $size 您希望从队列中返回的项目数量
返回队列中的 $size 个元素。您可以设置任何您喜欢的 $size,项目将以固定的安全大小分块从队列中获取,以防止服务器过载。
从队列中获取的项目将被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。
Queue::getAllItems
返回队列中的所有项目。项目将以固定的安全大小分块从队列中获取,以防止服务器过载。
从队列中获取的项目将被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。
Queue::ackItem
参数: $item 任何可以转换为字符串的对象
确认项目 - 从处理队列中移除项目。
Queue::ackItems
参数: (数组) $items 包含任何可以转换为字符串的对象的数组
确认多个项目 - 从处理队列中移除它们。
Queue::rejectItem
参数: $item 任何可以转换为字符串的对象
撤销项目 - 从处理队列中移除项目并将其放回队列头部(即它将成为第一个获取的项目)
注意:顺序只能由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认另一个项目,则顺序将丢失。
Queue::rejectItems
参数: (数组) $items 包含任何可以转换为字符串的对象的数组
撤销多个项目 - 从处理队列中移除项目并将其按相反顺序放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。
注意:顺序只能由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认另一个项目,则顺序将丢失。
Queue::rejectBatch
撤销处理队列中剩余的所有项目,并按相反顺序将其放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。
Queue::reEnqueueTimedOutItems
参数:(int)$timeout 处理队列及其所有项目被认为失败后的秒数
您应该定期调用此方法(或reEnqueueAllItems)或将超时项目从处理队列放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。
注意:顺序只能由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“排在第二批次之后”。
Queue::reEnqueueAllItems
在从队列获取项目之前,您应该调用此方法,但仅当您有单个消费者时。此方法将所有处理队列的项目按相反顺序放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。请注意,此方法可以将仍然有效的处理队列(即未失败的那些)重新入队。
注意:顺序只能由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“排在第二批次之后”。
Queue::dropTimedOutItems
参数:(int)$timeout 处理队列及其所有项目被认为失败后的秒数
您应该定期调用此方法(或在从队列获取项目之前)以从处理队列中删除超时项目。
Queue::dropAllItems
在从队列获取项目之前,您应该调用此方法,但仅当您有单个消费者时。此方法将从所有处理队列中删除项目。请注意,此方法可以删除仍然有效的处理队列(即未失败的那些)。
Queue::clearQueue
从队列及其处理列表中删除所有项目。对于测试目的很有用。
UniqueQueue
此类型队列在您不需要推入队列的所有项目,只需每种“类型”中的一个项目时很有用。例如,如果您想为按类别刷新缓存的产品构建一个队列,那么您可能希望将类别ID发送到队列。如果处理缓存刷新的进程太慢,那么您可能会向队列发送多个更新请求(类别ID)。在这种情况下,不需要刷新缓存两次 - 您只需要一次,但进程还没有处理到您的项目。解决方案是忽略第二个项目。这正是UniqueQueue的目的。它保证了队列中项目的唯一性。
如果您随机拒绝项目或在多线程环境中使用队列,则无法保证项目的顺序。UniqueQueue 尽力保持项目的顺序,但请记住这只是最佳努力方法。只有在使用单个消费者、在失败后拒绝整个批次并在获取新项目之前重新入队失败项目时,才能保证项目的顺序。
UniqueQueue - 通用用法
您可以使用 addItem 和 addItems 方法向队列中添加项目。如果您想从队列中获取项目,则可以使用 getItems 和 getAllItems 方法。当您从队列中获取项目时,您必须承认(成功;ackItem,ackItems)或拒绝(失败;rejectItem,rejectItems,rejectBatch)它们。如果您拒绝一个项目,则它将被移动到队列的头部。如果您错过承认或撤销项目,则它将保留在处理队列中,直到您使用 reEnqueueTimedOutItems,reEnqueueAllItems,dropTimedOutItems 或 dropAllItems(取决于您的需求)来清除它。您必须以某种方式清除失败的处理队列,否则遗忘的数据会填满您的 Redis。
UniqueQueue::getRedisClient
返回使用的 Redis 客户端实例(PhpRQ\ClientInterface)。这在多线程环境中很有用,当您想重新连接连接时。
UniqueQueue::getCount
返回队列中项目的数量。
UniqueQueue::addItem
参数: $item 任何可以转换为字符串的对象
向队列中添加项目。
UniqueQueue::addItems
参数: (数组) $items 包含任何可以转换为字符串的对象的数组
向队列中添加多个项目。
UniqueQueue::getItems
参数: (int) $size 您希望从队列中返回的项目数量
返回队列中的 $size 个元素。您可以设置任何您喜欢的 $size,项目将以固定的安全大小分块从队列中获取,以防止服务器过载。
从队列中获取的项目将被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。
UniqueQueue::getAllItems
返回队列中的所有项目。项目将以固定的安全大小分块从队列中获取,以防止服务器过载。
从队列中获取的项目将被添加到处理队列中。这样,如果进程意外退出,则可以使用重新入队方法保留从队列中获取的项目。
UniqueQueue::ackItem
参数: $item 任何可以转换为字符串的对象
确认项目 - 从处理队列中移除项目。
UniqueQueue::ackItems
参数: (数组) $items 包含任何可以转换为字符串的对象的数组
确认多个项目 - 从处理队列中移除它们。
UniqueQueue::rejectItem
参数: $item 任何可以转换为字符串的对象
撤销项目 - 从处理队列中移除项目并将其放回队列头部(即它将成为第一个获取的项目)
注意:顺序只能由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认另一个项目,则顺序将丢失。
UniqueQueue::rejectItems
参数: (数组) $items 包含任何可以转换为字符串的对象的数组
撤销多个项目 - 从处理队列中移除项目并将其按相反顺序放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。
注意:顺序只能由尽力而为方法保证,例如,如果您拒绝了一些项目然后确认另一个项目,则顺序将丢失。
UniqueQueue::rejectBatch
撤销处理队列中剩余的所有项目,并按相反顺序将其放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。
UniqueQueue::reEnqueueTimedOutItems
参数:(int)$timeout 处理队列及其所有项目被认为失败后的秒数
您应该定期调用此方法(或reEnqueueAllItems)或将超时项目从处理队列放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。
注意:顺序只能由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“排在第二批次之后”。
UniqueQueue::reEnqueueAllItems
在从队列获取项目之前,您应该调用此方法,但仅当您有单个消费者时。此方法将所有处理队列的项目按相反顺序放回队列头部(即它们将成为第一个获取的项目,顺序与之前获取的顺序相同)。请注意,此方法可以将仍然有效的处理队列(即未失败的那些)重新入队。
注意:顺序只能由尽力而为方法保证,例如,如果第一批次失败,第二批次成功,第三批次再次失败,项目的顺序无法保留,因为第一批次现在“排在第二批次之后”。
UniqueQueue::dropTimedOutItems
参数:(int)$timeout 处理队列及其所有项目被认为失败后的秒数
您应该定期调用此方法(或在从队列获取项目之前)以从处理队列中删除超时项目。
UniqueQueue::dropAllItems
在从队列获取项目之前,您应该调用此方法,但仅当您有单个消费者时。此方法将从所有处理队列中删除项目。请注意,此方法可以删除仍然有效的处理队列(即未失败的那些)。
UniqueQueue::clearQueue
从队列及其处理列表中删除所有项目。对于测试目的很有用。
池
如果您有某些独特的操作需要处理,例如每天处理一次,那么池(Pool)是您的绝佳工具。池中的项目按其分配的时间标记进行处理。当一个项目成功处理后,它的时间标记会按设定的周期(5分钟、小时、天、周等)增加,并且只有当它的时间标记低于当前时间时,它才会再次被处理。
如果池中的项目大多数时间都是恒定的,这很有用。那么使用池而不是队列是明智的选择。
Pool - 通用用法
您可以使用 addItem 和 addItems 方法将项目添加到池中。如果您想从池中获取项目,则可以使用 getItems 和 getAllItems 方法。当您从池中获取项目时,您可以确认(成功;ackItem,ackItems)或简单地忽略它们。如果您不再希望处理某些项目,则可以使用 removeItem 和 removeItems 将它们从池中移除。您还可以使用 clearPool 方法清除整个池。
Pool::getRedisClient
返回使用的 Redis 客户端实例(PhpRQ\ClientInterface)。这在多线程环境中很有用,当您想重新连接连接时。
Pool::getCount
返回池中的项目数。
Pool::getCountToProcess
返回需要处理的项目数。
Pool::isInPool
参数:(数组|混合)$item 单个项目或可以转换为字符串的项目数组
检查给定的项目(或多个项目)是否在池中 - 返回布尔值。对于多个项目,返回由项目索引的布尔值数组。
Pool::addItem
参数: $item 任何可以转换为字符串的对象
将项目添加到池中。
Pool::addItems
参数: (数组) $items 包含任何可以转换为字符串的对象的数组
将多个项目添加到池中。
Pool::getItems
参数:(int)$size 从池中返回的项目数
返回 $size 个应处理的池项目。您可以将 $size 设置为您喜欢的任何数字 - 项目逐个从池中取出,因此服务器不会被压垮。
从池中取出的项目的时间标记会更改为一个浮点数(一个"处理"标记)。这样,您可以确保所有项目都得到处理(例如,如果处理过程崩溃)。
Pool::getAllItems
返回应处理的池中所有项目。项目逐个从池中取出,因此服务器不会被压垮。
从池中取出的项目的时间标记会更改为一个浮点数(一个"处理"标记)。这样,您可以确保所有项目都得到处理(例如,如果处理过程崩溃)。
Pool::ackItem
参数: $item 任何可以转换为字符串的对象
确认项目 - 增加项目的时间标记。
Pool::ackItems
参数: (数组) $items 包含任何可以转换为字符串的对象的数组
确认多个项目 - 增加项目的时间标记。
Pool::removeItem
参数: $item 任何可以转换为字符串的对象
从池中删除指定的项目。
Pool::removeItems
参数: (数组) $items 包含任何可以转换为字符串的对象的数组
从池中删除指定的项目。
Pool::clearPool
从池中删除所有项目。对于测试目的很有用。
基础
此功能在抽象类 Base
中实现,因此您可以从库中的所有对象类型(如 Queue、Pool 等)调用它。
safeExecution()
PhpRQ 方法的功能包装器,用于在数据包丢失的情况下避免抛出 Predis\Connection\ConnectionException。
示例
$queue = new \PhpRQ\Queue($predisConnection); $items = $queue->safeExecution( function() use ($queue) {$queue->getItems(1);}, function($returnValue) { /* what should happen in case of success */ }, function() { /* what should happen in case of failure */ }, $retryWait, // Microseconds to wait between retries. $maxAttempts // Maximal count of attempts. );