ssigwart / sqs-queue-processor

处理 SQS 队列的库。

v1.0.0 2024-01-01 18:22 UTC

This package is auto-updated.

Last update: 2024-09-30 20:08:50 UTC


README

此库提供以下功能的 SQS 队列处理器:

  • 重复消息处理。
  • 延迟重试功能。
  • 错误处理选项。

安装

composer require ssigwart/sqs-queue-processor

使用方法

使用以下内容设置和运行队列处理器。

$sqsQueueProcessor = new SqsQueueProcessor(/* Fill in parameters */);
$sqsQueueProcessor->processMessages();

参数: SqsQueueProcessorConfiguration

  • 此参数指定在单个 SQS ReceiveMessage 调用中返回的最大消息数量(默认 10),用于接收消息的可见性超时(默认 300)以及等待消息的秒数(默认 20)。
  • 您还可以控制在处理消息时记录的内容。

参数: SqsQueueProcessorTimingInterface

  • 此参数将通知队列处理器何时停止处理消息,使用 shouldStopProcessingMessages
  • 要无限循环,始终返回 false。
  • 您还可以检查时间,在给定时间后停止它。
  • 您还可以让同一对象实现此接口和 SqsQueueProcessorMessageProviderInterface,并在收到空消息列表后停止。

参数: SqsQueueProcessorCleanupInterface

  • 此参数包括清理函数,如 cleanUpAfterExceptionProcessingMessage。如果使用事务性数据库,您可能需要在数据库上调用 ROLLBACK

参数: SqsQueueProcessorMessageProviderInterface

  • 此参数用于获取消息,删除消息以及更新消息的可见性超时。
  • 建议使用 AwsHighAvailabilitySqsMessageProvider,它实现了此接口。

参数: SqsQueueProcessorMessageStatusInterface

  • 此参数提供函数以确定消息是否正在处理或已完成处理。
  • 实现此接口的建议方法包括 Memcached、Redis 或数据库。

参数: SqsQueueProcessorSingleMessageProcessorInterface

  • 此参数是处理消息的核心接口。
  • 处理消息的工作应在此处完成。
  • 它应返回一个 SqsQueueProcessorSingleMessageProcessorResult
    • 对于成功处理完成的消息,返回 SqsQueueProcessorSingleMessageProcessorResult::newSuccessResult()
    • 对于您想延迟处理的消息,返回 SqsQueueProcessorSingleMessageProcessorResult::newDelayedProcessingResult(...) 并带有新的可见性超时。
    • 对于处理过程中出现错误的消息,返回 SqsQueueProcessorSingleMessageProcessorResult::newFailureResult(...),可能带有新的可见性超时。
  • 建议捕获任何 Throwable,执行清理并返回一个失败,但未捕获的异常将被视为失败。

参数: SqsQueueProcessorErrorReportingInterface

  • 此参数提供错误处理能力。
  • 以下是处理不同错误类型的建议方法
    • MSG_MARKED_AS_COMPLETED - 将原始 SqsMessage 存储在 S3 中,并发送警报或创建任务以审查消息,以确保其已成功处理。如果需要,可以使用 S3 数据重新排队消息。
    • MSG_MARKED_AS_IN_PROGRESS - 如果消息被标记为正在处理一段时间,发送警报或创建任务以审查消息。这可能是因为在消息处理失败时未清除正在处理的标志。
    • EXCEPTION_THROWN_HANDLING_MESSAGE - 发送警报或创建任务以调查错误。
    • FAILED_TO_MARK_AS_PROCESSED - 发送警报。这通常不会造成太大问题。如果消息未能标记为已处理并且 SQS 消息被第二次接收(例如,如果 SQS 删除失败),则成为问题。
    • FAILED_TO_DELETE_MSG - 发送警报或创建任务以删除消息。
    • FAILED_TO_CLEAR_MSG_IN_PROGRESS_FLAG - 发送警报或创建任务以清除标志,以便稍后可以再次处理消息。

参数: LoggerInterface

  • 此参数是 \Psr\Log\LoggerInterface,因此任何该接口的实现都可以使用。
  • 所有记录的消息将自动附加 SQS 消息 ID,以帮助区分不同 SQS 消息的日志消息。