pmg/sqs-transport

SQS的Symfony Messenger传输

v0.5.0 2024-02-08 15:51 UTC

This package is auto-updated.

Last update: 2024-08-27 16:46:37 UTC


README

⚠️ 弃用通知:此项目不再维护,可能无法按预期工作。请使用Alli Platform Bundle代替。有关更多信息,请参阅此迁移指南:从PMG SQS Transport迁移到Alli Platform Bundle

安装

composer require pmg/sqs-transport

包使用

PMG\SqsTransport\Bundle\PmgSqsTransportBundle添加到您的应用程序kernel中。默认情况下,传输包将在创建传输工厂和扩展的传输实例时使用aws.sqs服务。此服务名称可配置,但应与AWS Bundle兼容。

class AppKernel extends Kernel
{
    // ...

    public function registerBundles()
    {
        $bundles = [
            new \Aws\Symfony\AwsBundle(),
            new \PMG\SqsTransport\Bundle\PmgSqsTransportBundle(),
        ];

        // ...

        return $bundles;
    }
}

SqsClient服务配置

如果您不使用AwsBundle并且想手动指定包含Aws\Sqs\SqsClient实例的服务,则需要一些包配置。

pmg_sqs_transport:
  sqs_client_service: your_sqs_service_id

消息传递配置

framework:
  # ...
  messenger:
    transports:
      # will create a transport with a queue URL of
      # https://queue.amazonaws.com/80398EXAMPLE/MyQueue
      sqs: sqs://queue.amazonaws.com/80398EXAMPLE/MyQueue
      
      # this will also make an https URL:
      # https://queue.amazonaws.com/80398EXAMPLE/MyQueue
      sqs_https: sqs+https://queue.amazonaws.com/80398EXAMPLE/MyQueue

      # or you may wish to use `http://`, like if running a localstack
      # instance for local dev. Queue url would be https://:4576/queue/MyQueue
      sqs_http: sqs+https://:4576/queue/MyQueue

      # specify how many message to receive at once with query params
      # must be >= 1 and <= 10
      sqs: sqs://queue.amazonaws.com/80398EXAMPLE/MyQueue?receive_count=10

      # specify a wait timeout when making a call to receive messages (in seconds)
      sqs: sqs://queue.amazonaws.com/80398EXAMPLE/MyQueue?receive_wait=10

      # or specify those things in `options`
      sqs:
        dsn: sqs://queue.amazonaws.com/80398EXAMPLE/MyQueue
        options:
          receive_wait: 10
          receive_count: 5

SQS标签

  • PMG\SqsTransport\Stamp\SqsReceiptHandleStamp:在通过get方法从SQS接收到消息时添加。这允许稍后对消息进行ackreject
  • PMG\SqsTransport\Stamp\SqsStringAttributeStamp:允许您添加一个自定义的消息属性,其中DataType设置为String,字符串值。
  • PMG\SqsTransport\Stamp\SqsNumberAttributeStamp:允许您添加一个自定义的消息属性,其中DataType设置为Number,数值。
$messageBus->dispatch(new YourMessage(), [
  new SqsStringAttributeStamp('stringAttributeName', 'attributeValue'),
  new SqsNumberAttributeStamp('numberAttributeName', 123),
]);

SQS每条消息最多有10个属性,但传输通常至少使用一个属性来发送传输序列化器的headers

重试时

symfony messenger worker修改包含一些重试元数据的信封。由于SQS中的消息无法就地修改,因此我们只需在发生这种情况时将新消息放入队列,而随后的ack调用将删除现有消息。