PHP包,以简单的方式管理队列任务

dev-master 2023-01-29 21:09 UTC

This package is not auto-updated.

Last update: 2024-09-24 03:03:01 UTC


README

Build Status Total Downloads Latest Stable Version

Simpleue提供了一种非常简单的方式来运行工作者以消费队列(消费者)在PHP中。该库已被开发以轻松扩展以与不同的队列服务器一起工作,并对外开放以管理任何类型的作业。

当前实现

  • Redis队列适配器。
  • AWS SQS队列适配器。
  • Beanstalkd队列适配器。

您可以在simpleue-example中找到一个使用示例

工作者

该库有一个工作者类,它运行一个无限循环(可以通过一些条件停止)并管理处理作业的所有阶段

  • 获取下一个作业。
  • 执行作业。
  • 作业成功后执行...
  • 作业失败后执行...
  • 执行错误后执行...
  • 没有作业时执行...

循环可以通过以下方法在控制下停止

  • 停止作业:作业处理器允许定义一个停止作业。
  • 最大迭代次数:可以在声明对象时指定。

每个工作者有一个队列源,并管理一种类型的作业。可以使用相同的队列源同时运行多个工作者。

优雅退出

工作者还可以处理一些posix信号,例如SIGINTSIGTERM,以便在有人尝试手动停止它时(通常在shell中使用C-c键),它可以优雅地退出(等待当前队列作业完成)。

此行为默认禁用。要启用它,您需要在工作者类的构造函数中传递一个额外的参数。下面是使用示例。

注意:此功能在HHVM上未测试,因此如果您在HHVM上运行,可能无法按预期工作

队列

该库提供了一种接口,允许实现不同队列服务器的队列连接。目前该库提供了以下实现

  • Redis队列适配器。
  • AWS SQS队列适配器。
  • Beanstalkd队列适配器。

队列接口管理与队列系统相关的所有内容,并抽象了与该作业相关的内容。

它需要队列系统客户端

  • Redis:Predis\Client
  • AWS SQS:Aws\Sqs\SqsClient
  • Beanstalkd:Pheanstalk\Pheanstalk;

以及源队列名称。消费者需要额外的队列来管理进程

  • 处理队列(仅限Redis):它将存储从源队列弹出的项目,在处理过程中。
  • 失败队列:所有失败的作业(根据作业定义)都将添加到该队列。
  • 错误队列:所有在管理过程中抛出异常的作业都将添加到该队列。

重要

对于AWS SQS队列,所有队列必须在开始工作之前存在。

作业

作业接口用于管理队列中接收到的作业。它必须管理域业务逻辑并定义停止作业。

作业从队列系统中抽象出来,因此相同的作业定义可以与不同的队列接口一起工作。作业始终从队列接收消息体。

如果您有不同的作业类型(发送邮件、裁剪图像等)并使用一个队列,您可以定义isMyJob。如果作业不是预期的类型,可以将作业发送回队列。

安装

在您的composer json文件中需要此包

{

    "require": {
        "javibravo/simpleue" : "dev-master",
    },

}

使用

第一步是定义和实现要管理的作业

<?php

namespace MyProject\MyJob;

use Simpleue\Job\Job;

class MyJob implements Job {

    public function manage($job) {
        ...
        try {
            ...
        } catch ( ... ) {
            return FALSE;
        }
        ...
        return TRUE;
    }

    ...
    
    public function isStopJob($job) {
        if ( ... )
            return TRUE;
        return FALSE;
    }
    
    public function isMyJob($job) {
            if ( ... )
                return TRUE;
            return FALSE;
    }
    ...

}

一旦定义了作业,我们就可以定义我们的消费者并开始运行

Redis消费者

<?php

use Predis\Client;
use Simpleue\Queue\RedisQueue;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$redisQueue = new RedisQueue(
    new Client(array('host' => 'localhost', 'port' => 6379, 'schema' => 'tcp')),
    'my_queue_name'
);
$myNewConsumer = new QueueWorker($redisQueue, new MyJob());
$myNewConsumer->start();

AWS SQS消费者

<?php

use Aws\Sqs\SqsClient;
use Simpleue\Queue\SqsQueue;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$sqsClient = new SqsClient([
    'profile' => 'aws-profile',
    'region' => 'eu-west-1',
    'version' => 'latest'
]);

$sqsQueue = new SqsQueue($sqsClient, 'my_queue_name');

$myNewConsumer = new QueueWorker($sqsQueue, new MyJob());
$myNewConsumer->start();

Beanstalkd 消费者

<?php

use Simpleue\Queue\BeanStalkdQueue;
use Simpleue\Worker\QueueWorker;
use Pheanstalk\Pheanstalk;
use MyProject\MyJob;

$beanStalkdClient = new Pheanstalk('localhost');

$beanStalkdQueue = new BeanStalkdQueue($beanStalkdClient, 'my_queue_name');

$myNewConsumer = new QueueWorker($beanStalkdQueue, new MyJob());
$myNewConsumer->start();

使用 maxIterations

您可以通过两种方式设置最大迭代次数,下面都进行了展示

使用 Setter 方法

$myConsumer = new QueueWorker($myQueue, new MyJob());
$myConsumer->setMaxIterations(10); //any number
$myConsumer->start();

使用构造函数参数

$myConsumer = new QueueWorker($myQueue, new MyJob(), 10);
$myConsumer->start();

启用优雅退出

要启用优雅退出,请在构造函数中传递一个额外的参数。

$myConsumer = new QueueWorker($myQueue, new MyJob(), 10, true);
$myConsumer->start();

AWS SQS 作业锁定以防止重复

当使用 AWS SQS 标准队列时,有时即使提供了 MessageVisibilityTimeout,工作者仍然可能会接收到重复的消息。为了防止这种重复,您可以将 Redis Locker 传递给 SqsQueue 对象。如果您提供了 locker 对象并且锁定失败,则作业将被发送到错误队列。锁提供者不会删除/解锁作业。如果需要,您应该手动解锁。您可以使用 getJobUniqId 方法获取作业键。

<?php

use Aws\Sqs\SqsClient;
use Simpleue\Queue\SqsQueue;
use Simpleue\Locker\RedisLocker;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$redis = new \Redis();
$redis->addServer('localhost');
$redisLocker = new RedisLocker($redis);

$sqsClient = new SqsClient([
    'profile' => 'aws-profile',
    'region' => 'eu-west-1',
    'version' => 'latest'
]);

$sqsQueue = new SqsQueue($sqsClient, 'my_queue_name', 20, 30);
$sqsQueue->setLocker($redisLocker);

$myNewConsumer = new QueueWorker($sqsQueue, new MyJob());
$myNewConsumer->start();

更多信息请参阅 http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html#standard-queues-at-least-once-delivery

(*) 该想法是支持任何队列系统,因此对此是开放的。欢迎贡献。