spiral / roadrunner-jobs
RoadRunner 队列(作业)插件 API 库
v4.6.0
2024-07-29 08:02 UTC
Requires
- php: >=8.1
- ext-json: *
- ramsey/uuid: ^3 || ^4
- roadrunner-php/roadrunner-api-dto: ^1.0
- spiral/goridge: ^4.0
- spiral/roadrunner: ^2023.1 || ^2024.1
- spiral/roadrunner-worker: ^3.0
Requires (Dev)
- google/protobuf: ^3.17 || ^4.0
- phpunit/phpunit: ^10.0
- roave/security-advisories: dev-master
- vimeo/psalm: >=5.8
README
RoadRunner Jobs 插件
此存储库包含使用 RoadRunner Jobs 插件的 PHP 桥接代码库。
安装
要安装应用程序服务器和作业代码库
composer require spiral/roadrunner-jobs
您可以使用方便的安装程序下载最新可用的兼容版本的 RoadRunner 组装
composer require spiral/roadrunner-cli --dev vendor/bin/rr get
配置
首先,您需要将至少一个作业适配器添加到您的 RoadRunner 配置中。例如,以下配置是可行的
rpc: listen: tcp://127.0.0.1:6001 server: command: php consumer.php relay: pipes jobs: consume: [ "local" ] pipelines: local: driver: memory config: priority: 10 prefetch: 10000
注意 在文档页面上了解所有可用的驱动程序。
启动服务器后,将使用此配置,将有一个名为 local
的驱动程序可供您使用。
用法
生产者
以下代码将允许从 RoadRunner 服务器写入和读取任意值。
<?php use Spiral\RoadRunner\Jobs\Jobs; use Spiral\Goridge\RPC\RPC; require __DIR__ . '/vendor/autoload.php'; // Jobs service $jobs = new Jobs(RPC::create('tcp://127.0.0.1:6001')); // Select "local" pipeline from jobs $queue = $jobs->connect('local'); // Create task prototype with default headers $task = $queue->create('ping', '{"site": "https://example.com"}') // Create task with "echo" name ->withHeader('attempts', 4) // Number of attempts to execute the task ->withHeader('retry-delay', 10); // Delay between attempts // Push "echo" task to the queue $task = $queue->dispatch($task); var_dump($task->getId() . ' has been queued');
消费者
消费者处理来自 RoadRunner 服务器的任务,并根据处理结果做出响应
ack
- 用于正确认认。nack
- 用于负确认。requeue
- 用于重新排队任务。
nack
方法的行为取决于队列驱动程序的实现。它可以接受一个额外的参数 redelivery;如果传递并设置为 true,则任务将被重新排队。但是,并非所有驱动程序都支持此功能。如果没有传递 redelivery 参数,设置为 false,或者队列驱动程序的实现不支持它,则任务不会被重新排队。
$task->nack(message: $reason, redelivery: true);
requeue
方法由 RoadRunner 实现,不依赖于队列驱动程序。它允许您将任务重新发送到 队列末尾 并向任务添加额外的标题。
$task->withHeader('attempts', (string) ($attempts + 1))->requeue($exception);
nack
和 requeue
方法可以指定重新排队任务的 延迟。为此,请调用 withDelay
方法并在调用 nack
或 requeue
方法之前传递所需的值。
$task->withDelay(10)->requeue($exception);
<?php use Spiral\RoadRunner\Jobs\Consumer; use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface; require __DIR__ . '/vendor/autoload.php'; $consumer = new Spiral\RoadRunner\Jobs\Consumer(); /** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */ while ($task = $consumer->waitTask()) { try { $name = $task->getName(); // "ping" $queue = $task->getQueue(); // "local" $driver = $task->getDriver(); // "memory" $payload = $task->getPayload(); // {"site": "https://example.com"} // Process task $task->ack(); } catch (\Throwable $e) { $task->requeue($e); } }
许可协议
MIT 许可协议 (MIT)。有关更多信息,请参阅 LICENSE
。由 Spiral Scout 维护。