roiwk/rabbitmq

rabbitmq 异步(workerman)和同步客户端,生产者和消费者

v1.3.0 2024-08-07 03:00 UTC

This package is auto-updated.

Last update: 2024-09-07 03:13:43 UTC


README

rabbitmq 异步(workerman)和同步 PHP 客户端,生产者和消费者

rabbitmq 是一个异步(workerman)和同步的PHP客户端,用于异步(workerman)和同步的生产者和消费者。

依赖项

php >= 8.0

安装

composer require roiwk/rabbitmq

使用

所有示例

配置示例

// 配置格式
$config = [
    'host' => '127.0.0.1',
    'port' => 5672,
    'vhost' => '/',
    'mechanism' => 'AMQPLAIN',
    'user' => 'username',
    'password' => 'password',
    'timeout' => 10,
    'heartbeat' => 60,
    'heartbeat_callback' => function(){},
    'error_callback'     => null,
];

发布者示例

// 同步发布者  sync Publisher
Roiwk\Rabbitmq\Producer::connect($config)->publishSync('Hello World!', '', '', 'hello');

// 异步发布者  async Publisher(workerman)

use Workerman\Worker;

$worker = new Worker();
$worker->onWorkerStart = function() use($config) {
    Roiwk\Rabbitmq\Producer::connect($config)->publishAsync('Hello World!', '', '', 'hello');
};
Worker::runAll();

消费者示例

// 同步消费者  sync Consumer

use Bunny\AbstractClient;
use Roiwk\Rabbitmq\AbstractConsumer;

// style 1:
$client = new Roiwk\Rabbitmq\Client($config, null, '', '', 'hello');
$client->syncProcess(function(Message $message, Channel $channel, AbstractClient $client){
    echo " [x] Received ", $message->content, "\n";
    $channel->ack();
});

// style 2:
$consumer = new class ($config) extends AbstractConsumer {
    protected bool $async = false;
    protected string $queue = 'hello';
    protected array $consume = [
        'noAck' => true,
    ];
    public function consume(Message $message, Channel $channel, AbstractClient $client)
    {
        echo " [x] Received ", $message->content, "\n";
    }
};
$consumer->onWorkerStart(null);
// 异步消费者  async Consumer(workerman)

use Workerman\Worker;
use Bunny\AbstractClient;
use Roiwk\Rabbitmq\AbstractConsumer;

$worker = new Worker();

$consumer = new class ($config) extends AbstractConsumer {

    protected bool $async = true;

    protected string $queue = 'hello';

    protected array $consume = [
        'noAck' => true,
    ];

    public function consume(Message $message, Channel $channel, AbstractClient $client)
    {
        echo " [x] Received ", $message->content, "\n";
    }
};

$worker->onWorkerStart = [$consumer, 'onWorkerStart'];
Worker::runAll();

高级用法

webman中自定义进程--消费者

1.process.php

'hello-rabbitmq' => [
    'handler' => app\queue\rabbitmq\Hello::class,
    'count'   => 1,
    'constructor' => [
        'rabbitmqConfig' => $config,
        //'logger' => Log::channel('hello'),
    ],
]

2.app\queue\rabbitmq\Hello.php

namespace app\queue\rabbitmq;

use Roiwk\Rabbitmq\AbstractConsumer;
use Roiwk\Rabbitmq\Producer;
use Bunny\Channel;
use Bunny\Message;
use Bunny\AbstractClient;

class Hello extends AbstractConsumer
{
    protected bool $async = true;

    protected string $queue = 'hello';

    protected array $consume = [
        'noAck' => true,
    ];

    public function consume(Message $message, Channel $channel, AbstractClient $client)
    {
        echo " [x] Received ", $message->content, "\n";
    }
}

webman中自定义进程--分组消费者

类似于webman-queue插件,分组将消费者放在同一个文件夹下,使用同一个worker,多个进程数处理
1.process.php

'hello-rabbitmq' => [
    'handler' => Roiwk\Rabbitmq\GroupConsumers::class,
    'count'   => 2,
    'constructor' => [
        'consumer_dir' => app_path().'/queue/rabbimq',
        'rabbitmqConfig' => $config,
        //'logger' => Log::channel('hello'),
    ],
]

2.在 app_path().'/queue/rabbimq' 目录下创建php文件,继承Roiwk\Rabbitmq\AbstractConsumer即可,同上app\queue\rabbitmq\Hello.php

提示

  1. !!! 此库异步仅支持在workamn环境中,同步环境都支持。其他环境,如需异步,请使用bunny的客户端