darkghosthunter/laralocker

此包已被弃用且不再维护。未建议替代包。

避免在队列作业、监听器和通知中发生竞态条件

v2.0.0 2021-04-12 05:07 UTC

This package is auto-updated.

Last update: 2022-02-27 21:38:04 UTC


README

此包的问题通过使用 唯一作业 得到解决。

此包不再维护。

Laralocker

John Doyle - Unsplash (UL) #dAW17ADBZEM

Latest Stable Version License Coverage Status Maintainability

使用此简单的锁定预留系统避免在作业、监听器和通知中发生 竞态条件

要求

  • PHP 7.4, 8.0 或更高版本
  • Laravel 7.x, 8.x 或更高版本

安装

启动 composer

composer require darkghosthunter/laralocker

何时使用 Laralocker?

任何可能存在 竞态条件 的情况。

例如,假设我们需要为已售出的票创建一个顺序序列号,如 AAAA-BBBB-CCCC。这是通过推送到队列的作业完成的。这引入了三个问题

  • 如果有两个或更多作业同时启动,它们会同时检查最后一个售出的票,并 保存具有相同序列号的下一张票
  • 如果我们使用队列中的 悲观锁定,我们可能会成为 死锁 的受害者(锁定行不能永久修改)。
  • 如果我们只有一个队列工作者,它将一次处理一张票。当一分钟内有大量用户购买 1,000 张票时,单个队列工作者将花费很长时间来处理所有票。音乐会五分钟后开始,希望你的 CPU 是顶级的 AMD EPYC!

使用此包,所有票可以同时分发,无需担心碰撞,只需为处理预留一个 槽位(如 ID)。

它是如何工作的

此包允许您的作业、监听器或通知成为 Lockable。只需添加三行代码,作业就会 前瞻 查找一个空闲的“槽位”,并预留它。

为了简单起见,我将将通知和监听器视为作业,因为所有这些都可以推送到队列。

一旦作业完成处理,它将释放“槽位”,并将该槽位标记为下一个作业的起点,这样它们就不会从头开始前瞻。

这在您的作业需要顺序数据时很有用:序列号、计算结果、时间戳,等等。

用法

  1. Lockable 接口添加到您的作业、通知或监听器中。
  2. 添加 HandlesLock 特性。
  3. 添加 LockerJobMiddleware 中间件。
  4. 实现 startFrom()next()

只有当作业实现了ShouldQueue接口时,作业中间件才会运行。如果您需要作业在同一个进程中运行而不绕过中间件,请使用dispatch_sync()MyJob::dispatchSync()

示例

以下是一个完整的示例,演示了一个简单的监听器,当给定的用户购买给定的音乐会门票时,它会处理序列密钥。完成后,用户将能够打印他的门票并在音乐会现场使用它。

<?php

namespace App\Listeners;

use App\Ticket;
use App\Events\TicketSold;
use App\Notifications\TicketAvailableNotification;
use DarkGhostHunter\Laralocker\Contracts\Lockable;
use DarkGhostHunter\Laralocker\LockerJobMiddleware;
use DarkGhostHunter\Laralocker\HandlesLockerSlot;
use Illuminate\Contracts\Queue\ShouldQueue;
use SerialGenerator\SerialGenerator;

class CreateTicket implements ShouldQueue, Lockable
{
    use HandlesLockerSlot;

    /**
     * Get the middleware the job should pass through.
     *
     * @return array
     */
    public function middleware()
    {
        return [new LockerJobMiddleware()];
    }

    /**
     * Return the starting slot for the Jobs.
     *
     * @return mixed
     */
    public function startFrom()
    {
        // Get the latest stored ticket serial key.
        return Ticket::latest()->value('serial_key');
    }

    /**
     * The next slot to check for availability.
     *
     * @param mixed $slot
     * @return mixed
     */
    public function next($slot)
    {
        // Ask our hypothetical generator to create a new serial from that.
        return SerialGenerator::baseSerial($slot)->getNextSerial();
    }

    /**
     * Handle the event.
     *
     * @param \App\Listeners\TicketSold $event
     * @return void
     */
    public function handle(TicketSold $event)
    {
        // Create a new Ticket instance with this locked slot value.  
        $ticket = Ticket::make([
            'serial_key' => $this->slot,
        ]);

        // Associate the Ticket to the Concert and the User 
        $ticket->concert()->associate($event->concert);
        $ticket->user()->associate($event->user);

        // Save the Ticket into the system
        $ticket->save();

        // Notify the user that his ticket bought is available
        $event->user->notify(
            new TicketAvailableNotification($ticket)        
        );
    }
}

让我们开始检查每个方法的作用。

startFrom()

当作业询问从哪里开始时,这将被用来获取“最后使用的槽位”。如果是第一个,返回null是可以的。

一旦检索到这个起始点,锁器会将它保存在缓存中。后续对起始点的调用将使用缓存,而不是在每个作业中执行此方法。

这仅在第一个作业触碰到队列时使用,或者如果缓存返回null(可能是由于你清除了它)。

你应该返回一个字符串,或者一个可以表示为字符串的对象实例

next($slot)

