ssigwart / sqs-queue-processor
处理 SQS 队列的库。
v1.0.0
2024-01-01 18:22 UTC
Requires
- php: >=8.1.0
- aws/aws-sdk-php: ^3.0
- psr/log: ^3.0
- ssigwart/aws-high-availability-sqs: ^1.0
Requires (Dev)
- phpunit/phpunit: ^10.4
README
此库提供以下功能的 SQS 队列处理器:
- 重复消息处理。
- 延迟重试功能。
- 错误处理选项。
安装
composer require ssigwart/sqs-queue-processor
使用方法
使用以下内容设置和运行队列处理器。
$sqsQueueProcessor = new SqsQueueProcessor(/* Fill in parameters */); $sqsQueueProcessor->processMessages();
参数: SqsQueueProcessorConfiguration
- 此参数指定在单个 SQS
ReceiveMessage
调用中返回的最大消息数量(默认 10),用于接收消息的可见性超时(默认 300)以及等待消息的秒数(默认 20)。 - 您还可以控制在处理消息时记录的内容。
参数: SqsQueueProcessorTimingInterface
- 此参数将通知队列处理器何时停止处理消息,使用
shouldStopProcessingMessages
。 - 要无限循环,始终返回 false。
- 您还可以检查时间,在给定时间后停止它。
- 您还可以让同一对象实现此接口和
SqsQueueProcessorMessageProviderInterface
,并在收到空消息列表后停止。
参数: SqsQueueProcessorCleanupInterface
- 此参数包括清理函数,如
cleanUpAfterExceptionProcessingMessage
。如果使用事务性数据库,您可能需要在数据库上调用ROLLBACK
。
参数: SqsQueueProcessorMessageProviderInterface
- 此参数用于获取消息,删除消息以及更新消息的可见性超时。
- 建议使用
AwsHighAvailabilitySqsMessageProvider
,它实现了此接口。
参数: SqsQueueProcessorMessageStatusInterface
- 此参数提供函数以确定消息是否正在处理或已完成处理。
- 实现此接口的建议方法包括 Memcached、Redis 或数据库。
参数: SqsQueueProcessorSingleMessageProcessorInterface
- 此参数是处理消息的核心接口。
- 处理消息的工作应在此处完成。
- 它应返回一个
SqsQueueProcessorSingleMessageProcessorResult
- 对于成功处理完成的消息,返回
SqsQueueProcessorSingleMessageProcessorResult::newSuccessResult()
。 - 对于您想延迟处理的消息,返回
SqsQueueProcessorSingleMessageProcessorResult::newDelayedProcessingResult(...)
并带有新的可见性超时。 - 对于处理过程中出现错误的消息,返回
SqsQueueProcessorSingleMessageProcessorResult::newFailureResult(...)
,可能带有新的可见性超时。
- 对于成功处理完成的消息,返回
- 建议捕获任何
Throwable
,执行清理并返回一个失败,但未捕获的异常将被视为失败。
参数: SqsQueueProcessorErrorReportingInterface
- 此参数提供错误处理能力。
- 以下是处理不同错误类型的建议方法
MSG_MARKED_AS_COMPLETED
- 将原始SqsMessage
存储在 S3 中,并发送警报或创建任务以审查消息,以确保其已成功处理。如果需要,可以使用 S3 数据重新排队消息。MSG_MARKED_AS_IN_PROGRESS
- 如果消息被标记为正在处理一段时间,发送警报或创建任务以审查消息。这可能是因为在消息处理失败时未清除正在处理的标志。EXCEPTION_THROWN_HANDLING_MESSAGE
- 发送警报或创建任务以调查错误。FAILED_TO_MARK_AS_PROCESSED
- 发送警报。这通常不会造成太大问题。如果消息未能标记为已处理并且 SQS 消息被第二次接收(例如,如果 SQS 删除失败),则成为问题。FAILED_TO_DELETE_MSG
- 发送警报或创建任务以删除消息。FAILED_TO_CLEAR_MSG_IN_PROGRESS_FLAG
- 发送警报或创建任务以清除标志,以便稍后可以再次处理消息。
参数: LoggerInterface
- 此参数是
\Psr\Log\LoggerInterface
,因此任何该接口的实现都可以使用。 - 所有记录的消息将自动附加 SQS 消息 ID,以帮助区分不同 SQS 消息的日志消息。