ssigwart/aws-high-availability-sqs

实现高可用 SQS 调用的库。

v1.0.3 2024-01-07 03:14 UTC

This package is auto-updated.

Last update: 2024-09-07 05:10:30 UTC


README

此库使编写高可用 SQS 消息变得简单。它包括以下功能:

  • 可以向 SQS 发送任何大小的消息。如果消息太大,它将被存储在 S3 中,而其引用将存储在 SQS 中。
  • 如果主队列不可用,可以向多个备份队列之一发送消息。

设置

您应该在不同的区域中设置至少 2 个 SQS 队列和 2 个 S3 存储桶。SQS 队列是您想要处理的消息。S3 存储桶将用于发送的超大队列消息。

S3 生命周期规则

当删除由 S3 文件支持的 SQS 消息时,代码将尝试删除该 S3 文件。但是,在某些情况下,S3 文件可能不会被删除。例如,在调用 AwsHighAvailabilitySqsReceiver::deleteMessage 时,建议捕获并忽略 AwsHighAvailabilitySqsDeleteS3FileDeletionFailureException 异常。此外,任何从队列过期或最终进入死信队列的消息将不会删除文件。因此,建议在 S3 存储桶上设置生命周期规则。您可以在 https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-expire-general-considerations.html 上找到设置对象过期时间的说明。30 天是一个安全的选项。

用法

API 需要您传入一个 \Aws\Sdk 对象。在下面的示例中,使用 $awsSdk 作为此对象。

发送 SQS 消息

  1. 创建要发送到的可用 SQS 队列列表。
    • 这是一个 SqsAvailableQueues 对象,包括主队列和可选的替代队列。
  2. 创建用于存储大型消息的 S3 存储桶列表。
    • 这是一个 S3AvailableUploadFileBucketAndKeyLocations 对象。
    • 您可以将实现 S3FileBucketAndKeyProviderInterface 的任何内容添加到位置列表中。
    • 最简单的选项是使用 S3FileBucketAndKey,它实现了此接口。
  3. 设置 SqsMessageSendingMetadata 以包含要发送的消息的元数据。
  4. 创建一个 AwsHighAvailabilitySqsSender 对象并调用 sendMessageWithS3LargeMessageBacking
    • 您可以使用返回的对象获取使用的队列和消息 ID。
// Set up SQS queues
$primaryQueue = new SqsAvailableQueue('us-east-1', 'https://sqs.us-east-1.amazonaws.com/123456789012/queue_in_us_east_1');
$backupQueue = new SqsAvailableQueue('us-east-2', 'https://sqs.us-east-2.amazonaws.com/123456789012/queue_in_us_east_2');
$availableQueues = new SqsAvailableQueues($primaryQueue);
$availableQueues->addAvailableQueue($backupQueue);

// Set up S3 locations
$primaryLocation = new S3FileBucketAndKey('us-east-1', 'phpunit-test-us-east-1', 'us-east-1/path/to/dir/');
$backupLocation = new S3FileBucketAndKey('us-east-2', 'phpunit-test-us-east-2', 'us-east-2/path/to/dir/');
$s3Locations = new S3AvailableUploadFileBucketAndKeyLocations($primaryLocation);
$s3Locations->addAlternativeLocation($backupLocation);

// Send messages
$sqsSender = new AwsHighAvailabilitySqsSender($awsSdk);
$result = $sqsSender->sendMessageWithS3LargeMessageBacking($availableQueues, $s3Locations, $queueMsgBody, null);
print 'Selected Queue: ' . $result->getSelectedQueue()->getQueueUrl() . PHP_EOL;
print 'Message ID: ' . $result->getSqsMessageId() . PHP_EOL;

SqsMessageSendingMetadata 选项

SqsMessageSendingMetadata 类允许您在发送消息时自定义以下内容:

  • SQS 交付延迟。
  • SQS 消息属性。

接收 SQS 消息

  1. 创建一个包含您想要接收的队列的 SqsAvailableQueue 对象。
  2. 设置 SqsMessageReceivingMetadata 以包含接收消息的元数据。
  3. 创建一个 AwsHighAvailabilitySqsReceiver 对象并调用 receivedMessagesWithS3LargeMessageBacking
    • 返回的对象包括 SQS 消息,您可以使用 getSqsMessages 获取这些消息。
$primaryQueue = new SqsAvailableQueue('us-east-1', 'https://sqs.us-east-1.amazonaws.com/123456789012/queue_in_us_east_1');

// Set up metadata
$metadata = new SqsMessageReceivingMetadata();
$metadata->setMaxNumMessages(7);
$metadata->setVisibilityTimeout(300);
$metadata->setWaitTime(15);

$sqsReceiver = new AwsHighAvailabilitySqsReceiver($awsSdk);
$sqsMessagesResult = $sqsReceiver->receivedMessagesWithS3LargeMessageBacking($primaryQueue, $metadata);
print 'Number of Messages: ' . $sqsMessagesResult->getNumMessages() . PHP_EOL;
foreach ($sqsMessagesResult->getSqsMessages() as $sqsMessage)
{
	print 'Messages ID: ' . $sqsMessage->getMessageId() . PHP_EOL;
	print 'Receipt Handle: ' . $sqsMessage->getReceiptHandle() . PHP_EOL;
	print 'Message Body: ' . $sqsMessage->getMessage() . PHP_EOL;
}

SqsMessageReceivingMetadata 选项

SqsMessageReceivingMetadata 类允许您在接收消息时自定义以下内容:

  • 要接收的消息的最大数量(默认:10)
  • 接收到的 SQS 消息的可见性超时。
  • 接收消息时的 SQS 等待时间。

删除 SQS 消息

  1. 创建一个包含您想要删除的队列的 SqsAvailableQueue 对象。
  2. 创建一个 AwsHighAvailabilitySqsReceiver 对象并调用 deleteMessage
    • 对于任何失败,将抛出 AwsHighAvailabilitySqsDeleteException
    • 然而,如果抛出了 AwsHighAvailabilitySqsDeleteS3FileDeletionFailureException 异常,则表示 SQS 消息已成功删除,因此建议除了可能记录异常外,可以忽略该异常。
$primaryQueue = new SqsAvailableQueue('us-east-1', 'https://sqs.us-east-1.amazonaws.com/123456789012/queue_in_us_east_1');
$sqsReceiver = new AwsHighAvailabilitySqsReceiver($awsSdk);
try {
	$sqsReceiver->deleteMessage($primaryQueue, $sqsMessage);
} catch (AwsHighAvailabilitySqsDeleteS3FileDeletionFailureException $e) {
	error_log($e->getMessage());
} catch (AwsHighAvailabilitySqsDeleteException $e) {
	// Handle error. Possibly try to delete it again or add application code to prevent the message from being processed again later.
}