etouches / phplumber
管道库,允许异步和同步过程的混合。
v1.0.0-beta
2017-03-28 21:03 UTC
Requires
- php: >=5.3
Requires (Dev)
- phpunit/phpunit: 4.8.*
This package is not auto-updated.
Last update: 2024-09-29 02:49:48 UTC
README
Phplumber 是一个用于 PHP 的非常简单的管道库,它允许异步和同步过程的混合。使用它将大过程分解成多个步骤,使用队列和多进程。Phplumber 不包含硬编码的依赖项,可以由任何队列设置和任何存储机制支持。
需求
- PHP 5.3+
- 队列,例如 RabbitMQ 或 Redis
- 存储,例如关系型数据库表或 Redis
示例
让我们以创建和填充数据库为例。它需要多个步骤,其中一些可以并发完成。
- 创建数据库(一个进程)
- 创建和填充表(每个表一个进程)
- 创建依赖于多个表的视图(一个进程,依赖于所有表都存在)
Create table 1
/ \
Create database -> Create table 2 -> Create views
\ /
Create table 3
首先,我们定义我们的进程。顺序步骤扩展 Process
。可以多次运行并使用不同数据的步骤扩展 MultiProcess
。
class CreateDatabase extends Process { public function invoke($payload) { $database_name = $payload['database_name']; echo "Drop database $database_name if it exists...\n"; echo "Creating database $database_name...\n"; } }
class CreateTable extends MultiProcess { // Determine the data we need to queue for async processes public function getAsyncPayloads($payload) { $database_name = $payload['database_name']; $table_names = array('first_table', 'second_table', 'third_table'); $payloads = array(); foreach ($table_names as $table) { $payloads[] = array('database_name' => $database_name, 'table_name' => $table); } return $payloads; } public function invoke($payload) { $database_name = $payload['database_name']; $table_name = $payload['table']; echo "Connecting to database $database_name...\n"; echo "Creating and populating $table_name...\n"; switch ($payload['table']) { case 'first_table': // Create table and insert rows... break; // ... } } }
然后,我们定义进程的顺序。
class CreateAndFillDatabase extends ProcessList { protected function setup() { $this ->add('CreateDatabase') ->add('CreateTable') ->add('CreateViews'); } }
现在我们可以启动进程序列。
$equation = new CreateAndFillDatabase(); $equation->process(array('database_name' => 'test_db'));
请参阅 examples
目录,以获取完整的工作示例。
入门
- 实现
StorageInterface
。这将保存信号量数据。合适的存储引擎包括任何关系型数据库、nosql 或如 Redis 一样的键值存储。 - 扩展
Queue
类以集成队列系统。合适的队列引擎包括 Redis 和 RabbitMQ。 - 将每个进程编写为扩展
Process
的类(对于同步)或扩展MultiProcess
的类(对于异步)。 - 实现
ProcessFactoryInterface
。这将创建每个ProcessInterface
实例,允许你使用任何先决条件设置你的进程,例如数据库连接或配置选项。 - 通过扩展
ProcessList
将进程组合在一起。 - 实现一个工作守护进程,它实例化你的
Queue
实现,并调用consume()
来监听传入的消息。每个消息将调用多进程的单个部分。运行多个工作进程以并发执行进程。 - 在您的系统中选择一个位置启动整个工作流程,实例化您的
ProcessList
,并调用process()
,传递初始有效载荷。