bref/symfony-messenger

Symfony Messenger 与 AWS Lambda 上的 SQS 和 SNS 的桥梁

资助包维护!
mnapoli

1.3.3 2024-08-16 21:15 UTC

README

通过 Bref 使用 Symfony Messenger 在 AWS Lambda 上。

此桥梁允许消息被发送到 SQS、SNS 或 EventBridge,同时工作程序在 AWS Lambda 上处理这些消息。

安装

本指南假设

首先,安装此包

composer require bref/symfony-messenger

然后,在 config/bundles.php 中注册该包

return [
    // ...
    Bref\Symfony\Messenger\BrefMessengerBundle::class => ['all' => true],
];

现在可以使用 SQS、SNS 和 EventBridge 与 Symfony Messenger 一起使用。

使用方法

Symfony Messenger 发送消息。要创建消息,请参阅 Symfony Messenger 文档

要配置 消息 的发送位置,本文档中的所有示例均基于 Symfony 文档中的示例

# config/packages/messenger.yaml

framework:
    messenger:
        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'
        routing:
             'App\Message\MyMessage': async

SQS

SQS 服务是一个类似于 RabbitMQ 的队列。要使用它,请将其 URL 设置在环境变量 MESSENGER_TRANSPORT_DSN

MESSENGER_TRANSPORT_DSN=https://sqs.us-east-1.amazonaws.com/123456789/my-queue

就是这样,消息将被发送到该队列。

该实现使用由 Symfony Amazon SQS Messenger 提供的 SQS 传输,因此支持所有这些功能。如果您已经使用该传输,则切换到 AWS Lambda 非常容易,并且不需要对消息发送进行任何更改。

创建 SQS 队列

您可以在控制台中自行创建队列,编写自定义 Cloudformation 或使用 Lift 的 Queue 构造 来为您处理。

以下是一个使用 Lift 的简单示例,请确保首先 安装插件 并查看 完整文档 了解更多详情。

# serverless.yml

service: my-app
provider:
    name: aws
    environment:
        ...
        MESSENGER_TRANSPORT_DSN: ${construct:jobs.queueUrl}

constructs:
    jobs:
        type: queue
        worker:
            handler: bin/consumer.php
            timeout: 20 # in seconds
            reservedConcurrency: 5 # max. 5 messages processed in parallel
            layers:
                - ${bref:layer.php-80}

plugins:
    - serverless-lift

在所有情况下,您都希望禁用 auto_setup 以避免额外的请求和权限问题。

# config/packages/messenger.yaml

framework:
    messenger:
        transports:
            async: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    auto_setup: false

添加权限

当在 AWS Lambda 上运行 Symfony 时,不需要配置凭证。AWS 客户端将自动从环境变量中读取它们 (配置环境变量)

您只需在 serverless.yml 中提供正确的 IAM 声明 即可。Lambda 将负责其余部分。使用 Messenger 向 SQS 发布所需的 IAM 权限是给定队列上的 sqs:SendMessage

如果您使用 Lift,则这会自动为您完成。

从 SQS 消费消息

  1. 如果您不使用 Lift,请在 serverless.yml 中创建将被 SQS 调用的函数
functions:
    worker:
        handler: bin/consumer.php
        timeout: 20 # in seconds
        reservedConcurrency: 5 # max. 5 messages processed in parallel
        layers:
            - ${bref:layer.php-80}
        events:
            # Read more at https://www.serverless.com/framework/docs/providers/aws/events/sqs/
            - sqs:
                arn: arn:aws:sqs:us-east-1:1234567890:my_sqs_queue
                # Only 1 item at a time to simplify error handling
                batchSize: 1
  1. 创建处理脚本(例如 bin/consumer.php
<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\Sqs\SqsConsumer;

require dirname(__DIR__) . '/config/bootstrap.php';

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(SqsConsumer::class);

如果您使用的是 Symfony 5.1 或更高版本,请使用以下内容代替

<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\Sqs\SqsConsumer;
use Symfony\Component\Dotenv\Dotenv;

require dirname(__DIR__).'/vendor/autoload.php';

(new Dotenv())->bootEnv(dirname(__DIR__).'/.env');

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool)$_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(SqsConsumer::class);
  1. 注册并配置 SqsConsumer 服务
# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sqs\SqsConsumer:
        public: true
        autowire: true
        arguments:
            # true enables partial SQS batch failure
            # Enabling this without proper SQS config will consider all your messages successful
            # See https://bref.sh/docs/function/handlers.html#partial-batch-response for more details.
            $partialBatchFailure: false

