markkimsal/amphp-mqtt

Amp的异步MQTT客户端

1.3.0 2018-05-23 21:12 UTC

This package is not auto-updated.

Last update: 2024-09-20 21:18:03 UTC


README

markkimsal/amphp-mqtt 是基于Amp的PHP异步MQTT客户端。

安装

composer require markkimsal/amphp-mqtt

此项目不需要任何PHP扩展。

用法

每个发布或订阅操作都会返回一个 Amp\Promise,您可以在它解析时做出反应。消息通过 ->on('message') 接收,并接受一个只有一个参数的回调函数:Packet\Publish 类。 Packet\Publish 用于发送和接收消息。

在收到连接确认包之前,您可以开始发送和订阅,系统会排队您的数据包,但仍然立即返回一个 Amp\Promise

QoS 0 数据包在发送后会立即解析,因为它们不会从服务器收到任何确认。

QoS 1 数据包在客户端收到 Puback 包后会解析。

QoS 2 数据包在客户端收到 Pubcomp 包后会解析。客户端将自动根据规范用 Pubrel 响应 Pubrec。

支持通过使用 "tls://" 前缀到服务器的URI来使用 TLSv1.2。不会执行对等验证。

支持使用连接URL的参数进行用户名和密码认证。

<?php
include('vendor/autoload.php');
use \Amp\Loop;
use \MarkKimsal\Mqtt\Client;

Loop::run( function($w) {

	$client = new Client('tcp://172.17.0.1:1883?topics=foo,bar&clientId=abc123');

	$p = $client->connect();

	$p2 = $client->subscribe('test/', function($err, $resp) {
		echo "***** SUBSCRIBE Resolved *******\n";
		var_dump($err);
		var_dump($resp);
	});

	$p->onResolve(function($err, $resp) use($p, $client){
		echo "****** CONNECT Resolved ********\n";
	});

	$p2->onResolve(function($err, $res) {
		echo "***** SUBSCRIBE Resolved in a different way *******\n";
		var_dump($err);
		var_dump($res);
	});

	$client->on('message', function($publishPacket) {
		echo "****** got a message on topic: [".$publishPacket->getTopic()."] ***** \n";
		echo $publishPacket->getMessage()."\n";
	});

	Loop::repeat(1000, function() use($client){
		$client->publish('Current time is: '.date('H:i:s'), 'time', 0, function($err, $result) {
			if (!$err) {
				echo "***** Socket fired off Publish Packet with qos 0 *****\n";
			}
		});
	});
});

阻塞模式(与同步代码集成)

您可以在同步后端中使用此库等待发送QoS 0、1或2的数据包。

<?php
	include('vendor/autoload.php');

	$client = new MarkKimsal\Mqtt\Client('tcp://172.17.0.1:1883');

	$pconn = $client->connect();

	$p0 = $client->publishRetain('QoS0 Current time is: '.date('H:i:s'), 'time', 0, function($err, $result) {
		if (!$err) {
			echo "***** Socket fired off Publish Packet with qos 0 *****\n";
		}
	});

	$p1 = $client->publish('QoS1 Current time is: '.date('H:i:s'), 'time', 1, function($err, $result) {
		if (!$err) {
			echo "***** Got Publish Ack with qos 1 *****\n";
		}
	});

	$p2 = $client->publish('QoS2 Current time is: '.date('H:i:s'), 'time', 2, function($err, $result) {
		if (!$err) {
			echo "***** Got Publish Ack with qos 2 *****\n";
		}
	});


	Amp\Promise\wait($p0);
	Amp\Promise\wait($p1);
	Amp\Promise\wait($p2);
	return;

手动发布确认

您可以通过禁用自动确认功能来控制何时确认QoS 1和2的发布消息。否则,在 on('message') 处理器运行后,将发送适当的确认包。

<?php
	include('vendor/autoload.php');

	$client = new MarkKimsal\Mqtt\Client('tcp://172.17.0.1:1883?topics=foo,bar&clientId=abc123');
	$client->disableAutoAck();

	$p2 = $client->subscribe('test/#', function($err, $resp) {
		echo "***** SUBSCRIBE Resolved *******\n";
	});


	$p = $client->connect();

	$client->on('message', function($publishPacket) use($client) {
		if ($publishPacket->isDup()) {
			echo "****** got a DUP on topic: [".$publishPacket->getTopic()."] ***** \n";
			echo $publishPacket->getMessage()."\n";
		} else {
			echo "****** got a message on topic: [".$publishPacket->getTopic()."] ***** \n";
			echo $publishPacket->getMessage()."\n";
		}

		//save message with durability here

		$client->acknowledge($publishPacket);
	});

干净会话

您可以通过将 cleanSession 添加为URL参数来使用干净会话连接。

您必须使用干净会话或提供客户端ID。

如果您不提供 clientId,将自动为您创建干净会话。

	include('vendor/autoload.php');

	$client = new MarkKimsal\Mqtt\Client('tcp://172.17.0.1:1883?cleanSession');