jumaphelix/php-dag

PHP中实现基于并行化处理的定向无环图(DAG)任务执行

v2.0.5 2024-08-26 07:40 UTC

README

  • PHP中实现基于并行化处理的定向无环图(DAG)任务执行
  • 当你有一系列任务需要执行,但其中一些任务存在依赖关系,即它们应该在依赖任务完成后才运行时,这个包就适合你了。
  • 首先根据任务依赖关系解决任务,并使用 Swoole 进行并行执行。如果任务依赖于依赖任务的数据,则可以从依赖任务获取输出。
  • 你还可以直接使用此包作为并行执行同步任务的一种方式。

需求

  • PHP >= 8.1
  • Swoole: 此包仅在使用Swoole扩展并已在php.ini配置文件中启用扩展后才能工作。

安装

composer require phelixjuma/php-dag

用法

考虑一组5个任务A、B、C、D和E,其中任务A只能在任务D完成后运行,任务B只能在任务C完成后运行:C -> B D -> A E 对于这种情况,任务C、D和E可以并发执行,而任务A应该在D完成后立即开始,任务B也应该在C完成后开始。

DAG拓扑排序

只要没有循环依赖,我们就可以将这些任务表示为一个DAG,然后根据给定信息对任务进行排序以确定执行顺序。

use JumaPhelix\DAG\DAG;
use JumaPhelix\DAG\Task;

// instantiate DAG
$dag = new DAG();

// Create tasks as instances or mocks of Task
$taskA = new Task('A', function() {});
$taskB = new Task('B', function() {});
$taskC = new Task('C', function() {});
$taskD = new Task('D', function() {});
$taskE = new Task('E', function() {});

// Add tasks to the DAG
$dag->addTask($taskA);
$dag->addTask($taskB);
$dag->addTask($taskC);
$dag->addTask($taskD);
$dag->addTask($taskE);

// Define dependencies with the first param defining the child and the second defining the parent
$dag->addParent('A', 'D');
$dag->addParent('B', 'C');

// Sort the tasks to show the order of execution
$sortedTasks = $dag->topologicalSort();

具有依赖关系的任务执行

让我们看看一个简单的示例,说明上述情况中的任务实现

function taskA($parentResults, $name) {

    sleep(1);

    $message = "";
    if (!empty($parentResults)) {
        foreach ($parentResults as $parentResult) {
            $message .= "($parentResult) ";
        }
    }
    $message .= "Task A completed in 1 second for $name";

    return $message;
}

// Here, we are suuming that all the tasks will be modifying this same data - while running on their own but respecting the dependencies
$data = [];

// Set up the tasks as DAG
$dag = new DAG();

// We define a shared data manager that allows concurrent tasks to modify the same data by reference in a lock-safe manner avoids data corruption
$dataManager = new SharedDataManager($data);

// Create tasks as instances or mocks of Task
$taskA = new Task('A', function ($parentResults = null) use($dataManager) {

    $args = ['name' => "Phelix"];

    // This task calls an external file
    $result = taskA($parentResults, ...$args);

    // Modify the shared data in a lock-safe way
    $dataManager->modifyData(function($data) use($result) {
        $data['A'] = $result;
        return $data;
    });

    return $result;

});

// This task handles everything within a closure 
$taskB = new Task('B', function($parentResults = null) use($dataManager)  {

    sleep(2);

    $message = "";
    // if there's data from a parent task, it can be consumed as shown here
    if (!empty($parentResults)) {
        foreach ($parentResults as $parentResult) {
            $message .= "($parentResult) ";
        }
    }
    $message .= "Task B completed in 2 seconds";

    // Modify the shared data in a lock-safe way
    $dataManager->modifyData(function($data) use($message) {
        $data['B'] = $message;
        return $data;
    });

    return $message;

});

$taskC = new Task('C', function($parentResults = null) use($dataManager) {

    sleep(1);

    $message = "";
    if (!empty($parentResults)) {
        foreach ($parentResults as $parentResult) {
            $message .= "($parentResult) ";
        }
    }
    $message .= "Task C completed in 1 second";

    // Modify the shared data in a lock-safe way
    $dataManager->modifyData(function($data) use($message) {
        $data['C'] = $message;
        return $data;
    });

    return $message;

});
$taskD = new Task('D', function($parentResults = null) use($dataManager) {

    sleep(3);

    $message = "";
    if (!empty($parentResults)) {
        foreach ($parentResults as $parentResult) {
            $message .= "($parentResult) ";
        }
    }
    $message .= "Task D completed in 3 seconds";

    // Modify the shared data in a lock-safe way
    $dataManager->modifyData(function($data) use($message) {
        $data['D'] = $message;
        return $data;
    });

    return $message;

});
$taskE = new Task('E', function($parentResults = null) use($dataManager) {

    sleep(3);

    $message = "";
    if (!empty($parentResults)) {
        foreach ($parentResults as $parentResult) {
            $message .= "($parentResult) ";
        }
    }
    $message .= "Task E completed in 3 seconds";

    // Modify the shared data in a lock-safe way
    $dataManager->modifyData(function($data) use($message) {
        $data['E'] = $message;
        return $data;
    });

    return $message;
});

// Add tasks to the DAG
$dag->addTask($taskA);
$dag->addTask($taskB);
$dag->addTask($taskC);
$dag->addTask($taskD);
$dag->addTask($taskE);

// Define dependencies (C, D, E, A, B)
$dag->addParent('A', 'D');
$dag->addParent('B', 'C');

// Initialize the task executor
$executor = new TaskExecutor($dag);

// Execute tasks
$executor->execute();

$executionTime = $executor->getExecutionTime(); // Tasks will run in parallel and execute in a much shorter time than if they were run synchronously

// We can get all results which from each of the tasks
$allResults = $executor->getResults();

// Or we can get the result from the last task
$lastResult = $executor->getFinalResult();

// We can get the final value of the shared data as modified by all the tasks
$sharedData = $dataManager->getData();

基本并行化

假设你只想并行化没有依赖关系的任务。例如,我需要从数据库中循环1000条记录,并对每条记录执行需要1秒钟的操作,如果正常在PHP中循环,总时间为1000秒。我们可以并行化执行,使总时间保持为1秒,无论记录数量多少。

$data = [];

// Set up the tasks as DAG
$dag = new DAG();
$dataManager = new SharedDataManager($data);

$count = 10000;

for ($i = 0; $i < $count; $i++) {

    $dag->addTask(new Task($i, function () use($i, $dataManager) {

        $time = 2;
        sleep($time);

        $response = $time . " seconds";

        $dataManager->modifyData(function($data) use($i, $response) {
            $data[$i] = $response;
            return $data;
        });

        return $response;

    }));
}

// Initialize the task executor
$executor = new TaskExecutor($dag);

// Execute tasks
$executor->execute();

// Total execution time will be approx 2 seconds down from 20,000 seconds had the tasks run in a normal loop
$executionTime = $executor->getExecutionTime();

// All results from each of the tasks
$allResults = $executor->getResults();

// The result from the last task to execute
$lastResult = $executor->getFinalResult();

// Shared data final state
$sharedData = $dataManager->getData();