gaozhitw/laravel-sqs-fifo-queue

为Amazon SQS FIFO队列添加Laravel队列驱动。

dev-master 2022-10-29 10:15 UTC

This package is auto-updated.

Last update: 2024-09-29 06:14:55 UTC


README

Latest Version on Packagist Software License Build Status Coverage Status Quality Score Total Downloads

此Laravel/Lumen包提供了一个用于Amazon SQS FIFO队列的队列驱动程序。虽然Laravel可以与Amazon的SQS标准队列无缝工作,但FIFO队列略有不同,并且Laravel无法正确处理。这就是这个包的用武之地。

版本

此包已在Laravel 4.1至Laravel 8.x上进行了测试,尽管它可能在发布后继续在后续版本上工作。本节将更新以反映包已实际测试的版本。

安装

通过Composer

$ composer require shiftonelabs/laravel-sqs-fifo-queue

一旦更新了Composer并安装了包,就需要加载服务提供者。

Laravel 5.5+、6.x、7.x、8.x(5.5、5.6、5.7、5.8、6.x、7.x、8.x)

此包使用自动包发现。服务提供者将自动注册。

Laravel 5.0 - 5.4

打开 config/app.php 并将以下行添加到提供者数组中

ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider::class,

Laravel 4(4.1、4.2)

打开 app/config/app.php 并将以下行添加到提供者数组中

'ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider',

Lumen 5、6、7、8(5.0、5.1、5.2、5.3、5.4、5.5、5.6、5.7、5.8、6.x、7.x、8.x)

打开 bootstrap/app.php 并在“注册服务提供者”部分下添加以下行

$app->register(ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider::class);

配置

Laravel/Lumen 5.1+、6.x、7.x、8.x(5.1、5.2、5.3、5.4、5.5、5.6、5.7、5.8、6.x、7.x、8.x)

如果使用Lumen,请创建项目根目录下的config目录(如果您还没有的话)。接下来,将vendor/laravel/lumen-framework/config/queue.php复制到config/queue.php

现在,对于Laravel和Lumen,打开config/queue.php并在connections数组中添加以下条目。

'sqs-fifo' => [
    'driver' => 'sqs-fifo',
    'key' => env('SQS_KEY'),
    'secret' => env('SQS_SECRET'),
    'prefix' => env('SQS_PREFIX'),
    'suffix' => env('SQS_SUFFIX'),
    'queue' => 'your-queue-name',    // ex: queuename.fifo
    'region' => 'your-queue-region', // ex: us-east-2
    'group' => 'default',
    'deduplicator' => 'unique',
    'allow_delay' => env('SQS_ALLOW_DELAY'),
],

示例 .env文件

SQS_KEY=ABCDEFGHIJKLMNOPQRST
SQS_SECRET=1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
SQS_PREFIX=https://sqs.us-east-2.amazonaws.com/123456789012
SQS_ALLOW_DELAY=false

如果您希望这是默认连接,还可以在>= 5.7的.env文件中将QUEUE_CONNECTION=sqs-fifo设置为默认连接,或者对于< 5.7.env文件,设置为QUEUE_DRIVER=sqs-fifo

Laravel/Lumen 5.0

如果使用Lumen,请创建项目根目录下的config目录(如果您还没有的话)。接下来,将vendor/laravel/lumen-framework/config/queue.php复制到config/queue.php

现在,对于Laravel和Lumen,打开config/queue.php并在connections数组中添加以下条目

'sqs-fifo' => [
    'driver' => 'sqs-fifo',
    'key'    => env('SQS_KEY'),
    'secret' => env('SQS_SECRET'),
    'queue'  => env('SQS_PREFIX').'/your-queue-name',
    'region' => 'your-queue-region',
    'group' => 'default',
    'deduplicator' => 'unique',
    'allow_delay' => env('SQS_ALLOW_DELAY'),
],

示例 .env文件

