ride/lib-queue

作业队列的抽象库

1.3.0 2020-05-06 10:55 UTC

This package is auto-updated.

Last update: 2024-09-06 21:26:39 UTC


README

PHP Ride 框架的队列抽象库。

使用队列系统处理耗时任务。

库中包含的内容

QueueJob

QueueJob 接口供您实现。它包含运行您的任务的逻辑。

您的作业实现可能需要数据来工作。您应该将所需数据的引用而不是初始化的对象附加到作业中,例如 id。由于 id 比对象实例更容易序列化,这会使作业更小,便于存储。

当您的作业被调用时,您应使用引用在当前状态中查找数据,而不是在作业入队时的状态。

QueueManager

要实现 QueueManager 接口,以便为底层队列系统提供。这是队列系统的外观。您可以使用它检查系统、队列或作业的状态。它也用于入队作业和更新它们的状态。

QueueJob 接收 QueueManager 当它被调用时。队列管理器应被扩展,以便它可以被队列作业用于查找它们的引用。

QueueJobStatus

当您询问有关队列作业的信息时,QueueManager 将返回 QueueJobStatus 实例。此接口也由底层队列系统实现。

QueueDispatcher

QueueJob 包含它所在的队列。当您创建一个作业时,您可能不知道可用的队列或它们的可用性。您可以使用 QueueDispatcher 来入队作业。这个调度器的目标是选择适合作业的正确队列并执行入队过程。

库提供了 2 个调度器。

SingleQueueDispatcher

SingleQueueDispatcher 是一个非常简单的调度器。当您只有一个静态队列时使用。

RoundRobinQueueDispatcher

RoundRobinQueueDispatcher 使用队列的数量进行初始化。当这个调度器入队一个作业时,它将把它推送到作业数量最少的队列。

QueueWorker

QueueWorker 用于从队列中抓取作业并调用它们。这个工作流程很可能是一个在后台运行的命令或守护进程。您需要为每个队列有一个工作进程。

提供了一个通用的实现,即 GenericQueueWorker 类。

代码示例

查看此代码示例,了解此库的一些可能性

<?php

use ride\library\queue\dispatcher\QueueDispatcher;
use ride\library\queue\job\AbstractQueueJob;
use ride\library\queue\worker\GenericQueueWorker;
use ride\library\queue\QueueManager;

// push some jobs to the queue
function pushJobs(QueueDispatcher $queueDispatcher, array $files) {
    foreach ($files as $index => $file) {
        // create a job with the needed references
        $job = new MyQueueJob($file);
         
        // you can optionally set a priority
        // jobs with a smaller priority value are more urgent
        $job->setPriority(100);
         
        // the queue dispatcher will select a queue and push your job to it
        $queueJobStatus = $queueDispatcher->queue($job);
    
        // you get the job id and more properties through the resulting queue job status     
        $files[$index] = $queueJobStatus->getId();
    }
    
    return $files;
}

// work horse
function work(QueueManager $queueManager, $queue = 'default') {
    // use a generic worker
    $queueWorker = new GenericQueueWorker();
    
    // this worker will poll the queue every 3 seconds for a job forever
    // you can provide a 0 second sleep time, the worker will then stop when it has no jobs
    $queueWorker->work($queueManager, $queue, 3);
}

// example job which executes pngcrush on the provided path
class MyQueueJob extends AbstractQueueJob {

    public function __construct($path) {
        $this->path = $path;
    }

    public function run(QueueManager $queueManager) {
        // you can update the description of your job while running
        $queueManager->updateStatus($this->getJobId(), 'Checking file ' . $this->path);
        
        if (file_exists($this->path) && !is_directory($this->path)) {
            return;
        }
        
        $command = 'pngcrush -nofilecheck -rem alla -bail -blacken -ow ' . $this->path; 
        
        $queueManager->updateStatus($this->getJobId(), 'Invoking ' . $command);
        
        exec($command, $output, $code);
        
        $queueManager->updateStatus($this->getJobId(), $command . ' returned ' . $code);
    }
    
}

相关模块

您可以通过以下相关模块检查此库

安装

您可以使用 Composer 来安装此库。

composer require ride/lib-queue