jetea/queue

一个工作队列

2.0.1 2019-01-25 06:41 UTC

This package is auto-updated.

Last update: 2024-09-25 21:03:07 UTC


README

基于beanstalkd(及其他队列系统)的工作队列,易于派发和处理工作。

关键词

队列工作易于处理带超时的工作延迟工作retry_after给定连接给定队列给定通道内存限制重新加载

安装

composer require "jetea/queue:~2.0"

概述

创建队列

use Jetea\Queue\Drivers\Beanstalkd;
use Jetea\Queue\Queue;

$queue = new Queue(new Beanstalkd($host, $port));

创建工作

<?php

use Jetea\Queue\Job;

class ExampleJob extends Job
{
    /**
     * @var string job queue name (beanstalkd tube)
     */
    public $queue = 'default';

    /**
     * The "time to run" for all pushed jobs. (beanstalkd ttr, timeout)
     *
     * @var int 允许 worker 执行的最大秒数,超时 job 将会被 release 到 ready 状态.
     */
    public $retry_after = 60;

    /**
     * The number of times the job may be attempted.
     *
     * @var int 最大尝试次数
     */
    public $tries = 1;

    /**
     * @var array
     */
    public $words;

    public function __construct(array $words)
    {
        $this->words = $words;
    }

    public function handle()
    {
        var_export($this->words);

        var_dump($this->retry_after, $this->tries);

        // throw new \Exception('handle job with error...lol ^_^');
    }
}

当然,您可以在之后派发工作(推送延迟工作)。

派发工作

$queue->push(new ExampleJob(['i', 'love', 'china']));

注意:`$worker->daemon()` 是阻塞的。

$queue->later(60, new ExampleJob(['i', 'love', 'china']));

处理工作

$worker = new Worker($queue);

$worker->daemon();

默认情况下,工作进程将监听名为 `default` 的通道,您可以为工作进程指定队列(beanstalkd 通道)如下:

您还可以指定在没有工作时工作进程的睡眠时间,以及内存限制,如下:

$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);

$worker->daemon();

注意,如果您想重新加载队列工作进程,您应该在 `Jetea\Queue\Worker` 类上实现 `queueShouldRestart` 方法。

$sleep = 60;
$memoryLimit = 128;

$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);

$worker->daemon($sleep, $memoryLimit);

强烈建议扩展 `Jetea\Queue\Worker`,并实现以下方法:`logProcessError`,`handleWithObj`,`queueShouldRestart`。

推荐

队列包可以在其他队列系统上运行,您应该实现以下接口 Jetea\Queue\Drivers\Jobs\JobInterfaceJetea\Queue\Drivers\QueueInterface

其他

队列包可以在其他队列系统上运行,您应该实现以下接口 Jetea\Queue\Drivers\Jobs\JobInterfaceJetea\Queue\Drivers\QueueInterface

示例

链接

待办事项

  • 添加一些有用的命令,例如清除队列工作或踢出埋藏的工作等。