axl-media / laravel-sqs-fifo-queue
为 Amazon SQS FIFO 队列添加 Laravel 队列驱动程序。基于 shiftonelabs/laravel-sqs-fifo-queue
Requires
- php: >=5.5.0
- aws/aws-sdk-php: ~3.0
- illuminate/queue: >=4.1
- ramsey/uuid: ~3.0
Requires (Dev)
- illuminate/encryption: >=4.1
- mockery/mockery: ~0.9
- phpunit/phpunit: ~4.0
- shiftonelabs/codesniffer-standard: 0.*
- squizlabs/php_codesniffer: 2.*
- vlucas/phpdotenv: ~2.2
This package is auto-updated.
Last update: 2024-09-05 17:42:39 UTC
README
这个 Laravel/Lumen 包提供了一种队列驱动程序,用于 Amazon 的 SQS FIFO 队列。虽然 Laravel 可以直接与 Amazon 的 SQS 标准队列一起工作,但 FIFO 队列略有不同,并且 Laravel 无法正确处理。这正是这个包发挥作用的地方。
版本
此包已在 Laravel 4.1 到 Laravel 5.6 上进行了测试,尽管它可能在发布新版本时继续工作。本节将更新以反映已测试的版本。
安装
通过 Composer
$ composer require axl-media/laravel-sqs-fifo-queue
一旦更新了 composer 并安装了包,就需要加载服务提供者。
Laravel 5.5+ (5.5, 5.6)
此包使用自动包发现。服务提供者将自动注册。
Laravel 5.0 - 5.4
打开 config/app.php
并将以下行添加到 providers 数组中
AXLMedia\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider::class,
Laravel 4 (4.1, 4.2)
打开 app/config/app.php
并将以下行添加到 providers 数组中
'AXLMedia\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider',
Lumen 5 (5.0, 5.1, 5.2, 5.3, 5.4, 5.5, 5.6)
打开 bootstrap/app.php
并在 "注册服务提供者" 部分下添加以下行
$app->register(AXLMedia\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider::class);
配置
Laravel/Lumen 5.1+ (5.1, 5.2, 5.3, 5.4, 5.5, 5.6)
如果使用 Lumen,请在项目根目录下创建一个 config
目录(如果您还没有的话)。接下来,将 vendor/laravel/lumen-framework/config/queue.php
复制到 config/queue.php
。
现在,对于 Laravel 和 Lumen,打开 config/queue.php
并在 connections
数组中添加以下条目。
'sqs-fifo' => [
'driver' => 'sqs-fifo',
'key' => env('SQS_KEY'),
'secret' => env('SQS_SECRET'),
'prefix' => env('SQS_PREFIX'),
'queue' => 'your-queue-name', // ex: queuename.fifo
'region' => 'your-queue-region', // ex: us-east-2
'group' => 'default',
'deduplicator' => 'unique',
],
示例 .env 文件
SQS_KEY=ABCDEFGHIJKLMNOPQRST
SQS_SECRET=1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
SQS_PREFIX=https://sqs.us-east-2.amazonaws.com/123456789012
如果您希望这是默认连接,也可以在 .env
文件中设置 QUEUE_DRIVER=sqs-fifo
。
Laravel/Lumen 5.0
如果使用 Lumen,请在项目根目录下创建一个 config
目录(如果您还没有的话)。接下来,将 vendor/laravel/lumen-framework/config/queue.php
复制到 config/queue.php
。
现在,对于 Laravel 和 Lumen,打开 config/queue.php
并在 connections
数组中添加以下条目
'sqs-fifo' => [
'driver' => 'sqs-fifo',
'key' => env('SQS_KEY'),
'secret' => env('SQS_SECRET'),
'queue' => env('SQS_PREFIX').'/your-queue-name',
'region' => 'your-queue-region',
'group' => 'default',
'deduplicator' => 'unique',
],
示例 .env 文件
SQS_KEY=ABCDEFGHIJKLMNOPQRST
SQS_SECRET=1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
SQS_PREFIX=https://sqs.us-east-2.amazonaws.com/123456789012
如果您希望这是默认连接,也可以在 .env
文件中设置 QUEUE_DRIVER=sqs-fifo
。
Laravel 4
打开 app/config/queue.php
并在 connections
数组中添加以下条目
'sqs-fifo' => array(
'driver' => 'sqs-fifo',
'key' => 'your-public-key', // ex: ABCDEFGHIJKLMNOPQRST
'secret' => 'your-secret-key', // ex: 1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef
'queue' => 'your-queue-url', // ex: https://sqs.us-east-2.amazonaws.com/123456789012/queuename.fifo
'region' => 'your-queue-region', // ex: us-east-2
'group' => 'default',
'deduplicator' => 'unique',
),
如果您希望这是默认连接,也可以更新 'default'
键为 'sqs-fifo'
。
Capsule
如果您在 Lumen/Laravel 之外使用 illuminate\queue
组件 Capsule
use Illuminate\Queue\Capsule\Manager as Queue; use AXLMedia\LaravelSqsFifoQueue\LaravelSqsFifoQueueServiceProvider; $queue = new Queue; $queue->addConnection([ 'driver' => 'sqs-fifo', 'key' => 'your-public-key', // ex: ABCDEFGHIJKLMNOPQRST 'secret' => 'your-secret-key', // ex: 1a23bc/deFgHijKl4mNOp5qrS6TUVwXyz7ABCDef /** * Set "prefix" and/or "queue" depending on version, as described for Laravel versions above * 'prefix' => 'your-prefix', * 'queue' => 'your-queue-name', */ 'region' => 'your-queue-region', // ex: us-east-2 'group' => 'default', 'deduplicator' => 'unique', ], 'sqs-fifo'); // Make this Capsule instance available globally via static methods... (optional) $queue->setAsGlobal(); // Register the 'queue' alias in the Container, then register the SQS FIFO provider. $app = $queue->getContainer(); $app->instance('queue', $queue->getQueueManager()); (new LaravelSqsFifoQueueServiceProvider($app))->register();
凭证
如果使用提供 AWS 凭据的替代选项(例如使用 AWS 凭据文件),则可以省略 key
和 secret
配置选项。有关此信息的更多信息,请参阅AWS PHP SDK 指南。
用法
基本上,此队列驱动程序的用法与内置队列驱动程序相同。然而,当与 Amazon 的 SQS FIFO 队列一起工作时,有一些额外的事项需要考虑。
消息组
除了可以为每个连接有多个队列名称之外,SQS FIFO 队列还允许每个 FIFO 队列有多个“消息组”。这些消息组用于将相关作业分组在一起,并且每个组中的作业将按照 FIFO 顺序进行处理。这很重要,因为您的队列性能可能取决于能否正确分配消息组。如果您有 100 个作业在队列中,并且它们都属于一个消息组,那么一次只能有一个队列工作者能够处理作业。但是,如果它们可以合理地分成 5 个消息组,那么您可以有 5 个队列工作者处理来自队列的作业(每组一个)。FIFO 顺序是按消息组进行的。
当前默认情况下,所有排队的作业将被合并成一个组,这个组是在配置文件中定义的。在上面的配置中,所有排队的作业将被发送到名为default
的组中。可以通过使用onMessageGroup()
方法按作业改变组,下面将进行详细解释。
组ID不能为空,长度不能超过128个字符,可以包含字母数字字符和标点符号(!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
)。
在未来的版本中,消息组将能够分配给一个函数,如下面的去重器所示。
去重
当将作业发送到SQS FIFO队列时,亚马逊需要一种方法来确定作业是否是队列中已有作业的重复项。SQS FIFO队列有5分钟的去重间隔,这意味着如果在间隔内发送重复的消息,它将成功接受(无错误),但不会交付或处理。
确定重复消息通常有两种方法:要么无论内容如何都认为所有消息都是唯一的,要么如果消息内容相同,则认为它们是重复的。
此包通过deduplicator
配置选项处理去重。
要将所有消息视为唯一的,请将deduplicator
设置为unique
。
要将具有相同内容的消息视为重复项,有两种选择,取决于FIFO队列是如何配置的。如果FIFO队列已在亚马逊中配置为启用基于内容的去重
功能,则应将deduplicator
设置为sqs
。这告诉连接依赖亚马逊来确定内容的唯一性。然而,如果基于内容的去重
功能已禁用,则应将deduplicator
设置为content
。注意,如果禁用基于内容的去重
,且将deduplicator
设置为sqs
,则在尝试将作业发送到队列时将生成错误。
总结如下
sqs
- 当消息具有相同内容且在SQS FIFO队列上启用了基于内容的去重
时使用。content
- 当消息具有相同内容且在SQS FIFO队列上禁用了基于内容的去重
时使用。unique
- 当所有消息应被视为唯一时使用,无论内容如何。
如果需要不同的去重算法,可以在容器中注册自定义去重方法。
最后,默认情况下,所有排队的作业将使用配置文件中定义的去重器。可以使用withDeduplicator()
方法按作业更改。
延迟作业
SQS FIFO队列不支持按消息延迟,只支持按队列延迟。所需的延迟在亚马逊控制台中设置队列时定义。在FIFO队列上为作业设置延迟将没有影响。因此,使用later()
方法将作业推送到SQS FIFO队列将生成BadMethodCallException
。
要延迟作业,必须将作业push()
到一个已定义了交付延迟的SQS FIFO队列。
高级用法
按作业组和去重器
如果您需要更改特定作业的组或去重器,则需要访问onMessageGroup()
和withDeduplicator()
方法。这些方法通过AXLMedia\LaravelSqsFifoQueue\Bus\SqsFifoQueueable
特性提供。一旦将此特性添加到您的作业类中,就可以更改特定作业的组和/或去重器,而不会影响队列上的其他作业。
代码示例
工作
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use AXLMedia\LaravelSqsFifoQueue\Bus\SqsFifoQueueable; class ProcessCoin implements ShouldQueue { use InteractsWithQueue, Queueable, SqsFifoQueueable, SerializesModels; // }
用法
dispatch( (new \App\Jobs\ProcessCoin) ->onMessageGroup('quarter') ->withDeduplicator('unique') );
自定义去重器
去重器通过生成一个去重ID并发送到队列来工作。如果两个消息生成了相同的去重ID,则第二个消息被视为重复消息,如果它位于5分钟的去重间隔内,则消息不会被投递。
如果您有一些需要用于生成去重ID的自定义逻辑,您可以注册自己的自定义去重器。去重器存储在带有前缀 queue.sqs-fifo.deduplicator
的IoC容器中。例如,unique
去重器别名为 queue.sqs-fifo.deduplicator.unique
。
自定义去重器通过在IoC中注册一个新的前缀别名为创建。此别名应解析为一个实现 AXLMedia\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator
合约的新对象实例。您可以定义一个实现此合约的新类,或者创建一个新的 AXLMedia\LaravelSqsFifoQueue\Queue\Deduplicators\Callback
实例,它接受一个执行去重逻辑的 Closure
。定义的 Closure
应该接受两个参数:$payload
和 $queue
,其中 $payload
是要发送到队列的 json_encoded()
消息,而 $queue
是要发送消息的队列名称。生成的ID不能超过128个字符,并且可以包含字母数字字符和标点符号(!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
)。
例如,如果您想创建一个 random
去重器,它会随机选择一些作业作为重复项,您可以在 AppServiceProvider
的 register()
方法中添加以下行
$this->app->bind('queue.sqs-fifo.deduplicator.random', function ($app) { return new \AXLMedia\LaravelSqsFifoQueue\Queue\Deduplicators\Callback(function ($payload, $queue) { // Return the deduplication id generated for messages. Randomly 0 or 1. return mt_rand(0,1); }); }
或者,如果您更喜欢创建一个新类,您的类将如下所示
namespace App\Deduplicators; use AXLMedia\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator; class Random implements Deduplicator { public function generate($payload, $queue) { // Return the deduplication id generated for messages. Randomly 0 or 1. return mt_rand(0,1); } }
然后您可以在您的 AppServiceProvider
中这样注册该类
$this->app->bind('queue.sqs-fifo.deduplicator.random', App\Deduplicators\Random::class);
注册此别名后,您可以将配置中的 deduplicator
键更新为使用值 random
,或者您可以通过在作业上调用 withDeduplicator('random')
来为单个作业设置去重器。
贡献
欢迎贡献。请参阅 CONTRIBUTING 获取详细信息。
安全
如果您发现任何与安全相关的问题,请通过电子邮件 alex.g@axl.media 而不是使用问题跟踪器。
鸣谢
许可证
MIT许可证(MIT)。请参阅 许可证文件 获取更多信息。