现在,每当有消息发送到 SQS 时,Lambda 函数将被调用。Bref 消费者类会将消息放回 Symfony Messenger 以进行处理。

FIFO 队列

FIFO 队列保证消息的精确一次交付,并且必须有后缀 .fifo

# config/packages/messenger.yaml

framework:
    messenger:
        transports:
            async: 
                dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue.fifo'
# serverless.yml
resources:
    Resources:
        Queue:
            Type: AWS::SQS::Queue
            Properties:
                QueueName: my-queue.fifo
                FifoQueue: true

Symfony Amazon SQS Messenger 会自动计算/设置 FIFO 队列所需的 MessageGroupIdMessageDeduplicationId 参数,但您也可以显式设置它们

use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;

/* @var MessageBus $messageBus */
$messageBus->dispatch(new MyAsyncMessage(), [new AmazonSqsFifoStamp('my-group-message-id', 'my-deduplication-id')]);

其他方面与正常 SQS 队列相同。

SNS

AWS SNS 是“通知”而非“队列”。消息可能不会按照发送的顺序到达,它们可能一次性到达。要使用它,创建一个 SNS 主题并将其设置为 DSN

MESSENGER_TRANSPORT_DSN=sns://arn:aws:sns:us-east-1:1234567890:foobar

就这些,消息将被发送到该主题。

注意:当在 AWS Lambda 上运行 Symfony 时,不需要配置凭证。AWS 客户端会自动从环境变量中读取它们[链接]

要从 SNS 消费消息

  1. serverless.yml 中创建将被 SNS 调用的函数
