awd-studio/service-buses

这种模式(如:命令总线、查询总线和事件总线)在一个包中的实现;由依赖注入容器驱动

v4.0.3 2023-08-10 15:53 UTC

README

一个简单的库,用于在PHP项目中实现类似CQRS的模式。

Build status Coverage Status

功能

  • 消息或处理器无需扩展或实现任何额外的抽象。
  • 处理器可以是任何callable项。
  • 处理器可以订阅任何父类或事件实现的任何事件。
  • 包含一个装饰器,用于将处理器注册为通过PSR-11容器处理的服务的处理器。
  • 包含一个装饰器,通过消息处理器的类型提示自动订阅处理器。
  • 提供现成的总线模式,如Command BusQuery BusEvent Bus实现。

内容

开始使用

要求

  • PHP 8.2+
  • PSR-11 - 兼容容器(可选)

安装

composer require awd-studio/service-buses

处理消息

消息只是一个简单的PHP对象。

它可以包含您需要的任何数据,但通常,最好提供一些不可变的消息,这些消息可以被序列化。

<?php

class MyMessage {}

无论如何,您都可以扩展或实现您需要的任何内容。

<?php

interface MessageInterface {}

abstract class ParentMessage {}

final class MyMessage extends ParentMessage implements MessageInterface {}

处理器定位器是一个处理器存储库。

有了它们,我们可以将处理器分配给特定的消息。库提供了一些处理器定位器,例如,一个用于在内存中存储处理器的定位器

<?php

use AwdStudio\Bus\Handler\InMemoryHandlerLocator;

$handlers = new InMemoryHandlerLocator();

// To assign a handler we can call a method `add`.
// As a "messageId" we send the FCQN of a message that we assign on.
// A handler must be any callable PHP-item. 
$handlers->add(\stdClass::class, static function (\stdClass $message): void {});

// Now, we've got a handler that handles a message of type "stdClass".
// But, we can add more than one handler per message. 
// Actually, it's not limited, but keep in mind the patterns
// such Command-bus or Query-bus that suppose to use the only one handler
// per a message that they handle.
// So, we can add more handlers to same message, for example a callable object:
$handler = new class {
    public function __invoke(\stdClass $message): void {}
};
$handlers->add(\stdClass::class, $handler);

// So now, we have 2 handlers that are going to be released 
// when somebody tries get them:
$handlers->get(\stdClass::class);

// To check if there are some handlers for certain message 
// there is a method `has`:
$handlers->has(\stdClass::class); // true|false

要处理一个消息,需要调用总线。例如,我们有一个扩展SimpleBus的总线。

我们将使用一个

<?php

use AwdStudio\Bus\Handler\InMemoryHandlerLocator;

// We need to use a handler locator, from which a bus will get handlers
$bus = new class(new InMemoryHandlerLocator()) extends \AwdStudio\Bus\SimpleBus {
    // We need to provide a method that will handle our message
    public function handle(object $message): void 
    {
        // Our parent allows us to iterate all handlers 
        // that assigned to certain message
        foreach ($this->handleAll($message) as $result) {
            echo $result;
        }
    }
};

// To use a bus, we call a provided method:
$bus->handle(new \stdClass());

预定义总线

有一些预定义的总线

  • \AwdStudio\Command\CommandBus (命令总线模式CQRS中的C)

    • \AwdStudio\Command\SimpleCommandBus - 通过单个处理器处理命令。
  • \AwdStudio\Query\QueryBus (查询总线模式CQRS中的Q)

    • \AwdStudio\Query\SimpleQueryBus - 通过单个处理器处理查询。
  • \AwdStudio\Event\EventBus 观察者-订阅者模式

    • \AwdStudio\Event\SimpleEventBus - 将事件分发到每个订阅者(可以是>= 0)。

命令总线

<?php

use AwdStudio\Bus\Handler\InMemoryHandlerLocator;
use AwdStudio\Command\SimpleCommandBus;

class MyCommand {
    // Messages might be any of PHP class.
    // No any of implementation or extending required.
}

$handlers = new InMemoryHandlerLocator();
// Register a handler. It can be any callable thing.
$handlers->add(MyCommand::class, static function (MyCommand $command): void {});

$bus = new SimpleCommandBus($handlers);

$bus->handle(new MyCommand());

查询总线

<?php

use AwdStudio\Bus\Handler\InMemoryHandlerLocator;
use AwdStudio\Query\SimpleQueryBus;

class MyQuery {
    // Messages might be any of PHP class.
    // No any of implementation or extending required.
}

$handlers = new InMemoryHandlerLocator();
// Register a handler. It can be any callable thing.
$handlers->add(MyQuery::class, static function (MyQuery $query): string {
    return 'foo';
});

$bus = new SimpleQueryBus($handlers);

$result = $bus->handle(new MyQuery());

// Result will be:
// -> prefix foo suffix

事件总线

<?php

use AwdStudio\Bus\Handler\InMemoryHandlerLocator;
use AwdStudio\Event\SimpleEventBus;

class MyEvent {
    // Messages might be any of PHP class.
    // No any of implementation or extending required.
}

$subscribers = new InMemoryHandlerLocator();
// Register a handler. It can be any callable thing.
$subscribers->add(MyEvent::class, static function (MyEvent $event): void {});
// As the event-bus pattern allows to provide any amount of subscribers
// we cah add more of them:
$subscribers->add(MyEvent::class, static function (MyEvent $event): void {});

