有线00 / custom-queue
从外部队列获取队列消息
Requires
- php: >=5.4.0
- ext-json: *
- aws/aws-sdk-php: ~3.3
- illuminate/queue: ~5.0
Requires (Dev)
- mockery/mockery: ^1.0@dev
- orchestra/testbench: 3.8.x-dev
- php-mock/php-mock-mockery: dev-master
- phpunit/phpunit: 7.5.x-dev
This package is auto-updated.
Last update: 2024-09-08 11:25:32 UTC
README
Laravel队列在内部运行良好,当工作项通过SomeJob::dispatch()
推送并随后通过Laravel队列工作者检索时。
但如果工作项是从除Laravel以外的外部服务或消息代理推送到的队列呢?例如,AWS S3存储桶 > SNS > SQS?在这些情况下,Laravel队列工作者无法识别负载,无法解析它,并且因为工作项负载缺少预期的job
和data
属性等而中断。
CustomQueue
旨在解决此问题。它将检索一个工作项,将负载重新打包成Laravel可以识别的格式,然后通过用户指定的作业处理器进行处理。
没有custom-queue的负载
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "ap-southeast-2",
"eventTime": "2019-03-22T06:31:40.395Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:blahblah:blah"
},
"requestParameters": {
"sourceIPAddress": "10.10.10.10"
},
"responseElements": {
"x-amz-request-id": "C53F65ECD63F53F8",
"x-amz-id-2": "blah="
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "folder-name",
"bucket": {
"name": "bucket-name",
"ownerIdentity": {
"principalId": "A2XHNNJ3IERBDC"
},
"arn": "arn:aws:s3:::bucket-name"
},
"object": {
"key": "file-drop/droptest.csv",
"size": 12,
"eTag": "tagid",
"sequencer": "005C94814C54E35D75"
}
}
}
]
}
有custom-queue的负载
{
"type": "job",
"job": "custom-sqs",
"data": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-2\",\"eventTime\":\"2019-03-22T06:31:40.395Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:blahblah:blah\"},\"requestParameters\":{\"sourceIPAddress\":\"10.10.10.10\"},\"responseElements\":{\"x-amz-request-id\":\"C53F65ECD63F53F8\",\"x-amz-id-2\":\"blah=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"folder-name\",\"bucket\":{\"name\":\"bucket-name\",\"ownerIdentity\":{\"principalId\":\"A2XHNNJ3IERBDC\"},\"arn\":\"arn:aws:s3:::bucket-name\"},\"object\":{\"key\":\"file-drop/droptest.csv\",\"size\":12,\"eTag\":\"tagid\",\"sequencer\":\"005C94814C54E35D75\"}}}]}"
}
**注意:** job: custom-sqs
指定了用于处理作业的作业处理器。data
只是原始负载。
安装
通过composer安装包
composer require wired00/custom-queue
发布CustomQueue
配置文件到您的项目
vendor:publish
以下为customqueue.php
配置的详细信息
使用
配置
queue.php
设置自定义外部SQS连接
'custom-sqs' => [
'driver' => 'custom-sqs',
'key' => env('AWS_ACCESS_KEY_ID', 'your-public-key'),
'secret' => env('AWS_SECRET_ACCESS_KEY', 'your-secret-key'),
'queue' => env('SQS_QUEUE', 'your-queue-url'),
'region' => env('AWS_REGION', 'us-east-1'),
],
从您的Laravel .env
设置所有这些值。例如
AWS_ACCESS_KEY_ID=ASIAWMC25A2L7MDO6NGA
AWS_SECRET_ACCESS_KEY=3qZLVRShxQvx2xbTSKD5bllObtwHNH3O/9NqvFNc
AWS_SECURITY_TOKEN=YOUR-AWS-SECURITY-TOKEN
SQS_QUEUE=https://sqs.ap-southeast-2.amazonaws.com/123/your-sqs-queue-name
AWS_REGION=ap-southeast-2
QUEUE_CONNECTION=custom-sqs
customqueue.php
customqueue.php
配置文件仅包含处理器类路径和标识符之间的映射。
示例包含标识符custom-sqs
和类路径App\Jobs\ProcessSQS::class
。这指定了从队列检索作业负载时,将始终添加job: App\Jobs\ProcessSQS::class
键值对,只要处理连接类型为custom-sqs
。作业将通过ProcessSQS->handle()
进行处理。
目前CustomQueue仅支持自定义SQS检索,但将来可能会支持RabbitMQ和Redis。在这些情况下,customqueue.php
将包含标识符,例如custom-redis
和custom-rabbitmq
。
处理器文件
处理器文件是customqueue.php
中通过类路径引用的文件。它们必须实现Wired00\CustomQueue\Contracts\CustomQueueJobHandler
。
这些的常见命名空间是App\Jobs
。
例如
<?php
namespace App\Jobs;
use Wired00\CustomQueue\Contracts\CustomQueueJobHandler as HandlerContract;
use Illuminate\Queue\Jobs\Job;
class ProcessSQS implements HandlerContract
{
/**
* Execute the job.
*
* @param Job $job
* @param null $data
* @return void
*/
public function handle(Job $job, $data = null)
{
// process code (you can access job payload via $data)
// $job->delete();
}
}
注意
-
handle()
接受当前作业和包含从SQS队列弹出的作业负载等重要内容的$data
。 -
可能需要
$job->delete();
,具体取决于队列如何处理。在我这个例子中,SQS
在检索作业后似乎不会自动删除作业。因此,需要delete()
来在处理后删除作业。
许可证
这是基于未维护的、非功能的Laravel外部队列包(kristianedlund/laravel-external-queue)构建的。
它是MIT许可证,你可以随意使用它。一切谨慎,不承担任何责任。MIT许可证。