virge / graph
由 RabbitMQ 通过 Virge::Graphite 驱动的 PHP 工作流服务
Requires (Dev)
- virge/api: ~2.0
- virge/cli: ~4.0
- virge/core: ~2.0
- virge/db: ~3.0
- virge/enigma: ~2.0
- virge/graphite: ~3.0
- virge/orm: ~3.0
This package is not auto-updated.
Last update: 2024-09-14 18:53:02 UTC
README
Virge::Graph 是一个简单的流程框架,提供可扩展的后端队列作业处理。
Virge::Graph 需要 RabbitMQ (https://rabbitmq.cn/) 和 MySQL 数据库 (https://mysqlserver.cn/)。
Virge::Graph 使用 Virge::Graphite 队列库来处理所有工作进程和扩展。https://github.com/siosphere/virge-graphite
入门
您需要设置 RabbitMQ 服务器和 MySQL 服务器。这两个服务器都可以通过 https://dockerd.com.cn/ 轻松设置
您需要在项目中包含 virge/graph 包
composer require virge/graph:dev-master
您还需要创建 Virge::Reactor 并设置一些配置文件。
如果从头开始一个项目,您可以使用 virge/project
composer create-project virge/project
这将为您设置 Reactor。
要添加到现有项目中,建议创建一个子文件夹来存放 virge 配置文件和 reactor。
myproject/
virge/
config/
database.php
queue.php
reactor.php
vadmin.php
Reactor
Reactor 将自动加载所有配置文件,并将所需的服 务注册到 Virge。
<?php
//reactor.php
$BASE_DIR = dirname(__FILE__) . '/';
$loader = include $BASE_DIR . '../vendor/autoload.php';
$loader->add('Virge', $BASE_DIR . '../');
class Reactor extends Virge\Core\BaseReactor
{
public function registerCapsules($capsules = [])
{
parent::registerCapsules([
new \Virge\Graph\GraphCapsule(),
new \Virge\Graphite\GraphiteCapsule(),
new \Virge\Cli\Capsule(),
new \Virge\ORM\Capsule(),
new \Virge\Database\Capsule(),
]);
}
}
//require our workflows
require_once 'workflows.php'
您还需要在 config/ 中设置/创建两个配置文件,您可以从示例目录复制这些文件
database.php
<?php
return [
'service' => 'virge/database',
'connections' => array(
'default' => array(
'hostname' => 'localhost',
'username' => '',
'password' => '',
'database' => 'virge_graph',
),
),
];
queue.php
<?php
return [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'pass' => 'guest',
];
您还需要一个入口点来运行 reactor 和处理您的队列,包含在示例目录中的轻量级管理工具“vadmin”可用于复制,或创建一个新文件
vadmin.php
<?php
use Virge\Core\Config;
error_reporting(E_ALL &~ E_NOTICE &~ E_STRICT);
/**
*
* @author Michael Kramer
*/
chdir(dirname(__FILE__));
require_once 'reactor.php';
// Create new Notifier instance.
$config = Config::get('app');
$args = array();
if(isset($argv[2])){
for($i = 2; $i <= $argc; $i++) {
$args[] = isset($argv[$i]) ? $argv[$i] : NULL;
}
}
$command = isset($argv[1]) ? $argv[1] : null;
$reactor = new Reactor();
$reactor->run('prod', 'cli', 'execute', array($command, $args));
我们还需要创建和设置我们的工作流程文件,这个文件只需要包含我们工作流程的定义,工作流程类可以通过 composer 自动加载,请参阅 https://getcomposer.org.cn/doc/
workflows.php
<?php
use MyApp\MyWorkflow;
use Virge\Graph;
Graph::workflow('simple', MyWorkflow::class);
Reactor 和工作流程必须在您希望推送工作流程的地方包含在您的应用程序中。如果使用 virge/project,则它们将自动从资源目录中包含。
简单模式
Virge::Graph 具有简单的工作流程模式,允许一个队列处理您的工作流程。这不允许您单独扩展单个任务,但使设置更加简单。
定义工作流程
工作流程是一个定义了一系列任务、它们之间的依赖关系以及它们的生命周期回调的类。
class SimpleWorkflow extends Virge\Graph\Component\Workflow
{
public function defineWorkflow()
{
Graph::simple();
Graph::task('hello', function(Job $job, TaskResult $result) {
$result->setResult('hello');
});
Graph::task('world', function(Job $job, TaskResult $result) {
$result->setResult('world');
})
->dependsOn(['hello'])
->onComplete(function(Job $job) {
$hello = $job->getTask('hello');
$world = $job->getTask('world');
//write the result
file_put_contents('./results.log', $hello->getResult() . ' ' . $world->getResult() . "\n", FILE_APPEND);
})
;
}
}
任务可以是闭包,也可以是具有“run”方法的类。
class HelloTask
{
const TASK_ID = 'hello';
public function run(Job $job, TaskResult $result)
{
$result->setResult('hello');
}
}
在定义时,只需传递 className 即可
Graph::task(HelloTask::TASK_ID, HelloTask::class);
将作业推送到工作流程
作业是 Virge\Graph\Component\Workflow\Job 对象。您可以使用“setData”函数设置初始数据。您可以在 setData 中设置复杂的数据,但请注意,它会被序列化并推送到多个队列,因此不建议放置大量数据,最好传递简单数据,并在每个工作进程上加载数据。
use Virge\Graph;
$job = new Job('simple');
$job->setData('test123');
Graph::push($job);
处理队列
您必须设置至少一个工作进程来处理默认的 virge:graph 队列,该队列处理任务的调度、生命周期事件、更新任务进度以及与数据库同步结果。
php -f vadmin.php virge:graphite:worker virge:graph
在高级工作流程中,每个任务都将排队到它自己的队列中,以实现水平扩展。
# replace workflowId and taskId below
php -f vadmin.php virge:graphite:worker virge:graph:workflowId:taskId