drealecs/thread-worker

PHP多线程工作进程消息库

v1.1.0-beta 2014-01-14 23:02 UTC

This package is auto-updated.

Last update: 2024-09-09 02:11:11 UTC


README

PHP多线程工作进程消息/事件库 构建状态

简介

Thread-worker是一个库,允许在同一台计算机或不同计算机上通过多个PHP进程并行执行任务。

该库借鉴了其他语言中的许多概念。

概念

  • 任务表示一个应该异步执行的功能。

  • 队列是一系列任务。任务可以由一个进程放入队列,由另一个进程取出执行。

  • 执行器是队列的一个包装器。任务将从一个PHP脚本传递给它,在另一个PHP脚本中使用RemoteExecutor来处理这些任务。

  • TaskResultTaskException是任务的结果。

TaskTaskResultTaskException是作为“消息”序列化的实体,并写入/从队列中读取。

API

任务

将在远程执行的代码无法与调用代码共享变量或上下文。当定义一个可以异步执行的功能时,必须通过扩展\ThreadWorker\Task并实现方法run()来创建它

class AddTask extends ThreadWorker\Task
{
    public function run($a, $b)
    {
        returns $a + $b;
    }
}

然后,任务可以这样使用

$task = new AddTask(3, 5);
$result = $task();

这将使$result等于8。

就像一个函数一样,任务可以返回一个值或没有。

当然,上面的例子是本地、同步地执行任务。要异步执行它,我们需要一个队列和一个执行器。

队列

为了同步工作任务,使用队列概念。

有一个队列,有人把任务放进去,然后有工作进程去取并运行它。

任务队列的接口

  • public function queue($task, $captureResult); - 调用以将任务排队执行。有不需要返回结果的任务和需要返回结果的任务。通常不需要返回结果的任务更可取,因为调用代码可以执行其他操作并退出范围或甚至完成执行。为了实现这一点,$captureResult必须为false,在这种情况下,方法不返回任何内容。如果我们需要远程执行多个任务并将它们的结果连接起来,我们可能需要将第二个参数作为true传递,并且queue()方法将返回一个任务标识符,稍后可以使用它来查询和检索任务结果。

  • public function start() - 被可以执行任务的脚本调用。这是一个阻塞方法,它将阻塞,直到队列中有任务。它返回一个RemoteTask,它是任务和其TaskResult的容器。

  • public function end($remoteTask) - 被执行任务的脚本调用。它标记任务已完成,并且任务是一个返回响应的任务,它存储TaskResult。

  • public function getResult($taskId) - 通常由排队执行任务的脚本调用。它只能调用一次,并且会阻塞,直到任务执行完成。

  • public function isQueued|isRunning|isFinished($taskId) - 可以查询任务状态的方法。

  • 公共函数 getQueueSize() 和 getRunningSize() - 可以查询队列当前工作流容量的方法。

目前只有一个队列实现:\ThreadWorker\RedisQueue,并计划实现:AMQPQueue、MySQLQueue。

执行器

执行器封装了一个队列,提供了一个更简单的接口来处理队列任务、执行任务和获取任务结果。

有一个名为 QueueExecutor 的执行器,它有两个方法

  • void execute(Task $task) - 将任务添加到队列
  • QueuedTask submit(Task $task) - 将任务添加到队列,并返回一个 QueuedTask 实例,可以用来查询任务状态和检索任务结果。

让我们看一个例子

$queue = new ThreadWorker\RedisQueue('example');
$executor = new ThreadWorker\QueueExecutor($queue);

$task = new AddTask(3, 5);
$queuedTask = $executor->submit($task);
$result = $queuedTask->getResult()->getValue();

QueueExecutor 被扩展为 RemoteExecutor,用于异步执行任务。工作者的代码如下

$queue = new ThreadWorker\RedisQueue('example');
$worker = new ThreadWorker\RemoteExecutor($queue);

$worker->work();

将 RemoteExecutor 实例作为额外的参数传递给任务的 run() 方法,并可用于排队更多任务。