mirays/codeigniter-queue-worker

CodeIgniter 3 队列工作管理控制器

2.0.1 2023-09-12 19:45 UTC

This package is auto-updated.

Last update: 2024-09-12 22:21:49 UTC


README

CodeIgniter 队列工作


CodeIgniter 3 守护进程队列工作管理控制器

License

此队列工作扩展被收集到 yidas/codeigniter-pack 中,它是一个完整的 Codeigniter 框架解决方案。

此库仅提供工作控制器,您需要实现自己的队列驱动程序,其中包含处理器/进程。

功能

  • 多进程 实现(原生 PHP-CLI)

  • 动态工作分发(守护进程) 管理

  • 永久在后台运行 而不需要额外的库

  • 进程唯一性保证 由启动器实现

概述

演示

每3秒添加2~5个工作进程设置,以运行监听器(守护进程)

$ php index.php job_controller/listen
2018-10-06 14:36:28 - Queue Listener - Job detect
2018-10-06 14:36:28 - Queue Listener - Start dispatch
2018-10-06 14:36:28 - Queue Listener - Dispatch Worker #1 (PID: 13254)
2018-10-06 14:36:28 - Queue Listener - Dispatch Worker #2 (PID: 13256)
2018-10-06 14:36:31 - Queue Listener - Dispatch Worker #3 (PID: 13266)
2018-10-06 14:36:34 - Queue Listener - Job empty
2018-10-06 14:36:34 - Queue Listener - Stop dispatch, total cost: 6.00s

介绍

此库为 Codeigniter 3 框架提供了一个带有多进程实现的守护进程队列工作总解决方案,它包括监听器(守护进程)和工作进程,用于处理队列中的新工作。您可以将应用程序队列(如 Redis)与队列工作控制器集成。

PHP 在核心语言级别不支持多线程,此库通过管理多进程来实现多线程。

有关更多概念,以下图表显示了此库的实现结构

监听器(守护进程)可以持续运行,以检测新工作,直到手动停止或您关闭终端。另一方面,工作进程可以继续运行,以处理新工作,直到没有剩余的工作,工作进程可以由监听器调用。

启动器适合启动监听器进程,运行中的监听器进程是唯一的,第二次启动会检测到现有的监听器而不会再次启动。

要求

此库需要以下内容

  • PHP CLI 5.4.0+
  • CodeIgniter 3.0.0+

安装

\application 文件夹下的 Codeigniter 项目中运行 Composer

composer require yidas/codeigniter-queue-worker

检查 Codeigniter application/config/config.php

$config['composer_autoload'] = TRUE;

您可以将供应商路径自定义到 $config['composer_autoload']

配置

首先,创建一个继承工作控制器的控制器,然后使用您自己的队列驱动程序来设计您自己的处理程序,以实现工作控制器。以下是一些常见接口

use yidas\queue\worker\Controller as WorkerController;

class My_worker extends WorkerController
{
    // Initializer
    protected function init() {}
    
    // Worker
    protected function handleWork() {}
    
    // Listener
    protected function handleListen() {}
}

这些处理程序应该被设计为检测相同的作业队列,但用于不同的目的。例如,如果您使用 Redis 作为消息队列,监听器和工作进程检测相同的 Redis 列表队列,监听器仅通过为工作进程分叉来分发工作,而工作进程继续取出工作并处理,直到作业队列为空。

如何设计一个工作进程

1. 构建初始化器

protected void init()

init() 方法是工作控制器的构造函数,它为您提供了定义初始化(如 Codeigniter 库加载)的接口。

示例代码