$bus = new SimpleEventBus($subscribers);

$bus->handle(new MyEvent());

// After that, the event is delivered to each subscriber.

订阅父类

该库允许订阅不仅限于某个特定的类,还可以是其所有父类 - 无论是父类还是任何级别的实现。

<?php

use AwdStudio\Bus\Handler\ParentsAwareClassHandlerRegistry;
use AwdStudio\Bus\Handler\PsrContainerClassHandlerRegistry;
use Psr\Container\ContainerInterface;

class MyPsr11Container implements ContainerInterface {}

interface Foo {}
abstract class Bar {}
final class Baz extends Bar implements Foo {}

class Handler
{
    // You can subscribe on any of level
    public function __invoke(Foo $message): void {}
    // ..or
    public function __invoke(Bar $message): void {}
    // ..or
    public function __invoke(Baz $message): void {}
}

$handlerRegistry = new ParentsAwareClassHandlerRegistry(new PsrContainerClassHandlerRegistry(new MyPsr11Container()));

将服务用作处理器

当然,仅将回调作为处理器来解析并不是构建项目的便捷方式。幸运的是,我们有一些标准,如PSR-11,用于实现诸如DIP之类的常见用例。而且,该库提供了使用这些容器作为服务定位器的功能,以便解析处理器作为DI。

要使用它,有一个处理器定位器的装饰器,可以用于使用FCQN注册处理器。作为依赖项,它接受任何Psr\Container\ContainerInterface,它应该解析处理器。

<?php

use AwdStudio\Bus\Handler\PsrContainerClassHandlerRegistry;
use AwdStudio\Bus\SimpleBus;
use Psr\Container\ContainerInterface;

class MyPsr11Container implements ContainerInterface
{
    private $dependencies;

    public function __construct(array $dependencies)
    {
        $this->dependencies = $dependencies;
    }

    public function has($id): bool
    {
        return \in_array($id, $this->dependencies, true);
    }

    public function get($id): object
    {
        return $id();
    }
}

class StdClassHandler
{
    public function __invoke(\stdClass $message): void
    {
        $message->foo = 'foo';
    }
}

$serviceLocator = new MyPsr11Container([StdClassHandler::class]);
$handlerRegistry = new PsrContainerClassHandlerRegistry($serviceLocator);

// To assign a handler use a defined method:
$handlerRegistry->register(\stdClass::class, StdClassHandler::class);

// And pass them as a handler-locator to a bus
$bus = new class ($handlerRegistry) extends SimpleBus {
    public  function handle(object $message): void 
    {
        foreach ($this->handleAll($message) as $result) {
            echo $result;
        }
    }
};

// After that, you can call handling as usual:
$bus->handle(new \stdClass()); // The handler will be executed

自动注册服务

甚至还有一个装饰器可以自动订阅回调,通过它们的签名,这些签名应该包含一个类型提示作为第一个参数。

<?php

use AwdStudio\Bus\Handler\AutoRegisterHandlersRegistryClass;
use AwdStudio\Bus\Handler\PsrContainerClassHandlerRegistry;

$psrRegistry = new PsrContainerClassHandlerRegistry(new  MyPsr11Container());
$autoRegistry = new AutoRegisterHandlersRegistryClass($psrRegistry);

// Now, you can add a callback to assign a handler automatically.
// Just be sure, that it has a correct type-hint of a message that it handles.
$handler = static function (\stdClass $message): void { };
$autoRegistry->autoAdd($handler); // It will be called within the stdClass' messages.

// And this is not all it can! 
// If you use services as handlers - you also can register them automatically. 
// Suppose we have this handler, that can be resolved from our container:
class Handler {
    public function __invoke(\stdClass $message): void { }
}

// We can register it like so:
$autoRegistry->autoRegister(Handler::class);

// That's all..

使用您自己的处理方法

如果您不喜欢可调用服务,或者以某种方式需要使用通过不同方法处理的处理器,这根本不是问题。

在注册时只需传递方法名称即可

<?php

use AwdStudio\Bus\Handler\PsrContainerClassHandlerRegistry;

class Handler {
    public function handle(\stdClass $message): void { }
}

// Any registry can manage with it out of the box
$psrRegistry = new PsrContainerClassHandlerRegistry(new  MyPsr11Container());
$psrRegistry->register(\stdClass::class, Handler::class, 'handle');
// The 3rd argument tells which method is in charge of handling.

定义自定义总线

要定义您自己的总线,您可以扩展预定义的任何一个。您有两个选项

<?php

use AwdStudio\Bus\SimpleBus;

class MyBus extends SimpleBus
{
    public function handle(object $message): string 
    {
        $result = '';
        foreach ($this->handleAll($message) as $handled) {
            $result .= $handled;
        }
    
        return $result;
    }
}

SimpleBus允许您仅使用句柄来处理消息。

<?php

use AwdStudio\Bus\SimpleBus;

class MyBus extends SimpleBus
{
    public function handle(object $message): string 
    {
        $result = '';
        foreach ($this->handleMessage($message) as $chain) {
            $result .= $chain();
        }
    
        return $result;
    }
}

测试

composer setup-dev
composer test