jesusslim / mqttclient
PHP mqtt 客户端
1.52
2019-05-21 10:00 UTC
Requires
- php: >=5.6.0
- jesusslim/pinject: 1.2.3
README
PHP mqtt 客户端
用法
[英文] 中文
安装
安装
composer require jesusslim/mqttclient
如果你的 composer 不允许 dev-master,请添加以下配置
"minimum-stability": "dev"
到你的 composer.json 文件中。
需求
swoole 2.0.8+(swoole.so)
mosquitto.so
示例
我们可以使用 MqttClient,一个是基于 swoole(请参阅 swoole 示例,已弃用),另一个是基于 mosquitto(请参阅 mosquitto 示例)。
区别
- 需求区别。
- Swoole 存在包分离错误和崩溃错误。我们应该处理这个异常,例如,监控进程 ID,在崩溃或接收到错误包时重启。
- Swoole 客户端可以在一个客户端中订阅和发布,使用 mosquitto 需要两个客户端。
- Swoole 和 mosquitto 之间有一些语法差异。
基于 mosquitoo 的示例
定义你的 logger
class Logger implements \mqttclient\src\swoole\MqttLogInterface {
public function log($type,$content,$params = []){
echo "$type : $content \r\n";
}
}
使用 Mqttclient
$host = '127.0.0.1';
$port = 1883;
$r = new \mqttclient\src\mosquitto\MqttClient($host,$port,10017);
$r->setAuth('username','password');
$r->setKeepAlive(60);
$r->setLogger(new Logger());
$r->setMaxReconnectTimesWhenError(360*12);
//reconnect interval
$r->setReconnectInterval(10);
//subscribe topics,callback's params can be any data we mapped into the container(IOC)
$r->setTopics(
[
new \mqttclient\src\subscribe\Topic('test/slim',function($msg){
echo "I receive:".$msg."\r\n";}),
new \mqttclient\src\subscribe\Topic('test/slim3',function(\mqttclient\src\swoole\MqttClient $client,$msg){
echo "I receive:".$msg." for slim3 \r\n";
echo $client->getClientId();
})
]
);
//set trigger
$r->on(\mqttclient\src\consts\ClientTriggers::SOCKET_CONNECT,function(){
//do something
});
$r->start();
发送者
$host = '127.0.0.1';
$port = 1883;
$r = new \mqttclient\src\mosquitto\MqttSender($host,$port,10017);
$r->setAuth('username','password');
$r->setKeepAlive(60);
$r->setLogger(new Logger());
$r->setMaxReconnectTimesWhenError(360*12);
//reconnect interval
$r->setReconnectInterval(10);
$r->setQueue(new Queue());
$r->start();
它需要一个实现 mqttclient\src\mosquitto\MqttSendingQueue 的队列,以循环获取需要发送的消息。
基于 swoole 的示例(已弃用)
定义你的 logger
class Logger implements \mqttclient\src\swoole\MqttLogInterface {
public function log($type,$content,$params = []){
echo "$type : $content \r\n";
}
}
定义你的临时存储(使用 Redis/Memory/...)
class Store implements \mqttclient\src\swoole\TmpStorageInterface{
private $data = [];
public function set($message_type, $key, $sub_key, $data, $expire = 3600)
{
$this->data[$message_type][$key][$sub_key] = $data;
}
public function get($message_type, $key, $sub_key)
{
return $this->data[$message_type][$key][$sub_key];
}
public function delete($message_type, $key, $sub_key)
{
if (!isset($this->data[$message_type][$key][$sub_key])){
echo "storage not found:$message_type $key $sub_key";
}
unset($this->data[$message_type][$key][$sub_key]);
}
}
使用 MqttClient
$host = '127.0.0.1';
$port = 1883;
$r = new \mqttclient\src\swoole\MqttClient($host,$port,10017);
$r->setAuth('username','password');
$r->setKeepAlive(60);
$r->setLogger(new Logger());
$r->setStore(new Store());
//dns lookup
$r->setDnsLookup(true);
//buffer size
$r->setSocketBufferSize(1024*1024*5);
//reconnect times when error
$r->setMaxReconnectTimesWhenError(360*12);
//reconnect interval
$r->setReconnectInterval(10000);
//subscribe topics,callback's params can be any data we mapped into the container(IOC)
$r->setTopics(
[
new \mqttclient\src\subscribe\Topic('test/slim',function($msg){
echo "I receive:".$msg."\r\n";}),
new \mqttclient\src\subscribe\Topic('test/slim3',function(\mqttclient\src\swoole\MqttClient $client,$msg){
echo "I receive:".$msg." for slim3 \r\n";
echo $client->getClientId();
})
]
);
//set trigger
$r->on(\mqttclient\src\consts\ClientTriggers::RECEIVE_SUBACK,function(\mqttclient\src\swoole\MqttClient $client){
$client->publish('slim/echo','GGXX',\mqttclient\src\consts\Qos::ONE_TIME);
});
$r->connect();
$r->publish('test/slim','test qos',2);
扩展
您也可以使用自己的客户端扩展 MqttClient。
示例
class Client extends MqttClient
{
private $mysql_handler;
private $mongo_handler;
public function __construct($host,$port,$client_id,$mysql_conf,$mongo_conf)
{
$this->mysql_handler = new Mysqli($mysql_conf);
$this->mongo_handler = new \MongoClient('mongodb://'.$mongo_conf['username'].':'.$mongo_conf['password'].'@'.$mongo_conf['host'].':'.$mongo_conf['port'].'/'.$mongo_conf['db']);
parent::__construct($host,$port,$client_id);
}
/**
* override the produceContainer function and map your own class/data/closure to the injector,and they can be used in every publish receive handler
* for exp: $client->setTopics([new Topic('test/own',function($mongo,$msg){ $result = $mongo->selectCollection('log_platform','test')->find(['sid' => ['$gte' => intval($msg)]]); })]);
* @return Injector
*/
protected function produceContainer()
{
$container = new Injector();
$container->mapData(MqttClient::class,$this);
$container->mapData(Client::class,$this);
$container->mapData('mysqli',$this->mysql_handler);
$container->mapData('mongo',$this->mongo_handler);
return $container;
}
}