class My_worker extends \yidas\queue\worker\Controller
{
    protected function init()
    {
        // Optional autoload (Load your own libraries or models)
        $this->load->library('myjobs');
    }
// ...

如上所示,myjobs 库是由您的应用程序定义的,用于处理作业过程。 具有 Redis 的 myjobs 的示例代码

2. 构建工作进程

protected boolean handleWork(object $static=null)

handleWork() 方法是Worker的处理器,用于持续取出工作并执行处理。当此方法返回 false 时,意味着工作队列已空,Worker将自行关闭。

示例代码

class My_worker extends \yidas\queue\worker\Controller
{
    protected function handleWork()
    {
        // Your own method to get a job from your queue in the application
        $job = $this->myjobs->popJob();
        
        // return `false` for job not found, which would close the worker itself.
        if (!$job)
            return false;
        
        // Your own method to process a job
        $this->myjobs->processJob($job);
        
        // return `true` for job existing, which would keep handling.
        return true;
    }
// ...

3. 构建监听器

protected boolean handleListen(object $static=null)

handleListen() 方法是Listener的处理器,用于在检测到新工作项时返回 true,并将Worker调度到处理工作。当此方法返回 false 时,意味着工作队列已空,Listener将停止调度。

示例代码

class My_worker extends \yidas\queue\worker\Controller
{
    protected function handleListen()
    {
        // Your own method to detect job existence
        // return `true` for job existing, which leads to dispatch worker(s).
        // return `false` for job not found, which would keep detecting new job
        return $this->myjobs->exists();
    }
// ...

属性设置

您可以通过定义属性来自定义Worker。

use yidas\queue\worker\Controller as WorkerController;

class My_worker extends WorkerController
{
    // Setting for that a listener could fork up to 10 workers
    public $workerMaxNum = 10;
    
    // Enable text log writen into specified file for listener and worker
    public $logPath = 'tmp/my-worker.log';
}

公共属性

用法

有3个操作可用

  • listen - 管理和调度工作的Listener(守护进程),通过分叉Worker来实现。
  • work - 处理和解决来自队列的工作的Worker。
  • launch - 在后台运行 listenwork 进程并保持其唯一运行的启动器。

在配置队列Worker控制器后,您可以使用Codeigniter 3 PHP-CLI命令运行上述操作。

运行队列工作进程

工作进程

要处理来自队列的新工作,您可以简单地运行Worker

$ php index.php myjob/work

作为您的Worker处理器 handleWork(),Worker将继续运行(返回 true),直到工作队列为空(返回 false)。

监听器

要启动一个用于管理Worker的Listener,您可以简单地运行Listener

$ php index.php myjob/listen

作为您的Listener处理器 handleListen(),Listener将在检测到新工作项时调度Worker(返回 true),直到工作队列为空,停止调度并监听下一个新工作项(返回 false)。

Listener通过为每个Worker分叉运行进程来管理Worker,实现了多进程,这可以显著提高工作队列的性能。

在后台运行

此库支持在后台永久运行Listener或Worker,您可以将Worker作为服务运行。

启动器

要后台运行Listener或Worker,您可以调用启动器来启动进程

$ php index.php myjob/launch

默认情况下,启动器将启动 listen 进程,您也可以通过传递参数来启动 work

$ php index.php myjob/launch/worker

启动器可以保持启动的进程唯一运行,这可以防止同时运行多个相同的Listener或Worker。例如,首次启动Listener

$ php index.php myjob/launch
Success to launch process `listen`: myjob/listen.
Called command: php /srv/ci-project/index.php myjob/listen > /dev/null &
------
USER   PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
user 14650  0.0  0.7 327144 29836 pts/3    R+   15:43   0:00 php /srv/ci-project/index.php myjob/listen

然后,当您再次启动Listener时,启动器将防止重复运行

$ php index.php myjob/launch
Skip: Same process `listen` is running: myjob/listen.
------
USER   PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
user 14650  0.4  0.9 337764 36616 pts/3    S   15:43   0:00 php /srv/ci-project/index.php myjob/listen

对于唯一的工作场景,您可以使用数据库作为应用程序队列,如果有多个Worker处理相同的工作项,则会导致竞争条件。与memcache列表不同,数据库队列应该由一个Worker在同一时间处理。

进程状态

启动Listener后,您可以通过命令 ps aux|grep php 检查Listener服务。

...
www-data  2278  0.7  1.0 496852 84144 ?        S    Sep25  37:29 php-fpm: pool www
www-data  3129  0.0  0.4 327252 31064 ?        S    Sep10   0:34 php /srv/ci-project/index.php myjob/listen
...

根据上述内容,您可以管理Listener和Worker,例如通过命令 kill 3129 杀死Listener。

Listener检测到工作项时,Worker将运行,运行的Worker进程也将显示在 ps aux|grep php 中。

手动地,您也可以在Listener或Worker的末尾使用 &(和号)来在后台运行。