richardmichel / laravel-sqs-fifo-queue
Requires
- php: >=5.5.0
- aws/aws-sdk-php: ~3.0
- illuminate/queue: >=4.1
- illuminate/support: >=4.1
- ramsey/uuid: >=3.0
Requires (Dev)
- illuminate/encryption: >=4.1
- mockery/mockery: ~0.9
- phpunit/phpunit: ~4.0
- shiftonelabs/codesniffer-standard: 0.*
- squizlabs/php_codesniffer: 2.*
- vlucas/phpdotenv: ~2.2
README
这个Laravel/Lumen包提供了一个用于亚马逊SQS FIFO队列的队列驱动程序。虽然Laravel可以开箱即用地与亚马逊SQS标准队列一起工作,但FIFO队列略有不同,且Laravel无法正确处理。这就是这个包的作用所在。
版本
此包已在Laravel 4.1到Laravel 8.x上进行了测试,尽管它可能仍然适用于后续版本,因为它们被发布。本节将更新以反映实际测试的版本。
安装
通过Composer
$ composer require richardmichel/laravel-sqs-fifo-queue
一旦更新了Composer并安装了包,就需要加载服务提供者。
Laravel 5.5+、6.x、7.x、8.x (5.5, 5.6, 5.7, 5.8, 6.x, 7.x, 8.x)
此包使用自动包发现。服务提供者将自动注册。
Laravel 5.0 - 5.4
打开 config/app.php
并将以下行添加到提供者数组中
ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider::class,
Laravel 4 (4.1, 4.2)
打开 app/config/app.php
并将以下行添加到提供者数组中
'ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider',
Lumen 5, 6, 7, 8 (5.0, 5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 5.7, 5.8, 6.x, 7.x, 8.x)
打开 bootstrap/app.php
并在“注册服务提供者”部分下添加以下行
$app->register(ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider::class);
配置
Laravel/Lumen 5.1+, 6.x, 7.x, 8.x (5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 5.7, 5.8, 6.x, 7.x, 8.x)
如果使用Lumen,请在项目根目录下创建一个名为config
的目录(如果尚未创建)。然后,将vendor/laravel/lumen-framework/config/queue.php
复制到config/queue.php
。
现在,对于Laravel和Lumen,打开config/queue.php
并将以下条目添加到connections
数组中。
'sqs-fifo' => [
'driver' => 'sqs-fifo',
'key' => env('SQS_KEY'),
'secret' => env('SQS_SECRET'),
'prefix' => env('SQS_PREFIX'),
'suffix' => env('SQS_SUFFIX'),
'queue' => 'your-queue-name', // ex: queuename.fifo
'region' => 'your-queue-region', // ex: us-east-2
'group' => 'default',
'deduplicator' => 'unique',
'allow_delay' => env('SQS_ALLOW_DELAY'),
],
示例.env文件
SQS_KEY=ABCDEFGHIJKLMNOPQRST
SQS_SECRET=1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
SQS_PREFIX=https://sqs.us-east-2.amazonaws.com/123456789012
SQS_ALLOW_DELAY=false
如果您希望这是默认连接,请还在.env
文件中设置QUEUE_CONNECTION=sqs-fifo
(对于>= 5.7)或QUEUE_DRIVER=sqs-fifo
(对于< 5.7)。
Laravel/Lumen 5.0
如果使用Lumen,请在项目根目录下创建一个名为config
的目录(如果尚未创建)。然后,将vendor/laravel/lumen-framework/config/queue.php
复制到config/queue.php
。
现在,对于Laravel和Lumen,打开config/queue.php
并将以下条目添加到connections
数组中
'sqs-fifo' => [
'driver' => 'sqs-fifo',
'key' => env('SQS_KEY'),
'secret' => env('SQS_SECRET'),
'queue' => env('SQS_PREFIX').'/your-queue-name',
'region' => 'your-queue-region',
'group' => 'default',
'deduplicator' => 'unique',
'allow_delay' => env('SQS_ALLOW_DELAY'),
],
示例.env文件
SQS_KEY=ABCDEFGHIJKLMNOPQRST
SQS_SECRET=1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
SQS_PREFIX=https://sqs.us-east-2.amazonaws.com/123456789012
SQS_ALLOW_DELAY=false
如果您希望这是默认连接,请也在.env
文件中设置QUEUE_DRIVER=sqs-fifo
。
Laravel 4
打开 app/config/queue.php
并将以下条目添加到 connections
数组中
'sqs-fifo' => array(
'driver' => 'sqs-fifo',
'key' => 'your-public-key', // ex: ABCDEFGHIJKLMNOPQRST
'secret' => 'your-secret-key', // ex: 1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
'queue' => 'your-queue-url', // ex: https://sqs.us-east-2.amazonaws.com/123456789012/queuename.fifo
'region' => 'your-queue-region', // ex: us-east-2
'group' => 'default',
'deduplicator' => 'unique',
'allow_delay' => false,
),
如果您希望这是默认连接,请也更新'default'
键为'sqs-fifo'
。
Capsule
如果在Lumen/Laravel之外使用illuminate\queue
组件Capsule
use Illuminate\Queue\Capsule\Manager as Queue; use ShiftOneLabs\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider; $queue = new Queue; $queue->addConnection([ 'driver' => 'sqs-fifo', 'key' => 'your-public-key', // ex: ABCDEFGHIJKLMNOPQRST 'secret' => 'your-secret-key', // ex: 1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef /** * Set "prefix"/"suffix" and/or "queue" depending on version, as described for Laravel versions above * 'prefix' => 'your-prefix', * 'suffix' => 'your-suffix', * 'queue' => 'your-queue-name', */ 'region' => 'your-queue-region', // ex: us-east-2 'group' => 'default', 'deduplicator' => 'unique', 'allow_delay' => false, ], 'sqs-fifo'); // Make this Capsule instance available globally via static methods... (optional) $queue->setAsGlobal(); // Register the 'queue' alias in the Container, then register the SQS FIFO provider. $app = $queue->getContainer(); $app->instance('queue', $queue->getQueueManager()); (new LaravelSqsFifoQueueServiceProvider($app))->register();
凭证
如果使用提供AWS凭证的替代选项之一(例如,使用AWS凭证文件),则可以省略key
和secret
配置选项。有关替代选项的更多信息,请参阅AWS PHP SDK指南。
AWS STS会话令牌
如果需要指定AWS STS临时会话令牌,可以添加配置选项'token' => env('AWS_SESSION_TOKEN')
。这对于某些特定环境(如AWS Lambda)是必需的。
队列后缀
配置选项 suffix
用于支持不同环境下的队列,无需在使用队列时指定环境后缀。例如,如果您有一个 emails-staging.fifo
队列和一个 emails-production.fifo
队列,您可以根据环境将 suffix
配置设置为 -staging
或 -production
,而您的代码可以继续使用 emails.fifo
,无需知道环境。因此,Job::dispatch()->onQueue('emails.fifo')
将使用配置中定义的 suffix
对应的队列,即 emails-staging.fifo
或 emails-production.fifo
。
此功能是在 Laravel 7.x 中引入到纯 SQS 队列的,主要是为了支持 Laravel Vapor。有关此功能的更多详细信息,请参阅 功能 PR。
标准 Laravel 实现与这个实现之间有两个区别
- 此包支持从 Laravel 5.1 开始的队列后缀。
- SQS FIFO 队列必须以
.fifo
后缀结尾。如上例所示,配置中定义的任何suffix
都将在所需的.fifo
后缀之前。不要在后缀配置或队列名称中指定.fifo
,否则队列名称将无法正确生成。
用法
基本上,此队列驱动器的用法与内置队列驱动器相同。然而,当处理 Amazon 的 SQS FIFO 队列时,还有一些额外的事情需要考虑。
消息组
除了可以为每个连接有多个队列名称之外,SQS FIFO 队列还允许每个 FIFO 队列有多个“消息组”。这些消息组用于将相关作业组合在一起,并且每个组内的作业按照 FIFO 顺序处理。这很重要,因为您的队列性能可能取决于能否正确分配消息组。如果您有 100 个作业在队列中,并且它们都属于一个消息组,那么一次只能有一个队列工作器能够处理这些作业。如果它们可以被逻辑地分成 5 个消息组,那么您可以有 5 个队列工作器处理来自队列的作业(每个组一个)。FIFO 排序是按消息组进行的。
目前,默认情况下,所有队列作业将汇总到一个组中,该组在配置文件中定义。在上面的配置中,所有队列作业都将作为 default
组的一部分发送。可以使用 onMessageGroup()
方法按作业更改组,这将在下面进行更多解释。
组 ID 不能为空,不能超过 128 个字符,并且可以包含字母数字字符和标点符号(!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
)。
在未来版本中,消息组将被能够分配给一个函数,如下面的去重器。
去重
在向 SQS FIFO 队列发送作业时,Amazon 需要一种方法来确定作业是否是队列中已存在的作业的重复。SQS FIFO 队列有一个 5 分钟的去重间隔,这意味着如果在间隔内发送重复的消息,它将成功接受(无错误),但不会被投递或处理。
确定重复消息通常有两种方式:要么所有消息都被视为唯一的,不考虑内容,要么如果消息内容相同,则被视为重复。
此包通过 deduplicator
配置选项处理去重。
为了使所有消息都被视为唯一的,将 deduplicator
设置为 unique
。
为了将内容相同的消息视为重复,有两种选择,具体取决于FIFO队列的配置方式。如果FIFO队列已在Amazon中配置为启用基于内容去重
功能,则应将deduplicator
设置为sqs
。这表示连接将依赖Amazon来确定内容唯一性。然而,如果禁用了基于内容去重
功能,则应将deduplicator
设置为content
。注意,如果禁用了基于内容去重
,并将deduplicator
设置为sqs
,则在尝试将作业发送到队列时将生成错误。
总结如下:
sqs
- 当需要将内容相同的消息视为重复,并且SQS FIFO队列启用了基于内容去重
时使用。content
- 当需要将内容相同的消息视为重复,但SQS FIFO队列禁用了基于内容去重
时使用。unique
- 当需要将所有消息视为唯一,无论内容如何。
如果需要不同的去重算法,可以在容器中注册自定义去重方法。
最后,默认情况下,所有队列中的作业将使用配置文件中定义的去重器。可以使用withDeduplicator()
方法按作业更改此设置。
延迟作业
SQS FIFO队列不支持按消息延迟,只支持按队列延迟。所需的延迟在Amazon控制台设置队列时定义。尝试为发送到FIFO队列的作业设置延迟将没有效果。为了延迟作业,可以将作业push()
到一个已定义有交付延迟的SQS FIFO队列。
由于不支持按消息延迟,使用later()
方法将作业推送到SQS FIFO队列将默认抛出BadMethodCallException
异常。但是,可以使用allow_delay
配置选项来更改此行为。
将队列的allow_delay
配置选项设置为true
将允许later()
方法将该队列上的作业推送到该队列而不会抛出异常。但是,由于延迟时间在SQS队列本身上定义,因此发送到later()
方法的延迟参数将被完全忽略。
注意:Laravel 5.4.36 - 5.6.35中存在一个bug,导致所有使用\Illuminate\Bus\Queueable
特性的队列Mailable
对象都会使用later()
方法。如果您受到影响,则需要将队列的allow_delay
配置选项设置为true
,即使它们不允许延迟。这将防止抛出异常。
高级用法
按作业组和去重器
如果您需要更改特定作业的组或去重器,则需要访问onMessageGroup()
和withDeduplicator()
方法。这些方法通过ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable
特性提供。一旦将此特性添加到您的作业类中,就可以更改该特定作业的组和/或去重器,而不会影响队列上的其他作业。
代码示例
作业
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable; class ProcessCoin implements ShouldQueue { use InteractsWithQueue, Queueable, SqsFifoQueueable, SerializesModels; // }
用法
dispatch( (new \App\Jobs\ProcessCoin()) ->onMessageGroup('quarter') ->withDeduplicator('unique') );
注意:Laravel 4通常不支持将作业实例传递到队列,但此包支持。作业实例将转换为仅类名,因此队列系统将继续工作,但在转换之前将提取作业实例的消息组和去重器。
通知
<?php namespace App\Notifications; use Illuminate\Bus\Queueable; use Illuminate\Notifications\Notification; use Illuminate\Contracts\Queue\ShouldQueue; use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable; class InvoicePaid extends Notification implements ShouldQueue { use Queueable, SqsFifoQueueable; // }
用法
$user->notify( (new InvoicePaid($invoice))->onMessageGroup($invoice->id) );
可邮寄
<?php namespace App\Mail; use Illuminate\Bus\Queueable; use Illuminate\Mail\Mailable; use Illuminate\Queue\SerializesModels; use Illuminate\Contracts\Queue\ShouldQueue; use ShiftOneLabs\LaravelSqsFifoQueue\Bus\SqsFifoQueueable; class OrderShipped extends Mailable implements ShouldQueue { use Queueable, SerializesModels, SqsFifoQueueable; // }
用法
Mail::to($request->user()) ->cc($moreUsers) ->bcc($evenMoreUsers) ->queue(new OrderShipped($order)->onMessageGroup($order->number));
自定义去重器
去重器通过生成一个去重ID并将其发送到队列来工作。如果两个消息生成相同的去重ID,则第二个消息被视为重复,如果它在5分钟的去重间隔内,则不会投递该消息。
如果您有一些需要使用的自定义逻辑来生成去重ID,您可以注册自己的自定义去重器。去重器存储在具有前缀 queue.sqs-fifo.deduplicator
的IoC容器中。例如,unique
去重器被别名为 queue.sqs-fifo.deduplicator.unique
。
自定义去重器通过在IoC中注册一个新的前缀别名字符串来创建。此别名应解析为一个新的对象实例,该实例实现了 ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator
合同。您可以定义一个实现此合同的新类,或者创建一个新的 ShiftOneLabs\LaravelSqsFifoQueue\Queue\Deduplicators\Callback
实例,该实例接受一个执行去重逻辑的 Closure
。定义的 Closure
应接受两个参数:$payload
和 $queue
,其中 $payload
是要发送到队列的 json_encoded()
消息,而 $queue
是消息将被发送到的队列名称。生成的ID不能超过128个字符,并且可以包含字母数字字符和标点符号(!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
)。
例如,如果您想创建一个 random
去重器,该去重器将随机选择一些工作作为重复项,您可以在您的 AppServiceProvider
的 register()
方法中添加以下行
$this->app->bind('queue.sqs-fifo.deduplicator.random', function ($app) { return new \ShiftOneLabs\LaravelSqsFifoQueue\Queue\Deduplicators\Callback(function ($payload, $queue) { // Return the deduplication id generated for messages. Randomly 0 or 1. return mt_rand(0,1); }); }
或者,如果您更喜欢创建一个新的类,您的类可能看起来像这样
namespace App\Deduplicators; use ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator; class Random implements Deduplicator { public function generate($payload, $queue) { // Return the deduplication id generated for messages. Randomly 0 or 1. return mt_rand(0,1); } }
然后您可以在您的 AppServiceProvider
中注册这个类,如下所示
$this->app->bind('queue.sqs-fifo.deduplicator.random', App\Deduplicators\Random::class);
注册了此别名后,您可以将配置中的 deduplicator
键更新为使用值 random
,或者您可以通过在作业上调用 withDeduplicator('random')
来设置单个作业的去重器。
贡献
欢迎贡献。请参阅 CONTRIBUTING 了解详情。
安全
如果您发现任何与安全相关的问题,请通过电子邮件 patrick@shiftonelabs.com 联系,而不是使用问题跟踪器。
致谢
许可
MIT许可(MIT)。请参阅 许可文件 了解更多信息。
git tag -a v1.0.1 -m "version 1.0.1" git push origin v1.0.1