lorenzo / cakephp-sqs
CakePHP Aws Simple Queue System
Requires
- aws/aws-sdk-php: ~3.32
- cakephp/cakephp: ^3.3.0
Requires (Dev)
- php-vcr/php-vcr: dev-fix-stream-emulation as 1.3.3
- php-vcr/phpunit-testlistener-vcr: ~2.0
- phpunit/phpunit: 5.*
This package is auto-updated.
Last update: 2024-08-25 21:51:11 UTC
README
此插件是用于原始 AWS SDK 相关的 Simple Queue Service (SQS) 类的适配器。它提供的是一个工具类,该类将根据 Configure 类中存储的信息处理 HTTP 客户端的构建和配置。
此外,它还提供了一个 Task 类,可以帮助您快速构建长生命周期的工作进程。
要求
- CakePHP 3.4.x
- PHP 5.6+
安装
此插件支持的唯一安装方法是使用 composer。只需将以下内容添加到您的 composer.json 配置文件中:
composer require lorenzo/cakephp-sqs
启用插件
您需要在您的 config/bootstrap.php
文件中启用插件
Plugin::load('CakeSQS');
配置
插件期望在 Configure 键中以下配置数据:
Configure::write('CakeSQS', [
'connection' => [
/**
* @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/class-Aws.Sqs.SqsClient.html#___construct
* for all possible config params
*/
'credentials' => [
'key' => 'Your amazon key',
'secret' => 'Your amazon secret',
],
'version' => '2012-11-05', // you don't need to change this
'region' => 'eu-central-1' // must match the region where the sqs queue was created
],
'queues' => [
'testQueue1' => 'sqs queue url, for example: https://sqs.eu-central-1.amazonaws.com/12345/someQueue'
]
]);
或者,您可以通过 setConfig
配置连接器类
$this->SimpleQueue = new SimpleQueue();
$this->SimpleQueue->setConfig([
'connection.credentials' => [
'key' => 'SOMEKEY',
'secret' => 'SOMESECRET',
],
'queues.testQueue1' => 'sqs queue url, for example: https://sqs.eu-central-1.amazonaws.com/12345/someQueue'
]);
存储消息到队列中
$this->SimpleQueue->send('testQueue1', [
'key1' => 'value1',
'key2' => 'value2',
]);
返回值是一个 \Aws\Result 对象
批量存储多个消息
$this->SimpleQueue->send('testQueue1', [
'key1' => 'value1',
'key2' => 'value2',
]);
此方法的返回值是一个数组,包含所有无法存储的消息(如果有),在数组中的位置与您发送的位置相同。例如,如果您的数组中的第二个和第四个消息失败,则数组将包含
[1 => 'Error Message', 3 => 'Another error message']
从队列中接收消息
$msgResult = $this->SimpleQueue->receiveMessage('testQueue1');
返回值未从 AWS SDK 中更改。有关更多信息,请参阅其文档。例如
$msgResult = $this->SimpleQueue->receiveMessage('testQueue1');
$msgResult->search('Messages');
// would return
[
[
'MessageId' => '929a9cfe-0d45-494c-80fc-41ec6e2e017d',
'ReceiptHandle' => 'AQEBEYj8zzBaKn8PpJiHgvyznyo7H0BQYH7MBy7429K/ad53lXLRh5yu2Yb0EH9o22WskOCTX7enwcGxTc7JQLQPcJwFwJB/L29pVOyDvZc8fI2XPjd+7jbN91H6PqfHUUsryiDHkA36ZH0tWKjFOVt986GKptqdON+BbinT2KIjd5NLwN2sr7kWgWKhva6YSC/BIWTsSUyAfiFGRDLksNtMiXJk2nFzwvINGU7khBdDpZ0xZxmhhPvT3TPQeSukZNEp859yZLVA9t69Vx2Rrtf/3vGfZj9NjSVrEMcquP8zDrmIicp5+ILtm1qYJxq2lsYH0LHTwGtIQC1nW+J7D/t3JAFZdgohsdXEl3T+KIig2APUgJz4Mp/ze3gzIrY7/Y+plII+MnrISdBSmDnoRRpF/g==',
'MD5OfBody' => 'ff45cc3835165307ef414c23ca2c6f67',
'Body' => '{"key1":"value1","key2":"value2"}'
]
];
从队列中删除消息
$msgResult = $this->SimpleQueue->receiveMessage('testQueue1');
$handle = $msgResult['Messages'][0]['ReceiptHandle'];
$this->SimpleQueue->deleteMessage('testQueue1', $handle);
设置工作进程 shell
如前所述,此插件可以帮助您创建长生命周期的工人壳,该壳将逐个处理来自 CakeSQS 中一组队列的消息。以下是一个示例
<?php
namespace App\Shell;
use Cake\Console\Shell;
use Cake\Core\Configure;
use CakeSQS\SimpleQueue;
/**
* Sqs shell command.
*/
class SqsShell extends Shell
{
public $tasks = ['CakeSQS.QueueWorker'];
public function send()
{
$credentials = [
'key' => 'xxx',
'secret' => 'yyy',
];
Configure::write('CakeSQS.connection.credentials', $credentials);
Configure::write('CakeSQS.queues.testQueue1', 'https://sqs.eu-central-1.amazonaws.com/zzzz/testQueue1');
$queue = new SimpleQueue();
debug($queue->send('testQueue1', 'some-data'));
}
public function workForever()
{
$credentials = [
'key' => 'xxx',
'secret' => 'yyy',
];
Configure::write('CakeSQS.connection.credentials', $credentials);
Configure::write('CakeSQS.queues.testQueue1', 'https://sqs.eu-central-1.amazonaws.com/zzz/testQueue1');
$this->QueueWorker->addFunction('testQueue1', function ($item) {
debug($item);
return true; // return true to delete the message upon processing, false to leave the message in the queue
});
$this->QueueWorker->work();
}
}
用于处理作业的注册函数将在使用 json_decode
解码后从队列中接收消息体。
重要:返回 true 删除处理后的消息,返回 false 将消息留在队列中稍后重新处理
使用 SIGHUP 软杀死工作进程
要在迭代循环中中途杀死长生命周期的工作进程 shell,您可以在命令行上通过调用 kill -1 PID
发送 SIGHUP(PID 是运行中的工作进程 shell 的进程 ID)。
这将导致当前迭代完成,然后停止工作进程。