yejune/simpleue

用于以简单方式管理队列任务的 PHP 包

1.0.1 2017-07-28 04:40 UTC

This package is auto-updated.

Last update: 2024-09-14 12:36:31 UTC


README

Build Status Total Downloads Latest Stable Version

Simpleue 提供了一种非常简单的方式来运行工作进程以消费队列(消费者)在 PHP 中。该库已被开发,以便轻松扩展以与不同的队列服务器一起工作,并开放以管理任何类型的作业。

当前实现

  • Redis 队列适配器。
  • AWS SQS 队列适配器。
  • Beanstalkd 队列适配器。

您可以在 simpleue-example 中找到使用示例

工作进程

该库有一个工作进程类,它运行一个无限循环(可以用一些条件停止)并管理处理作业的所有阶段

  • 获取下一个作业。
  • 执行作业。
  • 作业成功后执行 ...
  • 作业失败后执行 ...
  • 执行错误后执行 ...
  • 没有作业时执行 ...

可以使用以下方法在控制下停止循环

  • 停止作业:作业处理器允许定义一个停止作业。
  • 最大迭代次数:可以在对象声明时指定。

每个工作进程有一个队列源,并管理一种类型的作业。可以使用相同的队列源并行运行多个工作进程。

队列

该库提供了一个接口,允许为不同的队列服务器实现队列连接。目前,库提供以下实现

  • Redis 队列适配器。
  • AWS SQS 队列适配器。
  • Beanstalkd 队列适配器。

队列接口管理所有与队列系统相关的功能,并抽象了关于作业的信息。

它需要队列系统客户端

  • Redis:Predis\Client
  • AWS SQS:Aws\Sqs\SqsClient
  • Beanstalkd:Pheanstalk\Pheanstalk;

以及源 队列名称。消费者需要额外的队列来管理过程

  • 处理队列(仅限 Redis):它将存储从源队列弹出的项目,在它被处理时。
  • 失败队列:所有失败的作业(根据作业定义)将添加到此队列。
  • 错误队列:所有在管理过程中抛出异常的作业将添加到此队列。

重要

对于 AWS SQS 队列,在开始工作之前,所有队列都必须存在。

作业

作业接口用于管理队列中接收到的作业。它必须管理领域业务逻辑并定义 停止作业

作业从队列系统中抽象出来,因此相同的作业定义可以与不同的队列接口一起工作。作业始终从队列接收消息体,

安装

在您的 composer json 文件中添加包

{

    "require": {
        "javibravo/simpleue" : "dev-master",
    },

}

使用方法

第一步是定义和实现要管理的 作业

<?php

namespace MyProject\MyJob;

use Simpleue\Job\Job;

class MyJob implements Job {

    public function manage($job) {
        ...
        try {
            ...
        } catch ( ... ) {
            return FALSE;
        }
        ...
        return TRUE;
    }

    ...
    
    public function isStopJob($job) {
        if ( ... )
            return TRUE;
        return FALSE;
    }
    
    ...

}

一旦定义了作业,我们就可以定义我们的消费者并开始运行

Redis 消费者

<?php

use Predis\Client;
use Simpleue\Queue\RedisQueue;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$redisQueue = new RedisQueue(
    new Client(array('host' => 'localhost', 'port' => 6379, 'schema' => 'tcp')),
    'my_queue_name'
);
$myNewConsumer = new QueueWorker($redisQueue, new MyJob());
$myNewConsumer->start();

AWS SQS 消费者

<?php

use Aws\Sqs\SqsClient;
use Simpleue\Queue\SqsQueue;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$sqsClient = new SqsClient([
    'profile' => 'aws-profile',
    'region' => 'eu-west-1',
    'version' => 'latest'
]);

$sqsQueue = new SqsQueue($sqsClient, 'my_queue_name');

$myNewConsumer = new QueueWorker($sqsQueue, new MyJob());
$myNewConsumer->start();

Beanstalkd 消费者

<?php

use Simpleue\Queue\BeanStalkdQueue;
use Simpleue\Worker\QueueWorker;
use Pheanstalk\Pheanstalk;
use MyProject\MyJob;

$beanStalkdClient = new Pheanstalk('localhost');

$beanStalkdQueue = new BeanStalkdQueue($beanStalkdClient, 'my_queue_name');

$myNewConsumer = new QueueWorker($beanStalkdQueue, new MyJob());
$myNewConsumer->start();

(*) 我们的想法是支持任何队列系统,因此它是开放的。欢迎贡献。