phluxor / phluxor
PHP中灵活的actor模型工具包,赋予PHP生态系统力量
Requires
- php: ^8.3
- ext-pdo: *
- ext-swoole: ^5.1
- alb/phpketama: ^0.4.0
- google/protobuf: ^3.25.4
- guzzlehttp/promises: ^2.0
- monolog/monolog: ^3.7.0
- open-telemetry/context-swoole: ^1.0.1
- open-telemetry/opentelemetry: ^1.0.0
- pascaldevink/shortuuid: ^4.0.0
- php-http/httplug: ^2.4
- psr/log: ^3.0
- ramsey/uuid: ^4.7.5
- symfony/cache: ^7.0
- symfony/uid: ^7.0
Requires (Dev)
- phpstan/phpstan: ^1.11.10
- phpunit/phpunit: ^10.5.11
- slevomat/coding-standard: ^8.15.0
- squizlabs/php_codesniffer: ^3.10.2
- swoole/ide-helper: 5.1.3
Suggests
- ext-grpc: to use vitual actor model with grpc
- ext-protobuf: serialization with protobuf
README
PHP中灵活的actor模型工具包,赋予PHP生态系统力量。
需要PHP 8.3和swoole。
并使用Protocol Buffers进行消息序列化。/ 目前不支持其他序列化格式。
文档正在准备中。
请勿在生产环境中使用。
示例网络应用程序 / phluxor-http-application-samples
演示Phluxor的使用
演示如何使用Phluxor与Event Sourcing和CQRS。
文档
Phluxor是用于在PHP中实现actor模型的工具包。
需要PHP 8.3和swoole。
使用Protocol Buffers进行消息序列化。
其他序列化格式目前不支持。
安装
$ composer require phluxor/phluxor
已实现
- actor模型
- actor生命周期
- 监督
- actor注册
- actor消息传递
- 成为/不再是
- 邮箱/分配器
- 事件流
- 未来
- 持久actor(内存/MySQL)
- 支持OpenTelemetry(指标)
- 路由器/轮询,广播,散播-收集等。
- 支持远程 / 远程是不同节点上的actor之间相互通信的机制。
正在进行中
- 支持OpenTelemetry(跟踪)
- 虚拟actor/集群支持
监督
异常处理由actor系统执行,actor可以由父actor或根actor监督。
OneForOneStrategy
AllForOneStrategy
ExponentialBackoffStrategy
RestartStrategy
易于使用
就像akka http一样,您可以使用简单的API来使用它。
使用mezzio / mezzio-swoole / phluxor。
并且您可以使用swoole / open swoole。
<?php declare(strict_types=1); namespace App\ActorSystem; use App\Message\Add; use App\Message\Cancel; use App\Message\Event; use App\Message\GetEvent; use Phluxor\ActorSystem\Context\ContextInterface; use Phluxor\ActorSystem\Message\ActorInterface; class TicketSeller implements ActorInterface { private int $tickets = 0; private string $name = ''; private string $id = ''; public function receive(ContextInterface $context): void { $msg = $context->message(); switch (true) { case $msg instanceof Add: // change actor state $this->name = $msg->name; $this->tickets = $msg->tickets; $this->id = $context->self()?->protobufPid()->getId(); break; case $msg instanceof GetEvent: $context->requestWithCustomSender( $context->sender(), new Event($this->name, $this->tickets), $context->parent() ); break; case $msg instanceof Cancel: $context->requestWithCustomSender( $context->sender(), new Cancel(), $context->parent() ); $context->poison($context->self()); break; } } }
向actor发送消息。
<?php declare(strict_types=1); namespace App\ActorSystem; use App\Event\EventCreated; use App\Message\EventDescription; use App\Message\EventExists; use Phluxor\ActorSystem\Context\ContextInterface; use Phluxor\ActorSystem\Exception\SpawnErrorException; use Phluxor\ActorSystem\Message\ActorInterface; use Phluxor\ActorSystem\Props; use Swoole\Coroutine\WaitGroup; use function array_merge; use function sprintf; class BoxOffice implements ActorInterface { public function receive(ContextInterface $context): void { $msg = $context->message(); switch (true) { case $msg instanceof EventDescription: try { $result = $context->spawnNamed( Props::fromProducer(fn() => new TicketSeller()), $msg->name ); $context->send($result->getRef(), new Add($msg->name, $msg->tickets)); $context->respond(new EventCreated($msg->name, $msg->tickets)); } catch (SpawnErrorException $e) { $context->respond(new EventExists()); } break; } } } }
成为/不再是
actor可以通过become
和unbecome
方法改变其行为。
示例
one, other是行为方法。
<?php declare(strict_types=1); namespace Acme; use Phluxor\ActorSystem\Behavior; use Phluxor\ActorSystem\Context\ContextInterface; use Phluxor\ActorSystem\Message\ActorInterface; use Phluxor\ActorSystem\Message\ReceiveFunction; class EchoSetBehaviorActor implements ActorInterface { private Behavior $behavior; public function __construct() { $this->behavior = new Behavior(); $this->behavior->become( new ReceiveFunction( fn(ContextInterface $context) => $this->one($context) ) ); } public function receive(ContextInterface $context): void { $this->behavior->receive($context); } public function one(ContextInterface $context): void { if ($context->message() instanceof BehaviorMessage) { $this->behavior->become( new ReceiveFunction( fn(ContextInterface $context) => $this->other($context) ) ); } } public function other(ContextInterface $context): void { if ($context->message() instanceof EchoRequest) { $context->respond(new EchoResponse()); } } }
<?php declare(strict_types=1); use Acme\BehaviorMessage; use Acme\EchoSetBehaviorActor; use Acme\EchoRequest; use Phluxor\ActorSystem; use Phluxor\ActorSystem\ActorContext; use Phluxor\ActorSystem\Props; use function Swoole\Coroutine\go; use function Swoole\Coroutine\run; function main(): void { run(function () { $system = ActorSystem::create(); go(function (ActorSystem $system) { $pid = $system->root()->spawn( Props::fromProducer( fn() => new EchoSetBehaviorActor() ) ); $system->root()->send($pid, new BehaviorMessage()); $future = $system->root()->requestFuture($pid, new EchoRequest(), 1); var_dump($future->result()); $system->root()->stop($pid); }, $system); }); }
持久actor
持久actor是可以从以前的状态恢复的actor。
您可以根据您的环境自由实现持久化数据库的适配器。
默认情况下,已经实现了内存和MySQL。
由于使用Protocol Buffers进行消息序列化,因此需要事先准备Protocol Buffers文件。以下是一个示例Protocol Buffers文件。
持久actor是可以从先前状态恢复的actor。
您可以根据环境实现持久化数据库的适配器。
默认情况下,已经实现了内存和MySQL。
由于使用Protocol Buffers进行消息序列化,因此需要事先准备Protocol Buffers文件。
例如,Protocol Buffers文件如下。
syntax = "proto3"; package Acme.Persistence.ProtoBuf; option php_namespace = "Acme\\Persistence\\ProtoBuf"; option php_metadata_namespace = "Acme\\Metadata"; message Message { string message = 1; } message Snapshot { string message = 1; }
有关持久化的详细使用方法,请参阅持久actor的示例。持久化/ MySQL
例如,持久actor如下。持久化/ MySQL
使用Phluxor\Persistence\Mixin
特性和实现Phluxor\Persistence\PersistentInterface
。
<?php declare(strict_types=1); namespace Acme\Persistence; use Phluxor\ActorSystem\Context\ContextInterface; use Phluxor\ActorSystem\Message\ActorInterface; use Phluxor\Persistence\Message\RequestSnapshot; use Phluxor\Persistence\Mixin; use Phluxor\Persistence\PersistentInterface; use Acme\Persistence\ProtoBuf\Message; use Acme\Persistence\ProtoBuf\Snapshot; class InMemoryTestActor implements ActorInterface, PersistentInterface { use Mixin; private string $state = ''; public function receive(ContextInterface $context): void { $msg = $context->message(); switch (true) { case $msg instanceof RequestSnapshot: $this->persistenceSnapshot(new TestSnapshot(['message' => $this->state])); break; case $msg instanceof TestSnapshot: $this->state = $msg->getMessage(); break; case $msg instanceof TestMessage: if (!$this->recovering()) { $this->persistenceReceive($msg); } $this->state = $msg->getMessage(); break; case $msg instanceof Query: $context->respond($this->state); break; } } }
更多示例在持久化测试目录中。