leroy-merlin-br/laravel-pubsub

Laravel 的 Pub-Sub 抽象

6.0.3 2021-02-25 14:45 UTC

README

Laravel 的 Pub-Sub 抽象。 从 Entanet/laravel-pubsub 分支而来

Author Software License Packagist Version Total Downloads

此包是一个包装器,将 php-pubsub 引入 Laravel。

要支持 Laravel 4,请使用包 https://github.com/Superbalist/laravel4-pubsub

请注意,Laravel 5.3 仅支持到版本 2.0.2。

2.0.3+ 支持 Laravel 5.4 及以上版本

6.0.0 支持 Laravel 8.x

以下适配器得到支持

  • 本地
  • /dev/null
  • Redis
  • Kafka(请参阅以下单独的安装说明)
  • Google Cloud
  • HTTP

安装

composer require leroy-merlin-br/laravel-pubsub

在 app.php 中注册服务提供者

'providers' => [
    // ...
    LeroyMerlin\LaravelPubSub\PubSubServiceProvider::class,
]

在 app.php 中注册外观

'aliases' => [
    // ...
    'PubSub' => LeroyMerlin\LaravelPubSub\PubSubFacade::class,
]

该包有一个默认配置,使用以下环境变量。

PUBSUB_CONNECTION=redis

REDIS_HOST=localhost
REDIS_PASSWORD=null
REDIS_PORT=6379

KAFKA_BROKERS=localhost
KAFKA_SECURITY_PROTOCOL=kafka-security-protocol
KAFKA_SASL_USERNAME=kafka-sasl-username
KAFKA_SASL_PASSWORD=kafks-sasl-password

GOOGLE_CLOUD_PROJECT_ID=your-project-id-here
GOOGLE_CLOUD_KEY_FILE=path/to/your/gcloud-key.json

HTTP_PUBSUB_URI=null
HTTP_PUBSUB_SUBSCRIBE_CONNECTION=redis

如果 KAFKA_SECURITY_PROTOCOL 设置为 "SASL_SSL" 或 "SASL_PLAINTEXT",则将使用存储在 KAFKA_SASL_USERNAME & KAFKA_SASL_PASSWORD 中的凭证对 Kafka 服务器/集群进行认证。

要自定义配置文件,请使用 Artisan 发布包配置。

php artisan vendor:publish --provider="LeroyMerlin\LaravelPubSub\PubSubServiceProvider"

然后,您可以在 app/config/pubsub.php 中编辑生成的配置。

Kafka 适配器安装

请注意,尽管该包捆绑了对 php-pubsub-kafka 适配器的支持,但默认情况下不包括该适配器。

这是因为 KafkaPubSubAdapter 依赖于外部依赖项 librdkafka C 库php-rdkafka PECL 扩展。

如果您打算使用此适配器,您需要按照以下 安装说明 安装这些依赖项。

然后,您可以使用以下方式包含适配器

composer require superbalist/php-pubsub-kafka

使用方法

// get the pub-sub manager
$pubsub = app('pubsub');

// note: function calls on the manager are proxied through to the default connection
// eg: you can do this on the manager OR a connection
$pubsub->publish('channel_name', 'message');

// get the default connection
$pubsub = app('pubsub.connection');
// or
$pubsub = app(\LeroyMerlin\LaravelPubSub\Contracts\AdapterInterface::class);

// get a specific connection
$pubsub = app('pubsub')->connection('redis');

// publish a message
// the message can be a string, array, bool, object - anything which can be json encoded
$pubsub->publish('channel_name', 'this is where your message goes');
$pubsub->publish('channel_name', ['key' => 'value']);
$pubsub->publish('channel_name', true);

// publish multiple messages
$messages = [
    'message 1',
    'message 2',
];
$pubsub->publishBatch('channel_name', $messages);

// subscribe to a channel
$pubsub->subscribe('channel_name', function ($message) {
    var_dump($message);
});

// all the above commands can also be done using the facade
PubSub::connection('kafka')->publish('channel_name', 'Hello World!');

PubSub::connection('kafka')->subscribe('channel_name', function ($message) {
    var_dump($message);
});

创建一个订阅者

该包包含一个辅助命令 php artisan make:subscriber MyExampleSubscriber,用于生成新的订阅者命令类。

许多 pub-sub 适配器都包含阻塞的 subscribe() 调用,因此这些命令最好作为作为 supervisor 进程运行的守护进程运行。

此生成命令将创建文件 app/Console/Commands/MyExampleSubscriber.php,其中将包含

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use LeroyMerlin\LaravelPubSub\Contracts\AdapterInterface;

class MyExampleSubscriber extends Command
{
    /**
     * The name and signature of the subscriber command.
     *
     * @var string
     */
    protected $signature = 'subscriber:name';

    /**
     * The subscriber description.
     *
     * @var string
     */
    protected $description = 'PubSub subscriber for ________';

    /**
     * @var AdapterInterface
     */
    protected $pubsub;

    /**
     * Create a new command instance.
     *
     * @param AdapterInterface $pubsub
     */
    public function __construct(AdapterInterface $pubsub)
    {
        parent::__construct();

        $this->pubsub = $pubsub;
    }

    /**
     * Execute the console command.
     */
    public function handle()
    {
        $this->pubsub->subscribe('channel_name', function ($message) {

        });
    }
}

Kafka 订阅者

对于使用 php-pubsub-kafka 适配器的订阅者,您可能希望为每个订阅者更改 consumer_group_id

为此,您需要使用 PubSubConnectionFactory 为每个订阅者创建新的连接。这是因为一旦创建连接,就无法更改 consumer_group_id

以下是如何做到这一点的示例

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use LeroyMerlin\LaravelPubSub\PubSubConnectionFactory;
use Superbalist\PubSub\AdapterInterface;

class MyExampleKafkaSubscriber extends Command
{
    /**
     * The name and signature of the subscriber command.
     *
     * @var string
     */
    protected $signature = 'subscriber:name';

    /**
     * The subscriber description.
     *
     * @var string
     */
    protected $description = 'PubSub subscriber for ________';

    /**
     * @var AdapterInterface
     */
    protected $pubsub;

    /**
     * Create a new command instance.
     *
     * @param PubSubConnectionFactory $factory
     */
    public function __construct(PubSubConnectionFactory $factory)
    {
        parent::__construct();

        $config = config('pubsub.connections.kafka');
        $config['consumer_group_id'] = self::class;
        $this->pubsub = $factory->make('kafka', $config);
    }

    /**
     * Execute the console command.
     */
    public function handle()
    {
        $this->pubsub->subscribe('channel_name', function ($message) {

        });
    }
}

添加自定义驱动程序

请参阅 php-pubsub 文档中的 编写适配器

要包含您的自定义驱动程序,您可以使用 extend() 函数。

$manager = app('pubsub');
$manager->extend('custom_connection_name', function ($config) {
    // your callable must return an instance of the AdapterInterface
    return new MyCustomPubSubDriver($config);
});

// get an instance of your custom connection
$pubsub = $manager->connection('custom_connection_name');