phluxor/phluxor

PHP中灵活的actor模型工具包,赋予PHP生态系统力量

维护者

详细信息

github.com/ytake/phluxor

源代码

问题

资助包维护!
ytake

0.1.10 2024-09-07 14:05 UTC

README

PHP中灵活的actor模型工具包,赋予PHP生态系统力量。

需要PHP 8.3和swoole。

并使用Protocol Buffers进行消息序列化。/ 目前不支持其他序列化格式。

文档正在准备中。

请勿在生产环境中使用。

示例网络应用程序 / phluxor-http-application-samples
演示Phluxor的使用
演示如何使用Phluxor与Event Sourcing和CQRS。
文档

Phluxor是用于在PHP中实现actor模型的工具包。
需要PHP 8.3和swoole。
使用Protocol Buffers进行消息序列化。
其他序列化格式目前不支持。

安装

$ composer require phluxor/phluxor

已实现

  • actor模型
  • actor生命周期
  • 监督
  • actor注册
  • actor消息传递
  • 成为/不再是
  • 邮箱/分配器
  • 事件流
  • 未来
  • 持久actor(内存/MySQL)
  • 支持OpenTelemetry(指标)
  • 路由器/轮询,广播,散播-收集等。
  • 支持远程 / 远程是不同节点上的actor之间相互通信的机制。

正在进行中

  • 支持OpenTelemetry(跟踪)
  • 虚拟actor/集群支持

监督

异常处理由actor系统执行,actor可以由父actor或根actor监督。

  • OneForOneStrategy
  • AllForOneStrategy
  • ExponentialBackoffStrategy
  • RestartStrategy

易于使用

就像akka http一样,您可以使用简单的API来使用它。
使用mezzio / mezzio-swoole / phluxor。
并且您可以使用swoole / open swoole。

<?php

declare(strict_types=1);

namespace App\ActorSystem;

use App\Message\Add;
use App\Message\Cancel;
use App\Message\Event;
use App\Message\GetEvent;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;

class TicketSeller implements ActorInterface
{
    private int $tickets = 0;
    private string $name = '';
    private string $id   = '';

    public function receive(ContextInterface $context): void
    {
        $msg = $context->message();
        switch (true) {
            case $msg instanceof Add:
                // change actor state
                $this->name    = $msg->name;
                $this->tickets = $msg->tickets;
                $this->id      = $context->self()?->protobufPid()->getId();
                break;
            case $msg instanceof GetEvent:
                $context->requestWithCustomSender(
                    $context->sender(),
                    new Event($this->name, $this->tickets),
                    $context->parent()
                );
                break;
            case $msg instanceof Cancel:
                $context->requestWithCustomSender(
                    $context->sender(),
                    new Cancel(),
                    $context->parent()
                );
                $context->poison($context->self());
                break;
        }
    }
}

向actor发送消息。

<?php

declare(strict_types=1);

namespace App\ActorSystem;

use App\Event\EventCreated;
use App\Message\EventDescription;
use App\Message\EventExists;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Exception\SpawnErrorException;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\ActorSystem\Props;
use Swoole\Coroutine\WaitGroup;

use function array_merge;
use function sprintf;

class BoxOffice implements ActorInterface
{
    public function receive(ContextInterface $context): void
    {
        $msg = $context->message();
        switch (true) {
            case $msg instanceof EventDescription:
                try {
                    $result = $context->spawnNamed(
                        Props::fromProducer(fn() => new TicketSeller()),
                        $msg->name
                    );
                    $context->send($result->getRef(), new Add($msg->name, $msg->tickets));
                    $context->respond(new EventCreated($msg->name, $msg->tickets));
                } catch (SpawnErrorException $e) {
                    $context->respond(new EventExists());
                }
                break;
            }
        }
    }
}

成为/不再是

actor可以通过becomeunbecome方法改变其行为。

示例

one, other是行为方法。

<?php

declare(strict_types=1);

namespace Acme;

use Phluxor\ActorSystem\Behavior;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\ActorSystem\Message\ReceiveFunction;

