roslov/queue-bundle

队列包

安装: 229

依赖项: 0

建议者: 0

安全性: 0

星星: 1

关注者: 1

分支: 0

开放问题: 2

类型:symfony-bundle

1.1.0 2024-05-28 16:17 UTC

This package is auto-updated.

Last update: 2024-08-28 16:46:07 UTC


README

此包提供与RabbitMQ一起工作的类。

它基于RabbitMQ包

要求

  • PHP 7.4或更高版本
  • Symfony 3.4或更高版本
  • Doctrine包(可选)
  • MySQL数据库(可选)

待办事项

  • RPC客户端:允许多次请求调用
  • Doctrine:添加自动迁移
  • Doctrine:添加自动实体设置
  • 测试:添加测试

安装和使用

默认包配置

可以使用composer安装此包

composer require roslov/queue-bundle

然后通过创建以下内容的config/packages/roslov_queue.yaml来更改默认设置。

# config/packages/roslov_queue.yaml
roslov_queue:
  # Microservice name. This value will be used as a source of your published message
  service_name: my_service
  # Set this value to `true` is you’re using the SSL connection to RabbitMQ (for example, in AWS)
  ssl_enabled: false
  # Whether RabbitMQ bundle v1.x is used. It should be set to `false` for RabbitMQ bundles v2+
  legacy_rabbitmq_bundle: false
  # PSR-3 logger service
  logger: logger
  # Entity manager service. If you do not produce messages, set it to `null` (`~`)
  entity_manager: doctrine.orm.default_entity_manager
  # Event processor
  event_processor:
    # Whether event processor is enabled. If disabled, no events will be sent or saved
    enabled: false
    # Whether event processor uses instant delivery. If disabled, the event processor is used as transactional outbox
    instant_delivery: true
    # Delayed delivery subscriber. If disabled, the events will be stored but not sent (useful for tests)
    delayed_delivery_subscriber: true
  # RPC client
  rpc_client:
    # Whether RPC client should be created
    enabled: false
    # RabbitMQ connection
    connection: default
  # RPC server
  rpc_server:
    # Whether RPC server should be created
    enabled: false
    # RabbitMQ connection
    connection: default
    # Exchange name
    exchange: rpc_exchange
    # Setup callable. If you need to run some processes before running each handler (like DB connection refresh), add
    # the callable service here
    setup: ~
    # Handlers
    handlers: []
      # Put your handlers here:
      # App\Dto\Queue\GetUserCommand: App\Rpc\UserHandler
  # Message type to payload mapping. Extend this array with your payloads
  payload_mapping:
    Error: Roslov\QueueBundle\Dto\Error
    Trigger: Roslov\QueueBundle\Dto\Trigger
    Response.Empty: Roslov\QueueBundle\Dto\EmptyResponse
    Exception.Thrown: Roslov\QueueBundle\Dto\ExceptionThrown
    # Put your payloads here
  # By default, exception_subscriber is turned off
  exception_subscriber:
    # Whether exception subscriber should be enabled. If enabled, `exception_sender.exchange_options` is required
    enabled: false
    # Exception validator callable. If you need to check whether exception subscriber should execute its code with
    # the given exception, add the callable service here. It must return `true` if the exception is OK and the
    # notification should be sent, or `false` if passed exception is not OK and should not be notified.
    # Check the example of exception validator class below
    exception_validator: ~
  # Exception sender
  exception_sender:
    # RabbitMQ connection
    connection: default
    # Put exchange options here. This option is required if you either enabled `exception_subscriber` or use this sender
    # manually
    exchange_options: { name: 'exchange_name', type: topic }

RabbitMQ配置

此包还安装了RabbitMQ包。因此,首先需要配置RabbitMQ包。遵循其文档。例如

# config/packages/old_sound_rabbit_mq.yaml
old_sound_rabbit_mq:
  # RabbitMQ connection config
  connections:
    default:
      url: '%env(RABBITMQ_URL)%'
      lazy: true
      connection_timeout: 5
      read_write_timeout: 60
      keepalive: false
      heartbeat: 30
      # Use this parameter only if you need to use SSL connection to RabbitMQ.
      # For RabbitMQ bundle v2.11.1 or older, use `roslov_queue.rabbitmq.connection_params_provider`
      connection_parameters_provider: roslov_queue.rabbitmq.simple_ssl_context_provider
  # Producers (if used)
  producers:
    user_created:
      class: App\Producer\UserCreatedProducer
      connection: default
      exchange_options: { name: 'user', type: topic, auto_delete: false, durable: true }
      enable_logger: true
    # ...other producers
  # Multiple consumers
  multiple_consumers:
    main:
      connection: default
      exchange_options: { name: 'main', type: direct, auto_delete: false, durable: true }
      enable_logger: true
      queues:
        user-created:
          name: user_created
          routing_keys:
            - user-created
          callback: App\Consumer\UserCreatedConsumer
        # other consumers

