binsoul/net-mqtt-client-react

基于 React 的异步 MQTT 客户端

0.7.3 2021-04-12 16:59 UTC

This package is auto-updated.

Last update: 2024-09-13 03:38:55 UTC


README

Latest Version on Packagist Software License Total Downloads Build Status

此包提供了一个基于 React socket 库的异步 MQTT 客户端。所有客户端方法都返回一个承诺,如果操作成功则解决,如果操作失败则拒绝。已订阅主题的传入消息通过 "message" 事件传递。

安装

通过 composer

$ composer require binsoul/net-mqtt-client-react

示例

连接到公共代理并无限期运行。

<?php

use BinSoul\Net\Mqtt\Client\React\ReactMqttClient;
use BinSoul\Net\Mqtt\Connection;
use BinSoul\Net\Mqtt\DefaultMessage;
use BinSoul\Net\Mqtt\DefaultSubscription;
use BinSoul\Net\Mqtt\Message;
use BinSoul\Net\Mqtt\Subscription;
use React\Socket\DnsConnector;
use React\Socket\TcpConnector;

include 'vendor/autoload.php';

// Setup client
$loop = \React\EventLoop\Factory::create();
$dnsResolverFactory = new \React\Dns\Resolver\Factory();
$connector = new DnsConnector(new TcpConnector($loop), $dnsResolverFactory->createCached('8.8.8.8', $loop));
$client = new ReactMqttClient($connector, $loop);

// Bind to events
$client->on('open', function () use ($client) {
    // Network connection established
    echo sprintf("Open: %s:%d\n", $client->getHost(), $client->getPort());
});

$client->on('close', function () use ($client, $loop) {
    // Network connection closed
    echo sprintf("Close: %s:%d\n", $client->getHost(), $client->getPort());

    $loop->stop();
});

$client->on('connect', function (Connection $connection) {
    // Broker connected
    echo sprintf("Connect: client=%s\n", $connection->getClientID());
});

$client->on('disconnect', function (Connection $connection) {
    // Broker disconnected
    echo sprintf("Disconnect: client=%s\n", $connection->getClientID());
});

$client->on('message', function (Message $message) {
    // Incoming message
    echo 'Message';

    if ($message->isDuplicate()) {
        echo ' (duplicate)';
    }

    if ($message->isRetained()) {
        echo ' (retained)';
    }

    echo ': '.$message->getTopic().' => '.mb_strimwidth($message->getPayload(), 0, 50, '...');
    echo "\n";
});

$client->on('warning', function (\Exception $e) {
    echo sprintf("Warning: %s\n", $e->getMessage());
});

$client->on('error', function (\Exception $e) use ($loop) {
    echo sprintf("Error: %s\n", $e->getMessage());

    $loop->stop();
});

// Connect to broker
$client->connect('test.mosquitto.org')->then(
    function () use ($client) {
        // Subscribe to all topics
        $client->subscribe(new DefaultSubscription('#'))
            ->then(function (Subscription $subscription) {
                echo sprintf("Subscribe: %s\n", $subscription->getFilter());
            })
            ->otherwise(function (\Exception $e) {
                echo sprintf("Error: %s\n", $e->getMessage());
            });

        // Publish humidity once
        $client->publish(new DefaultMessage('sensors/humidity', '55%'))
            ->then(function (Message $message) {
                echo sprintf("Publish: %s => %s\n", $message->getTopic(), $message->getPayload());
            })
            ->otherwise(function (\Exception $e) {
                echo sprintf("Error: %s\n", $e->getMessage());
            });

        // Publish a random temperature every 10 seconds
        $generator = function () {
            return mt_rand(-20, 30);
        };

        $client->publishPeriodically(10, new DefaultMessage('sensors/temperature'), $generator)
            ->progress(function (Message $message) {
                echo sprintf("Publish: %s => %s\n", $message->getTopic(), $message->getPayload());
            })
            ->otherwise(function (\Exception $e) {
                echo sprintf("Error: %s\n", $e->getMessage());
            });
    }
);

$loop->run();

测试

$ composer test

许可证

MIT 许可证 (MIT)。有关更多信息,请参阅 许可证文件