mqtt client api: 阻塞API

fusesource版本:mqtt-client-1.11.jar
下载地址:https://github.com/fusesource/mqtt-client

fusesource提供三种mqtt client api: 阻塞API,基于Futur的API和回调API。其中,回调API是最复杂的也是性能最好的,另外两种均是对回调API的封装。 我们下面就简单介绍一下回调API的使用方法。

  1 import org.fusesource.hawtbuf.Buffer;
  2 import org.fusesource.hawtbuf.UTF8Buffer;
  3 import org.fusesource.hawtdispatch.Dispatch;
  4 import org.fusesource.hawtdispatch.DispatchQueue;
  5 import org.fusesource.mqtt.client.BlockingConnection;
  6 import org.fusesource.mqtt.client.Callback;
  7 import org.fusesource.mqtt.client.CallbackConnection;
  8 import org.fusesource.mqtt.client.FutureConnection;
  9 import org.fusesource.mqtt.client.Listener;
 10 import org.fusesource.mqtt.client.MQTT;
 11 import org.fusesource.mqtt.client.Message;
 12 import org.fusesource.mqtt.client.QoS;
 13 import org.fusesource.mqtt.client.Topic;
 14 import org.fusesource.mqtt.client.Tracer;
 15 import org.fusesource.mqtt.codec.MQTTFrame;
 16 public class MqttClient {
 17 public static void main(String[] args)
 18 {
 19  try {
 20    MQTT mqtt=new MQTT();
 21
 22    //MQTT设置说明
 23    mqtt.setHost("tcp://10.1.58.191:1883");
 24    mqtt.setClientId("876543210"); //用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。此ID应少于23个字符,默认根据本机地址、端口和时间自动生成
 25    mqtt.setCleanSession(false); //若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true
 26    mqtt.setKeepAlive((short) 60);//定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待
 27    mqtt.setUserName("admin");//服务器认证用户名
 28    mqtt.setPassword("admin");//服务器认证密码
 29
 30    mqtt.setWillTopic("willTopic");//设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
 31    mqtt.setWillMessage("willMessage");//设置“遗嘱”消息的内容,默认是长度为零的消息
 32    mqtt.setWillQos(QoS.AT_LEAST_ONCE);//设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE
 33    mqtt.setWillRetain(true);//若想要在发布“遗嘱”消息时拥有retain选项,则为true
 34    mqtt.setVersion("3.1.1");
 35
 36    //失败重连接设置说明
 37    mqtt.setConnectAttemptsMax(10L);//客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
 38    mqtt.setReconnectAttemptsMax(3L);//客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
 39    mqtt.setReconnectDelay(10L);//首次重连接间隔毫秒数,默认为10ms
 40    mqtt.setReconnectDelayMax(30000L);//重连接间隔毫秒数,默认为30000ms
 41    mqtt.setReconnectBackOffMultiplier(2);//设置重连接指数回归。设置为1则停用指数回归,默认为2
 42
 43    //Socket设置说明
 44          mqtt.setReceiveBufferSize(65536);//设置socket接收缓冲区大小,默认为65536(64k)
 45          mqtt.setSendBufferSize(65536);//设置socket发送缓冲区大小,默认为65536(64k)
 46          mqtt.setTrafficClass(8);//设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输
 47
 48          //带宽限制设置说明
 49          mqtt.setMaxReadRate(0);//设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制
 50          mqtt.setMaxWriteRate(0);//设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制
 51
 52          //选择消息分发队列
 53          mqtt.setDispatchQueue(Dispatch.createQueue("foo"));//若没有调用方法setDispatchQueue,客户端将为连接新建一个队列。如果想实现多个连接使用公用的队列,显式地指定队列是一个非常方便的实现方法
 54
 55          //设置跟踪器
 56    mqtt.setTracer(new Tracer(){
 57              @Override
 58              public void onReceive(MQTTFrame frame) {
 59                  System.out.println("recv: "+frame);
 60              }
 61              @Override
 62              public void onSend(MQTTFrame frame) {
 63                  System.out.println("send: "+frame);
 64              }
 65              @Override
 66              public void debug(String message, Object... args) {
 67                  System.out.println(String.format("debug: "+message, args));
 68              }
 69          });
 70
 71
 72
 73          //使用回调式API
 74    final CallbackConnection callbackConnection=mqtt.callbackConnection();
 75
 76    //连接监听
 77    callbackConnection.listener(new Listener() {
 78
 79    //接收订阅话题发布的消息
 80    @Override
 81    public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {
 82     System.out.println("=============receive msg================"+new String(payload.toByteArray()));
 83      onComplete.run();
 84    }
 85
 86    //连接失败
 87    @Override
 88    public void onFailure(Throwable value) {
 89     System.out.println("===========connect failure===========");
 90     callbackConnection.disconnect(null);
 91    }
 92
 93       //连接断开
 94    @Override
 95    public void onDisconnected() {
 96     System.out.println("====mqtt disconnected=====");
 97
 98    }
 99
100    //连接成功
101    @Override
102    public void onConnected() {
103     System.out.println("====mqtt connected=====");
104
105    }
106   });
107
108
109
110    //连接
111    callbackConnection.connect(new Callback() {
112
113      //连接失败
114        public void onFailure(Throwable value) {
115            System.out.println("============连接失败:"+value.getLocalizedMessage()+"============");
116        }
117        // 连接成功
118        public void onSuccess(Void v) {
119            //订阅主题
120            Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
121            callbackConnection.subscribe(topics, new Callback<byte[]>() {
122                //订阅主题成功
123             public void onSuccess(byte[] qoses) {
124                    System.out.println("========订阅成功=======");
125                }
126             //订阅主题失败
127                public void onFailure(Throwable value) {
128                 System.out.println("========订阅失败=======");
129                 callbackConnection.disconnect(null);
130                }
131            });
132
133
134             //发布消息
135            callbackConnection.publish("foo", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback() {
136                public void onSuccess(Void v) {
137                  System.out.println("===========消息发布成功============");
138                }
139                public void onFailure(Throwable value) {
140                 System.out.println("========消息发布失败=======");
141                 callbackConnection.disconnect(null);
142                }
143            });
144
145        }
146    });
147
148
149
150    while(true)
151    {
152
153    }
154
155
156  } catch (Exception e) {
157   e.printStackTrace();
158  }
159
160 }
161 }

