silverstripe-australia / silverstripe-sqs-jobqueue
Requires
- aws/aws-sdk-php: ^3.320.0
README
一个用于发送和消费 SQS 任务的模块。可以配置为作为队列作业的触发器。
当用作队列作业处理器时,队列作业的运行方式有一些细微的变化 - 除了不再需要 Cron 作业。
- 在调用 QueuedJobService->queueJob 时,会向 SQS 发送消息
- 消费者拾取该消息,然后处理该作业 ID
- 所有“类型 1”即即时作业随后在该执行线程中处理
- 如果添加的作业旨在将来运行,处理程序将作业类型设置为“计划”。
- 一个单独的 SQS 任务运行,寻找任何类型为“计划”的作业并执行这些作业
- 该任务将在 30 秒后再次排队运行
- “计划”任务运行器还将寻找任何处于“等待”状态的作业;这是如何在没有触发另一个 SQS 消息的情况下重新启动暂停作业的执行
作为队列作业处理器的配置
---
Name: jobrunner
After: queuedjobs
---
SilverStripe\Core\Injector\Injector:
QueueHandler:
class: Symbiote\SqsJobQueue\Service\SqsQueueHandler
properties:
sqsService: '%$SqsService'
SqsClient:
class: Aws\Sqs\SqsClient
constructor:
connection_details:
region: ap-southeast-2
version: latest
credentials:
key: YourKey
secret: YourSecret
它期望存在一个名为 'jobqueue' 的队列 - 如果队列名称不同,
SilverStripe\Core\Injector\Injector:
SqsService:
properties:
queueName: your-queue-name
编写任务
定义一个带有方法类的类。这是任务运行器;无需任何特定的实现。
例如
namespace \Whatever\Class\Implements; class Work { public function doStuff() { } }
将配置添加到您的项目中,将方法名称绑定到 SqsService
SilverStripe\Core\Injector\Injector: MyJobName: class: \Whatever\Class\Implements\Work SqsService: properties: handlers: doStuff: %$MyJobName
触发任务
要触发任务,调用
$sqsService->sendSqsMessage(['args' => ['param1', 'param2']], 'taskName');
为了方便,SqsService 实现了一个 __call()
方法,它将类似于
$sqsService->taskName($arg1, $arg2)
的调用重映射为
$sqsService->sendSqsMessage(['args' => [$arg1, $arg2]], 'taskName');
sendSqsMessage
然后将此转换为如下消息结构:
$message = [ 'message' => [ 'args' => [$arg1, $arg2'], 'handler' => 'taskName', ] ]; $sqsMessage = [ 'QueueUrl' => 'sqs://in.amazon' 'MessageBody' => json_encode($message) ];
因此,要手动触发消息,从您自己的代码中创建 $sqsMessage
结构,然后使用 $this->client->sendMessage($sqsMessage);
发送。
运行
php vendor/symbiote/silverstripe-sqs-jobqueue/sqs-worker.php
开发环境
如果您没有 SQS,可以通过用基于文件的 SQS 队列替换 AWS 队列来运行基于文件的队列系统。
---
Name: local_sqs_config
After: '#sqs_config'
---
SilverStripe\Core\Injector\Injector:
SqsService:
properties:
client: '%$FileSqsClient'
FileSqsClient:
class: Symbiote\SqsJobQueue\Service\FileBasedSqsQueue
默认情况下,这将创建在 sqs-jobqueue/code/service/.queueus 中的序列化数据(在 FileBasedSqsQueue 类上可配置)。
像以前一样运行 sqs-worker。
Docker
如果您使用 https://github.com/symbiote/docker-runtime,这将为您启动一个 sqsrunner
容器,该容器会自动运行 sqs-worker。
故障排除
因此,对于一个项目,您的配置可能如下所示,其中默认仅适用于测试环境中的基于文件的队列系统
---
Name: prod_sqs
After: '#sqs_config'
---
SilverStripe\Core\Injector\Injector:
Symbiote\SqsJobQueue\Service\SqsService:
properties:
client: '%$SqsClient'
SqsClient:
class: Aws\Sqs\SqsClient
constructor:
connection_details:
region: ap-southeast-2
version: latest
---
Name: dev_sqs
After: '#sqs_config'
Only:
environment: test
---
SilverStripe\Core\Injector\Injector:
Symbiote\SqsJobQueue\Service\SqsService:
properties:
client: '%$Symbiote\SqsJobQueue\Service\FileBasedSqsQueue'
在生产环境中,您仍然需要使用本地配置提供 credentials
。
要使基于文件的队列系统工作,您的本地配置需要如下所示
---
Name: jobrunner
After:
- queuedjobs
---
SilverStripe\Core\Injector\Injector:
QueueHandler:
class: Symbiote\SqsJobQueue\Service\SqsQueueHandler
properties:
sqsService: '%$Symbiote\SqsJobQueue\Service\SqsService'
Symbiote\SqsJobQueue\Service\SqsService:
properties:
queueName: '%sqs_jobqueue_name%'
---
Name: sqs_location
After: '#sqs_config'
---
SilverStripe\Core\Injector\Injector:
Symbiote\SqsJobQueue\Service\FileBasedSqsQueue:
properties:
queuePath: /var/www/html/mysite/fake-sqs-queues
重要的是要确保 /var/www/html/mysite/fake-sqs-queues
可写,因为您的队列作业将被写入那里。
Docker 测试
一旦所有配置都设置完成,测试的最简单方法是
- 排队一个任务(例如使用https://github.com/symbiote/silverstripe-queuedjobs/模块)
- 确保一个文件被写入到
/var/www/html/mysite/fake-sqs-queues
- 运行
docker logs sqsrunner
以查看它是否已经抓取了该任务 - 查看队列任务管理界面以确保任务已完成