v-krivenko/laravel-rabbitmq

为 Laravel 简单的 rabbitmq 集成

1.0.2 2024-02-15 17:35 UTC

This package is auto-updated.

Last update: 2024-09-15 19:02:19 UTC


README

Build Status Scrutinizer Code Quality Code Coverage Total Downloads

Laravel RabbitMQ

基于发布-订阅模式(订阅者为消费者)的简单 rabbitmq 库

目录

  1. 安装

  2. 配置

    2.1. 连接

    2.2. 队列

    2.3. 交换器

    2.4. 发布者

    2.5. 消费者

  3. 使用方法

    3.1. 发布消息

    3.2. 消费消息

    3.3. 可用 CLI 命令

    3.4. 自定义消息处理器

  4. 贡献

1. 安装

运行

composer require needle-project/laravel-rabbitmq

对于 Laravel 5.5 或更高版本,库应通过 包发现 自动加载。

对于低于 5.5 的 Laravel 版本,您需要将服务提供者添加到 app.php

<?php

return [
    // ...
    'providers' => [
        // ...
        NeedleProject\LaravelRabbitMq\Providers\ServiceProvider::class,
    ],
    // ...
];

2. 配置

  • 在您的 Laravel 配置目录内创建一个名为 laravel_rabbitmq.php 的新文件。(或者使用 artisan vendor:publish - 更多信息请参见 这里
  • 根据您的需求填写配置。

配置文件有 5 个主要节点:连接交换器队列发布者消费者

它们按以下模式使用:配置流程

示例配置

return [
    'connections' => [
        'connectionA' => [/** Connection A attributes */],
        'connectionB' => [/** Connection B attributes */],
    ],
    'exchanges' => [
        'exchangeA' => [
            // Tells that the exchange will use the connection A
            'connection' => 'connectionA',
            /** Exchange A Attributes */
        ],
        'exchangeB' => [
            // Tells that the exchange will use the connection B
            'connection' => 'connectionB',
            /** Exchange B Attributes */
        ]
    ],
    'queues' => [
        'queueA' => [
            // Tells that the queue will use the connection alias A
            'connection' => 'connectionA',
            /** Queue A Attributes */
        ]
    ],
    'publishers' => [
        'aPublisherName' => /** will publish to exchange defined by alias */ 'exchangeA'
    ],
    'consumers' => [
        'aConsumerName' => [
            // will read messages from
            'queue' => 'queueA',
            // and will send the for processing to an "NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface"
            'message_processor' => \NeedleProject\LaravelRabbitMq\Processor\CliOutputProcessor::class
        ]
    ]
]

2.1. 连接

连接属性

  • 所有属性都是可选的,如果没有定义,将使用默认值。

2.2. 队列

队列主要节点

队列属性

示例 1

[
	['exchange' => 'first.exchange', 'routing_key' => '*'],
	['exchange' => 'second.exchange', 'routing_key' => 'foo_bar'],
]

2.3. 交换器

交换器主要节点

交换器属性

示例 2

[
	['queue' => 'first.exchange', 'routing_key' => '*'],
	['queue' => 'second.exchange', 'routing_key' => 'foo_bar'],
]

2.4. 发布者

发布者将消息推送到一个 交换器(但它也可以将其推送到队列)。定义发布者

'publishers' => [
	'myFirstPublisher' => 'echangeAliasName',
	'mySecondPublisher' => 'queueAliasName'
	// and many as you need
]

2.5. 消费者

消费者将从队列中获取消息。定义消费者

'consumers' => [
    'myConsumerName' => [
        'queue' => 'queueAliasName',
        'prefetch_count' => 1,
        'message_processor' => \NeedleProject\LaravelRabbitMq\Processor\CliOutputProcessor::class
    ]
]

3. 使用方法

配置完成后,您应该得到一个类似于以下内容的配置文件 laravel_rabbitmq.php

return [
    'connections' => [
        'connectionA' => [],
    ],
    'exchanges' => [
        'exchangeA' => [
            'connection' => 'connectionA',
			'name' => 'foo_bar',
			'attributes' => [
				'exchange_type' => 'topic'
			]
        ]
	],
    'queues' => [
        'queueB' => [
            'connection' => 'connectionA',
            'name' => 'foo_bar_listener',
			'attributes' => [
				'bind' => [
                    ['exchange' => 'foo_bar', 'routing_key' => '*']
                ]
			]
        ]
    ],
    'publishers' => [
        'aPublisherName' => 'exchangeA'
    ],
    'consumers' => [
        'aConsumerName' => [
            'queue' => 'queueB',
            'message_processor' => \NeedleProject\LaravelRabbitMq\Processor\CliOutputProcessor::class
        ]
    ]
]

3.1. 发布消息

代码示例

<?php
/**
 * @var $app \Illuminate\Contracts\Container\Container
 * @var $publisher \NeedleProject\LaravelRabbitMq\PublisherInterface 
 */
$publisher = $app->makeWith(PublisherInterface::class, ['aPublisherName']);
$message = [
    'title' => 'Hello world',
    'body' => 'Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.',
];
$routingKey = '*';
$publisher->publish(json_encode($message), /* optional */$routingKey);

可选,有一个命令可以用来发布消息。

php artisan rabbitmq:publish aPublisherName MyMessage

注意:目前,CLI 中的路由键不支持。

3.2. 消费消息

应通过在守护程序模式下运行命令来消费消息。虽然 PHP 不旨在这样做,但您可以使用 supervisor 来实现。

消费者的 流程 相对简单:CLI 消费者 -> 获取消息 -> 将其传递给配置中的 message_processor 键。

消息处理器是一个实现了 NeedleProject\LaravelRabbitMq\Processor 接口的类。如果您不想处理确认,可以扩展 \NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor,它需要实现 processMessage(AMQPMessage $message): bool 方法。

您使用的 message_processor 键是由 Laravel 的 app 命令运行来构建类的。

启动消息消费者/监听器

php artisan rabbitmq:consume aConsumerName

运行带有限制的消费者(当达到任一限制时将停止)

php artisan rabbitmq:consume aConsumerName --time=60 --messages=100 --memory=64

这表示如果消费者运行了1分钟或消费了100条消息或已达到64MB的内存使用量,则消费者将停止。

3.3. 可用命令

当运行 php artisan 时,将出现一个新的命名空间

3.4. 自定义消息处理器

目前,您可以选择实现 MessageProcessorInterface 类或扩展 AbstractMessageProcessor

当使用 AbstractMessageProcessor 时,您将能够访问额外的 API,这些 API 可以用于您的 processMessage() 中。

protected function ack(AMQPMessage $message);
protected function nack(AMQPMessage $message, bool $redeliver = true);

4. 贡献

您可以通过提交拉取请求或在 Github 上报告任何问题来自由地贡献。在项目的当前阶段,尚未定义任何贡献流程。