mehr-it/lara-sqs-ext

Laravel 的扩展和可扩展 SQS 队列驱动程序(长轮询,附加参数)

3.1.2 2022-12-17 18:46 UTC

This package is auto-updated.

Last update: 2024-09-17 22:36:56 UTC


README

Latest Version on Packagist Build Status

此包为 Laravel 中的 Amazon SQS 队列提供扩展队列功能。默认情况下,它添加了对长轮询的支持,自动将可见性超时设置为作业超时,并允许更长的延迟(SQS 最大为 15 分钟)。当然,您可以在任何时候手动设置可见性超时。

它还添加了对监听锁的支持,以仅使用单个工作进程轮询队列,从而节省不必要的成本。

此包也是进一步扩展的绝佳起点。

安装

composer require mehr-it/lara-sqs-ext

此包使用 Laravel 的包自动发现,因此服务提供程序将自动加载。

队列配置

只需像配置 Laravel 中任何其他 SQS 队列一样配置队列连接,但使用 sqs-ext 作为驱动程序。

'sqs-conn' => [
	'driver' => 'sqs-ext',
	'key'    => '112233445566778899',
	'secret' => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
	'prefix' => 'https://sqs.eu-central-1.amazonaws.com/11223344556677',
	'queue'  => 'msgs',
	'region' => 'eu-central-1',
],

长轮询

要启用长轮询,您可以在队列配置中添加 message_wait_timeout 选项。这将 WaitTimeSeconds 参数设置为配置的时间量。

'sqs-conn' => [
	'driver'               => 'sqs-ext',
	'key'                  => '112233445566778899',
	'secret'               => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
	'prefix'               => 'https://sqs.eu-central-1.amazonaws.com/11223344556677',
	'queue'                => 'msgs',
	'region'               => 'eu-central-1',
	'message_wait_timeout' => 20,
],

有效的等待超时时间在 0 到 20 秒之间。如果您使用单个工作进程查询多个队列,长轮询可能不是合适的配置。

有关长轮询的更多信息,请参阅 AWS SDK 文档

扩展作业延迟

Amazon SQS 分发的消息的最大延迟时间为 15 分钟。对于某些用例,这可能不够。使用 extend_delay 选项允许在派发作业时使用更长的延迟。

'sqs-conn' => [
    'driver'               => 'sqs-ext',
    'key'                  => '112233445566778899',
    'secret'               => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
    'prefix'               => 'https://sqs.eu-central-1.amazonaws.com/11223344556677',
    'queue'                => 'msgs',
    'region'               => 'eu-central-1',
    'message_wait_timeout' => 20,
    'extend_delay'         => true,
],

启用扩展延迟后,作业将在 SQS 上延迟最多 15 分钟。对于更长的延迟,作业将在每次接收时返回到队列,直到延迟时间结束。

注意:延迟时间超过 15 分钟的消息将在延迟时间结束前“在途中”。SQS 的 “在途中”消息限制为 120,000 条

监听锁

当使用长轮询并在同一队列上使用多个工作进程时,您应将 listen_lock 选项设置为 true。这同步工作进程进程,允许一次只有一个工作进程轮询队列。这可以在使用许多工作进程时节省大量费用。

'sqs-conn' => [
    'driver'               => 'sqs-ext',
    'key'                  => '112233445566778899',
    'secret'               => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
    'prefix'               => 'https://sqs.eu-central-1.amazonaws.com/11223344556677',
    'queue'                => 'msgs',
    'region'               => 'eu-central-1',
    'message_wait_timeout' => 20,
    'listen_lock'          => true,
    // optionally specify custom lock file
    'listen_lock_file'     => '/path/to/file',
    // optionally specify listen lock timeout (in seconds)
    'listen_lock_timeout'  => 5
],

一旦长轮询 API 请求返回(无论是否接收到消息),监听锁就会释放,另一个进程可以获取它。

listen_lock_timeout 值指定队列驱动程序在向工作进程循环返回空响应之前尝试获取监听锁的时间长度。此值不应过高,以便工作进程可以定期检查重启和其他信号。

消息加密

如果您想端到端加密任何消息,可以为您的队列设置 encrypttrue

'sqs-conn' => [
	'driver'  => 'sqs-ext',
	'key'     => '112233445566778899',
	'secret'  => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
	'prefix'  => 'https://sqs.eu-central-1.amazonaws.com/11223344556677',
	'queue'   => 'msgs',
	'region'  => 'eu-central-1',
	'encrypt' => true,
],

Laravel 的内部加密函数将用于加密整个消息字符串。请注意,加密使用 Base64 编码,这会使消息大小增加约 36%。

可见性超时

可见性超时是 AWS SQS 的一个关键概念,但在 Laravel 的默认 SQS 实现中并没有得到很好的利用。此包提供了此功能的先进用法。

有关可见超时信息,请参阅AWS 文档

自动将可见超时设置为作业超时

如果你的作业有超时,SQS 消息对其他订阅者的不可见时间应完全相同。不幸的是,Laravel 并不会自动设置它。

然而,如果指定了超时,SqsExtJob 会自动将 SQS 消息的可见超时设置为作业超时

我们认为这对所有 SQS 作业都很有意义。这就是为什么这种行为默认是激活的。

但是,你可以通过设置 $automaticQueueVisibility = false 来自定义时间值或禁用此行为。以下示例手动设置了具有比作业超时优先级的可见超时

class MyJob implements ShouldQueue {
	
	// will set visibility timeout to 45sec, regardless of job's timeout
	protected $automaticQueueVisibility = 45;
}

