everlution/message-bus

该软件包最新版本(v1.1.1)没有提供许可证信息。

MessageBus

v1.1.1 2019-06-05 14:56 UTC

This package is auto-updated.

Last update: 2024-09-06 02:56:10 UTC


README

此库为PHP提供了一种eventbus解决方案,用于使微服务相互交互。

安装

通常只需使用以下composer命令

composer require everlution\message-bus

组件

此库提供默认组件,但您可以通过实现适当的接口轻松开发自己的组件。

传输

必须实现 Everlution\MessageBus\Transport\TransportInterface

该组件是实际与“物理”消息总线交互的服务。换句话说,它是使用特定技术的实现。

此库已提供RabbitMQ传输 Everlution\MessageBus\Transport\RabbitMqTransport,使用fanout交换,并在消费者抛出任何异常时重新发布消息。

use PhpAmqpLib\Connection\AMQPStreamConnection;
use Everlution\MessageBus\Transport\RabbitMqTransport;

$connection = new AMQPStreamConnection('rabbitmq_host', 5672, 'user', 'password');
$exchange = 'MessageBus';
$consumerName = 'consumer1';

$transport = new RabbitMqTransport($connection, $exchange, $consumerName);

消息定义

这是将在传输上发布的实际消息。

消息只能是数组,它将使用JSON Schema定义进行验证。实际上,消息定义要求您提供

  • 消息名称(作为区分器)
  • 验证消息有效负载本身的JSON Schema

该库提供了用于测试的 Everlution\MessageBus\MessageDefinition\PingMessageDefinition

<?php

class SmsSendMessage implements MessageDefinitionInterface
{
    public static function getName(): string
    {
        return 'sms_send';
    }

    public function getJsonSchema(): string
    {
        $schema = [
            'type' => 'object',
            'properties' => [
                'messageName' => [
                    'type' => 'string',
                    'enum' => [self::getName()],
                ],
                'number' => [
                    'type' => 'string',
                    'regex' => '$\d*^'
                ],
                'text' => [
                    'type' => 'string',
                    'maxLength' => 255,
                ],
            ],
            'additionalProperties' => false,
            'required' => [
                'messageName',
                'number',
                'text',
            ],
        ];

        return json_encode($schema);
    }
}

因此,为了定义可以在消息总线上发布的消费者,您需要创建消息定义,然后将它们添加到 Protocol

您可以根据需要自由地塑造消息定义,因为此库不强制您遵循任何约束。

协议

Protocol 实际上定义了消息总线所有消费者使用的协议,用于描述可以在 Transport 上发布的所有可用消息的结构。

协议必须实现 $protocol->addMessageDefinition() 方法,您需要为要在环境中使用的每个消息定义调用此方法。

它不仅代表了消息定义的注册表。

此库提供了一个默认的协议 Everlution\MessageBus\Protocol\DefaultProtocol,它基本上允许发布者仅发布与协议中添加的消息定义之一匹配的消息。

use Everlution\MessageBus\Protocol\DefaultProtocol;

$protocol = new DefaultProtocol();
$protocol->addMessageDefinition(new SmsSendMessage());

验证器

此组件负责验证消息与协议。

提供的 Everlution\MessageBus\Validator\DefaultValidator 使用 Opis\JsonSchema 库进行验证,但您可以通过实现 Everlution\MessageBus\Validator\ValidatorInterface 来开发自己的验证服务。

use Everlution\MessageBus\Validator\DefaultValidator;
use Everlution\MessageBus\Validator\ValidatorException;

$validator = new DefaultValidator();

$message = [
    'messageName' => 'sms_send',
    'number' => '0777777777',
    'text' => 'This is my message,
];

try {
    $validator->validate($protocol, $message);
} catch (ValidatorException $e) {
    var_dump($e->getJsonSchema());
    var_dump($e->getData());
    var_dump($e->getErrors());
}

序列化器

序列化器将您想在传输上发布的消息数组明确地转换为字符串,反之亦然。

该库已提供 Everlution\MessageBus\Serializer\JsonSerializer

发布者

发布者必须实现 Everlution\MessageBus\Publisher\PublisherInterface,其任务是在传输上发布消息。

提供的 Everlution\MessageBus\Publisher\DefaultPublisher 执行以下步骤

  1. 验证消息与协议
  2. 序列化消息
  3. 在传输上发布消息
use Everlution\MessageBus\Publisher\DefaultPublisher;

$publisher = new DefaultPublisher($protocol, $validator, $serializer, $transport);

try {
    $publisher->publish($message);
} catch (\Everlution\MessageBus\Validator\ValidatorException $e) {
    var_dump($e->getJsonSchema());
    var_dump($e->getData());
    var_dump($e->getErrors());
}

消费者

消费者必须实现 Everlution\MessageBus\Consumer\ConsumerInterface,并定义应用程序中消费消息的业务逻辑。

此库提供了两个消费者。

回声消费者

这个消费者 Everlution\MessageBus\Consumer\EchoConsumer 仅用于测试目的,基本上会执行 var_dump(message)

use Everlution\MessageBus\Consumer\EchoConsumer;

$consumer = new EchoConsumer($validator, $protocol, $serializer, $transport);

try {
    $consumer->consume($numberOfMessagesToConsumeAntThenExit = 1);
} catch (\Everlution\MessageBus\Validator\ValidatorException $e) {
    var_dump($e->getJsonSchema());
    var_dump($e->getData());
    var_dump($e->getErrors());
}

Symfony事件分发器消费者

这个消费者 Everlution\MessageBus\Consumer\SymfonyEventDispatcherConsumer 与Symfony集成,允许您将普通消息转换为Symfony事件并在您的应用程序中分发。这样,您可以简单地定义一个事件订阅者,并实现所有需要的逻辑。

这种概念将消息总线与应用程序解耦,因为事件订阅者不知道事件是由谁生成的。

模拟器

如果您正在使用这个库,您很可能会在微服务中工作。如果是这样,您可能不希望在本地的开发环境中运行所有微服务以生成消息总线上的消息。此外,生成所需的特定消息可能会更加困难。

模拟器可以帮助您,因为您只需要在本地运行您的微服务,并使用模拟器生成所需的特定消息,就像它们来自其他来源一样。

这个概念非常有用,因为它可以为您节省大量时间,并使您能够只关注该微服务。

此库提供了 Everlution\MessageBus\Simulator\JsonSchemaFakerSimulator,它使用您想要模拟的消息的JSON模式,并使用Faker库生成并发布一个有效消息。这个服务并不覆盖所有您可能定义的JSON模式,但它是一个很好的起点。

您可以通过实现 Everlution\MessageBus\Simulator\SimulatorInterface 来扩展它或实现自己的模拟器。

演示

您可以在 /demo 中找到一些工作示例。

您可以使用以下命令使用Docker运行它们

# running composer
./docker/bin/composer.sh "install"

# docker-compose up which spins up rabbitmq etc
./docker/bin/compose/up.sh

# You can ssh in the php-cli container
./docker/bin/ssh.sh php-cli

# You can now run the demo scripts
php demo/ping-publisher.php
php demo/ping-consumer.php

# When you are done tear everything down and destroy all
./docker/bin/compose/down.sh
./docker/bin/compose/rm.sh