bref / symfony-messenger
Symfony Messenger 与 AWS Lambda 上的 SQS 和 SNS 的桥梁
Requires
- php: >=8.0
- ext-json: *
- async-aws/event-bridge: ^1.0
- async-aws/sns: ^1.0
- async-aws/sqs: ^1.2|^2.0
- bref/bref: ^1.5 || ^2.0
- symfony/amazon-sqs-messenger: ^5.4 || ^6.0 || ^7.0
- symfony/config: ^5.4 || ^6.0 || ^7.0
- symfony/dependency-injection: ^5.4 || ^6.0 || ^7.0
- symfony/http-kernel: ^5.4 || ^6.0 || ^7.0
- symfony/messenger: ^5.4 || ^6.0 || ^7.0
- symfony/yaml: ^5.4 || ^6.0 || ^7.0
Requires (Dev)
- matthiasnoback/symfony-dependency-injection-test: ^4.3 || ^5.0
- mnapoli/hard-mode: ^0.3.0
- nyholm/symfony-bundle-test: ^3.0
- phpspec/prophecy: ^1.15
- phpspec/prophecy-phpunit: ^2.0
- phpstan/phpstan: ^1.7.10
- phpunit/phpunit: ^9.4
- symfony/framework-bundle: ^5.4 || ^6.0 || ^7.0
README
通过 Bref 使用 Symfony Messenger 在 AWS Lambda 上。
此桥梁允许消息被发送到 SQS、SNS 或 EventBridge,同时工作程序在 AWS Lambda 上处理这些消息。
安装
本指南假设
- Symfony 已安装
- Symfony Messenger 已安装
- Bref 已安装,并且配置为部署 Symfony
首先,安装此包
composer require bref/symfony-messenger
然后,在 config/bundles.php
中注册该包
return [ // ... Bref\Symfony\Messenger\BrefMessengerBundle::class => ['all' => true], ];
现在可以使用 SQS、SNS 和 EventBridge 与 Symfony Messenger 一起使用。
使用方法
Symfony Messenger 发送消息。要创建消息,请参阅 Symfony Messenger 文档。
要配置 消息 的发送位置,本文档中的所有示例均基于 Symfony 文档中的示例
# config/packages/messenger.yaml framework: messenger: transports: async: '%env(MESSENGER_TRANSPORT_DSN)%' routing: 'App\Message\MyMessage': async
SQS
SQS 服务是一个类似于 RabbitMQ 的队列。要使用它,请将其 URL 设置在环境变量 MESSENGER_TRANSPORT_DSN
中
MESSENGER_TRANSPORT_DSN=https://sqs.us-east-1.amazonaws.com/123456789/my-queue
就是这样,消息将被发送到该队列。
该实现使用由 Symfony Amazon SQS Messenger 提供的 SQS 传输,因此支持所有这些功能。如果您已经使用该传输,则切换到 AWS Lambda 非常容易,并且不需要对消息发送进行任何更改。
创建 SQS 队列
您可以在控制台中自行创建队列,编写自定义 Cloudformation 或使用 Lift 的 Queue 构造 来为您处理。
以下是一个使用 Lift 的简单示例,请确保首先 安装插件 并查看 完整文档 了解更多详情。
# serverless.yml service: my-app provider: name: aws environment: ... MESSENGER_TRANSPORT_DSN: ${construct:jobs.queueUrl} constructs: jobs: type: queue worker: handler: bin/consumer.php timeout: 20 # in seconds reservedConcurrency: 5 # max. 5 messages processed in parallel layers: - ${bref:layer.php-80} plugins: - serverless-lift
在所有情况下,您都希望禁用 auto_setup
以避免额外的请求和权限问题。
# config/packages/messenger.yaml framework: messenger: transports: async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: auto_setup: false
添加权限
当在 AWS Lambda 上运行 Symfony 时,不需要配置凭证。AWS 客户端将自动从环境变量中读取它们 (配置环境变量)。
您只需在 serverless.yml
中提供正确的 IAM 声明 即可。Lambda 将负责其余部分。使用 Messenger 向 SQS 发布所需的 IAM 权限是给定队列上的 sqs:SendMessage
。
如果您使用 Lift,则这会自动为您完成。
从 SQS 消费消息
- 如果您不使用 Lift,请在
serverless.yml
中创建将被 SQS 调用的函数
functions: worker: handler: bin/consumer.php timeout: 20 # in seconds reservedConcurrency: 5 # max. 5 messages processed in parallel layers: - ${bref:layer.php-80} events: # Read more at https://www.serverless.com/framework/docs/providers/aws/events/sqs/ - sqs: arn: arn:aws:sqs:us-east-1:1234567890:my_sqs_queue # Only 1 item at a time to simplify error handling batchSize: 1
- 创建处理脚本(例如
bin/consumer.php
)
<?php declare(strict_types=1); use Bref\Symfony\Messenger\Service\Sqs\SqsConsumer; require dirname(__DIR__) . '/config/bootstrap.php'; $kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']); $kernel->boot(); // Return the Bref consumer service return $kernel->getContainer()->get(SqsConsumer::class);
如果您使用的是 Symfony 5.1 或更高版本,请使用以下内容代替
<?php declare(strict_types=1); use Bref\Symfony\Messenger\Service\Sqs\SqsConsumer; use Symfony\Component\Dotenv\Dotenv; require dirname(__DIR__).'/vendor/autoload.php'; (new Dotenv())->bootEnv(dirname(__DIR__).'/.env'); $kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool)$_SERVER['APP_DEBUG']); $kernel->boot(); // Return the Bref consumer service return $kernel->getContainer()->get(SqsConsumer::class);
- 注册并配置
SqsConsumer
服务
# config/services.yaml services: Bref\Symfony\Messenger\Service\Sqs\SqsConsumer: public: true autowire: true arguments: # true enables partial SQS batch failure # Enabling this without proper SQS config will consider all your messages successful # See https://bref.sh/docs/function/handlers.html#partial-batch-response for more details. $partialBatchFailure: false
现在,每当有消息发送到 SQS 时,Lambda 函数将被调用。Bref 消费者类会将消息放回 Symfony Messenger 以进行处理。
FIFO 队列
FIFO 队列保证消息的精确一次交付,并且必须有后缀 .fifo
# config/packages/messenger.yaml framework: messenger: transports: async: dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue.fifo'
# serverless.yml resources: Resources: Queue: Type: AWS::SQS::Queue Properties: QueueName: my-queue.fifo FifoQueue: true
Symfony Amazon SQS Messenger 会自动计算/设置 FIFO 队列所需的 MessageGroupId
和 MessageDeduplicationId
参数,但您也可以显式设置它们
use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp; /* @var MessageBus $messageBus */ $messageBus->dispatch(new MyAsyncMessage(), [new AmazonSqsFifoStamp('my-group-message-id', 'my-deduplication-id')]);
其他方面与正常 SQS 队列相同。
SNS
AWS SNS 是“通知”而非“队列”。消息可能不会按照发送的顺序到达,它们可能一次性到达。要使用它,创建一个 SNS 主题并将其设置为 DSN
MESSENGER_TRANSPORT_DSN=sns://arn:aws:sns:us-east-1:1234567890:foobar
就这些,消息将被发送到该主题。
注意:当在 AWS Lambda 上运行 Symfony 时,不需要配置凭证。AWS 客户端会自动从环境变量中读取它们[链接]。
要从 SNS 消费消息
- 在
serverless.yml
中创建将被 SNS 调用的函数
functions: worker: handler: bin/consumer.php timeout: 20 # in seconds reservedConcurrency: 5 # max. 5 messages processed in parallel layers: - ${bref:layer.php-80} events: # Read more at https://www.serverless.com/framework/docs/providers/aws/events/sns/ - sns: arn: arn:aws:sns:us-east-1:1234567890:my_sns_topic
- 创建处理脚本(例如
bin/consumer.php
)
<?php declare(strict_types=1); use Bref\Symfony\Messenger\Service\Sns\SnsConsumer; require dirname(__DIR__) . '/config/bootstrap.php'; $kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']); $kernel->boot(); // Return the Bref consumer service return $kernel->getContainer()->get(SnsConsumer::class);
如果您使用的是 Symfony 5.1 或更高版本,请使用以下内容代替
<?php declare(strict_types=1); use Bref\Symfony\Messenger\Service\Sns\SnsConsumer; use Symfony\Component\Dotenv\Dotenv; require dirname(__DIR__).'/vendor/autoload.php'; (new Dotenv())->bootEnv(dirname(__DIR__).'/.env'); $kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']); $kernel->boot(); // Return the Bref consumer service return $kernel->getContainer()->get(SnsConsumer::class);
- 注册并配置
SnsConsumer
服务
# config/services.yaml services: Bref\Symfony\Messenger\Service\Sns\SnsConsumer: public: true autowire: true
现在,每当有消息发送到 SNS 时,Lambda 函数将被调用。Bref 消费者类会将消息放回 Symfony Messenger 以进行处理。
EventBridge
AWS EventBridge 是一个消息路由服务。它与 SNS 类似,但功能更强大。要使用它,按照以下方式配置 DSN
# "myapp" is the EventBridge "source", i.e. a namespace for your application's messages # This source name will be reused in `serverless.yml` later. MESSENGER_TRANSPORT_DSN=eventbridge://myapp
可选地,您可以通过 event_bus_name
查询参数设置 EventBusName,可以是名称或 ARN
MESSENGER_TRANSPORT_DSN=eventbridge://myapp?event_bus_name=custom-bus MESSENGER_TRANSPORT_DSN=eventbridge://myapp?event_bus_name=arn:aws:events:us-east-1:123456780912:event-bus/custom-bus
就这些,消息将被发送到 EventBridge。
注意:当在 AWS Lambda 上运行 Symfony 时,不需要配置凭证。AWS 客户端会自动从环境变量中读取它们[链接]。
要消费来自 EventBridge 的消息
- 在
serverless.yml
中创建将被 EventBridge 调用的函数
functions: worker: handler: bin/consumer.php timeout: 20 # in seconds reservedConcurrency: 5 # max. 5 messages processed in parallel layers: - ${bref:layer.php-80} events: # Read more at https://www.serverless.com/framework/docs/providers/aws/events/event-bridge/ - eventBridge: # In case of you change bus name in config/packages/messenger.yaml (i.e eventbridge://myapp?event_bus_name=custom-bus) you need to set bus name like below # eventBus: custom-bus # This filters events we listen to: only events from the "myapp" source. # This should be the same source defined in config/packages/messenger.yaml pattern: source: - myapp
- 创建处理脚本(例如
bin/consumer.php
)
<?php declare(strict_types=1); use Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer; require dirname(__DIR__) . '/config/bootstrap.php'; $kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']); $kernel->boot(); // Return the Bref consumer service return $kernel->getContainer()->get(EventBridgeConsumer::class);
如果您使用的是 Symfony 5.1 或更高版本,请使用以下内容代替
<?php declare(strict_types=1); use Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer; use Symfony\Component\Dotenv\Dotenv; require dirname(__DIR__).'/vendor/autoload.php'; (new Dotenv())->bootEnv(dirname(__DIR__).'/.env'); $kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']); $kernel->boot(); // Return the Bref consumer service return $kernel->getContainer()->get(EventBridgeConsumer::class);
- 注册并配置
EventBridgeConsumer
服务
# config/services.yaml services: Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer: public: true autowire: true arguments: $transportName: 'async' # Optionnally, if you have different buses in config/packages/messenger.yaml, set $bus like below: # $bus: '@event.bus'
现在,每当有消息发送到为该来源配置的 EventBridge 时,Lambda 函数将被调用。Bref 消费者类会将消息放回 Symfony Messenger 以进行处理。
错误处理
AWS Lambda 有错误处理机制(重试和处理失败的消息)。因此,此包不集成 Symfony Messenger 的重试机制。相反,它与 Lambda 的重试机制协同工作。
本节仍在进行中,欢迎贡献以改进它。
当消息在 SQS 中失败时,默认情况下会返回到 SQS 队列。它将重试,直到消息过期。以下是一个使用 SQS 设置重试和“死信队列”的示例
# serverless.yml resources: Resources: Queue: Type: AWS::SQS::Queue Properties: # This needs to be at least 6 times the lambda function's timeout # See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html VisibilityTimeout: '960' RedrivePolicy: deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn # Jobs will be retried 5 times # The number needs to be at least 5 per https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html maxReceiveCount: 5 # The dead letter queue is a SQS queue that receives messages that failed to be processed DeadLetterQueue: Type: AWS::SQS::Queue Properties: # Messages are stored up to 14 days (the max) MessageRetentionPeriod: 1209600
当使用 SNS 和 EventBridge 时,默认情况下消息将重试 2 次。
配置
配置 AWS 客户端
默认情况下,AWS 客户端(SQS、SNS、EventBridge)已预配置以在 AWS Lambda 上工作(感谢 AWS Lambda 填充的环境变量[链接])。
但是,您可以自定义 AWS 客户端,例如在 AWS Lambda 之外使用它们(本地、在 EC2 上)或模拟它们进行测试。这些客户端作为 Symfony 服务注册,键为
bref.messenger.sqs_client
bref.messenger.sns_client
bref.messenger.eventbridge_client
例如,要自定义 SQS 客户端
services: bref.messenger.sqs_client: class: AsyncAws\Sqs\SqsClient public: true # the AWS clients must be public arguments: # Apply your own config here - region: us-east-1
自动传输识别
自动传输识别主要由默认的TransportNameResolvers处理,适用于SNS和SQS,确保传输名称自动传递到您的消息处理器。然而,在需要手动指定传输名称或调整默认行为的情况下,您可以通过在config/services.yaml文件中您的服务定义中设置$transportName
参数来实现。此参数应与config/packages/messenger.yaml中定义的传输名称匹配。例如,对于一个SNSConsumer,您可以这样配置:
# config/packages/messenger.yaml framework: messenger: transports: async: '%env(MESSENGER_TRANSPORT_DSN)%'
# config/services.yaml services: Bref\Symfony\Messenger\Service\Sns\SnsConsumer: public: true autowire: true arguments: # Pass the transport name used in config/packages/messenger.yaml $transportName: 'async'
禁用传输
默认情况下,此包注册了Symfony Messenger的SQS、SNS和EventBridge传输。
如果您想禁用某些传输(例如在冲突的情况下),可以删除BrefMessengerBundle
从config/bundles.php
,并在您的应用程序配置中重新配置您想要的传输。查看Resources/config/services.yaml
以复制您想要的部分。
自定义序列化器
如果您想改变消息的序列化方式,例如使用Happyr消息序列化器,您需要在传输和消费者两端都添加序列化器。例如:
# config/packages/messenger.yaml framework: messenger: transports: async: dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue' serializer: 'Happyr\MessageSerializer\Serializer' # config/services.yaml services: Bref\Symfony\Messenger\Service\Sqs\SqsConsumer: public: true autowire: true arguments: $serializer: '@Happyr\MessageSerializer\Serializer'