roslov / queue-bundle
队列包
Requires
- php: ^7.4|^8.0
- ext-json: *
- ext-mbstring: *
- ext-sockets: *
- doctrine/orm: ^2.7
- php-amqplib/rabbitmq-bundle: ^1.15|^2.10
- roslov/log-obfuscator: ^1.0
- symfony/config: ^3.4|^4.0|^5.0|^6.0|^7.0
- symfony/dependency-injection: ^3.4|^4.0|^5.0|^6.0|^7.0
- symfony/framework-bundle: ^3.4|^4.0|^5.0|^6.0|^7.0
- symfony/property-access: ^3.4|^4.0|^5.0|^6.0|^7.0
- symfony/serializer: ^3.4|^4.0|^5.0|^6.0|^7.0
Requires (Dev)
- phpunit/phpunit: ^9.5
- roslov/psr12ext: ^9.1
README
此包提供与RabbitMQ一起工作的类。
它基于RabbitMQ包。
要求
- PHP 7.4或更高版本
- Symfony 3.4或更高版本
- Doctrine包(可选)
- MySQL数据库(可选)
待办事项
- RPC客户端:允许多次请求调用
- Doctrine:添加自动迁移
- Doctrine:添加自动实体设置
- 测试:添加测试
安装和使用
默认包配置
可以使用composer安装此包
composer require roslov/queue-bundle
然后通过创建以下内容的config/packages/roslov_queue.yaml
来更改默认设置。
# config/packages/roslov_queue.yaml roslov_queue: # Microservice name. This value will be used as a source of your published message service_name: my_service # Set this value to `true` is you’re using the SSL connection to RabbitMQ (for example, in AWS) ssl_enabled: false # Whether RabbitMQ bundle v1.x is used. It should be set to `false` for RabbitMQ bundles v2+ legacy_rabbitmq_bundle: false # PSR-3 logger service logger: logger # Entity manager service. If you do not produce messages, set it to `null` (`~`) entity_manager: doctrine.orm.default_entity_manager # Event processor event_processor: # Whether event processor is enabled. If disabled, no events will be sent or saved enabled: false # Whether event processor uses instant delivery. If disabled, the event processor is used as transactional outbox instant_delivery: true # Delayed delivery subscriber. If disabled, the events will be stored but not sent (useful for tests) delayed_delivery_subscriber: true # RPC client rpc_client: # Whether RPC client should be created enabled: false # RabbitMQ connection connection: default # RPC server rpc_server: # Whether RPC server should be created enabled: false # RabbitMQ connection connection: default # Exchange name exchange: rpc_exchange # Setup callable. If you need to run some processes before running each handler (like DB connection refresh), add # the callable service here setup: ~ # Handlers handlers: [] # Put your handlers here: # App\Dto\Queue\GetUserCommand: App\Rpc\UserHandler # Message type to payload mapping. Extend this array with your payloads payload_mapping: Error: Roslov\QueueBundle\Dto\Error Trigger: Roslov\QueueBundle\Dto\Trigger Response.Empty: Roslov\QueueBundle\Dto\EmptyResponse Exception.Thrown: Roslov\QueueBundle\Dto\ExceptionThrown # Put your payloads here # By default, exception_subscriber is turned off exception_subscriber: # Whether exception subscriber should be enabled. If enabled, `exception_sender.exchange_options` is required enabled: false # Exception validator callable. If you need to check whether exception subscriber should execute its code with # the given exception, add the callable service here. It must return `true` if the exception is OK and the # notification should be sent, or `false` if passed exception is not OK and should not be notified. # Check the example of exception validator class below exception_validator: ~ # Exception sender exception_sender: # RabbitMQ connection connection: default # Put exchange options here. This option is required if you either enabled `exception_subscriber` or use this sender # manually exchange_options: { name: 'exchange_name', type: topic }
RabbitMQ配置
此包还安装了RabbitMQ包。因此,首先需要配置RabbitMQ包。遵循其文档。例如
# config/packages/old_sound_rabbit_mq.yaml old_sound_rabbit_mq: # RabbitMQ connection config connections: default: url: '%env(RABBITMQ_URL)%' lazy: true connection_timeout: 5 read_write_timeout: 60 keepalive: false heartbeat: 30 # Use this parameter only if you need to use SSL connection to RabbitMQ. # For RabbitMQ bundle v2.11.1 or older, use `roslov_queue.rabbitmq.connection_params_provider` connection_parameters_provider: roslov_queue.rabbitmq.simple_ssl_context_provider # Producers (if used) producers: user_created: class: App\Producer\UserCreatedProducer connection: default exchange_options: { name: 'user', type: topic, auto_delete: false, durable: true } enable_logger: true # ...other producers # Multiple consumers multiple_consumers: main: connection: default exchange_options: { name: 'main', type: direct, auto_delete: false, durable: true } enable_logger: true queues: user-created: name: user_created routing_keys: - user-created callback: App\Consumer\UserCreatedConsumer # other consumers
消费者和生产者
创建将在消费者和生产者中使用的DTO,并将它们添加到roslov_queue.payload_mapping
(见示例)。
创建一个使用Roslov\QueueBundle\Serializer\MessagePayloadSerializer
作为序列化器的消费者
<?php declare(strict_types=1); namespace App\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; use Roslov\QueueBundle\Serializer\MessagePayloadSerializer; final class UserCreatedConsumer implements ConsumerInterface { public function __construct(private MessagePayloadSerializer $serializer) { } public function execute(AMQPMessage $msg): int { // Restore connections to DB if needed... // Refresh entity manager if used (`$this->em->clear()`)... $dto = $this->serializer->deserialize($msg->getBody()); // `$dto` will be automatically detected based on payload type. // Process DTO... return ConsumerInterface::MSG_ACK; } }
将您的消费者添加到old_sound_rabbit_mq.consumers
或old_sound_rabbit_mq.multiple_consumers
。
创建一个扩展Roslov\QueueBundle\Producer\BaseProducer
并实现getRoutingKey()
的生产者
<?php declare(strict_types=1); namespace App\Producer; use Roslov\QueueBundle\Producer\BaseProducer; final class UserCreatedProducer extends BaseProducer { protected function getRoutingKey(): string { return 'user-created'; } }
将您的生产者添加到old_sound_rabbit_mq.producers
。
创建一个通过扩展BaseProducerFacade
并注入EventProcessor
来将所有生产者调用放在一个地方的包装器
<?php declare(strict_types=1); namespace App\Producer; use App\Dto\Queue\UserCreated; use Roslov\QueueBundle\Processor\EventProcessor; use Roslov\QueueBundle\Producer\BaseProducerFacade; /** * Keeps all calls to producers. */ final class ProducerFacade extends BaseProducerFacade { public function __construct( EventProcessor $eventProcessor, // Inject other services ) { parent::__construct($eventProcessor); } public function sendUserCreatedEvent(int $userId): void { $payload = new UserCreated(); $payload->setId($userId); $this->send('user_created', $payload); } }
事件存储在数据库中,并在内核终止或消息消费后发送。因此,必须创建一个用于事件的数据库表。目前,仅支持Doctrine和MySQL
CREATE TABLE event ( id bigint(20) AUTO_INCREMENT NOT NULL, microtime double(16,6) NOT NULL COMMENT 'Unix timestamp with microseconds', producer_name varchar(64) NOT NULL COMMENT 'Producer name', body varchar(4096) NOT NULL COMMENT 'Full message body', created_at timestamp NOT NULL DEFAULT current_timestamp COMMENT 'Creation timestamp', updated_at timestamp NOT NULL DEFAULT current_timestamp ON UPDATE current_timestamp COMMENT 'Update timestamp', PRIMARY KEY (id) ) COMMENT = 'Events (transactional outbox)';
并将Event
实体添加到Doctrine配置
# config/packages/doctrine.yaml doctrine: orm: mappings: RoslovQueue: is_bundle: false type: annotation dir: '%kernel.project_dir%/vendor/roslov/queue-bundle/src/Entity' prefix: Roslov\QueueBundle\Entity alias: RoslovQueue
现在可以通过$producerFacade->sendUserCreatedEvent(123)
发送事件。
最佳的使用事件处理器的方式是在事务中使用,以符合事务性输出模式。因此,必须在代码中的某个位置调用生产者包装器,然后在事务结束时刷新所有事件
$this->em->getConnection()->beginTransaction(); try { // Your code... $producerFacade->sendUserCreatedEvent(123); // Creating an event — the event will be stored in memory. // We cannot store it in DB right now because this code may be used in // Doctrine lifetime cycles. // Your code... $this->eventProcessor->flush(); // All events are being stored in DB. // This should be done right before committing. Otherwise, you may lose your events. // All events will be sent to RabbitMQ on kernel terminate or on message consume. $this->em->getConnection()->commit(); } catch (Throwable $e) { $this->em->getConnection()->rollBack(); throw $e; }
如果生产消息,不要忘记在roslov_queue.event_processor.enabled
中启用事件处理器。
请注意,默认情况下,事务性输出支持是禁用的。要启用它,将roslov_queue.event_processor.instant_delivery
设置为false
。
在某些微服务中,可能不需要使用事务性输出,因此事件可以立即发送。在这种情况下,将roslov_queue.event_processor.instant_delivery
设置为true
,这样BaseProducerFacade::send()
和EventProcessor::save()
都将立即发送事件(而无需先将其保存到数据库)。这是默认行为。
对于自动化测试,可以禁用roslov_queue.event_processor.delayed_delivery_subscriber
。在这种情况下,事件将存储在数据库中但不会发送。因此,可以测试事件是否已创建。请注意,如果启用了即时交付,则此功能将不起作用——事件将立即发送。
RPC服务器和客户端
如果需要使用远程过程调用(RPC),请在客户端服务上启用roslov_queue.rpc_client.enabled
,在服务器服务上启用roslov_queue.rpc_server.enabled
和roslov_queue.rpc_server.exchange
。
# config/packages/roslov_queue.yaml roslov_queue: rpc_client: enabled: true rpc_server: enabled: true exchange: rpc_exchange
RPC客户端使用的示例
<?php declare(strict_types=1); namespace App\Queue; use App\Dto\Queue\GetUserCommand; use App\Dto\Queue\User; use Psr\Log\LoggerInterface; use Roslov\QueueBundle\Dto\Error; use Roslov\QueueBundle\Exception\UnknownErrorException; use Roslov\QueueBundle\Rpc\ClientInterface; final class UserProvider { private const EXCHANGE_NAME = 'rpc.main'; private const USER_NOT_FOUND = 'UserNotFound'; public function __construct(private ClientInterface $client, private LoggerInterface $logger) { } public function getUser(int $id): ?User { $command = new GetUserCommand(); $command->setId($id); /** @var User|Error $user */ $user = $this->client->call($command, self::EXCHANGE_NAME); if ($user instanceof User) { $this->logger->info("The details for the user with id \"$id\" have been received."); return $user; } if ($user instanceof Error && $user->getType() === self::USER_NOT_FOUND) { $this->logger->info("The user with id \"$id\" does not exist on the remote server."); return null; } throw new UnknownErrorException('Unknown error happened.'); } }
对于RPC服务器,添加处理命令并返回结果的处理程序
# config/packages/roslov_queue.yaml roslov_queue: rpc_server: handlers: App\Dto\Queue\GetUserCommand: App\Rpc\UserHandler # Other handlers...
RPC服务器处理器的示例
<?php declare(strict_types=1); namespace App\Rpc; use App\Dto\Queue\GetUserCommand; use InvalidArgumentException; use Roslov\QueueBundle\Dto\Error; use Roslov\QueueBundle\Rpc\HandlerInterface; final class UserHandler implements HandlerInterface { private const USER_NOT_FOUND = 'UserNotFound'; public function handle(object $command): object { if (!$command instanceof GetUserCommand) { throw new InvalidArgumentException(sprintf( 'Command "%s" is not supported. The handler supports "%s" only.', $command::class, GetUserCommand::class )); } // Search for a user $user = $this->findUser($command->getId()); // Your code for getting a user if ($user === null) { $error = new Error(); $error->setType(self::USER_NOT_FOUND); $error->setMessage('User not found.'); return $error; } return $user; } }
要运行RPC服务器,使用
bin/console rabbitmq:rpc-server roslov_queue
异常事件
此组件允许自动发送关于抛出异常的事件。
请注意,默认情况下,exception_subscriber
是禁用的。要启用它,将roslov_queue.exception_subscriber.enabled
设置为true
。
异常订阅者使用路由键exception-thrown
。
异常验证器类的示例,可以传递给roslov_queue.exception_validator
配置
<?php final class ExceptionValidator { /** * Returns `true` if notification about exception SHOULD BE sent. * * In this case, we notify about all exceptions except `UserNotFoundException`. * * @param \Throwable $exception The exception that must be validated * @return bool Validation result */ public function __invoke(\Throwable $exception): bool { return !$exception instanceof \App\Exception\UserNotFoundException; } }
如果您想手动发送异常事件,请使用\Roslov\QueueBundle\Sender\ExceptionSender::sendExceptionThrownEvent()
。
重新发送消息
如果发生了一些事情,您需要将相同的消息再次发送到同一个队列,请在您的消费者中使用return ConsumerInterface::MSG_SINGLE_NACK_REQUEUE;
代替return ConsumerInterface::MSG_ACK;
。
测试
单元测试
该包使用PHPUnit进行测试。要运行测试
./vendor/bin/phpunit
代码风格分析
代码风格使用PHP_CodeSniffer和PSR-12 Ext编码标准进行分析。要运行代码风格分析
./vendor/bin/phpcs --extensions=php --colors --standard=PSR12Ext --ignore=vendor/* -p -s .