huilin/rocketmq-client-php

rocketmq-client-php

dev-master 2022-10-19 13:23 UTC

This package is auto-updated.

Last update: 2024-09-19 18:53:22 UTC


README

Apache RocketMQ 的 PHP 客户端。 【如有问题可以加微信 f124816 反馈,备注RocketMQ Php Client】

依赖

安装

  1. 安装 rocketmq-client-cpp;(dist/Dockerfile 是构建 librocketmq.so 文件的示例)
  2. 执行 shell
    cp bin/librocketmq.so /usr/lib/; 
    mkdir /usr/include/rocketmq/ ; 
    cp include/* /usr/include/rocketmq/
  1. 安装 php-cpp
  2. 下载 rocketmq-client-php
    git clone https://github.com/lpflpf/rocketmq-client-php;
    cd rocketmq-client-php;
    make && make install
  1. 更新 php.ini 文件,添加行 extension=rocketmq.so;
  2. 尝试在 example 目录中运行示例。

用法

查看自动补全文件。

示例

生产者示例

namespace RocketMQ;
$instanceName = "MessageQueue";

$producer = new Producer($instanceName);
$producer->setInstanceName($instanceName);
$producer->setNamesrvAddr("127.0.0.1:9876");
$producer->start();

for ($i = 0; $i < 10000; $i ++){
    $message = new Message("TopicTest", "*", "hello world $i");
    $sendResult = $producer->send($message);
    echo $sendResult->getSendStatus() . "\n";
}

拉取消费者示例

将偏移量保存在本地是一个好主意。

namespace RocketMQ;

$consumer = new PullConsumer("pullTestGroup");
$consumer->setInstanceName("testGroup");
$consumer->setTopic("TopicTest");
$consumer->setNamesrvAddr("127.0.0.1:9876");
$consumer->start();
$queues = $consumer->getQueues();

foreach($queues as $queue){
    $newMsg = true;
    $offset = 0;
    while($newMsg){
        $pullResult = $consumer->pull($queue, "*", $offset, 8);
    
        switch ($pullResult->getPullStatus()){
        case PullStatus::FOUND:
            foreach($pullResult as $key => $val){
                echo $val->getMessage()->getBody() . "\n";
            }
            $offset += count($pullResult);
            break;
        default:
            $newMsg = false;
            break;
        }
    }
}

推送消费者示例

namespace RocketMQ;

$consumer = new PushConsumer("testGroup");
$consumer->setInstanceName("testGroup");
$consumer->setNamesrvAddr("127.0.0.1:9876");
$consumer->setThreadCount(10);
$consumer->setListenerType(MessageListenerType::LISTENER_ORDERLY);
$count = 0;
$consumer->setCallback(function ($msg) use (&$count){
    echo $msg->getMessage()->getBody() . "\n";
    $count ++;
});
$consumer->subscribe("TopicTest", "*");
$consumer->start();
$consumer->shutdown();

Docker

  1. 构建 rocketmq-client-cpp 容器。
cd dist
docker build -t rocketmq-client-cpp:1.2.2 ./dist
  1. 构建 rocketmq-client-php 容器
docker build -t rocketmq-client-php:1.0.0 .

待办事项

  1. 手动提交偏移。
  2. 日志处理。(rocketmq-client-cpp 不支持指定日志文件。)
  3. 文档。