lorenzo/cakephp-sqs

CakePHP Aws Simple Queue System

安装数: 270,546

依赖: 0

建议者: 0

安全性: 0

星标: 20

关注者: 8

分支: 15

开放问题: 3

类型:cakephp-plugin

2.1.1 2018-06-18 14:17 UTC

This package is auto-updated.

Last update: 2024-08-25 21:51:11 UTC


README

Build Status

此插件是用于原始 AWS SDK 相关的 Simple Queue Service (SQS) 类的适配器。它提供的是一个工具类,该类将根据 Configure 类中存储的信息处理 HTTP 客户端的构建和配置。

此外,它还提供了一个 Task 类,可以帮助您快速构建长生命周期的工作进程。

要求

  • CakePHP 3.4.x
  • PHP 5.6+

安装

此插件支持的唯一安装方法是使用 composer。只需将以下内容添加到您的 composer.json 配置文件中:

composer require lorenzo/cakephp-sqs

启用插件

您需要在您的 config/bootstrap.php 文件中启用插件

Plugin::load('CakeSQS');

配置

插件期望在 Configure 键中以下配置数据:

Configure::write('CakeSQS', [
    'connection' => [
        /**
         * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/class-Aws.Sqs.SqsClient.html#___construct
         * for all possible config params
         */
        'credentials' => [
            'key' => 'Your amazon key',
            'secret' => 'Your amazon secret',
        ],
        'version' => '2012-11-05', // you don't need to change this
        'region' => 'eu-central-1' // must match the region where the sqs queue was created
    ],
    'queues' => [
        'testQueue1' => 'sqs queue url, for example: https://sqs.eu-central-1.amazonaws.com/12345/someQueue'
    ]
]);

或者,您可以通过 setConfig 配置连接器类

$this->SimpleQueue = new SimpleQueue();
$this->SimpleQueue->setConfig([
    'connection.credentials' => [
        'key' => 'SOMEKEY',
        'secret' => 'SOMESECRET',
    ],
    'queues.testQueue1' => 'sqs queue url, for example: https://sqs.eu-central-1.amazonaws.com/12345/someQueue'
]);

存储消息到队列中

$this->SimpleQueue->send('testQueue1', [
    'key1' => 'value1',
    'key2' => 'value2',
]);

返回值是一个 \Aws\Result 对象

批量存储多个消息

$this->SimpleQueue->send('testQueue1', [
    'key1' => 'value1',
    'key2' => 'value2',
]);

此方法的返回值是一个数组,包含所有无法存储的消息(如果有),在数组中的位置与您发送的位置相同。例如,如果您的数组中的第二个和第四个消息失败,则数组将包含

[1 => 'Error Message', 3 => 'Another error message']

从队列中接收消息

$msgResult = $this->SimpleQueue->receiveMessage('testQueue1');

返回值未从 AWS SDK 中更改。有关更多信息,请参阅其文档。例如

$msgResult = $this->SimpleQueue->receiveMessage('testQueue1');
$msgResult->search('Messages');
// would return
[
    [
        'MessageId' => '929a9cfe-0d45-494c-80fc-41ec6e2e017d',
        'ReceiptHandle' => 'AQEBEYj8zzBaKn8PpJiHgvyznyo7H0BQYH7MBy7429K/ad53lXLRh5yu2Yb0EH9o22WskOCTX7enwcGxTc7JQLQPcJwFwJB/L29pVOyDvZc8fI2XPjd+7jbN91H6PqfHUUsryiDHkA36ZH0tWKjFOVt986GKptqdON+BbinT2KIjd5NLwN2sr7kWgWKhva6YSC/BIWTsSUyAfiFGRDLksNtMiXJk2nFzwvINGU7khBdDpZ0xZxmhhPvT3TPQeSukZNEp859yZLVA9t69Vx2Rrtf/3vGfZj9NjSVrEMcquP8zDrmIicp5+ILtm1qYJxq2lsYH0LHTwGtIQC1nW+J7D/t3JAFZdgohsdXEl3T+KIig2APUgJz4Mp/ze3gzIrY7/Y+plII+MnrISdBSmDnoRRpF/g==',
        'MD5OfBody' => 'ff45cc3835165307ef414c23ca2c6f67',
        'Body' => '{"key1":"value1","key2":"value2"}'
    ]
];

从队列中删除消息

$msgResult = $this->SimpleQueue->receiveMessage('testQueue1');
$handle = $msgResult['Messages'][0]['ReceiptHandle'];
$this->SimpleQueue->deleteMessage('testQueue1', $handle);

设置工作进程 shell

如前所述,此插件可以帮助您创建长生命周期的工人壳,该壳将逐个处理来自 CakeSQS 中一组队列的消息。以下是一个示例

<?php

namespace App\Shell;

use Cake\Console\Shell;
use Cake\Core\Configure;
use CakeSQS\SimpleQueue;

/**
 * Sqs shell command.
 */
class SqsShell extends Shell
{

    public $tasks = ['CakeSQS.QueueWorker'];

    public function send()
    {
        $credentials = [
            'key' => 'xxx',
            'secret' => 'yyy',
        ];

        Configure::write('CakeSQS.connection.credentials', $credentials);
        Configure::write('CakeSQS.queues.testQueue1', 'https://sqs.eu-central-1.amazonaws.com/zzzz/testQueue1');

        $queue = new SimpleQueue();
        debug($queue->send('testQueue1', 'some-data'));
    }

    public function workForever()
    {
        $credentials = [
            'key' => 'xxx',
            'secret' => 'yyy',
        ];

        Configure::write('CakeSQS.connection.credentials', $credentials);
        Configure::write('CakeSQS.queues.testQueue1', 'https://sqs.eu-central-1.amazonaws.com/zzz/testQueue1');

        $this->QueueWorker->addFunction('testQueue1', function ($item) {
            debug($item);

            return true; // return true to delete the message upon processing, false to leave the message in the queue
        });
        $this->QueueWorker->work();
    }
}

用于处理作业的注册函数将在使用 json_decode 解码后从队列中接收消息体。

重要:返回 true 删除处理后的消息,返回 false 将消息留在队列中稍后重新处理

使用 SIGHUP 软杀死工作进程

要在迭代循环中中途杀死长生命周期的工作进程 shell,您可以在命令行上通过调用 kill -1 PID 发送 SIGHUP(PID 是运行中的工作进程 shell 的进程 ID)。

这将导致当前迭代完成,然后停止工作进程。