消费者和生产者

创建将在消费者和生产者中使用的DTO,并将它们添加到roslov_queue.payload_mapping(见示例)。

创建一个使用Roslov\QueueBundle\Serializer\MessagePayloadSerializer作为序列化器的消费者

<?php

declare(strict_types=1);

namespace App\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Roslov\QueueBundle\Serializer\MessagePayloadSerializer;

final class UserCreatedConsumer implements ConsumerInterface
{
    public function __construct(private MessagePayloadSerializer $serializer)
    {
    }

    public function execute(AMQPMessage $msg): int
    {
        // Restore connections to DB if needed...
        // Refresh entity manager if used (`$this->em->clear()`)...

        $dto = $this->serializer->deserialize($msg->getBody());
        // `$dto` will be automatically detected based on payload type.

        // Process DTO...

        return ConsumerInterface::MSG_ACK;
    }
}

将您的消费者添加到old_sound_rabbit_mq.consumersold_sound_rabbit_mq.multiple_consumers

创建一个扩展Roslov\QueueBundle\Producer\BaseProducer并实现getRoutingKey()的生产者

<?php

declare(strict_types=1);

namespace App\Producer;

use Roslov\QueueBundle\Producer\BaseProducer;

final class UserCreatedProducer extends BaseProducer
{
    protected function getRoutingKey(): string
    {
        return 'user-created';
    }
}

将您的生产者添加到old_sound_rabbit_mq.producers

创建一个通过扩展BaseProducerFacade并注入EventProcessor来将所有生产者调用放在一个地方的包装器

<?php

declare(strict_types=1);

namespace App\Producer;

use App\Dto\Queue\UserCreated;
use Roslov\QueueBundle\Processor\EventProcessor;
use Roslov\QueueBundle\Producer\BaseProducerFacade;

/**
 * Keeps all calls to producers.
 */
final class ProducerFacade extends BaseProducerFacade
{
    public function __construct(
        EventProcessor $eventProcessor,
        // Inject other services
    ) {
        parent::__construct($eventProcessor);
    }

    public function sendUserCreatedEvent(int $userId): void
    {
        $payload = new UserCreated();
        $payload->setId($userId);
        $this->send('user_created', $payload);
    }
}

事件存储在数据库中,并在内核终止或消息消费后发送。因此,必须创建一个用于事件的数据库表。目前,仅支持Doctrine和MySQL

CREATE TABLE event (
    id bigint(20) AUTO_INCREMENT NOT NULL,
    microtime double(16,6) NOT NULL COMMENT 'Unix timestamp with microseconds',
    producer_name varchar(64) NOT NULL COMMENT 'Producer name',
    body varchar(4096) NOT NULL COMMENT 'Full message body',
    created_at timestamp NOT NULL DEFAULT current_timestamp COMMENT 'Creation timestamp',
    updated_at timestamp NOT NULL DEFAULT current_timestamp ON UPDATE current_timestamp
        COMMENT 'Update timestamp',
    PRIMARY KEY (id)
) COMMENT = 'Events (transactional outbox)';

并将Event实体添加到Doctrine配置

# config/packages/doctrine.yaml
doctrine:
  orm:
    mappings:
      RoslovQueue:
        is_bundle: false
        type: annotation
        dir: '%kernel.project_dir%/vendor/roslov/queue-bundle/src/Entity'
        prefix: Roslov\QueueBundle\Entity
        alias: RoslovQueue

现在可以通过$producerFacade->sendUserCreatedEvent(123)发送事件。

最佳的使用事件处理器的方式是在事务中使用,以符合事务性输出模式。因此,必须在代码中的某个位置调用生产者包装器,然后在事务结束时刷新所有事件

$this->em->getConnection()->beginTransaction();
try {
    // Your code...
    $producerFacade->sendUserCreatedEvent(123); // Creating an event — the event will be stored in memory.
                                                // We cannot store it in DB right now because this code may be used in
                                                // Doctrine lifetime cycles.
    // Your code...
    $this->eventProcessor->flush(); // All events are being stored in DB.
                                    // This should be done right before committing. Otherwise, you may lose your events.
                                    // All events will be sent to RabbitMQ on kernel terminate or on message consume.
    $this->em->getConnection()->commit();
} catch (Throwable $e) {
    $this->em->getConnection()->rollBack();
    throw $e;
}

如果生产消息,不要忘记在roslov_queue.event_processor.enabled中启用事件处理器。

请注意,默认情况下,事务性输出支持是禁用的。要启用它,将roslov_queue.event_processor.instant_delivery设置为false

在某些微服务中,可能不需要使用事务性输出,因此事件可以立即发送。在这种情况下,将roslov_queue.event_processor.instant_delivery设置为true,这样BaseProducerFacade::send()EventProcessor::save()都将立即发送事件(而无需先将其保存到数据库)。这是默认行为。

