uchamb / faktory-worker
Faktory 客户端和工作者模式的 PHP 实现
dev-master
2022-03-15 18:11 UTC
Requires
- php: ^7.1
- ext-sockets: *
This package is not auto-updated.
Last update: 2024-09-26 04:07:33 UTC
README
此项目是一个正在进行中的项目,欢迎提出建议和 pull requests。
这是 Faktory 工作者服务器的客户端实现。
安装
此项目尚未推送到 Packagist,因此您需要将额外的存储库添加到 Composer 设置中。
{ "repositories": [ { "type": "vcs", "url": "https://github.com/camuthig/faktory_worker_php" } ] }
然后,您可以要求使用此库
composer require camuthig/faktory-worker
用法
目前,此库不对工作者本身的实现方式做出任何假设。这意味着提供的只是向 Faktory 服务器发送消息并再次接收消息的基本构建块。
生产消息很简单。给定一个 ProducerInterface
的实例,您可以调用 push
函数。
<?php $connection = new \Camuthig\Faktory\Connection('tcp://127.0.0.1', 7419); $producer = new \Camuthig\Faktory\Producer($connection); $producer->push(new \Camuthig\Faktory\WorkUnit(uniqid(), 'example', []));
消费消息稍微复杂一些。此库提供了连接到服务器和构建所需功能所需的最小化功能。 随着项目的成熟,计划更好地支持消费 WorkUnits。
<?php // Configure the connection with worker properties. At least the `wid` should be provided. $connection = new \Camuthig\Faktory\Connection('127.0.0.1', 7419, [ 'wid' => uniqid(), 'labels' => ['php'], ]); $consumer = new \Camuthig\Faktory\Consumer($connection); $status = null; $interrupt = null; pcntl_signal(SIGINT, function ($signo, $signinfo) use (&$interrupt) { $interrupt = true; }); while (true) { if ($interrupt) { // Interruption should gracefully shutdown the process echo "Stopping consumer...\n"; $consumer->end(); exit(0); } if ($status === \Camuthig\Faktory\ConsumerInterface::TERMINATE) { // The server can send a signal to stop processing at any time. echo "Server requested consumer termination.\n"; exit(0); } elseif ($status === \Camuthig\Faktory\ConsumerInterface::QUIET) { // If the server requests to worker to be "quiet" we must stop processing. echo "Server requested consumer to go quiet.\n"; } else { $workUnit = $consumer->fetch(); if (!$workUnit) { sleep(5); continue; } echo "Received work unit " . $workUnit->getJobId() . "\n"; $consumer->ack($workUnit); } sleep(1); $status = $consumer->beat(); }
演示
可以在本地环境中快速运行工具的基本演示,其中已安装 PHP 和必要的扩展。
首先,启动 Faktory 服务器
docker-compose up -d
准备本地环境
composer dump
然后,在一个终端中启动生产者
php example/producer.php
这将开始输出类似
Pushing job with ID 5b1b202b6b5c4
Pushing job with ID 5b1b202c6be70
Pushing job with ID 5b1b202d6d780
Pushing job with ID 5b1b202e6f020
Pushing job with ID 5b1b202f6ff2f
Pushing job with ID 5b1b20307132a
Pushing job with ID 5b1b203172be3
Pushing job with ID 5b1b2032740bf
Pushing job with ID 5b1b203374ad7
最后,在第二个终端中启动消费者
php example/consumer.php
您应该开始看到处理中的作业
Received work unit 5b1b1f3a6fa7c
Received work unit 5b1b1f3b7120b
Received work unit 5b1b1f3c72601
Received work unit 5b1b1f3d7356d
Received work unit 5b1b1f3e74ca6
Received work unit 5b1b1f3f7668b
Received work unit 5b1b1f407758d
Received work unit 5b1b1f4178bd6
Received work unit 5b1b1f427a57e
Received work unit 5b1b1f437bebe