leroy-merlin-br / laravel-pubsub
Laravel 的 Pub-Sub 抽象
Requires
- php: >=7.4.0
- ext-json: *
- google/cloud: ^0.150.0
- guzzlehttp/guzzle: ^7.2
- illuminate/console: ^8.0
- illuminate/support: ^8.0
- predis/predis: ^1.1
Requires (Dev)
- mockery/mockery: ^1.4.2
- phpunit/phpunit: ^9.0
Conflicts
- dev-master
- 6.0.3
- 6.0.2
- 6.0.1
- 6.0.0
- 5.0.0
- 4.0.0
- 3.2.1
- 3.2.0
- 3.1.2
- 3.1.1
- 3.1.0
- 3.0.0
- 2.0.5
- 2.0.4
- 2.0.3
- 2.0.2
- 2.0.1
- 2.0.0
- 1.0.2
- 1.0.1
- 1.0.0
- dev-http-adapter-tests
- dev-doc/redis-test-explain
- dev-add-kafka-adapter
- dev-Laravel-7
- dev-Laravel_6_support
- dev-bugfix_rdkafka_v4_support
- dev-SASL_config_via_env
- dev-SASL-support
- dev-laravel-54-compat
This package is auto-updated.
Last update: 2024-08-25 22:15:32 UTC
README
Laravel 的 Pub-Sub 抽象。 从 Entanet/laravel-pubsub 分支而来
此包是一个包装器,将 php-pubsub 引入 Laravel。
要支持 Laravel 4,请使用包 https://github.com/Superbalist/laravel4-pubsub
请注意,Laravel 5.3 仅支持到版本 2.0.2。
2.0.3+ 支持 Laravel 5.4 及以上版本。
6.0.0 支持 Laravel 8.x。
以下适配器得到支持
- 本地
- /dev/null
- Redis
- Kafka(请参阅以下单独的安装说明)
- Google Cloud
- HTTP
安装
composer require leroy-merlin-br/laravel-pubsub
在 app.php 中注册服务提供者
'providers' => [ // ... LeroyMerlin\LaravelPubSub\PubSubServiceProvider::class, ]
在 app.php 中注册外观
'aliases' => [ // ... 'PubSub' => LeroyMerlin\LaravelPubSub\PubSubFacade::class, ]
该包有一个默认配置,使用以下环境变量。
PUBSUB_CONNECTION=redis
REDIS_HOST=localhost
REDIS_PASSWORD=null
REDIS_PORT=6379
KAFKA_BROKERS=localhost
KAFKA_SECURITY_PROTOCOL=kafka-security-protocol
KAFKA_SASL_USERNAME=kafka-sasl-username
KAFKA_SASL_PASSWORD=kafks-sasl-password
GOOGLE_CLOUD_PROJECT_ID=your-project-id-here
GOOGLE_CLOUD_KEY_FILE=path/to/your/gcloud-key.json
HTTP_PUBSUB_URI=null
HTTP_PUBSUB_SUBSCRIBE_CONNECTION=redis
如果 KAFKA_SECURITY_PROTOCOL 设置为 "SASL_SSL" 或 "SASL_PLAINTEXT",则将使用存储在 KAFKA_SASL_USERNAME & KAFKA_SASL_PASSWORD 中的凭证对 Kafka 服务器/集群进行认证。
要自定义配置文件,请使用 Artisan 发布包配置。
php artisan vendor:publish --provider="LeroyMerlin\LaravelPubSub\PubSubServiceProvider"
然后,您可以在 app/config/pubsub.php
中编辑生成的配置。
Kafka 适配器安装
请注意,尽管该包捆绑了对 php-pubsub-kafka 适配器的支持,但默认情况下不包括该适配器。
这是因为 KafkaPubSubAdapter 依赖于外部依赖项 librdkafka C 库
和 php-rdkafka
PECL 扩展。
如果您打算使用此适配器,您需要按照以下 安装说明 安装这些依赖项。
然后,您可以使用以下方式包含适配器
composer require superbalist/php-pubsub-kafka
使用方法
// get the pub-sub manager $pubsub = app('pubsub'); // note: function calls on the manager are proxied through to the default connection // eg: you can do this on the manager OR a connection $pubsub->publish('channel_name', 'message'); // get the default connection $pubsub = app('pubsub.connection'); // or $pubsub = app(\LeroyMerlin\LaravelPubSub\Contracts\AdapterInterface::class); // get a specific connection $pubsub = app('pubsub')->connection('redis'); // publish a message // the message can be a string, array, bool, object - anything which can be json encoded $pubsub->publish('channel_name', 'this is where your message goes'); $pubsub->publish('channel_name', ['key' => 'value']); $pubsub->publish('channel_name', true); // publish multiple messages $messages = [ 'message 1', 'message 2', ]; $pubsub->publishBatch('channel_name', $messages); // subscribe to a channel $pubsub->subscribe('channel_name', function ($message) { var_dump($message); }); // all the above commands can also be done using the facade PubSub::connection('kafka')->publish('channel_name', 'Hello World!'); PubSub::connection('kafka')->subscribe('channel_name', function ($message) { var_dump($message); });
创建一个订阅者
该包包含一个辅助命令 php artisan make:subscriber MyExampleSubscriber
,用于生成新的订阅者命令类。
许多 pub-sub 适配器都包含阻塞的 subscribe()
调用,因此这些命令最好作为作为 supervisor 进程运行的守护进程运行。
此生成命令将创建文件 app/Console/Commands/MyExampleSubscriber.php
,其中将包含
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use LeroyMerlin\LaravelPubSub\Contracts\AdapterInterface; class MyExampleSubscriber extends Command { /** * The name and signature of the subscriber command. * * @var string */ protected $signature = 'subscriber:name'; /** * The subscriber description. * * @var string */ protected $description = 'PubSub subscriber for ________'; /** * @var AdapterInterface */ protected $pubsub; /** * Create a new command instance. * * @param AdapterInterface $pubsub */ public function __construct(AdapterInterface $pubsub) { parent::__construct(); $this->pubsub = $pubsub; } /** * Execute the console command. */ public function handle() { $this->pubsub->subscribe('channel_name', function ($message) { }); } }
Kafka 订阅者
对于使用 php-pubsub-kafka
适配器的订阅者,您可能希望为每个订阅者更改 consumer_group_id
。
为此,您需要使用 PubSubConnectionFactory
为每个订阅者创建新的连接。这是因为一旦创建连接,就无法更改 consumer_group_id
。
以下是如何做到这一点的示例
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use LeroyMerlin\LaravelPubSub\PubSubConnectionFactory; use Superbalist\PubSub\AdapterInterface; class MyExampleKafkaSubscriber extends Command { /** * The name and signature of the subscriber command. * * @var string */ protected $signature = 'subscriber:name'; /** * The subscriber description. * * @var string */ protected $description = 'PubSub subscriber for ________'; /** * @var AdapterInterface */ protected $pubsub; /** * Create a new command instance. * * @param PubSubConnectionFactory $factory */ public function __construct(PubSubConnectionFactory $factory) { parent::__construct(); $config = config('pubsub.connections.kafka'); $config['consumer_group_id'] = self::class; $this->pubsub = $factory->make('kafka', $config); } /** * Execute the console command. */ public function handle() { $this->pubsub->subscribe('channel_name', function ($message) { }); } }
添加自定义驱动程序
请参阅 php-pubsub 文档中的 编写适配器。
要包含您的自定义驱动程序,您可以使用 extend()
函数。
$manager = app('pubsub'); $manager->extend('custom_connection_name', function ($config) { // your callable must return an instance of the AdapterInterface return new MyCustomPubSubDriver($config); }); // get an instance of your custom connection $pubsub = $manager->connection('custom_connection_name');