对于自动化测试,可以禁用roslov_queue.event_processor.delayed_delivery_subscriber。在这种情况下,事件将存储在数据库中但不会发送。因此,可以测试事件是否已创建。请注意,如果启用了即时交付,则此功能将不起作用——事件将立即发送。

RPC服务器和客户端

如果需要使用远程过程调用(RPC),请在客户端服务上启用roslov_queue.rpc_client.enabled,在服务器服务上启用roslov_queue.rpc_server.enabledroslov_queue.rpc_server.exchange

# config/packages/roslov_queue.yaml
roslov_queue:
  rpc_client:
    enabled: true
  rpc_server:
    enabled: true
    exchange: rpc_exchange

RPC客户端使用的示例

<?php

declare(strict_types=1);

namespace App\Queue;

use App\Dto\Queue\GetUserCommand;
use App\Dto\Queue\User;
use Psr\Log\LoggerInterface;
use Roslov\QueueBundle\Dto\Error;
use Roslov\QueueBundle\Exception\UnknownErrorException;
use Roslov\QueueBundle\Rpc\ClientInterface;

final class UserProvider
{
    private const EXCHANGE_NAME = 'rpc.main';

    private const USER_NOT_FOUND = 'UserNotFound';

    public function __construct(private ClientInterface $client, private LoggerInterface $logger)
    {
    }

    public function getUser(int $id): ?User
    {
        $command = new GetUserCommand();
        $command->setId($id);

        /** @var User|Error $user */
        $user = $this->client->call($command, self::EXCHANGE_NAME);

        if ($user instanceof User) {
            $this->logger->info("The details for the user with id \"$id\" have been received.");
            return $user;
        }
        if ($user instanceof Error && $user->getType() === self::USER_NOT_FOUND) {
            $this->logger->info("The user with id \"$id\" does not exist on the remote server.");
            return null;
        }
        throw new UnknownErrorException('Unknown error happened.');
    }
}

对于RPC服务器,添加处理命令并返回结果的处理程序

# config/packages/roslov_queue.yaml
roslov_queue:
  rpc_server:
    handlers:
      App\Dto\Queue\GetUserCommand: App\Rpc\UserHandler
      # Other handlers...

RPC服务器处理器的示例

<?php

declare(strict_types=1);

namespace App\Rpc;

use App\Dto\Queue\GetUserCommand;
use InvalidArgumentException;
use Roslov\QueueBundle\Dto\Error;
use Roslov\QueueBundle\Rpc\HandlerInterface;

final class UserHandler implements HandlerInterface
{
    private const USER_NOT_FOUND = 'UserNotFound';

    public function handle(object $command): object
    {
        if (!$command instanceof GetUserCommand) {
            throw new InvalidArgumentException(sprintf(
                'Command "%s" is not supported. The handler supports "%s" only.',
                $command::class,
                GetUserCommand::class
            ));
        }

        // Search for a user
        $user = $this->findUser($command->getId()); // Your code for getting a user

        if ($user === null) {
            $error = new Error();
            $error->setType(self::USER_NOT_FOUND);
            $error->setMessage('User not found.');
            return $error;
        }
        return $user;
    }
}

要运行RPC服务器,使用

bin/console rabbitmq:rpc-server roslov_queue

异常事件

此组件允许自动发送关于抛出异常的事件。

请注意,默认情况下,exception_subscriber是禁用的。要启用它,将roslov_queue.exception_subscriber.enabled设置为true

异常订阅者使用路由键exception-thrown

异常验证器类的示例,可以传递给roslov_queue.exception_validator配置

<?php
final class ExceptionValidator
{
    /**
     * Returns `true` if notification about exception SHOULD BE sent.
     *
     * In this case, we notify about all exceptions except `UserNotFoundException`.
     *
     * @param \Throwable $exception The exception that must be validated
     * @return bool Validation result
     */
    public function __invoke(\Throwable $exception): bool
    {
        return !$exception instanceof \App\Exception\UserNotFoundException;
    }
}

如果您想手动发送异常事件,请使用\Roslov\QueueBundle\Sender\ExceptionSender::sendExceptionThrownEvent()

重新发送消息

如果发生了一些事情,您需要将相同的消息再次发送到同一个队列,请在您的消费者中使用return ConsumerInterface::MSG_SINGLE_NACK_REQUEUE;代替return ConsumerInterface::MSG_ACK;

测试

单元测试

该包使用PHPUnit进行测试。要运行测试

./vendor/bin/phpunit

代码风格分析

代码风格使用PHP_CodeSnifferPSR-12 Ext编码标准进行分析。要运行代码风格分析

./vendor/bin/phpcs --extensions=php --colors --standard=PSR12Ext --ignore=vendor/* -p -s .