williamxsp/code-igniter-queue-worker

CI3 的队列和工作者系统。基于 yidas/codeigniter-queue-worker,它使用 Symfony Process 处理任务。

1.0.1 2019-04-03 15:38 UTC

This package is not auto-updated.

Last update: 2024-09-26 18:59:35 UTC


README

CodeIgniter 队列工作者


CodeIgniter 3 守护队列工作者管理控制器

Latest Stable Version License

此队列工作者扩展被收集到 yidas/codeigniter-pack,它是一个完整的 Codeigniter 框架解决方案。

此库仅提供工作者控制器,您需要实现自己的队列驱动程序,其中包含处理程序/进程。

功能

  • 原生 PHP-CLI 上的多进程实现

  • 动态工作者分发(守护进程)管理

  • 在后台永久运行,无需额外库

  • 通过启动器保证进程唯一性

概要

演示

每 3 秒添加 2~5 个工作者设置,运行监听器(守护进程)

$ php index.php job_controller/listen
2018-10-06 14:36:28 - Queue Listener - Job detect
2018-10-06 14:36:28 - Queue Listener - Start dispatch
2018-10-06 14:36:28 - Queue Listener - Dispatch Worker #1 (PID: 13254)
2018-10-06 14:36:28 - Queue Listener - Dispatch Worker #2 (PID: 13256)
2018-10-06 14:36:31 - Queue Listener - Dispatch Worker #3 (PID: 13266)
2018-10-06 14:36:34 - Queue Listener - Job empty
2018-10-06 14:36:34 - Queue Listener - Stop dispatch, total cost: 6.00s

介绍

此库为 Codeigniter 3 框架提供了一种带有多进程实现的守护队列工作者总解决方案,它包括监听器(守护进程)和工作者,用于从队列中处理新任务。您可以将您的应用程序队列(例如 Redis)与队列工作者控制器集成。

PHP 在核心语言级别上不支持多线程,此库通过管理多进程来实现多线程。

有关更多概念,以下图表显示了此库的实现结构

监听器(守护进程)可以继续运行以检测新任务,直到手动停止或您关闭终端。另一方面,工作者可以继续运行以处理新任务,直到没有任务剩余,此时工作者可以由监听器调用。

启动器适合启动监听器进程,正在运行的监听器进程将是唯一的,第二次启动将检测到现有的监听器而不会再次启动。

要求

此库需要以下内容

  • PHP CLI 5.4.0+
  • CodeIgniter 3.0.0+

安装

\application 文件夹下的 Codeigniter 项目中运行 Composer

composer require yidas/codeigniter-queue-worker

检查 Codeigniter application/config/config.php

$config['composer_autoload'] = TRUE;

您可以将供应商路径自定义到 $config['composer_autoload']

配置

首先,创建一个扩展工作控制器的控制器,然后使用您自己的队列驱动程序设计自己的处理程序以实现工作者控制器。以下是一些常见接口

use yidas\queue\worker\Controller as WorkerController;

class My_worker extends WorkerController
{
    // Initializer
    protected function init() {}
    
    // Worker
    protected function handleWork() {}
    
    // Listener
    protected function handleListen() {}
}

这些处理程序应设计为检测相同的作业队列,但用于不同的目的。例如,如果您使用 Redis 作为消息队列,监听器和工作者检测相同的 Redis 列表队列,监听器仅通过为工作者分叉来分配作业,而工作者则继续取出作业并进行处理,直到作业队列为空。

如何设计一个工作者

1. 构建初始化器

protected void init()

init() 方法是工作者控制器的构造函数,它为您提供定义初始化(例如 Codeigniter 库加载)的接口。

示例代码

