jumaphelix / php-dag
PHP中实现基于并行化处理的定向无环图(DAG)任务执行
v2.0.5
2024-08-26 07:40 UTC
Requires
- php: >=8.0
- ext-openswoole: *
- openswoole/core: 22.1.5
Requires (Dev)
- phpunit/phpunit: ^9.3
README
- PHP中实现基于并行化处理的定向无环图(DAG)任务执行
- 当你有一系列任务需要执行,但其中一些任务存在依赖关系,即它们应该在依赖任务完成后才运行时,这个包就适合你了。
- 首先根据任务依赖关系解决任务,并使用 Swoole 进行并行执行。如果任务依赖于依赖任务的数据,则可以从依赖任务获取输出。
- 你还可以直接使用此包作为并行执行同步任务的一种方式。
需求
- PHP >= 8.1
- Swoole: 此包仅在使用Swoole扩展并已在php.ini配置文件中启用扩展后才能工作。
安装
composer require phelixjuma/php-dag
用法
考虑一组5个任务A、B、C、D和E,其中任务A只能在任务D完成后运行,任务B只能在任务C完成后运行:C -> B D -> A E 对于这种情况,任务C、D和E可以并发执行,而任务A应该在D完成后立即开始,任务B也应该在C完成后开始。
DAG拓扑排序
只要没有循环依赖,我们就可以将这些任务表示为一个DAG,然后根据给定信息对任务进行排序以确定执行顺序。
use JumaPhelix\DAG\DAG; use JumaPhelix\DAG\Task; // instantiate DAG $dag = new DAG(); // Create tasks as instances or mocks of Task $taskA = new Task('A', function() {}); $taskB = new Task('B', function() {}); $taskC = new Task('C', function() {}); $taskD = new Task('D', function() {}); $taskE = new Task('E', function() {}); // Add tasks to the DAG $dag->addTask($taskA); $dag->addTask($taskB); $dag->addTask($taskC); $dag->addTask($taskD); $dag->addTask($taskE); // Define dependencies with the first param defining the child and the second defining the parent $dag->addParent('A', 'D'); $dag->addParent('B', 'C'); // Sort the tasks to show the order of execution $sortedTasks = $dag->topologicalSort();
具有依赖关系的任务执行
让我们看看一个简单的示例,说明上述情况中的任务实现
function taskA($parentResults, $name) { sleep(1); $message = ""; if (!empty($parentResults)) { foreach ($parentResults as $parentResult) { $message .= "($parentResult) "; } } $message .= "Task A completed in 1 second for $name"; return $message; } // Here, we are suuming that all the tasks will be modifying this same data - while running on their own but respecting the dependencies $data = []; // Set up the tasks as DAG $dag = new DAG(); // We define a shared data manager that allows concurrent tasks to modify the same data by reference in a lock-safe manner avoids data corruption $dataManager = new SharedDataManager($data); // Create tasks as instances or mocks of Task $taskA = new Task('A', function ($parentResults = null) use($dataManager) { $args = ['name' => "Phelix"]; // This task calls an external file $result = taskA($parentResults, ...$args); // Modify the shared data in a lock-safe way $dataManager->modifyData(function($data) use($result) { $data['A'] = $result; return $data; }); return $result; }); // This task handles everything within a closure $taskB = new Task('B', function($parentResults = null) use($dataManager) { sleep(2); $message = ""; // if there's data from a parent task, it can be consumed as shown here if (!empty($parentResults)) { foreach ($parentResults as $parentResult) { $message .= "($parentResult) "; } } $message .= "Task B completed in 2 seconds"; // Modify the shared data in a lock-safe way $dataManager->modifyData(function($data) use($message) { $data['B'] = $message; return $data; }); return $message; }); $taskC = new Task('C', function($parentResults = null) use($dataManager) { sleep(1); $message = ""; if (!empty($parentResults)) { foreach ($parentResults as $parentResult) { $message .= "($parentResult) "; } } $message .= "Task C completed in 1 second"; // Modify the shared data in a lock-safe way $dataManager->modifyData(function($data) use($message) { $data['C'] = $message; return $data; }); return $message; }); $taskD = new Task('D', function($parentResults = null) use($dataManager) { sleep(3); $message = ""; if (!empty($parentResults)) { foreach ($parentResults as $parentResult) { $message .= "($parentResult) "; } } $message .= "Task D completed in 3 seconds"; // Modify the shared data in a lock-safe way $dataManager->modifyData(function($data) use($message) { $data['D'] = $message; return $data; }); return $message; }); $taskE = new Task('E', function($parentResults = null) use($dataManager) { sleep(3); $message = ""; if (!empty($parentResults)) { foreach ($parentResults as $parentResult) { $message .= "($parentResult) "; } } $message .= "Task E completed in 3 seconds"; // Modify the shared data in a lock-safe way $dataManager->modifyData(function($data) use($message) { $data['E'] = $message; return $data; }); return $message; }); // Add tasks to the DAG $dag->addTask($taskA); $dag->addTask($taskB); $dag->addTask($taskC); $dag->addTask($taskD); $dag->addTask($taskE); // Define dependencies (C, D, E, A, B) $dag->addParent('A', 'D'); $dag->addParent('B', 'C'); // Initialize the task executor $executor = new TaskExecutor($dag); // Execute tasks $executor->execute(); $executionTime = $executor->getExecutionTime(); // Tasks will run in parallel and execute in a much shorter time than if they were run synchronously // We can get all results which from each of the tasks $allResults = $executor->getResults(); // Or we can get the result from the last task $lastResult = $executor->getFinalResult(); // We can get the final value of the shared data as modified by all the tasks $sharedData = $dataManager->getData();
基本并行化
假设你只想并行化没有依赖关系的任务。例如,我需要从数据库中循环1000条记录,并对每条记录执行需要1秒钟的操作,如果正常在PHP中循环,总时间为1000秒。我们可以并行化执行,使总时间保持为1秒,无论记录数量多少。
$data = []; // Set up the tasks as DAG $dag = new DAG(); $dataManager = new SharedDataManager($data); $count = 10000; for ($i = 0; $i < $count; $i++) { $dag->addTask(new Task($i, function () use($i, $dataManager) { $time = 2; sleep($time); $response = $time . " seconds"; $dataManager->modifyData(function($data) use($i, $response) { $data[$i] = $response; return $data; }); return $response; })); } // Initialize the task executor $executor = new TaskExecutor($dag); // Execute tasks $executor->execute(); // Total execution time will be approx 2 seconds down from 20,000 seconds had the tasks run in a normal loop $executionTime = $executor->getExecutionTime(); // All results from each of the tasks $allResults = $executor->getResults(); // The result from the last task to execute $lastResult = $executor->getFinalResult(); // Shared data final state $sharedData = $dataManager->getData();