php mqtt client

<?php

/* phpMQTT */
class phpMQTT {

    private $socket;            /* holds the socket */
    private $msgid = 1;         /* counter for message id */
    public $keepalive = 10;     /* default keepalive timmer */
    public $timesinceping;      /* host unix time, used to detect disconects */
    public $topics = array();   /* used to store currently subscribed topics */
    public $debug = false;      /* should output debug messages */
    public $address;            /* broker address */
    public $port;               /* broker port */
    public $clientid;           /* client id sent to brocker */
    public $will;               /* stores the will of the client */
    private $username;          /* stores username */
    private $password;          /* stores password */

    function __construct($address, $port, $clientid){
        $this->broker($address, $port, $clientid);
    }

    /* sets the broker details */
    function broker($address, $port, $clientid){
        $this->address = $address;
        $this->port = $port;
        $this->clientid = $clientid;
    }

    /* connects to the broker
        inputs: $clean: should the client send a clean session flag */
    function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){

        if($will) $this->will = $will;
        if($username) $this->username = $username;
        if($password) $this->password = $password;

        $address = gethostbyname($this->address);
        $this->socket = fsockopen($address, $this->port, $errno, $errstr, 60);

        if (!$this->socket ) {
            error_log("fsockopen() $errno, $errstr \n");
            return false;
        }

        stream_set_timeout($this->socket, 5);
        stream_set_blocking($this->socket, 0);

        $i = 0;
        $buffer = "";

        $buffer .= chr(0x00); $i++;
        $buffer .= chr(0x06); $i++;
        $buffer .= chr(0x4d); $i++;
        $buffer .= chr(0x51); $i++;
        $buffer .= chr(0x49); $i++;
        $buffer .= chr(0x73); $i++;
        $buffer .= chr(0x64); $i++;
        $buffer .= chr(0x70); $i++;
        $buffer .= chr(0x03); $i++;

        //No Will
        $var = 0;
        if($clean) $var+=2;

        //Add will info to header
        if($this->will != NULL){
            $var += 4; // Set will flag
            $var += ($this->will[‘qos‘] << 3); //Set will qos
            if($this->will[‘retain‘])    $var += 32; //Set will retain
        }

        if($this->username != NULL) $var += 128; //Add username to header
        if($this->password != NULL) $var += 64;  //Add password to header

        $buffer .= chr($var); $i++;

        //Keep alive
        $buffer .= chr($this->keepalive >> 8); $i++;
        $buffer .= chr($this->keepalive & 0xff); $i++;

        $buffer .= $this->strwritestring($this->clientid,$i);

        //Adding will to payload
        if($this->will != NULL){
            $buffer .= $this->strwritestring($this->will[‘topic‘],$i);
            $buffer .= $this->strwritestring($this->will[‘content‘],$i);
        }

        if($this->username) $buffer .= $this->strwritestring($this->username,$i);
        if($this->password) $buffer .= $this->strwritestring($this->password,$i);

        $head = "  ";
        $head{0} = chr(0x10);
        $head{1} = chr($i);

        fwrite($this->socket, $head, 2);
        fwrite($this->socket,  $buffer);

        $string = $this->read(4);

        if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){
            if($this->debug) echo "Connected to Broker\n";
        }else{
            error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n",
                                    ord($string{0}),ord($string{3})));
            return false;
        }

        $this->timesinceping = time();

        return true;
    }

    /* read: reads in so many bytes */
    function read($int = 8192, $nb = false){

        //  print_r(socket_get_status($this->socket));

        $string="";
        $togo = $int;

        if($nb){
            return fread($this->socket, $togo);
        }

        while (!feof($this->socket) && $togo>0) {
            $fread = fread($this->socket, $togo);
            $string .= $fread;
            $togo = $int - strlen($string);
        }

            return $string;
    }

    /* subscribe: subscribes to topics */
    function subscribe($topics, $qos = 0){
        $i = 0;
        $buffer = "";
        $id = $this->msgid;
        $buffer .= chr($id >> 8);  $i++;
        $buffer .= chr($id % 256);  $i++;

        foreach($topics as $key => $topic){
            $buffer .= $this->strwritestring($key,$i);
            $buffer .= chr($topic["qos"]);  $i++;
            $this->topics[$key] = $topic;
        }

        $cmd = 0x80;
        //$qos
        $cmd += ($qos << 1);

        $head = chr($cmd);
        $head .= chr($i);

        fwrite($this->socket, $head, 2);
        fwrite($this->socket, $buffer, $i);
        $string = $this->read(2);

        $bytes = ord(substr($string,1,1));
        $string = $this->read($bytes);
    }

    /* ping: sends a keep alive ping */
    function ping(){
            $head = " ";
            $head = chr(0xc0);
            $head .= chr(0x00);
            fwrite($this->socket, $head, 2);
            if($this->debug) echo "ping sent\n";
    }

    /* disconnect: sends a proper disconect cmd */
    function disconnect(){
            $head = " ";
            $head{0} = chr(0xe0);
            $head{1} = chr(0x00);
            fwrite($this->socket, $head, 2);
    }

    /* close: sends a proper disconect, then closes the socket */
    function close(){
        $this->disconnect();
        fclose($this->socket);
    }

    /* publish: publishes $content on a $topic */
    function publish($topic, $content, $qos = 0, $retain = 0){

        $i = 0;
        $buffer = "";

        $buffer .= $this->strwritestring($topic,$i);

        //$buffer .= $this->strwritestring($content,$i);

        if($qos){
            $id = $this->msgid++;
            $buffer .= chr($id >> 8);  $i++;
            $buffer .= chr($id % 256);  $i++;
        }

        $buffer .= $content;
        $i+=strlen($content);

        $head = " ";
        $cmd = 0x30;
        if($qos) $cmd += $qos << 1;
        if($retain) $cmd += 1;

        $head{0} = chr($cmd);
        $head .= $this->setmsglength($i);

        fwrite($this->socket, $head, strlen($head));
        fwrite($this->socket, $buffer, $i);

    }

    /* message: processes a recieved topic */
    function message($msg){
            $tlen = (ord($msg{0})<<8) + ord($msg{1});
            $topic = substr($msg,2,$tlen);
            $msg = substr($msg,($tlen+2));
            $found = 0;
            foreach($this->topics as $key=>$top){
                if( preg_match("/^".str_replace("#",".*",
                        str_replace("+","[^\/]*",
                            str_replace("/","\/",
                                str_replace("$",‘\$‘,
                                    $key))))."$/",$topic) ){
                    if(function_exists($top[‘function‘])){
                        call_user_func($top[‘function‘],$topic,$msg);
                        $found = 1;
                    }
                }
            }

            if($this->debug && !$found) echo "msg recieved but no match in subscriptions\n";
    }

    /* proc: the processing loop for an "allways on" client
        set true when you are doing other stuff in the loop good for watching something else at the same time */
    function proc( $loop = true){

        if(1){
            $sockets = array($this->socket);
            $w = $e = NULL;
            $cmd = 0;

                //$byte = fgetc($this->socket);
            if(feof($this->socket)){
                if($this->debug) echo "eof receive going to reconnect for good measure\n";
                fclose($this->socket);
                $this->connect(false);
                if(count($this->topics))
                    $this->subscribe($this->topics);
            }

            $byte = $this->read(1, true);

            if(!strlen($byte)){
                if($loop){
                    usleep(100000);
                }

            }else{

                $cmd = (int)(ord($byte)/16);
                if($this->debug) echo "Recevid: $cmd\n";

                $multiplier = 1;
                $value = 0;
                do{
                    $digit = ord($this->read(1));
                    $value += ($digit & 127) * $multiplier;
                    $multiplier *= 128;
                    }while (($digit & 128) != 0);

                if($this->debug) echo "Fetching: $value\n";

                if($value)
                    $string = $this->read($value,"fetch");

                if($cmd){
                    switch($cmd){
                        case 3:
                            $this->message($string);
                        break;
                    }

                    $this->timesinceping = time();
                }
            }

            if($this->timesinceping < (time() - $this->keepalive )){
                if($this->debug) echo "not found something so ping\n";
                $this->ping();
            }

            if($this->timesinceping<(time()-($this->keepalive*2))){
                if($this->debug) echo "not seen a package in a while, disconnecting\n";
                fclose($this->socket);
                $this->connect(false);
                if(count($this->topics))
                    $this->subscribe($this->topics);
            }

        }
        return 1;
    }

    /* getmsglength: */
    function getmsglength(&$msg, &$i){

        $multiplier = 1;
        $value = 0 ;
        do{
          $digit = ord($msg{$i});
          $value += ($digit & 127) * $multiplier;
          $multiplier *= 128;
          $i++;
        }while (($digit & 128) != 0);

        return $value;
    }

    /* setmsglength: */
    function setmsglength($len){
        $string = "";
        do{
          $digit = $len % 128;
          $len = $len >> 7;
          // if there are more digits to encode, set the top bit of this digit
          if ( $len > 0 )
            $digit = ($digit | 0x80);
          $string .= chr($digit);
        }while ( $len > 0 );
        return $string;
    }

    /* strwritestring: writes a string to a buffer */
    function strwritestring($str, &$i){
        $ret = " ";
        $len = strlen($str);
        $msb = $len >> 8;
        $lsb = $len % 256;
        $ret = chr($msb);
        $ret .= chr($lsb);
        $ret .= $str;
        $i += ($len+2);
        return $ret;
    }

    function printstr($string){
        $strlen = strlen($string);
            for($j=0;$j<$strlen;$j++){
                $num = ord($string{$j});
                if($num > 31)
                    $chr = $string{$j}; else $chr = " ";
                printf("%4d: %08b : 0x%02x : %s \n",$j,$num,$num,$chr);
            }
    }
}