class My_worker extends \yidas\queue\worker\Controller
{
    protected function init()
    {
        // Optional autoload (Load your own libraries or models)
        $this->load->library('myjobs');
    }
// ...

如上所述,myjobs 库由您的应用程序定义,它处理您的作业流程。

2. 构建工作者

protected boolean handleWork(object $static=null)

handleWork()方法是一个处理器的处理器,它持续地从任务队列中取出工作并进行处理。当这个方法返回false时,表示任务队列为空,并且工作器将关闭自己。

示例代码

class My_worker extends \yidas\queue\worker\Controller
{
    protected function handleWork()
    {
        // Your own method to get a job from your queue in the application
        $job = $this->myjobs->popJob();
        
        // return `false` for job not found, which would close the worker itself.
        if (!$job)
            return false;
        
        // Your own method to process a job
        $this->myjobs->processJob($job);
        
        // return `true` for job existing, which would keep handling.
        return true;
    }
// ...

3. 构建监听器

protected boolean handleListen(object $static=null)

handleListen()方法是一个处理器的处理器,它将工作器分派去处理工作,同时通过返回true来检测新的任务。当这个方法返回false时,表示任务队列为空,并且监听器将停止分派。

示例代码

class My_worker extends \yidas\queue\worker\Controller
{
    protected function handleListen()
    {
        // Your own method to detect job existence
        // return `true` for job existing, which leads to dispatch worker(s).
        // return `false` for job not found, which would keep detecting new job
        return $this->myjobs->exists();
    }
// ...

属性设置

您可以通过定义属性来自定义工作器。

use yidas\queue\worker\Controller as WorkerController;

class My_worker extends WorkerController
{
    // Setting for that a listener could fork up to 10 workers
    public $workerMaxNum = 10;
    
    // Enable text log writen into specified file for listener and worker
    public $logPath = 'tmp/my-worker.log';
}

公共属性

用法

有3个动作用于使用

  • listen:一个监听器(守护进程),通过派生工作器来管理和分派任务。
  • work:一个工作器,用于从队列中处理和解决任务。
  • launch:一个启动器,用于在后台运行listenwork进程,并保持其唯一运行。

您可以在配置队列工作器控制器后,使用Codeigniter 3 PHP-CLI命令来运行上述动作。

运行队列工作者

工作者

要处理队列中的新任务,您可以简单地运行工作器

$ php index.php myjob/work

作为您的工人处理器handleWork(),工作器将继续运行(返回true),直到任务队列为空(返回false)。

监听器

要启动一个监听器来管理工作器,您可以简单地运行监听器

$ php index.php myjob/listen

作为您的监听器处理器handleListen(),当检测到新任务时(返回true),监听器将分派工作器,直到任务队列为空,停止分派并监听下一个新任务(返回false)。

监听器通过为每个工作器派生运行进程来管理工作器,它实现了多进程,这可以显著提高任务队列的性能。

在后台运行

此库支持在后台永久运行监听器或工作器,它为您提供了将工作器作为服务运行的能力。

启动器

要在后台运行监听器或工作器,您可以调用启动器来启动进程

$ php index.php myjob/launch

默认情况下,启动器将启动listen进程,您也可以通过传递参数来启动work

$ php index.php myjob/launch/worker

启动器可以唯一地保持进程运行,这可以防止同时运行多个相同的监听器或工作器。例如,第一次启动监听器

$ php index.php myjob/launch
Success to launch process `listen`: myjob/listen.
Called command: php /srv/ci-project/index.php myjob/listen > /dev/null &
------
USER   PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
user 14650  0.0  0.7 327144 29836 pts/3    R+   15:43   0:00 php /srv/ci-project/index.php myjob/listen

然后,当您再次启动监听器时,启动器将防止重复运行

$ php index.php myjob/launch
Skip: Same process `listen` is running: myjob/listen.
------
USER   PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
user 14650  0.4  0.9 337764 36616 pts/3    S   15:43   0:00 php /srv/ci-project/index.php myjob/listen

对于唯一的工作场景,您可能需要使用数据库作为应用程序队列,如果有多个工作器处理相同的任务,这会导致竞争条件。与memcache列表不同,数据库队列应该由同一时间只有一个工作器处理。

进程状态

启动监听器后,您可以通过命令ps aux|grep php来检查监听器服务。

...
www-data  2278  0.7  1.0 496852 84144 ?        S    Sep25  37:29 php-fpm: pool www
www-data  3129  0.0  0.4 327252 31064 ?        S    Sep10   0:34 php /srv/ci-project/index.php myjob/listen
...

根据上述内容,您可以管理监听器和工作器,例如通过命令kill 3129来杀死监听器。

当监听器检测到任务时,工作器将运行,正在运行的工作器进程也将显示在ps aux|grep php中。

手动地,您也可以在监听器或工作器末尾使用&(和号)来在后台运行。