yidas/codeigniter-queue-worker

CodeIgniter 3 队列工作器管理控制器

1.0.1 2019-04-03 07:15 UTC

This package is auto-updated.

Last update: 2024-09-07 07:59:48 UTC


README

CodeIgniter 队列工作器


CodeIgniter 3 守护进程队列工作器管理控制器

Latest Stable Version License Total Downloads

此队列工作器扩展被收集到 yidas/codeigniter-pack,这是一个针对 Codeigniter 框架的完整解决方案。

此库仅提供工作控制器,您需要实现自己的队列驱动程序,其中包含处理/进程。

功能

  • 多进程 实现在原生 PHP-CLI 上

  • 动态工作器调度(守护进程) 管理

  • 在后台永久运行 而无需额外库

  • 由启动器提供的 进程唯一性保证 功能

概要

演示

每 3 秒添加 2~5 个工作器设置,运行一个监听器(守护进程)

$ php index.php job_controller/listen
2018-10-06 14:36:28 - Queue Listener - Job detect
2018-10-06 14:36:28 - Queue Listener - Start dispatch
2018-10-06 14:36:28 - Queue Listener - Dispatch Worker #1 (PID: 13254)
2018-10-06 14:36:28 - Queue Listener - Dispatch Worker #2 (PID: 13256)
2018-10-06 14:36:31 - Queue Listener - Dispatch Worker #3 (PID: 13266)
2018-10-06 14:36:34 - Queue Listener - Job empty
2018-10-06 14:36:34 - Queue Listener - Stop dispatch, total cost: 6.00s

简介

此库为 Codeigniter 3 框架提供了一个带有多进程实现的守护进程队列工作器完整解决方案,它包括用于从队列中处理新任务的监听器(守护进程)和工作器。您可以将应用程序队列(例如 Redis)与队列工作器控制器集成。

PHP 在核心语言级别上缺乏对多线程的支持,此库通过管理多进程来实现多线程。

有关更多概念,以下图表显示了此库的实现结构

监听器(守护进程)可以继续运行以检测新任务,直到手动停止或您关闭终端。另一方面,工作器可以继续运行以处理新任务,直到没有任务为止,工作器可以被监听器调用。

启动器适合启动监听器进程,运行中的监听器进程将是唯一的,第二次启动将检测到存在的监听器而不会再次启动。

要求

此库需要以下条件

  • PHP CLI 5.4.0+
  • CodeIgniter 3.0.0+

安装

在 Codeigniter 项目的 \application 文件夹下运行 Composer

composer require yidas/codeigniter-queue-worker

检查 Codeigniter application/config/config.php

$config['composer_autoload'] = TRUE;

您可以将供应商路径自定义到 $config['composer_autoload']

配置

首先,创建一个扩展工作控制器的控制器,然后使用您自己的队列驱动程序来设计您自己的处理程序以实现工作控制器。以下是一些常用接口

use yidas\queue\worker\Controller as WorkerController;

class My_worker extends WorkerController
{
    // Initializer
    protected function init() {}
    
    // Worker
    protected function handleWork() {}
    
    // Listener
    protected function handleListen() {}
}

这些处理程序应设计为检测相同的作业队列,但用于不同的目的。例如,如果您使用 Redis 作为消息队列,则监听器和工作器检测相同的 Redis 列表队列,监听器仅通过派生工作器来分发作业,而工作器继续取出作业并处理,直到作业队列为空。

如何设计工作器

1. 构建 Initializer

protected void init()

方法 init() 是工作控制器的构造函数,它为您提供了一个定义初始化的接口,例如 Codeigniter 库加载。

示例代码