SQS_KEY=ABCDEFGHIJKLMNOPQRST
SQS_SECRET=1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
SQS_PREFIX=https://sqs.us-east-2.amazonaws.com/123456789012
SQS_ALLOW_DELAY=false

如果您希望这是默认连接,还可以在.env文件中将QUEUE_DRIVER=sqs-fifo设置为默认连接。

Laravel 4

打开 app/config/queue.php 并在 connections 数组中添加以下条目

'sqs-fifo' => array(
    'driver' => 'sqs-fifo',
    'key'    => 'your-public-key',   // ex: ABCDEFGHIJKLMNOPQRST
    'secret' => 'your-secret-key',   // ex: 1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
    'queue'  => 'your-queue-url',    // ex: https://sqs.us-east-2.amazonaws.com/123456789012/queuename.fifo
    'region' => 'your-queue-region', // ex: us-east-2
    'group' => 'default',
    'deduplicator' => 'unique',
    'allow_delay' => false,
),

如果您希望这是默认连接,还可以将 'default' 键更新为 'sqs-fifo'

Capsule

如果在使用Lumen/Laravel之外使用illuminate\queue组件Capsule

use Illuminate\Queue\Capsule\Manager as Queue;
use ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider;

$queue = new Queue;

$queue->addConnection([
    'driver' => 'sqs-fifo',
    'key'    => 'your-public-key',   // ex: ABCDEFGHIJKLMNOPQRST
    'secret' => 'your-secret-key',   // ex: 1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
    /**
     * Set "prefix"/"suffix" and/or "queue" depending on version, as described for Laravel versions above
     * 'prefix' => 'your-prefix',
     * 'suffix' => 'your-suffix',
     * 'queue' => 'your-queue-name',
     */
    'region' => 'your-queue-region', // ex: us-east-2
    'group' => 'default',
    'deduplicator' => 'unique',
    'allow_delay' => false,
], 'sqs-fifo');

// Make this Capsule instance available globally via static methods... (optional)
$queue->setAsGlobal();

// Register the 'queue' alias in the Container, then register the SQS FIFO provider.
$app = $queue->getContainer();
$app->instance('queue', $queue->getQueueManager());
(new LaravelSqsFifoQueueServiceProvider($app))->register();

凭证

如果使用提供AWS凭证的替代选项(例如,使用AWS凭证文件),则可以省略keysecret配置选项。有关替代选项的更多信息,请参阅此处AWS PHP SDK指南

AWS STS会话令牌

如果需要指定AWS STS临时会话令牌,请添加配置选项'token' => env('AWS_SESSION_TOKEN'),。这对于某些特定环境(如AWS Lambda)是必需的。

队列后缀

suffix 配置选项用于在不指定环境后缀的情况下支持不同环境的队列。例如,如果您有一个 emails-staging.fifo 队列和一个 emails-production.fifo 队列,可以根据环境设置 suffix 配置为 -staging-production,而您的代码可以继续使用 emails.fifo 而无需了解环境。因此,Job::dispatch()->onQueue('emails.fifo') 将使用配置中定义的 suffix 所决定的 emails-staging.fifoemails-production.fifo 队列。

此功能是在 Laravel 7.x 中引入到纯 SQS 队列的,主要用于支持 Laravel Vapor。更多详细信息请参阅特性请求

与标准的 Laravel 实现相比,此实现有两个不同之处

  • 此包为 Laravel 5.1 一直到 Laravel 8.x 的版本增加了对队列后缀的支持。
  • SQS FIFO 队列必须以 .fifo 后缀结尾。如上例所示,配置中定义的任何 suffix 都将位于必需的 .fifo 后缀之前。不要在后缀配置或队列名称中指定 .fifo,否则队列名称将无法正确生成。

用法

基本上,此队列驱动程序的用法与内置的队列驱动程序相同。但是,当与 Amazon 的 SQS FIFO 队列一起工作时,还有一些额外的事项需要考虑。

消息组

