shiftonelabs / laravel-sqs-fifo-queue
为 Amazon SQS FIFO 队列添加 Laravel 队列驱动。
Requires
- php: >=8.0.2
- aws/aws-sdk-php: ~3.0
- illuminate/queue: >=9.0
- illuminate/support: >=9.0
- ramsey/uuid: >=4.2.2
Requires (Dev)
- illuminate/mail: >=9.0
- illuminate/notifications: >=9.0
- mockery/mockery: ~1.3
- phpunit/phpunit: ~9.3 || ~10.0
- shiftonelabs/codesniffer-standard: 0.*
- squizlabs/php_codesniffer: 3.*
- vlucas/phpdotenv: ~5.4
README
此 Laravel/Lumen 包提供 Amazon SQS FIFO 队列的队列驱动。虽然 Laravel 可以直接与 Amazon SQS 标准队列一起工作,但 FIFO 队列略有不同,并且 Laravel 无法正确处理。这就是这个包的用武之地。
版本
此包已在 Laravel 4.1 至 Laravel 11.x 上进行测试,尽管它可能在发布的新版本上继续工作。本节将更新以反映已测试的版本。
此说明文件已更新,以显示当前支持的最新版本信息(9.x - 11.x)。对于 Laravel 4.1 至 Laravel 8.x,请查看 2.x 分支。
安装
通过 Composer
$ composer require shiftonelabs/laravel-sqs-fifo-queue
更新 composer 并安装包后,需要加载服务提供者。
Laravel 9.x, 10.x, 11.x
此包使用自动包发现。服务提供者将自动注册。
Lumen 9.x, 10.x, 11.x
打开 bootstrap/app.php
,在“注册服务提供者”部分下添加以下行
$app->register(ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider::class);
配置
Laravel/Lumen 9.x, 10.x, 11.x
如果使用 Lumen,请在项目根目录中创建一个 config
目录(如果您还没有的话)。然后,将 vendor/laravel/lumen-framework/config/queue.php
复制到 config/queue.php
。
现在,对于 Laravel 和 Lumen,打开 config/queue.php
,并在 connections
数组中添加以下条目。
'sqs-fifo' => [
'driver' => 'sqs-fifo',
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'prefix' => env('SQS_FIFO_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
'queue' => env('SQS_FIFO_QUEUE', 'default.fifo'),
'suffix' => env('SQS_FIFO_SUFFIX'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'after_commit' => false,
'group' => 'default',
'deduplicator' => env('SQS_FIFO_DEDUPLICATOR', 'unique'),
'allow_delay' => env('SQS_FIFO_ALLOW_DELAY', false),
],
示例 .env 文件
AWS_ACCESS_KEY_ID=ABCDEFGHIJKLMNOPQRST
AWS_SECRET_ACCESS_KEY=1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
SQS_FIFO_PREFIX=https://sqs.us-east-1.amazonaws.com/123456789012
SQS_FIFO_QUEUE=queuename.fifo
SQS_FIFO_SUFFIX=-staging
AWS_DEFAULT_REGION=us-east-1
SQS_FIFO_DEDUPLICATOR=unique
SQS_FIFO_ALLOW_DELAY=false
如果您想将其设置为默认连接,还可以在 .env
文件中设置 QUEUE_CONNECTION=sqs-fifo
。
Capsule
如果在使用 Lumen/Laravel 之外使用 illuminate\queue
组件 Capsule
use Illuminate\Queue\Capsule\Manager as Queue; use ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider; $queue = new Queue; $queue->addConnection([ 'driver' => 'sqs-fifo', 'key' => 'your-public-key', // ex: ABCDEFGHIJKLMNOPQRST 'secret' => 'your-secret-key', // ex: 1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef 'prefix' => 'your-prefix', // ex: https://sqs.us-east-1.amazonaws.com/your-account-id 'queue' => 'your-queue-name', // ex: queuename.fifo 'suffix' => 'your-suffix', // ex: -staging 'region' => 'your-queue-region', // ex: us-east-1 'after_commit' => false, 'group' => 'default', 'deduplicator' => 'unique', 'allow_delay' => false, ], 'sqs-fifo'); // Make this Capsule instance available globally via static methods... (optional) $queue->setAsGlobal(); // Register the 'queue' alias in the Container, then register the SQS FIFO provider. $app = $queue->getContainer(); $app->instance('queue', $queue->getQueueManager()); (new LaravelSqsFifoQueueServiceProvider($app))->register();
凭证
如果您使用 AWS 凭据文件等替代选项提供 AWS 凭据,则可以省略 key
和 secret
配置选项。有关替代选项的更多信息,请参阅AWS PHP SDK 指南。
AWS STS 会话令牌
如果需要指定 AWS STS 临时会话令牌,则可以添加 'token' => env('AWS_SESSION_TOKEN'),
配置选项。对于一些特定环境,例如 AWS Lambda,这是必需的。
队列后缀
使用 suffix
配置选项可以在不指定环境后缀的情况下使用队列支持不同环境。例如,如果您有一个 emails-staging.fifo
队列和一个 emails-production.fifo
队列,您可以根据环境将 suffix
配置设置为 -staging
或 -production
,并且您的代码可以继续使用 emails.fifo
,而无需了解环境。因此,Job::dispatch()->onQueue('emails.fifo')
将使用定义在配置中的 suffix
,是 emails-staging.fifo
还是 emails-production.fifo
队列。
注意:SQS FIFO 队列必须以 .fifo
后缀结尾。如上例所示,配置中定义的任何 suffix
都将出现在所需的 .fifo
后缀之前。不要在后缀配置或队列名称中指定 .fifo
,否则队列名称将无法正确生成。
作业与数据库事务
使用 after_commit
配置选项 来控制数据库事务中进行队列作业的处理方式。当设置为 true
时,此选项确保在数据库事务提交之后,才真正发送在数据库事务中排队的作业。如果事务回滚,作业将被丢弃且不会发送。如果没有活跃的数据库事务,作业将立即发送。
队列事件监听器
目前,此包无法处理 队列事件监听器。对此功能的实现需要接管 Laravel 的事件调度器,而这不是此包目前准备要做的事情。为了解决这个问题,您需要将队列监听器转换为非队列形式,然后发送一个新的队列作业来处理工作。
使用方法
基本上,此队列驱动程序的使用与内置队列驱动程序相同。但是,在与 Amazon 的 SQS FIFO 队列一起工作时,还有一些额外的事项需要考虑。
消息组
除了每个连接可以拥有多个队列名称之外,SQS FIFO 队列还允许每个 FIFO 队列拥有多个“消息组”。这些消息组用于将相关作业分组在一起,并且每个组内的作业按 FIFO 顺序处理。这很重要,因为您的队列性能可能取决于正确分配消息组。如果您有 100 个作业在队列中,并且它们都属于一个消息组,那么一次只有一个队列工作者能够处理这些作业。如果它们可以合理地分成 5 个消息组,那么您就可以有 5 个队列工作者处理队列中的作业(每个组一个)。FIFO 排序是按消息组进行的。
目前,默认情况下,所有队列作业都将合并成一个组,如配置文件中定义的那样。在上面的配置中,所有队列作业都将作为 default
组的一部分发送。可以使用 onMessageGroup()
方法更改每个作业的组,这将在下面进行更多解释。
组 ID 不能为空,长度不能超过 128 个字符,并且可以包含字母数字字符和标点符号(!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
)。
此外,AWS 强制要求组 ID 必须是字符串类型。虽然在文档块中进行了类型提示,但在此包中不强制执行变量类型。为了避免 AWS 错误,在发送到 AWS 之前,指定的消息组将使用 strval()
方法转换为字符串。
在未来版本中,消息组将能够分配给一个函数,如下面的去重器所示。
去重
当将作业发送到 SQS FIFO 队列时,Amazon 需要一种方法来确定作业是否是队列中已存在的作业的副本。SQS FIFO 队列有一个 5 分钟的去重间隔,这意味着如果在间隔内发送重复的消息,它将成功接受(没有错误),但不会交付或处理。
确定重复消息通常有两种方法:要么无论内容如何都认为所有消息都是唯一的,要么如果消息内容相同,则认为它们是重复的。
此包通过 deduplicator
配置选项处理去重。
要使所有消息都视为唯一,请将 deduplicator
设置为 unique
。
要将内容相同的消息视为重复,有两种选择,具体取决于FIFO队列的配置方式。如果FIFO队列在Amazon中已启用基于内容的去重
功能,则应将deduplicator
设置为sqs
。这告诉连接依赖Amazon来确定内容唯一性。然而,如果禁用了基于内容的去重
功能,则应将deduplicator
设置为content
。注意,如果禁用了基于内容的去重
,并将deduplicator
设置为sqs
,则在尝试向队列发送任务时将生成错误。
总结如下:
sqs
- 当需要将内容相同的消息视为重复,并且SQS FIFO队列启用了基于内容的去重
功能时使用。content
- 当需要将内容相同的消息视为重复,但SQS FIFO队列禁用了基于内容的去重
功能时使用。unique
- 当所有消息都应该被视为唯一,无论内容如何时使用。
如果需要不同的去重算法,可以在容器中注册自定义去重方法。
最后,默认情况下,所有队列中的任务将使用配置文件中定义的去重器。可以使用withDeduplicator()
方法按任务更改此设置。
延迟任务
SQS FIFO队列不支持按消息延迟,只支持按队列延迟。所需延迟在Amazon控制台设置队列时定义在队列本身上。尝试在发送到FIFO队列的任务上设置延迟将没有任何效果。为了延迟任务,可以将任务push()
到一个已经设置了交付延迟的SQS FIFO队列。
由于不支持按消息延迟,使用later()
方法将任务推送到SQS FIFO队列将默认抛出BadMethodCallException
异常。但是,可以使用allow_delay
配置选项来更改此行为。
将队列的allow_delay
配置选项设置为true
将允许在队列上使用later()
方法推送任务而不会抛出异常。然而,发送到later()
方法的延迟参数将被完全忽略,因为延迟时间是在队列本身上的SQS中定义的。
高级用法
按任务组和去重器
如果您需要更改特定任务的组或去重器,您将需要访问onMessageGroup()
和withDeduplicator()
方法。这些方法通过ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable
特质提供。一旦将此特质添加到您的任务类中,就可以为该特定任务更改组以及/或去重器,而不会影响队列上的任何其他任务。
代码示例
任务
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable; class ProcessCoin implements ShouldQueue { use InteractsWithQueue, Queueable, SqsFifoQueueable, SerializesModels; // }
使用方法
dispatch( (new \App\Jobs\ProcessCoin()) ->onMessageGroup('quarter') ->withDeduplicator('unique') );
通知
<?php namespace App\Notifications; use Illuminate\Bus\Queueable; use Illuminate\Notifications\Notification; use Illuminate\Contracts\Queue\ShouldQueue; use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable; class InvoicePaid extends Notification implements ShouldQueue { use Queueable, SqsFifoQueueable; // }
使用方法
$user->notify( (new InvoicePaid($invoice))->onMessageGroup($invoice->id) );
可邮寄
<?php namespace App\Mail; use Illuminate\Bus\Queueable; use Illuminate\Mail\Mailable; use Illuminate\Queue\SerializesModels; use Illuminate\Contracts\Queue\ShouldQueue; use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable; class OrderShipped extends Mailable implements ShouldQueue { use Queueable, SerializesModels, SqsFifoQueueable; // }
使用方法
Mail::to($request->user()) ->cc($moreUsers) ->bcc($evenMoreUsers) ->queue(new OrderShipped($order)->onMessageGroup($order->number));
自定义去重器
去重器通过生成发送到队列的去重ID来工作。如果两个消息生成了相同去重ID,则第二个消息被视为重复,如果它在5分钟去重间隔内,则该消息将不会交付。
如果您有一些需要用于生成去重ID的自定义逻辑,您可以注册自己的自定义去重器。去重器存储在IoC容器中,带有前缀queue.sqs-fifo.deduplicator
。例如,unique
去重器被别名到queue.sqs-fifo.deduplicator.unique
。
自定义去重器通过在IoC中注册一个新的前缀别名来创建。这个别名应该解析到一个实现 ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator
接口的新对象实例。您可以选择定义一个新的实现此接口的类,或者创建一个新的 ShiftOneLabs\LaravelSqsFifoQueue\Queue\Deduplicators\Callback
实例,它接受一个执行去重逻辑的 Closure
。定义的 Closure
应该接受两个参数:$payload
和 $queue
,其中 $payload
是发送到队列的 json_encoded()
消息,而 $queue
是消息将被发送到的队列名称。生成的ID不能超过128个字符,可以包含字母数字字符和标点符号(!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
)。此外,AWS要求字符串类型。为了防止AWS错误,结果将在发送到AWS之前通过 strval()
转换。
例如,如果您想创建一个 random
去重器,它会随机选择一些作业作为重复项,您可以在您的 AppServiceProvider
的 register()
方法中添加以下行:
$this->app->bind('queue.sqs-fifo.deduplicator.random', function ($app) { return new \ShiftOneLabs\LaravelSqsFifoQueue\Queue\Deduplicators\Callback(function ($payload, $queue) { // Return the deduplication id generated for messages. Randomly 0 or 1. return mt_rand(0,1); }); }
或者,如果您更喜欢创建一个新的类,您的类可能如下所示:
namespace App\Deduplicators; use ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator; class Random implements Deduplicator { public function generate($payload, $queue) { // Return the deduplication id generated for messages. Randomly 0 or 1. return mt_rand(0,1); } }
然后您可以在您的 AppServiceProvider
中这样注册该类:
$this->app->bind('queue.sqs-fifo.deduplicator.random', App\Deduplicators\Random::class);
注册了这个别名后,您可以将配置中的 deduplicator
键更新为使用值 random
,或者您可以通过在作业上调用 withDeduplicator('random')
来为单个作业设置去重器。
贡献
欢迎贡献。请参阅 CONTRIBUTING 获取详细信息。
安全
如果您发现任何安全相关的问题,请通过电子邮件 [email protected] 反馈,而不是使用问题跟踪器。
致谢
许可证
MIT许可证(MIT)。有关更多信息,请参阅 许可证文件。