markkimsal / amphp-mqtt
Amp的异步MQTT客户端
1.3.0
2018-05-23 21:12 UTC
Requires
- amphp/amp: ^2
- amphp/socket: ^0.10
- amphp/uri: ^0.1
- evenement/evenement: ~3.0
This package is not auto-updated.
Last update: 2024-09-20 21:18:03 UTC
README
markkimsal/amphp-mqtt
是基于Amp的PHP异步MQTT客户端。
安装
composer require markkimsal/amphp-mqtt
此项目不需要任何PHP扩展。
用法
每个发布或订阅操作都会返回一个 Amp\Promise
,您可以在它解析时做出反应。消息通过 ->on('message')
接收,并接受一个只有一个参数的回调函数:Packet\Publish
类。 Packet\Publish
用于发送和接收消息。
在收到连接确认包之前,您可以开始发送和订阅,系统会排队您的数据包,但仍然立即返回一个 Amp\Promise
。
QoS 0 数据包在发送后会立即解析,因为它们不会从服务器收到任何确认。
QoS 1 数据包在客户端收到 Puback 包后会解析。
QoS 2 数据包在客户端收到 Pubcomp 包后会解析。客户端将自动根据规范用 Pubrel 响应 Pubrec。
支持通过使用 "tls://" 前缀到服务器的URI来使用 TLSv1.2。不会执行对等验证。
支持使用连接URL的参数进行用户名和密码认证。
<?php include('vendor/autoload.php'); use \Amp\Loop; use \MarkKimsal\Mqtt\Client; Loop::run( function($w) { $client = new Client('tcp://172.17.0.1:1883?topics=foo,bar&clientId=abc123'); $p = $client->connect(); $p2 = $client->subscribe('test/', function($err, $resp) { echo "***** SUBSCRIBE Resolved *******\n"; var_dump($err); var_dump($resp); }); $p->onResolve(function($err, $resp) use($p, $client){ echo "****** CONNECT Resolved ********\n"; }); $p2->onResolve(function($err, $res) { echo "***** SUBSCRIBE Resolved in a different way *******\n"; var_dump($err); var_dump($res); }); $client->on('message', function($publishPacket) { echo "****** got a message on topic: [".$publishPacket->getTopic()."] ***** \n"; echo $publishPacket->getMessage()."\n"; }); Loop::repeat(1000, function() use($client){ $client->publish('Current time is: '.date('H:i:s'), 'time', 0, function($err, $result) { if (!$err) { echo "***** Socket fired off Publish Packet with qos 0 *****\n"; } }); }); });
阻塞模式(与同步代码集成)
您可以在同步后端中使用此库等待发送QoS 0、1或2的数据包。
<?php include('vendor/autoload.php'); $client = new MarkKimsal\Mqtt\Client('tcp://172.17.0.1:1883'); $pconn = $client->connect(); $p0 = $client->publishRetain('QoS0 Current time is: '.date('H:i:s'), 'time', 0, function($err, $result) { if (!$err) { echo "***** Socket fired off Publish Packet with qos 0 *****\n"; } }); $p1 = $client->publish('QoS1 Current time is: '.date('H:i:s'), 'time', 1, function($err, $result) { if (!$err) { echo "***** Got Publish Ack with qos 1 *****\n"; } }); $p2 = $client->publish('QoS2 Current time is: '.date('H:i:s'), 'time', 2, function($err, $result) { if (!$err) { echo "***** Got Publish Ack with qos 2 *****\n"; } }); Amp\Promise\wait($p0); Amp\Promise\wait($p1); Amp\Promise\wait($p2); return;
手动发布确认
您可以通过禁用自动确认功能来控制何时确认QoS 1和2的发布消息。否则,在 on('message')
处理器运行后,将发送适当的确认包。
<?php include('vendor/autoload.php'); $client = new MarkKimsal\Mqtt\Client('tcp://172.17.0.1:1883?topics=foo,bar&clientId=abc123'); $client->disableAutoAck(); $p2 = $client->subscribe('test/#', function($err, $resp) { echo "***** SUBSCRIBE Resolved *******\n"; }); $p = $client->connect(); $client->on('message', function($publishPacket) use($client) { if ($publishPacket->isDup()) { echo "****** got a DUP on topic: [".$publishPacket->getTopic()."] ***** \n"; echo $publishPacket->getMessage()."\n"; } else { echo "****** got a message on topic: [".$publishPacket->getTopic()."] ***** \n"; echo $publishPacket->getMessage()."\n"; } //save message with durability here $client->acknowledge($publishPacket); });
干净会话
您可以通过将 cleanSession
添加为URL参数来使用干净会话连接。
您必须使用干净会话或提供客户端ID。
如果您不提供 clientId,将自动为您创建干净会话。
include('vendor/autoload.php'); $client = new MarkKimsal\Mqtt\Client('tcp://172.17.0.1:1883?cleanSession');