除了可以为每个连接具有多个队列名称之外,SQS FIFO 队列还允许每个 FIFO 队列具有多个“消息组”。这些消息组用于将相关作业分组在一起,并且每个组内的作业将按 FIFO 顺序进行处理。这很重要,因为您的队列性能可能取决于是否能够正确分配消息组。如果您有 100 个作业在队列中,并且它们都属于一个消息组,那么一次只有一个队列工作者可以处理作业。然而,如果它们可以合理地分成 5 个消息组,那么您就可以有 5 个队列工作者从队列中处理作业(每组一个)。FIFO 排序是按消息组进行的。

目前,默认情况下,所有排队作业将合并到一个组中,如配置文件中定义的那样。在上面的配置中,所有排队作业将作为 default 组的一部分发送。可以使用 onMessageGroup() 方法更改组,该方法将在下文进行详细说明。

组 ID 不能为空,不能超过 128 个字符,并且可以包含字母数字字符和标点符号(!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~)。

在未来的版本中,消息组将被能够分配给一个函数,如下面的去重器。

去重

当将作业发送到 SQS FIFO 队列时,Amazon 需要一种方法来确定作业是否是队列中已有作业的副本。SQS FIFO 队列有一个 5 分钟的去重间隔,这意味着如果在间隔内发送重复的消息,它将成功接受(无错误),但不会交付或处理。

确定重复消息通常有两种方法:要么无论内容如何都认为所有消息都是唯一的,要么如果消息具有相同的内容,则认为它们是重复的。

此包通过 deduplicator 配置选项处理去重。

要使所有消息都被认为是唯一的,请将 deduplicator 设置为 unique

要使具有相同内容的消息被视为重复,有两种选择,具体取决于FIFO队列的配置方式。如果FIFO队列在Amazon中已启用基于内容的去重功能,则应将deduplicator设置为sqs。这告诉连接依赖于Amazon来确定内容唯一性。但是,如果基于内容的去重功能已禁用,则应将deduplicator设置为content。注意,如果基于内容的去重已禁用,而将deduplicator设置为sqs,则在尝试向队列发送作业时将生成错误。

总结

  • sqs - 当相同内容的消息应被视为重复且在SQS FIFO队列上启用了基于内容的去重时使用。
  • content - 当相同内容的消息应被视为重复但在SQS FIFO队列上禁用了基于内容的去重时使用。
  • unique - 当所有消息都应被视为唯一,无论内容如何。

如果需要不同的去重算法,可以在容器中注册自定义去重方法。

最后,默认情况下,所有队列作业将使用配置文件中定义的去重器。可以使用withDeduplicator()方法按作业更改此设置。

延迟作业

SQS FIFO队列不支持按消息延迟,只支持按队列延迟。所需的延迟在Amazon控制台中设置队列时定义在队列本身上。尝试为发送到FIFO队列的作业设置延迟将没有任何效果。为了延迟作业,可以将作业push()到已定义为具有交付延迟的SQS FIFO队列。

由于不支持按消息延迟,使用later()方法将作业推送到SQS FIFO队列将默认抛出BadMethodCallException异常。但是,可以通过使用allow_delay配置选项来更改此行为。

将队列的allow_delay配置选项设置为true将允许later()方法在该队列上推送作业而不抛出异常。但是,发送到later()方法的延迟参数将被完全忽略,因为延迟时间已在SQS上定义在队列本身。

注意:Laravel 5.4.36 - 5.6.35中存在一个bug,会导致所有使用\Illuminate\Bus\Queueable特质的所有队列的Mailable对象都使用later()方法。如果您受到影响,您需要将队列的allow_delay配置选项设置为true,即使它们不允许延迟。这将防止抛出异常。

高级用法

按作业组和去重器

如果您需要更改特定作业的组或去重器,则需要访问onMessageGroup()withDeduplicator()方法。这些方法通过ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable特质提供。一旦将此特质添加到您的作业类中,就可以更改特定作业的组和/或去重器,而不会影响队列上的任何其他作业。

代码示例

