zfr/zfr-eb-worker

针对 AWS SQS 的 Elastic Beanstalk 工作环境进行薄封装

6.1.0 2017-10-23 20:57 UTC

README

Build Status

ZfrEbWorker 是围绕 SQS 的一种简单封装,旨在简化在 Elastic Beanstalk 中创建应用程序。

依赖关系

  • PHP 7.1+

安装

仅通过 Composer 支持 ZfrEbWorker 的官方安装

php composer.phar require 'zfr/zfr-eb-worker:6.*'

Elastic Beanstalk 如何工作?

您可以在此处了解更多有关 Elastic Beanstalk 工作器/CRON 的工作原理:http://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html

注意,如果返回 200,Elastic Beanstalk 会自动删除队列中的消息。这就是为什么这个包没有“deleteMessage”的原因。

使用方法

库配置

AWS 配置

ZfrEbWorker 期望您将 Aws\Sdk 类注册到您选择的容器中。您可以自由地以您喜欢的任何方式配置 SDK,因此 ZfrEbWorker 不包含默认工厂。

以下是一个简单的 ContainerInterop 兼容工厂示例

<?php

use Aws\Sdk as AwsSdk;
use Interop\Container\ContainerInterface;
use RuntimeException;

class AwsSdkFactory
{
    /**
     * @param  ContainerInterface $container
     * @return AwsSdk
     */
    public function __invoke(ContainerInterface $container): AwsSdk
    {
        $config = $container->get('config');

        if (!isset($config['aws'])) {
            throw new RuntimeException('Key "aws" is missing');
        }

        return new AwsSdk($config['aws']);
    }
}

然后注册您的工厂(此示例使用 Zend\ServiceManager 风格的配置)

use Aws\Sdk as AwsSdk;

return [
    'dependencies' => [
        'factories' => [
            AwsSdk::class => AwsSdkFactory::class
        ]
    ]
];

最后,修改您的配置以将 aws 键添加到配置文件中(然后,请遵循官方 AWS SDK 文档以了解所有可能的键)

return [
    'aws' => [
        'region'      => 'us-east-1', // Replace by your region
        'Sqs'         => ['version' => '2012-11-05'], // Add all your other services
        'credentials' => [
            'key'    => 'YOUR_USER_KEY',
            'secret' => 'YOUR_SECRET_KEY'
        ]
    ]
];

工作器配置

首先,确保通过添加以下配置来配置 ZfrEbWorker 库

