spiral/roadrunner-jobs

RoadRunner 队列(作业)插件 API 库

v4.6.0 2024-07-29 08:02 UTC

README

RoadRunner Jobs 插件

PHP Version Require Latest Stable Version phpunit psalm Codecov Total Downloads StyleCI

此存储库包含使用 RoadRunner Jobs 插件的 PHP 桥接代码库。

安装

要安装应用程序服务器和作业代码库

composer require spiral/roadrunner-jobs

您可以使用方便的安装程序下载最新可用的兼容版本的 RoadRunner 组装

composer require spiral/roadrunner-cli --dev
vendor/bin/rr get

配置

首先,您需要将至少一个作业适配器添加到您的 RoadRunner 配置中。例如,以下配置是可行的

rpc:
  listen: tcp://127.0.0.1:6001

server:
  command: php consumer.php
  relay: pipes

jobs:
  consume: [ "local" ]
  pipelines:
    local:
      driver: memory
      config:
        priority: 10
        prefetch: 10000

注意文档页面上了解所有可用的驱动程序。

启动服务器后,将使用此配置,将有一个名为 local 的驱动程序可供您使用。

用法

生产者

以下代码将允许从 RoadRunner 服务器写入和读取任意值。

<?php

use Spiral\RoadRunner\Jobs\Jobs;
use Spiral\Goridge\RPC\RPC;

require __DIR__ . '/vendor/autoload.php';

// Jobs service
$jobs = new Jobs(RPC::create('tcp://127.0.0.1:6001'));

// Select "local" pipeline from jobs
$queue = $jobs->connect('local');

// Create task prototype with default headers
$task = $queue->create('ping', '{"site": "https://example.com"}') // Create task with "echo" name
    ->withHeader('attempts', 4) // Number of attempts to execute the task
    ->withHeader('retry-delay', 10); // Delay between attempts

// Push "echo" task to the queue
$task = $queue->dispatch($task);

var_dump($task->getId() . ' has been queued');

消费者

消费者处理来自 RoadRunner 服务器的任务,并根据处理结果做出响应

  • ack - 用于正确认认。
  • nack - 用于负确认。
  • requeue - 用于重新排队任务。

nack 方法的行为取决于队列驱动程序的实现。它可以接受一个额外的参数 redelivery;如果传递并设置为 true,则任务将被重新排队。但是,并非所有驱动程序都支持此功能。如果没有传递 redelivery 参数,设置为 false,或者队列驱动程序的实现不支持它,则任务不会被重新排队。

$task->nack(message: $reason, redelivery: true);

requeue 方法由 RoadRunner 实现,不依赖于队列驱动程序。它允许您将任务重新发送到 队列末尾 并向任务添加额外的标题。

$task->withHeader('attempts', (string) ($attempts + 1))->requeue($exception);

nackrequeue 方法可以指定重新排队任务的 延迟。为此,请调用 withDelay 方法并在调用 nackrequeue 方法之前传递所需的值。

$task->withDelay(10)->requeue($exception);
<?php

use Spiral\RoadRunner\Jobs\Consumer;
use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface;

require __DIR__ . '/vendor/autoload.php';

$consumer = new Spiral\RoadRunner\Jobs\Consumer();

/** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */
while ($task = $consumer->waitTask()) {
    try {
        $name = $task->getName(); // "ping"
        $queue = $task->getQueue(); // "local"
        $driver = $task->getDriver(); // "memory"
        $payload = $task->getPayload(); // {"site": "https://example.com"}
    
        // Process task

        $task->ack();
    } catch (\Throwable $e) {
        $task->requeue($e);
    }
}
try Spiral Framework

许可协议

MIT 许可协议 (MIT)。有关更多信息,请参阅 LICENSE。由 Spiral Scout 维护。