vkoori / amqp
关于此包的最新版本(1.0.3)没有可用的许可证信息。
为Laravel/Lumen提供的AMQP驱动程序。支持RabbitMQ。
1.0.3
2023-05-19 17:04 UTC
Requires
- php: >=8.1
- ext-json: *
- php-amqplib/php-amqplib: ^3.5
Requires (Dev)
- laravel/lumen-framework: ^9.0|^10.0
- phpunit/phpunit: ^9.3
README
安装
composer require vkoori/amqp
添加服务提供者
在Laravel应用程序中,将以下行添加到您的 config/app.php
文件。
'providers' => [ //... \Kooriv\MessageBroker\Providers\AMQP::class, \Kooriv\MessageBroker\Providers\Config::class ]
在Lumen应用程序中,将以下行添加到您的 bootstrap/app.php
文件。
$app->register(\Kooriv\MessageBroker\Providers\AMQP::class); $app->register(\Kooriv\MessageBroker\Providers\Config::class);
生成配置
在Laravel应用程序中,您可以使用以下命令。
php artisan vendor:publish --provider="Kooriv\MessageBroker\Providers\Config" --tag="config"
在Lumen应用程序中,创建具有以下内容的 config/amqp.php
文件。
<?php use Kooriv\MessageBroker\RabbitMQ\Enum\Connections; use Kooriv\MessageBroker\Event\Consumers; use Kooriv\MessageBroker\Enum\ExchangeType; return [ 'driver' => env('AMQP_DRIVER', 'rabbitMQ'), 'events' => Consumers::class, 'rabbitMQ' => [ 'connection_type' => Connections::STREAM, 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ] ], 'ssl_options' => [ 'cafile' => env('RABBITMQ_SSL_CA_FILE', null), 'local_cert' => env('RABBITMQ_SSL_PUBLIC_FILE', null), 'local_pk' => env('RABBITMQ_SSL_PRIVATE_FILE', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), 'verify_peer_name' => env('RABBITMQ_SSL_PASSPHRASE', false), ], ], // if you want to publish failed jobs in to the message broker, use the `failed_jobs` key 'failed_jobs' => [ 'queueName' => env(key:'APP_NAME', default: 'amqp') . '_failed_job', 'exchangeName' => env(key:'APP_NAME', default: 'amqp') . '_failed_job', 'exchangeType' => ExchangeType::TOPIC, 'routing_keys' => [ env(key:'APP_NAME', default: 'amqp').'amqp_job.failed' ] ], ];
开始使用
在消息代理中发布消息
您可以通过依赖注入或使用 app(AMQP::class)
来访问 AMQP
。
<?php namespace App\Http\Controllers\AMQP; use App\Http\Controllers\AMQP\Events\FirstQueue; use Kooriv\MessageBroker\Contract\AMQP; class GetStarted { public function publish(AMQP $amqp) { $amqp ->publisher() ->dispatch(message: "test message", properties: []) ->onQueue(queue: new FirstQueue); } }
您需要创建具有以下内容的 FirstQueue
。
<?php namespace App\Http\Controllers\AMQP\Events; use App\Http\Controllers\AMQP\Listeners\FirstJob; use Kooriv\MessageBroker\Enum\ExchangeType; use Kooriv\MessageBroker\Event\PubSub; class FirstQueue extends PubSub { protected string $queueName = 'queueName'; protected string $exchangeName = 'exchangeName'; protected ?ExchangeType $exchangeType = ExchangeType::TOPIC; protected array $routing_keys = [ 'payment_service.invoice.paid' ]; protected array $callbacks=[ FirstJob::class ]; }
注意
如果您不需要消费此事件,请不要设置
$callbacks
订阅消息并运行它们
注意
首先,需要更新
config/amqp.php
文件中的events
键,并创建具有相同路径的类。
<?php return [ ... 'events' => \App\Http\Controllers\AMQP\Consumers\Consumers::class, ... ];
<?php namespace App\Http\Controllers\AMQP\Consumers; use App\Http\Controllers\AMQP\Events\FirstQueue; use Kooriv\MessageBroker\Event\Consumers as EventConsumers; class Consumers extends EventConsumers { protected array $pubSub = [ FirstQueue::class ]; }
对于订阅,我们需要在 FirstQueue
类的 callback
方法中创建类。这些类应扩展 Kooriv\MessageBroker\Job
类。
<?php namespace App\Http\Controllers\AMQP\Listeners; use Kooriv\MessageBroker\Contract\MainJob; use Kooriv\MessageBroker\Job; class FirstJob extends Job { public function payload(MainJob $event) { dump( $event->getAMQP(), $event->getBody(), // $event->getChannel(), $event->getConsumerTag(), $event->getExchange(), $event->getRoutingKey(), $event->get_properties() ); } }
输入以下命令以运行订阅。
php artisan consume
驱动程序支持
- RabbitMQ
即将推出
- Kafka
- Pulsar