'zfr_eb_worker' => [
    'queues' => [
        'first_queue'  => 'https://sqs.us-east-1.amazon.com/foo',
        'second_queue' => 'https://sqs.us-east-1.amazon.com/bar'
    ],

    'messages' => [
        'project.created' => SendCampaignListener::class,
        'image.saved'     => ProcessImageListener::class
    ]

queues 是 AWS SQS 上托管队列名称和队列 URL 的关联数组,而 messages 是将消息名称映射到特定监听器的关联数组(每个监听器只是一个标准的中间件)。

注册 WorkerMiddleware

您应该在您的路由器中注册 WorkerMiddleware 以响应对 "/internal/worker" 路径的请求。此中间件消耗由 Elastic Beanstalk 工作环境发送的消息并将路由到映射的监听器。例如,在 Zend Expressive 中

use ZfrEbWorker\Middleware\WorkerMiddleware;

$app->post('/internal/worker', WorkerMiddleware::class);

配置 Elastic Beanstalk

然后,您应该配置您的 Elastic Beanstalk 工作环境以将消息推送到 "/internal/worker" URL(如果您使用 Zend Expressive,这是默认配置的 URL)。默认情况下,ZfrEbWorker 执行额外的安全检查以确保请求来自 localhost(因为守护程序直接安装在 EC2 实例上并在本地推送消息)

推送消息

您可以通过将 MessageQueueInterface 对象注入到您的类中来推送消息。

您可以使用 MessageQueueRepositoryInterface 实例轻松创建预配置的队列。例如,假设以下配置

'zfr_eb_worker' => [
    'queues' => [
        'first_queue'  => 'https://sqs.us-east-1.amazon.com/foo'
    ]
]

您可以使用存储库创建队列,配置了 URL

$queue = $queueRepository->getMessageQueue('first_queue');

一旦您有一个配置好的队列,您就可以添加一个或多个消息,然后刷新队列。刷新时,库将确保尽可能少地调用 SQS(使用优化的 SQS 批量 API)以及多个队列

$queue->push(new Message('image.saved', ['image_id' => 123]));
$queue->push(new Message('imave.saved', ['image_id' => 456]));

// ...

$queue->flush();

push 方法接受一个 MessageInterface 对象作为第一个参数,它是消息名称和有效负载的薄包装。ZfrEbWorker 提供了一个默认的 Message 类。

您还可以使用专门的 DelayedMessage 类推送延迟消息(最长 600 秒)

示例用法

$queue->push(new DelayedMessage('image.saved', ['image_id' => 123], 60));

注意:如果您使用 FIFO 队列,则此操作不会有任何效果。在 FIFO 队列中,延迟只能全局应用于队列。

FIFO 队列

从版本 6 开始,ZfrEbWorker 支持 FIFO 队列。您可以在第三个和第四个参数中分别提供自定义组 ID 和去重 ID

$message = new Message('image.saved', ['image_id' => 123], 'group_id', 'deduplication_id');
$queue->push($message);

如果您不提供组 ID,将生成一个默认的组 ID。

检索消息信息

ZfrEbWorker会自动将传入的请求调度到为给定事件指定的中间件。消息信息存储在以下各种请求属性中

use ZfrEbWorker\Middleware\WorkerMiddleware;

class MyEventMiddleware
{
    public function __invoke($request, $response, $out)
    {
        $queue          = $request->getAttribute(WorkerMiddleware::MATCHED_QUEUE_ATTRIBUTE);
        $messageId      = $request->getAttribute(WorkerMiddleware::MESSAGE_ID_ATTRIBUTE);
        $messagePayload = $request->getAttribute(WorkerMiddleware::MESSAGE_PAYLOAD_ATTRIBUTE);
        $name           = $request->getAttribute(WorkerMiddleware::MESSAGE_NAME_ATTRIBUTE);
    }
}

注意:对于周期性任务,只有Middleware::MESSAGE_NAME_ATTRIBUTE可用。

如何静默忽略某些消息?

当ZfrEbWorker找不到处理消息的映射中间件时,它会抛出RuntimeException,这使得Elastic Beanstalk稍后再次重试消息。但是,如果您不想处理特定消息且不想让Elastic Beanstalk稍后重试,您应将SilentFailingListener映射到该消息,如下所示

'zfr_eb_worker' => [
    'messages' => [
        'user.updated' => ZfrEbWorker\Listener\SilentFailingListener::class,
    ]

如何使用周期性任务?

Elastic Beanstalk还支持通过使用cron.yaml文件(更多信息请参阅这里)来实现周期性任务。ZfrEbWorker以相同、统一的方式支持此用例。

简单地将所有周期性任务重定向到同一个"/internal/worker"路由,并确保您使用的任务名称是配置的一部分。例如,这里有一个名为"image.backup"的任务,每12小时运行一次

version: 1
cron:
  - name: "image.backup"
    url: "/internal/worker"
    schedule: "0 */12 * * *"

然后,在您的ZfrEbWorker配置中,只需像配置任何其他消息一样配置它

'zfr_eb_worker' => [
    'messages' => [
        'image.backup' => ImageBackupListener::class,
    ]

CLI命令

从版本3.3开始,ZfrEbWorker包含Symfony CLI命令,允许

  • 轻松将消息推送到遵循ZfrEbWorker格式的队列中。
  • 模拟原生Elastic Beanstalk工作者的使用,以获取消息并执行它们。

这个本地工作者仅适用于开发。在生产环境中,您应该使用原生Elastic Beanstalk工作者,它更快(在一次SQS调用中检索多达10条消息),并且内置在Elastic Beanstalk AMI中(它受监控...)。

设置

在使用这些CLI命令之前,您需要设置一些事情,具体请参阅以下章节。

添加依赖项

请确保您已在项目中添加了这两个依赖项(通常在composer.json文件的require-dev部分中)

{
  "require-dev": {
    "symfony/console": "^3.0",
    "guzzlehttp/guzzle": "^6.0"
  }
}

添加控制台入口点

ZfrEbWorker将WorkerCommandPublisherCommand Symfony CLI命令添加到配置的console顶级键。如果您使用的是已经使用Symfony CLI的框架,只需添加ZfrEbWorker\Cli\WorkerCommand和/或ZfrEbWorker\Cli\PublisherCommand命令。

如果您使用Zend\Expressive,这里是一个示例文件(例如,可以将其命名为console.php),您可以将它添加到public文件夹中,与您的index.php文件一起

use Symfony\Component\Console\Application;

chdir(dirname(__DIR__));
require 'vendor/autoload.php';

/** @var \Interop\Container\ContainerInterface $container */
$container = require 'config/container.php';

$application = new Application('Application console');

$commands = $container->get('config')['console']['commands'];

foreach ($commands as $command) {
    $application->add($container->get($command));
}

$application->run();

添加IAM权限

为了允许本地工作者工作,您需要将sqs:GetQueueUrlsqs:ReceiveMessagesqs:DeleteMessagesqs:SendMessage权限添加到您在本地使用的IAM用户。

出于安全原因,我们建议您拥有生产和开发队列,以便您的开发IAM用户只能访问开发队列,不能干扰生产队列。

PublisherCommand

此命令允许轻松将消息添加到Elastic Beanstalk工作者。

使用以下命令:php console.php eb-publisher --payload="foo=bar&bar=baz" --queue=my-queue --name=user.created

payload键支持HTML-like查询参数,因此如果您想添加以下JSON

{
  "user": {
    "first_name": "John",
    "last_name": "Doe"
  }
}

您可以使用以下payload:--payload='user[first_name]=John&user[last_name]=Doe'

WorkerCommand

此命令允许模拟原生Elastic Beanstalk工作者。

现在,您可以编写命令php console.php eb-worker --server=http://localhost --queue=my-queue。此代码将自动轮询名为my-queue的队列,并将消息推送到由server选项指示的URL,其中添加了/internal/worker路径(这是ZfrEbWorker的默认配置)。

因此,在这个示例中,每当添加一条新消息时,本地工作者将会向http://localhost/internal/worker发起POST请求。本地工作者的行为与原生的Elastic Beanstalk工作者完全相同,并且添加了所有相同的HTTP头。