原文地址:https://www.cnblogs.com/endv/p/11037424.html

时间: 2024-11-06 07:17:57

mqtt client api: 阻塞API的相关文章

{&quot;ret&quot;:100029,&quot;msg&quot;:&quot;client request&#39;s api name is not existed&quot;}

AFNetworking对Path的定义问题: 如果把这一串:https://graph.qq.com/user/get_user_init 定义成baseURL 那么后面加的任何Path,都会在Path前面加上一个/ 这个跟NSURL有关系,AF的baseURL是用NSURL实现的,NSURL会把你给的字符串拆分成scheme,host,path,query这些东西 然后AF再处理这个NSURL,而NSURL并不仅仅是一个字符串 给 getPath加参数就行了: {"ret":100

MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现 callback方式接收没有成功.所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因. 采用Callback式 发布主题 Java代码   package com.etrip.mqtt.callback; import java

MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现

MQTT moquette 的Server发布主题 Java代码   package com.etrip.mqtt.future; import java.net.URISyntaxException; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fuseso

mqtt client python example

This is a simple example showing how to use the [Paho MQTT Python client](https://eclipse.org/paho/clients/python/) to send data to Azure IoT Hub. You need to assemble the rights credentials and configure TLS and the MQTT protocol version appropriate

[译]API网关(API Gateway)

This a translation of an article ( http://microservices.io/patterns/apigateway.html) originally written and copyrighted by Chris Richardson ( http://twitter.com/crichardson). 模式:API网关 背景 我们假设你使用微服务模式创建一个在线商店,并正在实现商品详情页面.你需要开发多个版本的商品详情用户界面: 用于桌面和手机浏览器

Elasticsearch Index API & Aggregations API & Query DSL

这篇小菜给大家演示和讲解一些Elasticsearch的API,如在工作中用到时,方便查阅. 一.Index API 创建索引库 curl -XPUT 'http://127.0.0.1:9200/test_index/' -d '{     "settings" : {       "index" : {       "number_of_shards" : 3,       "number_of_replicas" : 1

web api写api接口时返回

web api写api接口时默认返回的是把你的对象序列化后以XML形式返回,那么怎样才能让其返回为json呢,下面就介绍两种方法: 方法一:(改配置法) 找到Global.asax文件,在Application_Start()方法中添加一句: 复制代码 代码如下: GlobalConfiguration.Configuration.Formatters.XmlFormatter.SupportedMediaTypes.Clear(); 修改后: 复制代码 代码如下: protected void

jquery api 常见api 效果操作例子

addClass_removeClass_toggleClass_hasClass.html 1 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> 2 <html> 3 <head> 4 <title>method_1.html</title> 5 <meta http-equiv="content-type" conten

jquery api 常见api 元素操作例子

append_prepend.html 1 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> 2 <html> 3 <head> 4 <title>method_1.html</title> 5 <meta http-equiv="content-type" content="text/html; charset=