class EchoSetBehaviorActor implements ActorInterface
{
    private Behavior $behavior;

    public function __construct()
    {
        $this->behavior = new Behavior();
        $this->behavior->become(
            new ReceiveFunction(
                fn(ContextInterface $context) => $this->one($context)
            )
        );
    }

    public function receive(ContextInterface $context): void
    {
        $this->behavior->receive($context);
    }

    public function one(ContextInterface $context): void
    {
        if ($context->message() instanceof BehaviorMessage) {
            $this->behavior->become(
                new ReceiveFunction(
                    fn(ContextInterface $context) => $this->other($context)
                )
            );
        }
    }

    public function other(ContextInterface $context): void
    {
        if ($context->message() instanceof EchoRequest) {
            $context->respond(new EchoResponse());
        }
    }
}
<?php

declare(strict_types=1);

use Acme\BehaviorMessage;
use Acme\EchoSetBehaviorActor;
use Acme\EchoRequest;
use Phluxor\ActorSystem;
use Phluxor\ActorSystem\ActorContext;
use Phluxor\ActorSystem\Props;

use function Swoole\Coroutine\go;
use function Swoole\Coroutine\run;

function main(): void 
{
    run(function () {
        $system = ActorSystem::create();
        go(function (ActorSystem $system) {
            $pid = $system->root()->spawn(
                Props::fromProducer(
                    fn() => new EchoSetBehaviorActor()
                )
            );
            $system->root()->send($pid, new BehaviorMessage());
            $future = $system->root()->requestFuture($pid, new EchoRequest(), 1);
            var_dump($future->result());
            $system->root()->stop($pid);
        }, $system);
    });
}

持久actor

持久actor是可以从以前的状态恢复的actor。
您可以根据您的环境自由实现持久化数据库的适配器。
默认情况下,已经实现了内存和MySQL。
由于使用Protocol Buffers进行消息序列化,因此需要事先准备Protocol Buffers文件。以下是一个示例Protocol Buffers文件。

持久actor是可以从先前状态恢复的actor。
您可以根据环境实现持久化数据库的适配器。
默认情况下,已经实现了内存和MySQL。
由于使用Protocol Buffers进行消息序列化,因此需要事先准备Protocol Buffers文件。

例如,Protocol Buffers文件如下。

syntax = "proto3";

package Acme.Persistence.ProtoBuf;

option php_namespace = "Acme\\Persistence\\ProtoBuf";
option php_metadata_namespace = "Acme\\Metadata";

message Message {
  string message = 1;
}

message Snapshot {
  string message = 1;
}

有关持久化的详细使用方法,请参阅持久actor的示例。持久化/ MySQL

例如,持久actor如下。持久化/ MySQL

使用Phluxor\Persistence\Mixin特性和实现Phluxor\Persistence\PersistentInterface

<?php

declare(strict_types=1);

namespace Acme\Persistence;

use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Message\ActorInterface;
use Phluxor\Persistence\Message\RequestSnapshot;
use Phluxor\Persistence\Mixin;
use Phluxor\Persistence\PersistentInterface;
use Acme\Persistence\ProtoBuf\Message;
use Acme\Persistence\ProtoBuf\Snapshot;

class InMemoryTestActor implements ActorInterface, PersistentInterface
{
    use Mixin;

    private string $state = '';

    public function receive(ContextInterface $context): void
    {
        $msg = $context->message();
        switch (true) {
            case $msg instanceof RequestSnapshot:
                $this->persistenceSnapshot(new TestSnapshot(['message' => $this->state]));
                break;
            case $msg instanceof TestSnapshot:
                $this->state = $msg->getMessage();
                break;
            case $msg instanceof TestMessage:
                if (!$this->recovering()) {
                    $this->persistenceReceive($msg);
                }
                $this->state = $msg->getMessage();
                break;
            case $msg instanceof Query:
                $context->respond($this->state);
                break;
        }
    }
}

更多示例在持久化测试目录中。