functions:
    worker:
        handler: bin/consumer.php
        timeout: 20 # in seconds
        reservedConcurrency: 5 # max. 5 messages processed in parallel
        layers:
            - ${bref:layer.php-80}
        events:
            # Read more at https://www.serverless.com/framework/docs/providers/aws/events/sns/
            - sns:
                arn: arn:aws:sns:us-east-1:1234567890:my_sns_topic
  1. 创建处理脚本(例如 bin/consumer.php
<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\Sns\SnsConsumer;

require dirname(__DIR__) . '/config/bootstrap.php';

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(SnsConsumer::class);

如果您使用的是 Symfony 5.1 或更高版本,请使用以下内容代替

<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\Sns\SnsConsumer;
use Symfony\Component\Dotenv\Dotenv;

require dirname(__DIR__).'/vendor/autoload.php';

(new Dotenv())->bootEnv(dirname(__DIR__).'/.env');

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(SnsConsumer::class);
  1. 注册并配置 SnsConsumer 服务
# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sns\SnsConsumer:
        public: true
        autowire: true

现在,每当有消息发送到 SNS 时,Lambda 函数将被调用。Bref 消费者类会将消息放回 Symfony Messenger 以进行处理。

EventBridge

AWS EventBridge 是一个消息路由服务。它与 SNS 类似,但功能更强大。要使用它,按照以下方式配置 DSN

# "myapp" is the EventBridge "source", i.e. a namespace for your application's messages
# This source name will be reused in `serverless.yml` later.
MESSENGER_TRANSPORT_DSN=eventbridge://myapp

可选地,您可以通过 event_bus_name 查询参数设置 EventBusName,可以是名称或 ARN

MESSENGER_TRANSPORT_DSN=eventbridge://myapp?event_bus_name=custom-bus
MESSENGER_TRANSPORT_DSN=eventbridge://myapp?event_bus_name=arn:aws:events:us-east-1:123456780912:event-bus/custom-bus

就这些,消息将被发送到 EventBridge。

注意:当在 AWS Lambda 上运行 Symfony 时,不需要配置凭证。AWS 客户端会自动从环境变量中读取它们[链接]

要消费来自 EventBridge 的消息

  1. serverless.yml 中创建将被 EventBridge 调用的函数
functions:
    worker:
        handler: bin/consumer.php
        timeout: 20 # in seconds
        reservedConcurrency: 5 # max. 5 messages processed in parallel
        layers:
            - ${bref:layer.php-80}
        events:
            # Read more at https://www.serverless.com/framework/docs/providers/aws/events/event-bridge/
            -   eventBridge:
                    # In case of you change bus name in config/packages/messenger.yaml (i.e eventbridge://myapp?event_bus_name=custom-bus) you need to set bus name like below
                    # eventBus: custom-bus
                    # This filters events we listen to: only events from the "myapp" source.
                    # This should be the same source defined in config/packages/messenger.yaml
                    pattern:
                        source:
                            - myapp
  1. 创建处理脚本(例如 bin/consumer.php
<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer;

require dirname(__DIR__) . '/config/bootstrap.php';

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(EventBridgeConsumer::class);

如果您使用的是 Symfony 5.1 或更高版本,请使用以下内容代替

<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer;
use Symfony\Component\Dotenv\Dotenv;

require dirname(__DIR__).'/vendor/autoload.php';

(new Dotenv())->bootEnv(dirname(__DIR__).'/.env');

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(EventBridgeConsumer::class);
  1. 注册并配置 EventBridgeConsumer 服务
# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer:
        public: true
        autowire: true
        arguments:
            $transportName: 'async'
            # Optionnally, if you have different buses in config/packages/messenger.yaml, set $bus like below:
            # $bus: '@event.bus'

现在,每当有消息发送到为该来源配置的 EventBridge 时,Lambda 函数将被调用。Bref 消费者类会将消息放回 Symfony Messenger 以进行处理。

错误处理

AWS Lambda 有错误处理机制(重试和处理失败的消息)。因此,此包不集成 Symfony Messenger 的重试机制。相反,它与 Lambda 的重试机制协同工作。

本节仍在进行中,欢迎贡献以改进它。

当消息在 SQS 中失败时,默认情况下会返回到 SQS 队列。它将重试,直到消息过期。以下是一个使用 SQS 设置重试和“死信队列”的示例

# serverless.yml
resources:
    Resources:
        Queue:
            Type: AWS::SQS::Queue
            Properties:
                # This needs to be at least 6 times the lambda function's timeout
                # See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
                VisibilityTimeout: '960'
                RedrivePolicy:
                    deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
                    # Jobs will be retried 5 times
                    # The number needs to be at least 5 per https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
                    maxReceiveCount: 5
        # The dead letter queue is a SQS queue that receives messages that failed to be processed
        DeadLetterQueue:
            Type: AWS::SQS::Queue
            Properties:
                # Messages are stored up to 14 days (the max)
                MessageRetentionPeriod: 1209600

当使用 SNS 和 EventBridge 时,默认情况下消息将重试 2 次。

配置

配置 AWS 客户端

默认情况下,AWS 客户端(SQS、SNS、EventBridge)已预配置以在 AWS Lambda 上工作(感谢 AWS Lambda 填充的环境变量[链接])。

但是,您可以自定义 AWS 客户端,例如在 AWS Lambda 之外使用它们(本地、在 EC2 上)或模拟它们进行测试。这些客户端作为 Symfony 服务注册,键为

  • bref.messenger.sqs_client
  • bref.messenger.sns_client
  • bref.messenger.eventbridge_client

例如,要自定义 SQS 客户端

services:
    bref.messenger.sqs_client:
        class: AsyncAws\Sqs\SqsClient
        public: true # the AWS clients must be public
        arguments:
            # Apply your own config here
            -
                region: us-east-1

自动传输识别

自动传输识别主要由默认的TransportNameResolvers处理,适用于SNS和SQS,确保传输名称自动传递到您的消息处理器。然而,在需要手动指定传输名称或调整默认行为的情况下,您可以通过在config/services.yaml文件中您的服务定义中设置$transportName参数来实现。此参数应与config/packages/messenger.yaml中定义的传输名称匹配。例如,对于一个SNSConsumer,您可以这样配置:

# config/packages/messenger.yaml
framework:
  messenger:
    transports:
      async: '%env(MESSENGER_TRANSPORT_DSN)%'
# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sns\SnsConsumer:
        public: true
        autowire: true
        arguments:
            # Pass the transport name used in config/packages/messenger.yaml
            $transportName: 'async'

禁用传输

默认情况下,此包注册了Symfony Messenger的SQS、SNS和EventBridge传输。

如果您想禁用某些传输(例如在冲突的情况下),可以删除BrefMessengerBundleconfig/bundles.php,并在您的应用程序配置中重新配置您想要的传输。查看Resources/config/services.yaml以复制您想要的部分。

自定义序列化器

如果您想改变消息的序列化方式,例如使用Happyr消息序列化器,您需要在传输和消费者两端都添加序列化器。例如:

# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: 
                dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
                serializer: 'Happyr\MessageSerializer\Serializer'

# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sqs\SqsConsumer:
        public: true
        autowire: true
        arguments:
            $serializer: '@Happyr\MessageSerializer\Serializer'