vkoori/amqp

关于此包的最新版本(1.0.3)没有可用的许可证信息。

为Laravel/Lumen提供的AMQP驱动程序。支持RabbitMQ。

1.0.3 2023-05-19 17:04 UTC

This package is auto-updated.

Last update: 2024-09-19 20:10:04 UTC


README

安装

composer require vkoori/amqp

添加服务提供者

在Laravel应用程序中,将以下行添加到您的 config/app.php 文件。

'providers' => [
    //...
    \Kooriv\MessageBroker\Providers\AMQP::class,
    \Kooriv\MessageBroker\Providers\Config::class
]

在Lumen应用程序中,将以下行添加到您的 bootstrap/app.php 文件。

$app->register(\Kooriv\MessageBroker\Providers\AMQP::class);
$app->register(\Kooriv\MessageBroker\Providers\Config::class);

生成配置

在Laravel应用程序中,您可以使用以下命令。

php artisan vendor:publish --provider="Kooriv\MessageBroker\Providers\Config" --tag="config"

在Lumen应用程序中,创建具有以下内容的 config/amqp.php 文件。

<?php

use Kooriv\MessageBroker\RabbitMQ\Enum\Connections;
use Kooriv\MessageBroker\Event\Consumers;
use Kooriv\MessageBroker\Enum\ExchangeType;

return [

    'driver' => env('AMQP_DRIVER', 'rabbitMQ'),
    'events' => Consumers::class,

    'rabbitMQ' => [
        'connection_type' => Connections::STREAM,
        'hosts' => [
            [
                'host' => env('RABBITMQ_HOST', '127.0.0.1'),
                'port' => env('RABBITMQ_PORT', 5672),
                'user' => env('RABBITMQ_USER', 'guest'),
                'password' => env('RABBITMQ_PASSWORD', 'guest'),
                'vhost' => env('RABBITMQ_VHOST', '/'),
            ]
        ],
        'ssl_options' => [
            'cafile' => env('RABBITMQ_SSL_CA_FILE', null),
            'local_cert' => env('RABBITMQ_SSL_PUBLIC_FILE', null),
            'local_pk' => env('RABBITMQ_SSL_PRIVATE_FILE', null),
            'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
            'verify_peer_name' => env('RABBITMQ_SSL_PASSPHRASE', false),
        ],
    ],

    // if you want to publish failed jobs in to the message broker, use the `failed_jobs` key
    'failed_jobs' => [
        'queueName' => env(key:'APP_NAME', default: 'amqp') . '_failed_job',
        'exchangeName' =>  env(key:'APP_NAME', default: 'amqp') . '_failed_job',
        'exchangeType' => ExchangeType::TOPIC,
        'routing_keys' => [
            env(key:'APP_NAME', default: 'amqp').'amqp_job.failed'
        ]
    ],
];

开始使用

在消息代理中发布消息

您可以通过依赖注入或使用 app(AMQP::class) 来访问 AMQP

<?php

namespace App\Http\Controllers\AMQP;

use App\Http\Controllers\AMQP\Events\FirstQueue;
use Kooriv\MessageBroker\Contract\AMQP;

class GetStarted
{
    public function publish(AMQP $amqp)
    {
        $amqp
        ->publisher()
        ->dispatch(message: "test message", properties: [])
        ->onQueue(queue: new FirstQueue);
    }
}

您需要创建具有以下内容的 FirstQueue

<?php

namespace App\Http\Controllers\AMQP\Events;

use App\Http\Controllers\AMQP\Listeners\FirstJob;
use Kooriv\MessageBroker\Enum\ExchangeType;
use Kooriv\MessageBroker\Event\PubSub;

class FirstQueue extends PubSub
{
    protected string $queueName = 'queueName';
    protected string $exchangeName = 'exchangeName';
    protected ?ExchangeType $exchangeType = ExchangeType::TOPIC;
    protected array $routing_keys = [
        'payment_service.invoice.paid'
    ];
    protected array $callbacks=[
        FirstJob::class
    ];

}

注意

如果您不需要消费此事件,请不要设置 $callbacks

订阅消息并运行它们

注意

首先,需要更新 config/amqp.php 文件中的 events 键,并创建具有相同路径的类。

<?php

return [
    ...
    'events' => \App\Http\Controllers\AMQP\Consumers\Consumers::class,
    ...
];
<?php

namespace App\Http\Controllers\AMQP\Consumers;

use App\Http\Controllers\AMQP\Events\FirstQueue;
use Kooriv\MessageBroker\Event\Consumers as EventConsumers;

class Consumers extends EventConsumers
{
    protected array $pubSub = [
        FirstQueue::class
    ];
}

对于订阅,我们需要在 FirstQueue 类的 callback 方法中创建类。这些类应扩展 Kooriv\MessageBroker\Job 类。

<?php

namespace App\Http\Controllers\AMQP\Listeners;

use Kooriv\MessageBroker\Contract\MainJob;
use Kooriv\MessageBroker\Job;

class FirstJob extends Job
{
	public function payload(MainJob $event)
	{
		dump(
			$event->getAMQP(),
			$event->getBody(),
			// $event->getChannel(),
			$event->getConsumerTag(),
			$event->getExchange(),
			$event->getRoutingKey(),
			$event->get_properties()
		);
	}
}

输入以下命令以运行订阅。

php artisan consume

驱动程序支持

  1. RabbitMQ

即将推出

  • Kafka
  • Pulsar