zfr / zfr-eb-worker
针对 AWS SQS 的 Elastic Beanstalk 工作环境进行薄封装
Requires
- php: ^7.1
- aws/aws-sdk-php: ^3.0
- psr/container: ^1.0
- psr/http-message: ^1.0
- webimpress/http-middleware-compatibility: ^0.1.4
Requires (Dev)
- guzzlehttp/guzzle: ^6.0
- phpunit/phpunit: ~5.1
- squizlabs/php_codesniffer: ^2.3
- symfony/console: ^3.0
- zendframework/zend-diactoros: ^1.3
Suggests
- guzzlehttp/guzzle: If you want to use the CLI local worker and publisher (^6.0)
- symfony/console: If you want to use the CLI local worker and publisher (^3.0)
README
ZfrEbWorker 是围绕 SQS 的一种简单封装,旨在简化在 Elastic Beanstalk 中创建应用程序。
依赖关系
- PHP 7.1+
安装
仅通过 Composer 支持 ZfrEbWorker 的官方安装
php composer.phar require 'zfr/zfr-eb-worker:6.*'
Elastic Beanstalk 如何工作?
您可以在此处了解更多有关 Elastic Beanstalk 工作器/CRON 的工作原理:http://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html
注意,如果返回 200,Elastic Beanstalk 会自动删除队列中的消息。这就是为什么这个包没有“deleteMessage”的原因。
使用方法
库配置
AWS 配置
ZfrEbWorker 期望您将 Aws\Sdk
类注册到您选择的容器中。您可以自由地以您喜欢的任何方式配置 SDK,因此 ZfrEbWorker 不包含默认工厂。
以下是一个简单的 ContainerInterop
兼容工厂示例
<?php use Aws\Sdk as AwsSdk; use Interop\Container\ContainerInterface; use RuntimeException; class AwsSdkFactory { /** * @param ContainerInterface $container * @return AwsSdk */ public function __invoke(ContainerInterface $container): AwsSdk { $config = $container->get('config'); if (!isset($config['aws'])) { throw new RuntimeException('Key "aws" is missing'); } return new AwsSdk($config['aws']); } }
然后注册您的工厂(此示例使用 Zend\ServiceManager 风格的配置)
use Aws\Sdk as AwsSdk; return [ 'dependencies' => [ 'factories' => [ AwsSdk::class => AwsSdkFactory::class ] ] ];
最后,修改您的配置以将 aws
键添加到配置文件中(然后,请遵循官方 AWS SDK 文档以了解所有可能的键)
return [ 'aws' => [ 'region' => 'us-east-1', // Replace by your region 'Sqs' => ['version' => '2012-11-05'], // Add all your other services 'credentials' => [ 'key' => 'YOUR_USER_KEY', 'secret' => 'YOUR_SECRET_KEY' ] ] ];
工作器配置
首先,确保通过添加以下配置来配置 ZfrEbWorker 库
'zfr_eb_worker' => [ 'queues' => [ 'first_queue' => 'https://sqs.us-east-1.amazon.com/foo', 'second_queue' => 'https://sqs.us-east-1.amazon.com/bar' ], 'messages' => [ 'project.created' => SendCampaignListener::class, 'image.saved' => ProcessImageListener::class ]
queues
是 AWS SQS 上托管队列名称和队列 URL 的关联数组,而 messages
是将消息名称映射到特定监听器的关联数组(每个监听器只是一个标准的中间件)。
注册 WorkerMiddleware
您应该在您的路由器中注册 WorkerMiddleware
以响应对 "/internal/worker" 路径的请求。此中间件消耗由 Elastic Beanstalk 工作环境发送的消息并将路由到映射的监听器。例如,在 Zend Expressive 中
use ZfrEbWorker\Middleware\WorkerMiddleware; $app->post('/internal/worker', WorkerMiddleware::class);
配置 Elastic Beanstalk
然后,您应该配置您的 Elastic Beanstalk 工作环境以将消息推送到 "/internal/worker" URL(如果您使用 Zend Expressive,这是默认配置的 URL)。默认情况下,ZfrEbWorker 执行额外的安全检查以确保请求来自 localhost(因为守护程序直接安装在 EC2 实例上并在本地推送消息)
推送消息
您可以通过将 MessageQueueInterface
对象注入到您的类中来推送消息。
您可以使用 MessageQueueRepositoryInterface
实例轻松创建预配置的队列。例如,假设以下配置
'zfr_eb_worker' => [ 'queues' => [ 'first_queue' => 'https://sqs.us-east-1.amazon.com/foo' ] ]
您可以使用存储库创建队列,配置了 URL
$queue = $queueRepository->getMessageQueue('first_queue');
一旦您有一个配置好的队列,您就可以添加一个或多个消息,然后刷新队列。刷新时,库将确保尽可能少地调用 SQS(使用优化的 SQS 批量 API)以及多个队列
$queue->push(new Message('image.saved', ['image_id' => 123])); $queue->push(new Message('imave.saved', ['image_id' => 456])); // ... $queue->flush();
push
方法接受一个 MessageInterface
对象作为第一个参数,它是消息名称和有效负载的薄包装。ZfrEbWorker 提供了一个默认的 Message
类。
您还可以使用专门的 DelayedMessage 类推送延迟消息(最长 600 秒)
示例用法
$queue->push(new DelayedMessage('image.saved', ['image_id' => 123], 60));
注意:如果您使用 FIFO 队列,则此操作不会有任何效果。在 FIFO 队列中,延迟只能全局应用于队列。
FIFO 队列
从版本 6 开始,ZfrEbWorker 支持 FIFO 队列。您可以在第三个和第四个参数中分别提供自定义组 ID 和去重 ID
$message = new Message('image.saved', ['image_id' => 123], 'group_id', 'deduplication_id'); $queue->push($message);
如果您不提供组 ID,将生成一个默认的组 ID。
检索消息信息
ZfrEbWorker会自动将传入的请求调度到为给定事件指定的中间件。消息信息存储在以下各种请求属性中
use ZfrEbWorker\Middleware\WorkerMiddleware; class MyEventMiddleware { public function __invoke($request, $response, $out) { $queue = $request->getAttribute(WorkerMiddleware::MATCHED_QUEUE_ATTRIBUTE); $messageId = $request->getAttribute(WorkerMiddleware::MESSAGE_ID_ATTRIBUTE); $messagePayload = $request->getAttribute(WorkerMiddleware::MESSAGE_PAYLOAD_ATTRIBUTE); $name = $request->getAttribute(WorkerMiddleware::MESSAGE_NAME_ATTRIBUTE); } }
注意:对于周期性任务,只有
Middleware::MESSAGE_NAME_ATTRIBUTE
可用。
如何静默忽略某些消息?
当ZfrEbWorker找不到处理消息的映射中间件时,它会抛出RuntimeException
,这使得Elastic Beanstalk稍后再次重试消息。但是,如果您不想处理特定消息且不想让Elastic Beanstalk稍后重试,您应将SilentFailingListener映射到该消息,如下所示
'zfr_eb_worker' => [ 'messages' => [ 'user.updated' => ZfrEbWorker\Listener\SilentFailingListener::class, ]
如何使用周期性任务?
Elastic Beanstalk还支持通过使用cron.yaml
文件(更多信息请参阅这里)来实现周期性任务。ZfrEbWorker以相同、统一的方式支持此用例。
简单地将所有周期性任务重定向到同一个"/internal/worker"路由,并确保您使用的任务名称是配置的一部分。例如,这里有一个名为"image.backup"的任务,每12小时运行一次
version: 1 cron: - name: "image.backup" url: "/internal/worker" schedule: "0 */12 * * *"
然后,在您的ZfrEbWorker配置中,只需像配置任何其他消息一样配置它
'zfr_eb_worker' => [ 'messages' => [ 'image.backup' => ImageBackupListener::class, ]
CLI命令
从版本3.3开始,ZfrEbWorker包含Symfony CLI命令,允许
- 轻松将消息推送到遵循ZfrEbWorker格式的队列中。
- 模拟原生Elastic Beanstalk工作者的使用,以获取消息并执行它们。
这个本地工作者仅适用于开发。在生产环境中,您应该使用原生Elastic Beanstalk工作者,它更快(在一次SQS调用中检索多达10条消息),并且内置在Elastic Beanstalk AMI中(它受监控...)。
设置
在使用这些CLI命令之前,您需要设置一些事情,具体请参阅以下章节。
添加依赖项
请确保您已在项目中添加了这两个依赖项(通常在composer.json
文件的require-dev
部分中)
{ "require-dev": { "symfony/console": "^3.0", "guzzlehttp/guzzle": "^6.0" } }
添加控制台入口点
ZfrEbWorker将WorkerCommand
和PublisherCommand
Symfony CLI命令添加到配置的console
顶级键。如果您使用的是已经使用Symfony CLI的框架,只需添加ZfrEbWorker\Cli\WorkerCommand
和/或ZfrEbWorker\Cli\PublisherCommand
命令。
如果您使用Zend\Expressive,这里是一个示例文件(例如,可以将其命名为console.php
),您可以将它添加到public
文件夹中,与您的index.php
文件一起
use Symfony\Component\Console\Application; chdir(dirname(__DIR__)); require 'vendor/autoload.php'; /** @var \Interop\Container\ContainerInterface $container */ $container = require 'config/container.php'; $application = new Application('Application console'); $commands = $container->get('config')['console']['commands']; foreach ($commands as $command) { $application->add($container->get($command)); } $application->run();
添加IAM权限
为了允许本地工作者工作,您需要将sqs:GetQueueUrl
、sqs:ReceiveMessage
、sqs:DeleteMessage
和sqs:SendMessage
权限添加到您在本地使用的IAM用户。
出于安全原因,我们建议您拥有生产和开发队列,以便您的开发IAM用户只能访问开发队列,不能干扰生产队列。
PublisherCommand
此命令允许轻松将消息添加到Elastic Beanstalk工作者。
使用以下命令:php console.php eb-publisher --payload="foo=bar&bar=baz" --queue=my-queue --name=user.created
payload
键支持HTML-like查询参数,因此如果您想添加以下JSON
{ "user": { "first_name": "John", "last_name": "Doe" } }
您可以使用以下payload:--payload='user[first_name]=John&user[last_name]=Doe'
WorkerCommand
此命令允许模拟原生Elastic Beanstalk工作者。
现在,您可以编写命令php console.php eb-worker --server=http://localhost --queue=my-queue
。此代码将自动轮询名为my-queue
的队列,并将消息推送到由server
选项指示的URL,其中添加了/internal/worker
路径(这是ZfrEbWorker的默认配置)。
因此,在这个示例中,每当添加一条新消息时,本地工作者将会向http://localhost/internal/worker
发起POST请求。本地工作者的行为与原生的Elastic Beanstalk工作者完全相同,并且添加了所有相同的HTTP头。