作业

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable;

class ProcessCoin implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SqsFifoQueueable, SerializesModels;

    //
}

用法

dispatch(
    (new \App\Jobs\ProcessCoin())
        ->onMessageGroup('quarter')
        ->withDeduplicator('unique')
);

注意:Laravel 4通常不支持将作业实例传递到队列中,但此包可以。作业实例将转换为仅类名,因此队列系统将继续工作,但在转换之前将提取作业实例上的消息组和去重器。

通知

<?php

namespace App\Notifications;

use Illuminate\Bus\Queueable;
use Illuminate\Notifications\Notification;
use Illuminate\Contracts\Queue\ShouldQueue;
use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable;

class InvoicePaid extends Notification implements ShouldQueue
{
    use Queueable, SqsFifoQueueable;

    //
}

用法

$user->notify(
    (new InvoicePaid($invoice))->onMessageGroup($invoice->id)
);

邮件

<?php

namespace App\Mail;

use Illuminate\Bus\Queueable;
use Illuminate\Mail\Mailable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Contracts\Queue\ShouldQueue;
use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable;

class OrderShipped extends Mailable implements ShouldQueue
{
    use Queueable, SerializesModels, SqsFifoQueueable;

    //
}

用法

Mail::to($request->user())
    ->cc($moreUsers)
    ->bcc($evenMoreUsers)
    ->queue(new OrderShipped($order)->onMessageGroup($order->number));

自定义去重器

去重器通过生成发送到队列的去重ID来工作。如果两个消息生成相同去重ID,则第二个消息被视为重复,如果在5分钟去重间隔内,则不会投递该消息。

如果您有一些需要用于生成去重ID的自定义逻辑,您可以注册自己的自定义去重器。去重器存储在以queue.sqs-fifo.deduplicator为前缀的IoC容器中。例如,unique去重器被别名为queue.sqs-fifo.deduplicator.unique

自定义去重器通过在IoC中注册一个新的带前缀的别名来创建。这个别名应该解析到一个实现ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator接口的新对象实例。您可以选择定义一个新的实现此接口的类,或者创建一个新的ShiftOneLabs\LaravelSqsFifoQueue\Queue\Deduplicators\Callback实例,它接受一个执行去重逻辑的Closure。定义的Closure应接受两个参数:$payload$queue,其中$payload是要发送到队列的json_encoded()消息,而$queue是消息要发送到的队列名称。生成的ID不得超过128个字符,可以包含字母数字字符和标点符号(!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~)。

例如,如果您想创建一个random去重器,它会随机选择一些工作作为重复项,您可以在AppServiceProviderregister()方法中添加以下行

$this->app->bind('queue.sqs-fifo.deduplicator.random', function ($app) {
    return new \ShiftOneLabs\LaravelSqsFifoQueue\Queue\Deduplicators\Callback(function ($payload, $queue) {
        // Return the deduplication id generated for messages. Randomly 0 or 1.
        return mt_rand(0,1);
    });
}

或者,如果您更喜欢创建一个新类,您的类可能如下所示

namespace App\Deduplicators;

use ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator;

class Random implements Deduplicator
{
    public function generate($payload, $queue)
    {
        // Return the deduplication id generated for messages. Randomly 0 or 1.
        return mt_rand(0,1);
    }
}

您可以将这个类注册到您的AppServiceProvider中,如下所示

$this->app->bind('queue.sqs-fifo.deduplicator.random', App\Deduplicators\Random::class);

注册此别名后,您可以将配置中的deduplicator键更新为使用值random,或者您可以通过在作业上调用withDeduplicator('random')来为单个作业设置去重器。

贡献

欢迎贡献。请参阅CONTRIBUTING以获取详细信息。

安全

如果您发现任何与安全相关的问题,请通过电子邮件patrick@shiftonelabs.com联系,而不是使用问题跟踪器。

致谢

许可证

MIT许可证(MIT)。请参阅许可证文件以获取更多信息。