javibravo/simpleue

PHP 包,以简单的方式管理队列任务

2.1.0 2017-11-15 13:41 UTC

This package is not auto-updated.

Last update: 2024-09-14 18:20:35 UTC


README

Build Status Total Downloads Latest Stable Version

Simpleue 提供了一种非常简单的方式来运行工作进程(消费者)以消费队列(队列)。该库已被开发,以便易于扩展以与不同的队列服务器一起工作,并开放以管理任何类型的作业。

当前实现

  • Redis 队列适配器。
  • AWS SQS 队列适配器。
  • Beanstalkd 队列适配器。

您可以在 simpleue-example 中找到使用示例。

工作进程

该库有一个工作进程类,它运行一个无限循环(可以通过某些条件停止)并管理处理作业的所有阶段

  • 获取下一个作业。
  • 执行作业。
  • 作业成功后执行...
  • 作业失败后执行...
  • 执行错误后执行...
  • 没有作业时执行...

循环可以通过以下方法在控制下停止

  • 停止作业:作业处理器允许定义一个停止作业。
  • 最大迭代次数:可以在声明对象时指定。

每个工作进程有一个队列源并管理一种类型的作业。可以使用相同的队列源同时运行多个工作进程。

优雅退出

工作进程还能够处理一些 POSIX 信号,例如 SIGINTSIGTERM,以便在有人尝试手动停止它时(通常在 shell 中使用 C-c 键组合)优雅地退出(等待当前队列作业完成)。

默认情况下,此行为是禁用的。要启用它,您需要在工作进程类的构造函数中传递一个额外的参数。下面是示例。

注意:此功能尚未在 HHVM 上进行测试,因此如果您在 HHVM 上运行它,可能不会按预期工作。

队列

该库提供了一个接口,允许实现不同队列服务器的队列连接。目前,该库提供了以下实现

  • Redis 队列适配器。
  • AWS SQS 队列适配器。
  • Beanstalkd 队列适配器。

队列接口管理与队列系统相关的所有操作,并抽象了关于该作业的信息。

它需要队列系统客户端

  • Redis:Predis\Client
  • AWS SQS:Aws\Sqs\SqsClient
  • Beanstalkd:Pheanstalk\Pheanstalk;

以及源 队列名称。消费者需要额外的队列来管理进程

  • 处理队列(仅限 Redis):它将存储从源队列弹出的项目,在它被处理时。
  • 失败队列:所有失败的作业(根据作业定义)都将添加到此队列。
  • 错误队列:所有在管理过程中抛出异常的作业都将添加到此队列。

重要

对于 AWS SQS 队列,所有队列必须在开始工作之前存在。

作业

作业接口用于管理队列中接收到的作业。它必须管理领域业务逻辑并定义 停止作业

作业从队列系统中抽象出来,因此相同的作业定义可以与不同的队列接口一起工作。作业始终从队列接收消息体。

如果您有不同类型的作业(发送邮件、裁剪图像等)并使用一个队列,您可以定义 isMyJob。如果作业不是预期类型,您可以将其发送回队列。

安装

在您的composer json文件中要求该包

{

    "require": {
        "javibravo/simpleue" : "dev-master",
    },

}

用法

第一步是定义和实现要管理的作业

<?php

namespace MyProject\MyJob;

use Simpleue\Job\Job;

class MyJob implements Job {

    public function manage($job) {
        ...
        try {
            ...
        } catch ( ... ) {
            return FALSE;
        }
        ...
        return TRUE;
    }

    ...
    
    public function isStopJob($job) {
        if ( ... )
            return TRUE;
        return FALSE;
    }
    
    public function isMyJob($job) {
            if ( ... )
                return TRUE;
            return FALSE;
    }
    ...

}

一旦定义了作业,我们就可以定义我们的消费者并开始运行

Redis消费者

<?php

use Predis\Client;
use Simpleue\Queue\RedisQueue;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$redisQueue = new RedisQueue(
    new Client(array('host' => 'localhost', 'port' => 6379, 'schema' => 'tcp')),
    'my_queue_name'
);
$myNewConsumer = new QueueWorker($redisQueue, new MyJob());
$myNewConsumer->start();

AWS SQS消费者

<?php

use Aws\Sqs\SqsClient;
use Simpleue\Queue\SqsQueue;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$sqsClient = new SqsClient([
    'profile' => 'aws-profile',
    'region' => 'eu-west-1',
    'version' => 'latest'
]);

$sqsQueue = new SqsQueue($sqsClient, 'my_queue_name');

$myNewConsumer = new QueueWorker($sqsQueue, new MyJob());
$myNewConsumer->start();

Beanstalkd消费者

<?php

use Simpleue\Queue\BeanStalkdQueue;
use Simpleue\Worker\QueueWorker;
use Pheanstalk\Pheanstalk;
use MyProject\MyJob;

$beanStalkdClient = new Pheanstalk('localhost');

$beanStalkdQueue = new BeanStalkdQueue($beanStalkdClient, 'my_queue_name');

$myNewConsumer = new QueueWorker($beanStalkdQueue, new MyJob());
$myNewConsumer->start();

使用最大迭代次数

您可以通过两种方式设置最大迭代次数,下面都展示了

使用setter方法

$myConsumer = new QueueWorker($myQueue, new MyJob());
$myConsumer->setMaxIterations(10); //any number
$myConsumer->start();

使用构造函数参数

$myConsumer = new QueueWorker($myQueue, new MyJob(), 10);
$myConsumer->start();

启用优雅退出

要启用优雅退出,请在构造函数中传入一个额外的参数。

$myConsumer = new QueueWorker($myQueue, new MyJob(), 10, true);
$myConsumer->start();

AWS SQS作业锁定以防止重复

当使用AWS SQS标准队列时,有时即使提供了MessageVisibilityTimeout,工作者仍然会接收到重复的消息。为了防止这种重复,您可以向SqsQueue对象提供Redis或Memcached锁定器。如果您提供了锁定器对象且锁定失败,则作业将被发送到错误队列。锁定器提供者不会删除/解锁作业。如果需要,您应手动解锁。您可以使用getJobUniqId方法获取作业键。

<?php

use Aws\Sqs\SqsClient;
use Simpleue\Queue\SqsQueue;
use Simpleue\Locker\MemcachedLocker;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$memcached = new \Memcached();
$memcached->addServer('localhost', 11211);
$memcachedLocker = new MemcachedLocker($memcached);

$sqsClient = new SqsClient([
    'profile' => 'aws-profile',
    'region' => 'eu-west-1',
    'version' => 'latest'
]);

$sqsQueue = new SqsQueue($sqsClient, 'my_queue_name', 20, 30);
$sqsQueue->setLocker($memcachedLocker);

$myNewConsumer = new QueueWorker($sqsQueue, new MyJob());
$myNewConsumer->start();

有关更多信息,请参阅http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html#standard-queues-at-least-once-delivery

(*) 该想法是支持任何队列系统,因此它是开放的。欢迎贡献。