ssigwart / aws-high-availability-sqs
实现高可用 SQS 调用的库。
v1.0.3
2024-01-07 03:14 UTC
Requires
- php: >=8.0.0
- aws/aws-sdk-php: ^3.0
- ssigwart/aws-high-availability-s3: ^1.0
Requires (Dev)
- phpunit/phpunit: ^10.4
README
此库使编写高可用 SQS 消息变得简单。它包括以下功能:
- 可以向 SQS 发送任何大小的消息。如果消息太大,它将被存储在 S3 中,而其引用将存储在 SQS 中。
- 如果主队列不可用,可以向多个备份队列之一发送消息。
设置
您应该在不同的区域中设置至少 2 个 SQS 队列和 2 个 S3 存储桶。SQS 队列是您想要处理的消息。S3 存储桶将用于发送的超大队列消息。
S3 生命周期规则
当删除由 S3 文件支持的 SQS 消息时,代码将尝试删除该 S3 文件。但是,在某些情况下,S3 文件可能不会被删除。例如,在调用 AwsHighAvailabilitySqsReceiver::deleteMessage
时,建议捕获并忽略 AwsHighAvailabilitySqsDeleteS3FileDeletionFailureException
异常。此外,任何从队列过期或最终进入死信队列的消息将不会删除文件。因此,建议在 S3 存储桶上设置生命周期规则。您可以在 https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-expire-general-considerations.html 上找到设置对象过期时间的说明。30 天是一个安全的选项。
用法
API 需要您传入一个 \Aws\Sdk
对象。在下面的示例中,使用 $awsSdk
作为此对象。
发送 SQS 消息
- 创建要发送到的可用 SQS 队列列表。
- 这是一个
SqsAvailableQueues
对象,包括主队列和可选的替代队列。
- 这是一个
- 创建用于存储大型消息的 S3 存储桶列表。
- 这是一个
S3AvailableUploadFileBucketAndKeyLocations
对象。 - 您可以将实现
S3FileBucketAndKeyProviderInterface
的任何内容添加到位置列表中。 - 最简单的选项是使用
S3FileBucketAndKey
,它实现了此接口。
- 这是一个
- 设置
SqsMessageSendingMetadata
以包含要发送的消息的元数据。 - 创建一个
AwsHighAvailabilitySqsSender
对象并调用sendMessageWithS3LargeMessageBacking
。- 您可以使用返回的对象获取使用的队列和消息 ID。
// Set up SQS queues $primaryQueue = new SqsAvailableQueue('us-east-1', 'https://sqs.us-east-1.amazonaws.com/123456789012/queue_in_us_east_1'); $backupQueue = new SqsAvailableQueue('us-east-2', 'https://sqs.us-east-2.amazonaws.com/123456789012/queue_in_us_east_2'); $availableQueues = new SqsAvailableQueues($primaryQueue); $availableQueues->addAvailableQueue($backupQueue); // Set up S3 locations $primaryLocation = new S3FileBucketAndKey('us-east-1', 'phpunit-test-us-east-1', 'us-east-1/path/to/dir/'); $backupLocation = new S3FileBucketAndKey('us-east-2', 'phpunit-test-us-east-2', 'us-east-2/path/to/dir/'); $s3Locations = new S3AvailableUploadFileBucketAndKeyLocations($primaryLocation); $s3Locations->addAlternativeLocation($backupLocation); // Send messages $sqsSender = new AwsHighAvailabilitySqsSender($awsSdk); $result = $sqsSender->sendMessageWithS3LargeMessageBacking($availableQueues, $s3Locations, $queueMsgBody, null); print 'Selected Queue: ' . $result->getSelectedQueue()->getQueueUrl() . PHP_EOL; print 'Message ID: ' . $result->getSqsMessageId() . PHP_EOL;
SqsMessageSendingMetadata
选项
SqsMessageSendingMetadata
类允许您在发送消息时自定义以下内容:
- SQS 交付延迟。
- SQS 消息属性。
接收 SQS 消息
- 创建一个包含您想要接收的队列的
SqsAvailableQueue
对象。 - 设置
SqsMessageReceivingMetadata
以包含接收消息的元数据。 - 创建一个
AwsHighAvailabilitySqsReceiver
对象并调用receivedMessagesWithS3LargeMessageBacking
。- 返回的对象包括 SQS 消息,您可以使用
getSqsMessages
获取这些消息。
- 返回的对象包括 SQS 消息,您可以使用
$primaryQueue = new SqsAvailableQueue('us-east-1', 'https://sqs.us-east-1.amazonaws.com/123456789012/queue_in_us_east_1'); // Set up metadata $metadata = new SqsMessageReceivingMetadata(); $metadata->setMaxNumMessages(7); $metadata->setVisibilityTimeout(300); $metadata->setWaitTime(15); $sqsReceiver = new AwsHighAvailabilitySqsReceiver($awsSdk); $sqsMessagesResult = $sqsReceiver->receivedMessagesWithS3LargeMessageBacking($primaryQueue, $metadata); print 'Number of Messages: ' . $sqsMessagesResult->getNumMessages() . PHP_EOL; foreach ($sqsMessagesResult->getSqsMessages() as $sqsMessage) { print 'Messages ID: ' . $sqsMessage->getMessageId() . PHP_EOL; print 'Receipt Handle: ' . $sqsMessage->getReceiptHandle() . PHP_EOL; print 'Message Body: ' . $sqsMessage->getMessage() . PHP_EOL; }
SqsMessageReceivingMetadata
选项
SqsMessageReceivingMetadata
类允许您在接收消息时自定义以下内容:
- 要接收的消息的最大数量(默认:10)
- 接收到的 SQS 消息的可见性超时。
- 接收消息时的 SQS 等待时间。
删除 SQS 消息
- 创建一个包含您想要删除的队列的
SqsAvailableQueue
对象。 - 创建一个
AwsHighAvailabilitySqsReceiver
对象并调用deleteMessage
。- 对于任何失败,将抛出
AwsHighAvailabilitySqsDeleteException
。 - 然而,如果抛出了
AwsHighAvailabilitySqsDeleteS3FileDeletionFailureException
异常,则表示 SQS 消息已成功删除,因此建议除了可能记录异常外,可以忽略该异常。
- 对于任何失败,将抛出
$primaryQueue = new SqsAvailableQueue('us-east-1', 'https://sqs.us-east-1.amazonaws.com/123456789012/queue_in_us_east_1'); $sqsReceiver = new AwsHighAvailabilitySqsReceiver($awsSdk); try { $sqsReceiver->deleteMessage($primaryQueue, $sqsMessage); } catch (AwsHighAvailabilitySqsDeleteS3FileDeletionFailureException $e) { error_log($e->getMessage()); } catch (AwsHighAvailabilitySqsDeleteException $e) { // Handle error. Possibly try to delete it again or add application code to prevent the message from being processed again later. }