symbiote/silverstripe-sqs-jobqueue

一个用于与 AWS 的简单队列服务 (SQS) 交互的模块

安装次数: 8,224

依赖项: 1

建议者: 0

安全: 0

星星: 1

关注者: 3

分支: 7

开放问题: 2

类型:silverstripe-vendormodule

4.0.0 2024-09-13 00:35 UTC

This package is auto-updated.

Last update: 2024-09-13 00:37:11 UTC


README

一个用于发送和消费 SQS 任务的模块。可以配置为队列作业的触发器。

当用作队列作业处理程序时,队列作业的运行方式略有不同——除了不再需要计划任务之外。

  • 在调用 QueuedJobService->queueJob 时,会向 SQS 发送一条消息
  • 消费者接收到该消息,然后处理该作业 ID
  • 所有“类型 1”即即时作业随后在该执行线程中处理
  • 如果添加的作业打算在将来运行,处理程序将作业类型设置为“已安排”。
  • 一个单独的 SQS 任务运行,寻找任何类型为“已安排”的作业并执行这些作业
  • 该任务在 30 秒后再次排队运行
  • “已安排”任务运行程序还将查找任何处于“等待”状态的作业;这是暂停作业被选中以供进一步执行而不需要触发另一个 SQS 消息的方法

作为队列作业处理程序的使用配置

---
Name: jobrunner
After: queuedjobs
---
SilverStripe\Core\Injector\Injector:
  QueueHandler:
    class: Symbiote\SqsJobQueue\Service\SqsQueueHandler
    properties:
      sqsService: '%$SqsService'
  SqsClient:
    class: Aws\Sqs\SqsClient
    constructor:
      connection_details: 
        region: ap-southeast-2
        version: latest
        credentials: 
          key: YourKey
          secret: YourSecret

它期望存在一个名为 'jobqueue' 的队列 - 如果队列名称不同,

SilverStripe\Core\Injector\Injector:
  SqsService:
    properties:
      queueName: your-queue-name

编写任务

定义一个包含方法的类。这是任务运行程序;不需要任何特定的实现。

例如

namespace \Whatever\Class\Implements;

class Work {

    public function doStuff() {

    }
}

将配置添加到您的项目中,将方法名称绑定到 SqsService

SilverStripe\Core\Injector\Injector:
  MyJobName: 
    class: \Whatever\Class\Implements\Work
  SqsService:
    properties:
      handlers: 
        doStuff: %$MyJobName

触发任务

要触发任务,调用

$sqsService->sendSqsMessage(['args' => ['param1', 'param2']], 'taskName');

为了方便起见,SqsService 实现了一个 __call() 方法,将调用映射为

$sqsService->taskName($arg1, $arg2)

转换成

$sqsService->sendSqsMessage(['args' => [$arg1, $arg2]], 'taskName');

sendSqsMessage 又将其转换为如下消息结构

$message = [
    'message' => [
        'args' => [$arg1, $arg2'],
        'handler' => 'taskName',
    ]
];

$sqsMessage = [
    'QueueUrl' => 'sqs://in.amazon'
    'MessageBody' => json_encode($message)
];

因此,要手动触发消息,从您自己的代码中创建 $sqsMessage 结构,并使用 $this->client->sendMessage($sqsMessage); 发送

运行

php vendor/symbiote/silverstripe-sqs-jobqueue/sqs-worker.php

开发环境

如果您没有 SQS,可以通过将 AWS 队列替换为基于文件的 SQS 队列来运行文件队列系统。

---
Name: local_sqs_config
After: '#sqs_config'
---
SilverStripe\Core\Injector\Injector:
  SqsService:
    properties:
      client: '%$FileSqsClient'
  FileSqsClient:
    class: Symbiote\SqsJobQueue\Service\FileBasedSqsQueue

默认情况下,这将在 sqs-jobqueue/code/service/.queueus 中创建序列化数据(在 FileBasedSqsQueue 类上可配置)。

像以前一样运行 sqs-worker。

Docker

如果您正在使用 https://github.com/symbiote/docker-runtime,这将为您启动一个 sqsrunner 容器,它会自动运行 sqs-worker。

故障排除

因此,对于项目,您的配置可能看起来像这样,其中默认只针对测试环境使用基于文件的队列系统

---
Name: prod_sqs
After: '#sqs_config'
---
SilverStripe\Core\Injector\Injector:
  Symbiote\SqsJobQueue\Service\SqsService:
    properties:
      client: '%$SqsClient'
  SqsClient:
    class: Aws\Sqs\SqsClient
    constructor:
      connection_details:
        region: ap-southeast-2
        version: latest
---
Name: dev_sqs
After: '#sqs_config'
Only:
  environment: test
---
SilverStripe\Core\Injector\Injector:
  Symbiote\SqsJobQueue\Service\SqsService:
    properties:
      client: '%$Symbiote\SqsJobQueue\Service\FileBasedSqsQueue'

在生产环境中,您仍然需要通过本地配置提供 credentials

要使基于文件的队列系统正常工作,您的本地配置将需要类似以下内容

---
Name: jobrunner
After: 
  - queuedjobs
---
SilverStripe\Core\Injector\Injector:
  QueueHandler:
    class: Symbiote\SqsJobQueue\Service\SqsQueueHandler
    properties:
      sqsService: '%$Symbiote\SqsJobQueue\Service\SqsService'
  Symbiote\SqsJobQueue\Service\SqsService:
    properties:
      queueName: '%sqs_jobqueue_name%'
---
Name: sqs_location
After: '#sqs_config'
---
SilverStripe\Core\Injector\Injector:
  Symbiote\SqsJobQueue\Service\FileBasedSqsQueue:
    properties:
      queuePath: /var/www/html/mysite/fake-sqs-queues

重要部分是确保 /var/www/html/mysite/fake-sqs-queues 可写,因为您的队列作业将写入该位置。

Docker 测试

一旦所有配置就绪,最简单的方法是

  • 排队一个作业(例如,使用 https://github.com/symbiote/silverstripe-queuedjobs/ 模块)
  • 确保一个文件被写入 /var/www/html/mysite/fake-sqs-queues
  • 运行 docker logs sqsrunner 以查看它是否拾取了作业
  • 查看队列中的作业管理员界面以确保作业已完成