有线00/custom-queue

从外部队列获取队列消息

v0.3 2019-04-01 06:09 UTC

This package is auto-updated.

Last update: 2024-09-08 11:25:32 UTC


README

Laravel队列在内部运行良好,当工作项通过SomeJob::dispatch()推送并随后通过Laravel队列工作者检索时。

但如果工作项是从除Laravel以外的外部服务或消息代理推送到的队列呢?例如,AWS S3存储桶 > SNS > SQS?在这些情况下,Laravel队列工作者无法识别负载,无法解析它,并且因为工作项负载缺少预期的jobdata属性等而中断。

CustomQueue旨在解决此问题。它将检索一个工作项,将负载重新打包成Laravel可以识别的格式,然后通过用户指定的作业处理器进行处理。

没有custom-queue的负载

  {
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      "awsRegion": "ap-southeast-2",
      "eventTime": "2019-03-22T06:31:40.395Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "AWS:blahblah:blah"
      },
      "requestParameters": {
        "sourceIPAddress": "10.10.10.10"
      },
      "responseElements": {
        "x-amz-request-id": "C53F65ECD63F53F8",
        "x-amz-id-2": "blah="
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "folder-name",
        "bucket": {
          "name": "bucket-name",
          "ownerIdentity": {
            "principalId": "A2XHNNJ3IERBDC"
          },
          "arn": "arn:aws:s3:::bucket-name"
        },
        "object": {
          "key": "file-drop/droptest.csv",
          "size": 12,
          "eTag": "tagid",
          "sequencer": "005C94814C54E35D75"
        }
      }
    }
  ]
}

有custom-queue的负载

{
  "type": "job",
  "job": "custom-sqs",
  "data": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-2\",\"eventTime\":\"2019-03-22T06:31:40.395Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:blahblah:blah\"},\"requestParameters\":{\"sourceIPAddress\":\"10.10.10.10\"},\"responseElements\":{\"x-amz-request-id\":\"C53F65ECD63F53F8\",\"x-amz-id-2\":\"blah=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"folder-name\",\"bucket\":{\"name\":\"bucket-name\",\"ownerIdentity\":{\"principalId\":\"A2XHNNJ3IERBDC\"},\"arn\":\"arn:aws:s3:::bucket-name\"},\"object\":{\"key\":\"file-drop/droptest.csv\",\"size\":12,\"eTag\":\"tagid\",\"sequencer\":\"005C94814C54E35D75\"}}}]}"
}

**注意:** job: custom-sqs指定了用于处理作业的作业处理器。data只是原始负载。

安装

通过composer安装包

composer require wired00/custom-queue

发布CustomQueue配置文件到您的项目

vendor:publish

以下为customqueue.php配置的详细信息

使用

配置

queue.php

设置自定义外部SQS连接

        'custom-sqs' => [
            'driver' => 'custom-sqs',
            'key' => env('AWS_ACCESS_KEY_ID', 'your-public-key'),
            'secret' => env('AWS_SECRET_ACCESS_KEY', 'your-secret-key'),
            'queue' => env('SQS_QUEUE', 'your-queue-url'),
            'region' => env('AWS_REGION', 'us-east-1'),
        ],

从您的Laravel .env设置所有这些值。例如

AWS_ACCESS_KEY_ID=ASIAWMC25A2L7MDO6NGA
AWS_SECRET_ACCESS_KEY=3qZLVRShxQvx2xbTSKD5bllObtwHNH3O/9NqvFNc
AWS_SECURITY_TOKEN=YOUR-AWS-SECURITY-TOKEN
SQS_QUEUE=https://sqs.ap-southeast-2.amazonaws.com/123/your-sqs-queue-name
AWS_REGION=ap-southeast-2
QUEUE_CONNECTION=custom-sqs

customqueue.php

customqueue.php配置文件仅包含处理器类路径和标识符之间的映射。

示例包含标识符custom-sqs和类路径App\Jobs\ProcessSQS::class。这指定了从队列检索作业负载时,将始终添加job: App\Jobs\ProcessSQS::class键值对,只要处理连接类型为custom-sqs。作业将通过ProcessSQS->handle()进行处理。

目前CustomQueue仅支持自定义SQS检索,但将来可能会支持RabbitMQ和Redis。在这些情况下,customqueue.php将包含标识符,例如custom-rediscustom-rabbitmq

处理器文件

处理器文件是customqueue.php中通过类路径引用的文件。它们必须实现Wired00\CustomQueue\Contracts\CustomQueueJobHandler

这些的常见命名空间是App\Jobs

例如

<?php

namespace App\Jobs;

use Wired00\CustomQueue\Contracts\CustomQueueJobHandler as HandlerContract;
use Illuminate\Queue\Jobs\Job;

class ProcessSQS implements HandlerContract
{
    /**
     * Execute the job.
     *
     * @param Job $job
     * @param null $data
     * @return void
     */
    public function handle(Job $job, $data = null)
    {
        // process code (you can access job payload via $data)
		    // $job->delete();
    }
}

注意

  • handle()接受当前作业和包含从SQS队列弹出的作业负载等重要内容的$data

  • 可能需要$job->delete();,具体取决于队列如何处理。在我这个例子中,SQS在检索作业后似乎不会自动删除作业。因此,需要delete()来在处理后删除作业。

许可证

这是基于未维护的、非功能的Laravel外部队列包(kristianedlund/laravel-external-queue)构建的。

它是MIT许可证,你可以随意使用它。一切谨慎,不承担任何责任。MIT许可证