你甚至可以使用 $automaticQueueVisibilityExtra 属性为作业的超时添加额外的时间。

手动设置可见超时

SqsExtJob 类有一个新的方法 setVisibilityTimeout,允许你手动设置可见超时。这在需要为作业处理获取更多时间时特别有用。

如果你手动设置了可见超时,请注意作业超时仍然适用,并且你的工作进程将在该时间后停止运行!

InteractsWithSqsQueue 特性实现了 setVisibilityTimeout 方法,就像 InteractsWithQueue 特性对其他方法所做的那样。

速率限制

你可以通过在队列配置中传递节流来配置队列的速率限制。它使用由 mehr-it/lara-token-bucket 实现的 "令牌桶" 算法。

为了在分布式系统中正确地使用速率限制,需要一个集中式缓存。

'sqs-conn' => [
    'driver'    => 'sqs-ext',
    'key'       => '112233445566778899',
    'secret'    => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
    'prefix'    => 'https://sqs.eu-central-1.amazonaws.com/11223344556677',
    'queue'     => 'msgs',
    'region'    => 'eu-central-1',
    'cache'     => 'the-centralized-cache',
    'throttles'=> [
        // queue name or wildcard as key
        'msgs' => [
            // the number of tokens added per second
            'rate'  => 0.5,
            // the maximum number of tokens in the bucket
            'burst' => 4,
            // the initial number of tokens in the bucket
            'initial' => 0,        
        ]       
    ]
],

如果你在每个连接上工作多个队列或使用队列工作进程中的通配符,你可以为每个队列指定一个节流。节流甚至支持通配符,这意味着你可以将节流配置应用到匹配通配符的每个队列上。

'sqs-conn' => [
    'driver'    => 'sqs-ext',
    'key'       => '112233445566778899',
    'secret'    => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
    'prefix'    => 'https://sqs.eu-central-1.amazonaws.com/11223344556677',
    'queue'     => 'apiQueue1',
    'region'    => 'eu-central-1',
    // use a centralized cache
    'cache'     => 'the-centralized-cache',
    'throttles' => [
        // queue name or wildcard as key
        'apiQueue*' => [
            // the number of tokens added per second
            'rate'  => 0.5,
            // the maximum number of tokens in the bucket
            'burst' => 4,     
        ]       
    ]
],

即使使用通配符,每个队列也是单独节流的。但是,你可以传递 bucketName 选项,以明确指定多个队列的共享令牌桶。

通配符队列工作进程

当启动工作进程时,Laravel 本地允许指定工作进程应工作的队列。尽管可以给出多个队列名称,但工作进程在处理多个队列时并不非常智能。

这个包允许将通配符作为 "queue" 传递给工作进程

# you should set sleep to zero, to efficiently switch between queues
./artisan queue:work sqs-conn --queue=apiQueue* --sleep=0

根据上面的示例,工作进程将检索以 "apiQueue" 开头的所有可用队列列表,并依次轮询它们以获取新消息。但是在选择下一个轮询的队列时,它会考虑以下因素

节流队列

如果一个队列需要由于速率限制而节流,则它不会在配额再次可用之前被轮询。

空队列

如果一个队列在上次轮询时没有返回消息,则轮询将在此暂停,直到发送新作业或暂停超时到期。暂停时间可以在队列配置中进行配置。这可以防止你的工作进程在其他队列已满时浪费时间在空队列上。

使用缓存实现暂停发送作业时恢复暂停的工作进程。因此,集中式缓存对于分布式系统至关重要。

请参阅以下配置示例,用于与通配符工作进程一起使用的适当配置:

'sqs-conn' => [
    'driver'                     => 'sqs-ext',
    'key'                        => '112233445566778899',
    'secret'                     => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
    'prefix'                     => 'https://sqs.eu-central-1.amazonaws.com/11223344556677',
    'queue'                      => 'apiQueue1',
    'region'                     => 'eu-central-1',
    // Long polling must not be used with wildcard workers, as this disables
    // queue pausing which is more efficient than long polling when working
    // on multiple queues. 
    'message_wait_timeout'       => 0,
    // use a centralized cache
    'cache'                      => 'the-centralized-cache',
    // interval for regular queue list updates (in seconds) 
    'throttles' => [
        /* define throttles here */
    ],   
    'queue_list_update_interval' => 60,
    // Maximum pause time, for empty queues (in seconds).
    // This value must not be too high, since delayed or in-the-flight
    // messages are not visible. The queue might look empty but will
    // have available messages later without the worker pause being aborted.
    'queue_pause_time'           => 20,
],

扩展

该库中的类提供了很好的扩展类的入口点。如果你希望添加更多功能,请查看源代码。

如果你只需要扩展作业类,你也可以在队列配置中这样做

'sqs-conn' => [
	'driver'   => 'sqs-ext',
	'key'      => '112233445566778899',
	'secret'   => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
	'prefix'   => 'https://sqs.eu-central-1.amazonaws.com/11223344556677',
	'queue'    => 'msgs',
	'region'   => 'eu-central-1',
	'job_type' => 'myJobClass'
],

myJobClass 通过应用程序服务容器进行解析。以下参数传递给解析器

[
	'container'      => $container, 		// the application container
	'sqs'            => $sqs,				// the SQS client
	'job'            => $job,				// the raw SQS message
	'connectionName' => $connectionName,	// the queue connection name
	'queue'          => $queue,				// the queue URL
	'queueOptions'   => $this->options,		// the queue options (configuration)
]