needle-project/laravel-rabbitmq

为Laravel提供简单的RabbitMQ集成

v0.5.0 2024-03-22 14:12 UTC

README

Scrutinizer Code Quality Code Coverage Total Downloads

Laravel RabbitMQ

一个简单的基于发布-订阅模式的RabbitMQ库,其中订阅者是消费者

目录

  1. 安装

  2. 配置

    2.1. 连接

    2.2. 队列

    2.3. 交换机

    2.4. 发布者

    2.5. 消费者

  3. 使用方法

    3.1. 发布消息

    3.2. 消费消息

    3.3. 可用的CLI命令

    3.4. 自定义消息处理器

  4. 示例

  5. 贡献

    5.1 本地开发

    5.2 需要的帮助

  6. 特别感谢

1. 安装

运行

composer require needle-project/laravel-rabbitmq

对于5.5或更高版本的Laravel,库应该通过包发现自动加载。

对于低于5.5版本的Laravel,您需要将服务提供者添加到app.php

<?php

return [
    // ...
    'providers' => [
        // ...
        NeedleProject\LaravelRabbitMq\Providers\ServiceProvider::class,
    ],
    // ...
];

2. 配置

  • 在Laravel的配置目录中创建一个名为laravel_rabbitmq.php的新文件。 (或使用artisan vendor:publish - 更多信息这里)
  • 根据您的需求填写配置。

配置文件有5个主要节点:连接交换机队列发布者消费者

它们在以下模式下使用:配置流程

示例配置

return [
    'connections' => [
        'connectionA' => [/** Connection A attributes */],
        'connectionB' => [/** Connection B attributes */],
    ],
    'exchanges' => [
        'exchangeA' => [
            // Tells that the exchange will use the connection A
            'connection' => 'connectionA',
            /** Exchange A Attributes */
        ],
        'exchangeB' => [
            // Tells that the exchange will use the connection B
            'connection' => 'connectionB',
            /** Exchange B Attributes */
        ]
    ],
    'queues' => [
        'queueA' => [
            // Tells that the queue will use the connection alias A
            'connection' => 'connectionA',
            /** Queue A Attributes */
        ]
    ],
    'publishers' => [
        'aPublisherName' => /** will publish to exchange defined by alias */ 'exchangeA'
    ],
    'consumers' => [
        'aConsumerName' => [
            // will read messages from
            'queue' => 'queueA',
            // and will send the for processing to an "NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface"
            'message_processor' => \NeedleProject\LaravelRabbitMq\Processor\CliOutputProcessor::class
        ]
    ]
]

2.1. 连接

连接属性

  • 所有属性都是可选的,如果没有定义,将使用默认值。

2.2. 队列

队列主要节点

队列属性

示例 1

[
	['exchange' => 'first.exchange', 'routing_key' => '*'],
	['exchange' => 'second.exchange', 'routing_key' => 'foo_bar'],
]

2.3. 交换机

交换机主要节点

交换机属性

示例 2

[
	['queue' => 'first.exchange', 'routing_key' => '*'],
	['queue' => 'second.exchange', 'routing_key' => 'foo_bar'],
]

2.4. 发布者

发布者向一个交换机(也可以向队列)推送消息。定义发布者

'publishers' => [
	'myFirstPublisher' => 'echangeAliasName',
	'mySecondPublisher' => 'queueAliasName'
	// and many as you need
]

2.5. 消费者

消费者将从队列中获取消息。定义消费者

'consumers' => [
    'myConsumerName' => [
        'queue' => 'queueAliasName',
        'prefetch_count' => 1,
        'message_processor' => \NeedleProject\LaravelRabbitMq\Processor\CliOutputProcessor::class
    ]
]

3. 使用方法

配置完成后,您应该得到一个类似于以下内容的配置文件laravel_rabbitmq.php

return [
    'connections' => [
        'connectionA' => [],
    ],
    'exchanges' => [
        'exchangeA' => [
            'connection' => 'connectionA',
			'name' => 'foo_bar',
			'attributes' => [
				'exchange_type' => 'topic'
			]
        ]
	],
    'queues' => [
        'queueB' => [
            'connection' => 'connectionA',
            'name' => 'foo_bar_listener',
			'attributes' => [
				'bind' => [
                    ['exchange' => 'foo_bar', 'routing_key' => '*']
                ]
			]
        ]
    ],
    'publishers' => [
        'aPublisherName' => 'exchangeA'
    ],
    'consumers' => [
        'aConsumerName' => [
            'queue' => 'queueB',
            'message_processor' => \NeedleProject\LaravelRabbitMq\Processor\CliOutputProcessor::class
        ]
    ]
]

3.1. 发布消息

代码中的使用示例

<?php
/**
 * @var $app \Illuminate\Contracts\Container\Container
 * @var $publisher \NeedleProject\LaravelRabbitMq\PublisherInterface 
 */
$publisher = $app->makeWith(PublisherInterface::class, ['aPublisherName']);
$message = [
    'title' => 'Hello world',
    'body' => 'Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.',
];
$routingKey = '*';
$publisher->publish(json_encode($message), /* optional */$routingKey);

可选,有一个命令可以用来发布消息。

php artisan rabbitmq:publish aPublisherName MyMessage

注意:目前,CLI中的路由键不受支持。

3.2. 消费消息

消费消息应该通过运行守护模式下的命令来完成。虽然PHP不适合做这个,但您可以使用supervisor来完成。

消费者的流程相当简单:CLI消费者 -> 获取消息 -> 将其传递给配置中的message_processor键。

消息处理器是一个实现了NeedleProject\LaravelRabbitMq\Processor接口的类。如果您不希望处理确认,可以扩展\NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor,这需要实现processMessage(AMQPMessage $message): bool方法。

message_processor键由laravel的app命令运行以构建类。

启动消息消费者/监听器

php artisan rabbitmq:consume aConsumerName

运行有限制的消费者(当达到其中一个限制时将停止)

php artisan rabbitmq:consume aConsumerName --time=60 --messages=100 --memory=64

这告诉消费者如果运行了1分钟或消费了100条消息或已达到64MB的内存使用量,则停止。

3.3. 可用命令

当运行php artisan时,将出现一个新的命名空间

3.4. 自定义消息处理器

目前,有两种方式可以实现消息处理器:实现MessageProcessorInterface类或扩展AbstractMessageProcessor

当使用AbstractMessageProcessor时,您将可以访问额外的API,这些API可以在您的processMessage()中使用。

protected function ack(AMQPMessage $message);
protected function nack(AMQPMessage $message, bool $redeliver = true);

5. 贡献

您可以通过提交pull请求或报告任何在Github上的问题来贡献。在项目的当前阶段,没有定义任何贡献流程。

5.1 本地开发

运行composer install(使用ignore-platform-reqs以避免缺少扩展)

 docker run --rm -v $(pwd):/app jitesoft/phpunit:8.1 composer install --ignore-platform-req=ext-sockets

通过Docker运行单元测试

docker run --rm -v $(pwd):/app jitesoft/phpunit:8.1 phpunit --configuration phpunit.xml

5.2. 需要的帮助

该项目需要帮助的多个主题

  • CI管道:需要配置scrutinizer(或任何其他工具),以覆盖运行所有支持的PHP版本和Laravel框架版本的测试
  • 文档:任何有助于简化库使用的改进都受欢迎
  • 示例:一个证明库在不同实际场景示例的示例部分

6. 特殊的“感谢”

特殊的“感谢”献给库的贡献者