?>

publish.php

<?php

require("./phpMQTT.php");

$mqtt = new phpMQTT("localhost", 1883, "hacker");

if ($mqtt->connect()) {
    $mqtt->publish("hello","Hello World! at ".date("r"),0);
    $mqtt->close();
}

?>

subscribe.php

<?php

require("./phpMQTT.php");

$mqtt = new phpMQTT("localhost", 1883, "musikar");

if(!$mqtt->connect()){
    exit(1);
}

$topics[‘hello‘] = array("qos"=>0, "function"=>"procmsg");
$mqtt->subscribe($topics,0);

while($mqtt->proc()){

}

$mqtt->close();

function procmsg($topic,$msg){
        echo "Msg Recieved: ".date("r")."\nTopic:{$topic}\n$msg\n";
}

?>
时间: 2024-10-28 19:40:34

php mqtt client的相关文章

mqtt client python example

This is a simple example showing how to use the [Paho MQTT Python client](https://eclipse.org/paho/clients/python/) to send data to Azure IoT Hub. You need to assemble the rights credentials and configure TLS and the MQTT protocol version appropriate

python mqtt client publish操作

使用Python库paho.mqtt.client 模拟mqtt client 连接broker,publish topic. #-*-coding:utf-8-*- import paho.mqtt.client as mqtt class mqttHandle(object): def __init__(self,mqtt_info): self.mqtt_info=mqtt_info def on_connect(client, userdata, flags, rc): print("C

python 实现MQTT Client

应用Python 实现MQTT Client,主要代码如下: #coding:utf-8 #!/usr/bin/python3 import json import os import binascii import asn1tools import sys import paho.mqtt.client as mqtt import requests import logging from enum import Enum from queue import Queue __all__ = [

mqtt client api: 阻塞API

fusesource版本:mqtt-client-1.11.jar下载地址:https://github.com/fusesource/mqtt-client fusesource提供三种mqtt client api: 阻塞API,基于Futur的API和回调API.其中,回调API是最复杂的也是性能最好的,另外两种均是对回调API的封装. 我们下面就简单介绍一下回调API的使用方法. 1 import org.fusesource.hawtbuf.Buffer; 2 import org.f

MQTT学习笔记——树莓派MQTT客户端 使用Mosquitto和paho-python

0 前言 本文说明如何在树莓派上安装Mosquitto.本文通过两个简单的例子说明树莓派中如何使用MQTT协议实现消息订阅,这些例子包括Mosquitto_sub指令实现消息订阅和paho-python扩展库实现GPIO端口的远程控制.本文中使用了两个工具--Mosquitto paho-python,其中Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,提供轻量级的,支持发布/订阅的的消息推送模式,使设备对设备之间的消息通信简单易用:另外,paho-python是一个

MQTT 消息 发布 订阅

当连接向一个mqtt服务器时,clientId必须是唯一的.设置一样,导致client.setCallback总是走到 connectionLost回调.报connection reset.调查一天才发现是clientid重复导致. client = new MqttAsyncClient(serverURIString, "client-id"); clientId是用来保存会话信息. MqttConnectOptions options = new MqttConnectOptio

Linkit 7688 DUO(六) 加入MQTT物联网协议

Linkit 系列博文: 联发科Linkit 7688 (一) 上手及在Mac下搭建OpenWrt交叉编译环境,C语言编译Hello,World 联发科Linkit 7688 (二)GPIO基本操作与C语言编程 联发科Linkit 7688 DUO(三): 通过 Arduino 控制外设和传感器 Linkit 7688 DUO(四): 接上各种Arduino传感器和模块--基础篇 Linkit 7688 DUO(五) 接上各种Arduino传感器和模块-扩展篇 Linkit 7688 DUO(六

IBM MQTT协议基本资料

原文链接:http://37iot.com/topic/5565a193d2dd2fc71da04f7c 初次听说MQTT还是在学习Android消息推送的时候,只是粗浅的了解,未曾深究.最近专注于IOT开发,在信息传递方面自然就想到了它. 相关资料: MQ 遥测传输 (MQTT) V3.1 协议规范:http://www.ibm.com/developerworks/cn/webservices/ws-mqtt/index.html 开源的mosquitto项目地址:http://mosqui

MQTT学习笔记——Yeelink MQTT服务 使用mqtt.js和paho-mqtt

0 前言 2014年8月yeelink推出基于MQTT协议的开关类型设备控制API,相比于基于HTTP RESTful的轮训方式,通过订阅相关主题消息,可以远程控制类应用实时性更好.本文使用两种方式实现开关类型设备的远程控制,一种是基于nodeJS的MQTT.js扩展库,另一种是基于python的paho-mqtt扩展库. [相关博文--MQTT] [MQTT学习笔记--MQTT协议体验 Mosquitto安装和使用] [MQTT学习笔记--树莓派MQTT客户端 使用Mosquitto和paho