class My_worker extends \yidas\queue\worker\Controller
{
    protected function init()
    {
        // Optional autoload (Load your own libraries or models)
        $this->load->library('myjobs');
    }
// ...

如上所述,myjobs 库是由您自己的应用程序定义的,它处理您的作业流程。 具有 Redis 的 myjobs 示例代码

2. 构建 Worker

protected boolean handleWork(object $static=null)

handleWork() 方法是Worker的处理程序,它持续取出任务并执行处理。当此方法返回 false 时,表示任务队列已空,Worker将自行关闭。

示例代码

class My_worker extends \yidas\queue\worker\Controller
{
    protected function handleWork()
    {
        // Your own method to get a job from your queue in the application
        $job = $this->myjobs->popJob();
        
        // return `false` for job not found, which would close the worker itself.
        if (!$job)
            return false;
        
        // Your own method to process a job
        $this->myjobs->processJob($job);
        
        // return `true` for job existing, which would keep handling.
        return true;
    }
// ...

3. 构建 Listener

protected boolean handleListen(object $static=null)

handleListen() 方法是Listener的处理程序,它在检测到新任务时返回 true,并将Worker分配以处理任务。当此方法返回 false 时,表示任务队列已空,Listener将停止分配。

示例代码

class My_worker extends \yidas\queue\worker\Controller
{
    protected function handleListen()
    {
        // Your own method to detect job existence
        // return `true` for job existing, which leads to dispatch worker(s).
        // return `false` for job not found, which would keep detecting new job
        return $this->myjobs->exists();
    }
// ...

属性设置

您可以通过定义属性来自定义Worker。

use yidas\queue\worker\Controller as WorkerController;

class My_worker extends WorkerController
{
    // Setting for that a listener could fork up to 10 workers
    public $workerMaxNum = 10;
    
    // Enable text log writen into specified file for listener and worker
    public $logPath = 'tmp/my-worker.log';
}

公共属性

使用方法

有3种使用方法

  • listen 一个监听器(守护进程),通过克隆Worker来管理和分配任务。
  • work 一个Worker,用于从队列中处理和解决任务。
  • launch 一个启动器,用于在后台运行 listenwork 进程,并保持其唯一运行。

在配置了队列Worker控制器后,您可以通过使用Codeigniter 3 PHP-CLI命令来运行上述操作。

运行队列工作器

工作器

要处理队列中的新任务,您可以简单地运行Worker

$ php index.php myjob/work

作为Worker处理程序 handleWork(),Worker将一直运行(返回 true),直到任务队列为空(返回 false)。

监听器

要启动一个监听器来管理Worker,您可以简单地运行Listener

$ php index.php myjob/listen

作为您的监听器处理程序 handleListen(),当检测到新任务时(返回 true),监听器将分配Worker,直到任务队列为空,停止分配并监听下一个新任务(返回 false)。

监听器通过为每个Worker克隆一个运行进程来管理Worker,它实现了多进程,可以显著提高任务队列的性能。

在后台运行

此库支持在后台永久运行Listener或Worker,它为您提供将Worker作为服务运行的能力。

启动器

要在后台运行Listener或Worker,您可以调用启动器来启动进程

$ php index.php myjob/launch

默认情况下,启动器会启动 listen 进程,您也可以通过参数启动 work

$ php index.php myjob/launch/worker

启动器可以唯一地启动进程运行,这可以防止同时运行多个相同的监听器或Worker。例如,第一次启动监听器

$ php index.php myjob/launch
Success to launch process `listen`: myjob/listen.
Called command: php /srv/ci-project/index.php myjob/listen > /dev/null &
------
USER   PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
user 14650  0.0  0.7 327144 29836 pts/3    R+   15:43   0:00 php /srv/ci-project/index.php myjob/listen

然后,当您再次启动监听器时,启动器将防止重复运行

$ php index.php myjob/launch
Skip: Same process `listen` is running: myjob/listen.
------
USER   PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
user 14650  0.4  0.9 337764 36616 pts/3    S   15:43   0:00 php /srv/ci-project/index.php myjob/listen

对于唯一的工作场景,您可能需要使用数据库作为应用程序队列,如果有多个Worker处理相同的任务,则会导致竞态条件。与memcache列表不同,数据库队列应该由一个Worker同时处理。

进程状态

启动监听器后,您可以通过命令 ps aux|grep php 检查监听器服务

...
www-data  2278  0.7  1.0 496852 84144 ?        S    Sep25  37:29 php-fpm: pool www
www-data  3129  0.0  0.4 327252 31064 ?        S    Sep10   0:34 php /srv/ci-project/index.php myjob/listen
...

根据上述内容,您可以管理监听器和Worker,例如通过命令 kill 3129 杀死监听器。

Worker将在监听器检测到任务时运行,正在运行的Worker进程也将显示在 ps aux|grep php 中。

手动地,您也可以在监听器或Worker的末尾使用 &(一个和号)来在后台运行。