etouches/phplumber

管道库,允许异步和同步过程的混合。

v1.0.0-beta 2017-03-28 21:03 UTC

This package is not auto-updated.

Last update: 2024-09-29 02:49:48 UTC


README

Phplumber 是一个用于 PHP 的非常简单的管道库,它允许异步和同步过程的混合。使用它将大过程分解成多个步骤,使用队列和多进程。Phplumber 不包含硬编码的依赖项,可以由任何队列设置和任何存储机制支持。

需求

  • PHP 5.3+
  • 队列,例如 RabbitMQ 或 Redis
  • 存储,例如关系型数据库表或 Redis

示例

让我们以创建和填充数据库为例。它需要多个步骤,其中一些可以并发完成。

  1. 创建数据库(一个进程)
  2. 创建和填充表(每个表一个进程)
  3. 创建依赖于多个表的视图(一个进程,依赖于所有表都存在)
                         Create table 1
                       /                \
    Create database ->   Create table 2   -> Create views
                       \                /
                         Create table 3

首先,我们定义我们的进程。顺序步骤扩展 Process。可以多次运行并使用不同数据的步骤扩展 MultiProcess

class CreateDatabase extends Process
{
    public function invoke($payload)
    {
        $database_name = $payload['database_name'];
        echo "Drop database $database_name if it exists...\n";
        echo "Creating database $database_name...\n";
    }
}
class CreateTable extends MultiProcess
{
    // Determine the data we need to queue for async processes
    public function getAsyncPayloads($payload)
    {
        $database_name = $payload['database_name'];
        $table_names = array('first_table', 'second_table', 'third_table');
        
        $payloads = array();
        foreach ($table_names as $table) {
            $payloads[] = array('database_name' => $database_name, 'table_name' => $table);
        }
        return $payloads;
    }
    
    public function invoke($payload)
    {
        $database_name = $payload['database_name'];
        $table_name = $payload['table'];
        echo "Connecting to database $database_name...\n";
        echo "Creating and populating $table_name...\n";
        
        switch ($payload['table']) {
            case 'first_table':
                // Create table and insert rows...
                break;
            // ...
        }
    }
}

然后,我们定义进程的顺序。

class CreateAndFillDatabase extends ProcessList
{
    protected function setup()
    {
        $this
            ->add('CreateDatabase')
            ->add('CreateTable')
            ->add('CreateViews');
    }
}

现在我们可以启动进程序列。

$equation = new CreateAndFillDatabase();
$equation->process(array('database_name' => 'test_db'));

请参阅 examples 目录,以获取完整的工作示例。

入门

  1. 实现 StorageInterface。这将保存信号量数据。合适的存储引擎包括任何关系型数据库、nosql 或如 Redis 一样的键值存储。
  2. 扩展 Queue 类以集成队列系统。合适的队列引擎包括 Redis 和 RabbitMQ。
  3. 将每个进程编写为扩展 Process 的类(对于同步)或扩展 MultiProcess 的类(对于异步)。
  4. 实现 ProcessFactoryInterface。这将创建每个 ProcessInterface 实例,允许你使用任何先决条件设置你的进程,例如数据库连接或配置选项。
  5. 通过扩展 ProcessList 将进程组合在一起。
  6. 实现一个工作守护进程,它实例化你的 Queue 实现,并调用 consume() 来监听传入的消息。每个消息将调用多进程的单个部分。运行多个工作进程以并发执行进程。
  7. 在您的系统中选择一个位置启动整个工作流程,实例化您的 ProcessList,并调用 process(),传递初始有效载荷。