needle-project / laravel-rabbitmq
为Laravel提供简单的RabbitMQ集成
Requires
- php: 7.*|8.*
- php-amqplib/php-amqplib: >=2.7|3.*
Requires (Dev)
- laravel/framework: 7.*|8.*|9.*|10.*
- phpcompatibility/php-compatibility: ^9.3
- phpmd/phpmd: @stable
- phpstan/phpstan: ^1.10
- phpunit/phpunit: 9.*|10.*
- squizlabs/php_codesniffer: 3.7.*
README
Laravel RabbitMQ
一个简单的基于发布-订阅模式的RabbitMQ库,其中订阅者是消费者
目录
-
2.1. 连接
2.2. 队列
2.3. 交换机
2.4. 发布者
2.5. 消费者
-
3.1. 发布消息
3.2. 消费消息
3.3. 可用的CLI命令
3.4. 自定义消息处理器
-
5.1 本地开发
5.2 需要的帮助
1. 安装
运行
composer require needle-project/laravel-rabbitmq
对于5.5或更高版本的Laravel,库应该通过包发现自动加载。
对于低于5.5版本的Laravel,您需要将服务提供者添加到app.php
<?php return [ // ... 'providers' => [ // ... NeedleProject\LaravelRabbitMq\Providers\ServiceProvider::class, ], // ... ];
2. 配置
- 在Laravel的配置目录中创建一个名为
laravel_rabbitmq.php
的新文件。 (或使用artisan vendor:publish
- 更多信息这里) - 根据您的需求填写配置。
配置文件有5个主要节点:连接、交换机、队列、发布者、消费者。
示例配置
return [ 'connections' => [ 'connectionA' => [/** Connection A attributes */], 'connectionB' => [/** Connection B attributes */], ], 'exchanges' => [ 'exchangeA' => [ // Tells that the exchange will use the connection A 'connection' => 'connectionA', /** Exchange A Attributes */ ], 'exchangeB' => [ // Tells that the exchange will use the connection B 'connection' => 'connectionB', /** Exchange B Attributes */ ] ], 'queues' => [ 'queueA' => [ // Tells that the queue will use the connection alias A 'connection' => 'connectionA', /** Queue A Attributes */ ] ], 'publishers' => [ 'aPublisherName' => /** will publish to exchange defined by alias */ 'exchangeA' ], 'consumers' => [ 'aConsumerName' => [ // will read messages from 'queue' => 'queueA', // and will send the for processing to an "NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface" 'message_processor' => \NeedleProject\LaravelRabbitMq\Processor\CliOutputProcessor::class ] ] ]
2.1. 连接
连接属性
- 所有属性都是可选的,如果没有定义,将使用默认值。
2.2. 队列
队列主要节点
队列属性
示例 1
[ ['exchange' => 'first.exchange', 'routing_key' => '*'], ['exchange' => 'second.exchange', 'routing_key' => 'foo_bar'], ]
2.3. 交换机
交换机主要节点
交换机属性
示例 2
[ ['queue' => 'first.exchange', 'routing_key' => '*'], ['queue' => 'second.exchange', 'routing_key' => 'foo_bar'], ]
2.4. 发布者
发布者向一个交换机(也可以向队列)推送消息。定义发布者
'publishers' => [ 'myFirstPublisher' => 'echangeAliasName', 'mySecondPublisher' => 'queueAliasName' // and many as you need ]
2.5. 消费者
消费者将从队列中获取消息。定义消费者
'consumers' => [ 'myConsumerName' => [ 'queue' => 'queueAliasName', 'prefetch_count' => 1, 'message_processor' => \NeedleProject\LaravelRabbitMq\Processor\CliOutputProcessor::class ] ]
3. 使用方法
配置完成后,您应该得到一个类似于以下内容的配置文件laravel_rabbitmq.php
return [ 'connections' => [ 'connectionA' => [], ], 'exchanges' => [ 'exchangeA' => [ 'connection' => 'connectionA', 'name' => 'foo_bar', 'attributes' => [ 'exchange_type' => 'topic' ] ] ], 'queues' => [ 'queueB' => [ 'connection' => 'connectionA', 'name' => 'foo_bar_listener', 'attributes' => [ 'bind' => [ ['exchange' => 'foo_bar', 'routing_key' => '*'] ] ] ] ], 'publishers' => [ 'aPublisherName' => 'exchangeA' ], 'consumers' => [ 'aConsumerName' => [ 'queue' => 'queueB', 'message_processor' => \NeedleProject\LaravelRabbitMq\Processor\CliOutputProcessor::class ] ] ]
3.1. 发布消息
代码中的使用示例
<?php /** * @var $app \Illuminate\Contracts\Container\Container * @var $publisher \NeedleProject\LaravelRabbitMq\PublisherInterface */ $publisher = $app->makeWith(PublisherInterface::class, ['aPublisherName']); $message = [ 'title' => 'Hello world', 'body' => 'Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.', ]; $routingKey = '*'; $publisher->publish(json_encode($message), /* optional */$routingKey);
可选,有一个命令可以用来发布消息。
php artisan rabbitmq:publish aPublisherName MyMessage
注意:目前,CLI中的路由键不受支持。
3.2. 消费消息
消费消息应该通过运行守护模式下的命令来完成。虽然PHP不适合做这个,但您可以使用supervisor来完成。
消费者的流程相当简单:CLI消费者 -> 获取消息 -> 将其传递给配置中的message_processor
键。
消息处理器是一个实现了NeedleProject\LaravelRabbitMq\Processor
接口的类。如果您不希望处理确认,可以扩展\NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor
,这需要实现processMessage(AMQPMessage $message): bool
方法。
message_processor
键由laravel的app
命令运行以构建类。
启动消息消费者/监听器
php artisan rabbitmq:consume aConsumerName
运行有限制的消费者(当达到其中一个限制时将停止)
php artisan rabbitmq:consume aConsumerName --time=60 --messages=100 --memory=64
这告诉消费者如果运行了1分钟或消费了100条消息或已达到64MB的内存使用量,则停止。
3.3. 可用命令
当运行php artisan
时,将出现一个新的命名空间
3.4. 自定义消息处理器
目前,有两种方式可以实现消息处理器:实现MessageProcessorInterface
类或扩展AbstractMessageProcessor
。
当使用AbstractMessageProcessor
时,您将可以访问额外的API,这些API可以在您的processMessage()
中使用。
protected function ack(AMQPMessage $message); protected function nack(AMQPMessage $message, bool $redeliver = true);
5. 贡献
您可以通过提交pull请求或报告任何在Github上的问题来贡献。在项目的当前阶段,没有定义任何贡献流程。
5.1 本地开发
运行composer install(使用ignore-platform-reqs以避免缺少扩展)
docker run --rm -v $(pwd):/app jitesoft/phpunit:8.1 composer install --ignore-platform-req=ext-sockets
通过Docker运行单元测试
docker run --rm -v $(pwd):/app jitesoft/phpunit:8.1 phpunit --configuration phpunit.xml
5.2. 需要的帮助
该项目需要帮助的多个主题
- CI管道:需要配置scrutinizer(或任何其他工具),以覆盖运行所有支持的PHP版本和Laravel框架版本的测试
- 文档:任何有助于简化库使用的改进都受欢迎
- 示例:一个证明库在不同实际场景示例的示例部分
6. 特殊的“感谢”
特殊的“感谢”献给库的贡献者。