在检索到起始槽位后,队列工作者会将它放入此方法以获取下一个应由下一个作业预留的空闲槽位。它可以接收你设置的任何内容,甚至是null

如果下一个槽位已被另一个作业“预留”,它将递归调用next($slot),直到找到一个未被预留的槽位。

例如,如果您的初始槽位是null,则该方法将接收null,加上十,然后返回10。锁器将检查10是否已被预留,如果它不可用,则再次调用next($slot),但这次使用10,依此类推,直到找到一个未被预留的槽位,比如60

cache()(可选)

这完全是可选的。如果你想特定作业使用另一个缓存存储,可以在这里返回它。只需记住,在使用之前,请先正确配置你想要在应用程序中使用的缓存驱动器

如果你的缓存与标记兼容,比如redismemcached,你可以在这里透明地设置你的标记。这允许你在出错时清除标记,或者对其有更细粒度的控制。

/**
 * Use a non-default Cache repository for handling slots (optional)
 *
 * @return \Illuminate\Contracts\Cache\Repository
 */
public function cache()
{
    return Cache::store('redis')->tag('tickets_queue');
}

$slotTtl(可选)

同样,这也是完全可选的。默认情况下,槽位在缓存中保留60秒。如果你的作业需要花费很长时间,你可以设置一个更大的ttl,比如10分钟。

总是建议设置一个最大值,以避免槽位在缓存存储中“爬升”。

/**
 * Maximum Slot reservation time
 *
 * @var \Illuminate\Support\Carbon|int
 */
public $slotTtl = 180;

如果你没有使用$slotTtl,锁器将自动从$timeoutretryUntil()或配置文件中的默认值中按此顺序获取它。

$prefix(可选)

同样可选,这管理了作业槽位预留所使用的前缀,以避免与其他缓存键冲突。

/**
 * Prefix for slots reservations
 *
 * @var string
 */
public string $prefix = 'ticket_locking';

配置

Laralocker无需额外操作即可正常工作,但如果你需要更改默认配置,只需发布配置文件。

php artisan vendor:publish --provider=DarkGhostHunter\Laralocker\LaralockerServiceProvider --tag="config"

你将在配置目录中找到一个laralocker.php文件,内容如下:

<?php 
return [
    'cache' => null,
    'prefix' => 'queue_locker',
    'ttl' => 60,
];

内容基本上是自解释的,但让我们逐一描述。

缓存

return [
    'cache' => 'redis',
];

当设置为 null 时,Laralocker 使用应用程序的默认缓存。在新的 Laravel 安装中,它是 file 存储。

如果您需要高性能,您可能希望切换到 redissqsmemecached 或您应用程序可用的任何其他存储。这必须是您 config/cache.php 文件中描述的 stores 之一。

前缀

return [
    'prefix' => 'app_slots_queue',
];

为了避免与其他缓存键冲突,Laralock 将使用一个字符串为槽位添加前缀。如果您在应用程序中出于任何原因使用了此前缀,您可能希望更改它。

槽位预留有效期

return [
    'ttl' => 300,
];

在缓存中预留的槽位始终有一个最大有效期,在此之后将自动释放。这是一种避免缓存中出现僵尸预留的机制。

当然,一些作业可能需要一段时间才能处理。如果您的作业可能需要很长时间,您可能希望将其扩展到安全值。

释放和清除槽位

当作业失败时,不会调用 releaseSlot()。这将允许在作业失败时不要更新最后一个槽位,并将槽位保留到过期。

如果您释放的作业没有使用 Queueable 特性,请确保在作业失败时调用 clearSlot()。这将删除槽位预留,以便其他作业可以预订它。

详细内部工作原理

好奇它是如何工作的吗?系好安全带

处理 作业时,作业会将自身传递给 Locker。此类将检查使用缓存使用的最后一个作业槽位。

如果没有使用最后一个槽位(因为它是队列中的第一个,或者缓存已被刷新),它将调用 startFrom() 并将返回值保存到槽位中,永远保存到缓存中,以避免每次都调用 startFrom()

接下来,Locker 将初始槽位传递给 next($slot),然后检查结果槽位是否空闲。它将递归调用 next($slot),直到找到非预留槽位。

找到后,Locker 将使用缓存保留它,为缓存键设置保存的存活时间,以避免在缓存中保留僵尸预留。

Locker 将将使用的槽位复制到作业的 $slot 属性中,然后作业继续执行。这样,开发人员就可以在作业中使用槽位(如我们的 Ticket 示例所示)。

一旦作业调用 releaseSlot(),Locker 将将 $slot 保存为缓存中使用的最后一个槽位,永远保存。这将允许其他作业从该槽位开始,而不是从第一个槽位开始检查,并遇到在缓存中过期的未预留槽位。

如果作业失败,则不会更新“最后一个槽位”,并且槽位将保留到过期。

如果槽位已经保存为最后一个,它将比较作业开始时的日期和时间戳,并且只有在它更近时才更新。这允许不保存“较旧”的槽位,从而使槽位可以继续前进。

最后,它将“释放”缓存中的当前预留槽位,避免缓存中的僵尸键。

许可证

MIT 许可证(MIT)。有关更多信息,请参阅 许可证文件