javibravo/simple-php-queue

此包已被废弃,不再维护。作者建议使用javibravo/simpleue包代替。

PHP包,以简单方式管理队列任务

2.1.0 2017-11-15 13:41 UTC

This package is not auto-updated.

Last update: 2022-02-01 12:48:32 UTC


README

Build Status Total Downloads Latest Stable Version

Simpleue提供了一种非常简单的方式来在PHP中运行工作进程以消费队列(消费者)。该库已开发用于轻松扩展以与不同的队列服务器一起工作,并开放以管理任何类型的作业。

当前实现

  • Redis队列适配器。
  • AWS SQS队列适配器。
  • Beanstalkd队列适配器。

您可以在simpleue-example中找到使用示例。

工作进程

该库有一个工作进程类,它运行一个无限循环(可以通过一些条件停止)并管理处理作业的所有阶段

  • 获取下一个作业。
  • 执行作业。
  • 作业成功后执行...
  • 作业失败后执行...
  • 执行错误后执行...
  • 没有作业时执行...

循环可以通过以下方法在控制下停止

  • 停止作业:作业处理器允许定义一个停止作业。
  • 最大迭代次数:可以在对象声明时指定。

每个工作进程有一个队列源并管理一种类型的作业。可以使用相同的队列源同时运行多个工作进程。

优雅退出

工作进程还可以处理一些posix信号,例如 SIGINTSIGTERM,因此当有人尝试手动停止它时(通常在shell中使用C-c键),它可以优雅地退出(等待当前队列作业完成)。

默认情况下,此行为是禁用的。要启用它,您需要在工作进程类的构造函数中传递一个额外的参数。下面是使用示例。

注意:此功能在HHVM上未测试,因此如果您在HHVM上运行它,可能不会按预期工作。

队列

该库提供了一个接口,允许为不同的队列服务器实现队列连接。目前,该库提供了以下实现

  • Redis队列适配器。
  • AWS SQS队列适配器。
  • Beanstalkd队列适配器。

队列接口管理与队列系统相关的所有内容,并抽象了关于该作业的信息。

它需要队列系统客户端

  • Redis:Predis\Client
  • AWS SQS:Aws\Sqs\SqsClient
  • Beanstalkd:Pheanstalk\Pheanstalk;

以及源 队列名称。消费者将需要额外的队列来管理流程

  • 处理队列(仅限Redis):它将存储从源队列中弹出的项,在它被处理时。
  • 失败队列:所有失败(根据作业定义)的作业都将添加到该队列。
  • 错误队列:在管理过程中抛出异常的所有作业都将添加到该队列。

重要

对于AWS SQS队列,在开始工作之前,所有队列都必须存在。

作业

作业接口用于管理队列中接收到的作业。它必须管理域业务逻辑并定义停止作业

作业被抽象化从队列系统中,因此相同的作业定义能够与不同的队列接口一起工作。作业始终从队列接收消息体。

如果您有不同的作业类型(发送邮件、裁剪图像等)并且您使用一个队列,您可以定义isMyJob。如果作业不是预期的类型,您可以发送作业回队列。

安装

在您的composer json文件中需要此包

{

    "require": {
        "javibravo/simpleue" : "dev-master",
    },

}

使用

第一步是定义和实现要管理的作业

<?php

namespace MyProject\MyJob;

use Simpleue\Job\Job;

class MyJob implements Job {

    public function manage($job) {
        ...
        try {
            ...
        } catch ( ... ) {
            return FALSE;
        }
        ...
        return TRUE;
    }

    ...
    
    public function isStopJob($job) {
        if ( ... )
            return TRUE;
        return FALSE;
    }
    
    public function isMyJob($job) {
            if ( ... )
                return TRUE;
            return FALSE;
    }
    ...

}

一旦定义了作业,我们就可以定义我们的消费者并开始运行

Redis 消费者

<?php

use Predis\Client;
use Simpleue\Queue\RedisQueue;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$redisQueue = new RedisQueue(
    new Client(array('host' => 'localhost', 'port' => 6379, 'schema' => 'tcp')),
    'my_queue_name'
);
$myNewConsumer = new QueueWorker($redisQueue, new MyJob());
$myNewConsumer->start();

AWS SQS 消费者

<?php

use Aws\Sqs\SqsClient;
use Simpleue\Queue\SqsQueue;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$sqsClient = new SqsClient([
    'profile' => 'aws-profile',
    'region' => 'eu-west-1',
    'version' => 'latest'
]);

$sqsQueue = new SqsQueue($sqsClient, 'my_queue_name');

$myNewConsumer = new QueueWorker($sqsQueue, new MyJob());
$myNewConsumer->start();

Beanstalkd 消费者

<?php

use Simpleue\Queue\BeanStalkdQueue;
use Simpleue\Worker\QueueWorker;
use Pheanstalk\Pheanstalk;
use MyProject\MyJob;

$beanStalkdClient = new Pheanstalk('localhost');

$beanStalkdQueue = new BeanStalkdQueue($beanStalkdClient, 'my_queue_name');

$myNewConsumer = new QueueWorker($beanStalkdQueue, new MyJob());
$myNewConsumer->start();

使用最大迭代次数

您可以通过两种方式设置最大迭代次数,下面都展示了

使用设置器方法

$myConsumer = new QueueWorker($myQueue, new MyJob());
$myConsumer->setMaxIterations(10); //any number
$myConsumer->start();

使用构造函数参数

$myConsumer = new QueueWorker($myQueue, new MyJob(), 10);
$myConsumer->start();

启用优雅退出

要启用优雅退出,请在构造函数中传入一个额外的参数。

$myConsumer = new QueueWorker($myQueue, new MyJob(), 10, true);
$myConsumer->start();

AWS SQS 作业锁定以防止重复

当使用 AWS SQS 标准队列时,即使提供了 MessageVisibilityTimeout,有时工作者也可能接收到重复的消息。为了防止这种重复,您可以将 Redis 或 Memcached 锁定器提供给 SqsQueue 对象。如果您提供了锁定器对象且锁定失败,则作业将发送到错误队列。锁定器提供者不会移除/解锁作业。如有需要,您应手动解锁。您可以使用 getJobUniqId 方法获取作业键。

<?php

use Aws\Sqs\SqsClient;
use Simpleue\Queue\SqsQueue;
use Simpleue\Locker\MemcachedLocker;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$memcached = new \Memcached();
$memcached->addServer('localhost', 11211);
$memcachedLocker = new MemcachedLocker($memcached);

$sqsClient = new SqsClient([
    'profile' => 'aws-profile',
    'region' => 'eu-west-1',
    'version' => 'latest'
]);

$sqsQueue = new SqsQueue($sqsClient, 'my_queue_name', 20, 30);
$sqsQueue->setLocker($memcachedLocker);

$myNewConsumer = new QueueWorker($sqsQueue, new MyJob());
$myNewConsumer->start();

有关更多信息,请参阅http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html#standard-queues-at-least-once-delivery

(*) 我们的想法是支持任何队列系统,因此它是开放的。欢迎贡献。