drealecs / thread-worker
PHP多线程工作进程消息库
Requires
- php: >=5.3.0
- ext-redis: *
- rhumsaa/uuid: 2.5.*
Requires (Dev)
- phpunit/phpunit: 3.7.*
This package is auto-updated.
Last update: 2024-09-09 02:11:11 UTC
README
简介
Thread-worker是一个库,允许在同一台计算机或不同计算机上通过多个PHP进程并行执行任务。
该库借鉴了其他语言中的许多概念。
概念
-
任务表示一个应该异步执行的功能。
-
队列是一系列任务。任务可以由一个进程放入队列,由另一个进程取出执行。
-
执行器是队列的一个包装器。任务将从一个PHP脚本传递给它,在另一个PHP脚本中使用RemoteExecutor来处理这些任务。
-
TaskResult或TaskException是任务的结果。
Task、TaskResult和TaskException是作为“消息”序列化的实体,并写入/从队列中读取。
API
任务
将在远程执行的代码无法与调用代码共享变量或上下文。当定义一个可以异步执行的功能时,必须通过扩展\ThreadWorker\Task并实现方法run()
来创建它
class AddTask extends ThreadWorker\Task { public function run($a, $b) { returns $a + $b; } }
然后,任务可以这样使用
$task = new AddTask(3, 5); $result = $task();
这将使$result
等于8。
就像一个函数一样,任务可以返回一个值或没有。
当然,上面的例子是本地、同步地执行任务。要异步执行它,我们需要一个队列和一个执行器。
队列
为了同步工作任务,使用队列概念。
有一个队列,有人把任务放进去,然后有工作进程去取并运行它。
任务队列的接口
-
public function queue($task, $captureResult); - 调用以将任务排队执行。有不需要返回结果的任务和需要返回结果的任务。通常不需要返回结果的任务更可取,因为调用代码可以执行其他操作并退出范围或甚至完成执行。为了实现这一点,
$captureResult
必须为false
,在这种情况下,方法不返回任何内容。如果我们需要远程执行多个任务并将它们的结果连接起来,我们可能需要将第二个参数作为true
传递,并且queue()
方法将返回一个任务标识符,稍后可以使用它来查询和检索任务结果。 -
public function start() - 被可以执行任务的脚本调用。这是一个阻塞方法,它将阻塞,直到队列中有任务。它返回一个RemoteTask,它是任务和其TaskResult的容器。
-
public function end($remoteTask) - 被执行任务的脚本调用。它标记任务已完成,并且任务是一个返回响应的任务,它存储TaskResult。
-
public function getResult($taskId) - 通常由排队执行任务的脚本调用。它只能调用一次,并且会阻塞,直到任务执行完成。
-
public function isQueued|isRunning|isFinished($taskId) - 可以查询任务状态的方法。
-
公共函数 getQueueSize() 和 getRunningSize() - 可以查询队列当前工作流容量的方法。
目前只有一个队列实现:\ThreadWorker\RedisQueue,并计划实现:AMQPQueue、MySQLQueue。
执行器
执行器封装了一个队列,提供了一个更简单的接口来处理队列任务、执行任务和获取任务结果。
有一个名为 QueueExecutor 的执行器,它有两个方法
- void execute(Task $task) - 将任务添加到队列
- QueuedTask submit(Task $task) - 将任务添加到队列,并返回一个 QueuedTask 实例,可以用来查询任务状态和检索任务结果。
让我们看一个例子
$queue = new ThreadWorker\RedisQueue('example'); $executor = new ThreadWorker\QueueExecutor($queue); $task = new AddTask(3, 5); $queuedTask = $executor->submit($task); $result = $queuedTask->getResult()->getValue();
QueueExecutor 被扩展为 RemoteExecutor,用于异步执行任务。工作者的代码如下
$queue = new ThreadWorker\RedisQueue('example'); $worker = new ThreadWorker\RemoteExecutor($queue); $worker->work();
将 RemoteExecutor 实例作为额外的参数传递给任务的 run()
方法,并可用于排队更多任务。