MQTT协议实现Eclipse Paho学习总结二

一、概述

前一篇博客(MQTT协议实现Eclipse Paho学习总结一) 写了一些MQTT协议相关的一些概述和其实现Eclipse Paho的报文类别,同时对心跳包进行了分析。这篇文章,在不涉及MQTT逻辑实现的基础之上分析一下Eclipse Paho中Socket通信的实现,这里我们主要阐述其采用Java同步技术将同步的Socket通信异步化的过程。

二、上菜

先看一下在org.eclipse.paho.client.mqttv3.internal有两个类,CommsSender,CommsReceiver,通过名字我们知道这个这两个类是关于客户端发送消息和接收消息的两个类。上源码分析。

2.1 CommsSender

我们先来看一下CommsSender的run方法。

[java] view plaincopy

  1. public void run() {
  2. final String methodName = "run";
  3. MqttWireMessage message = null;
  4. while (running && (out != null)) {//超级无限循环
  5. try {
  6. message = clientState.get();//主要看这里获取message时进行了阻塞,即clientState.get()方法没有获得消息的时候,代码一直处理阻塞状态,不会一直无限循环!
  7. if (message != null) {
  8. //@TRACE 802=network send key={0} msg={1}
  9. log.fine(className,methodName,"802", new Object[] {message.getKey(),message});
  10. if (message instanceof MqttAck) {
  11. out.write(message);
  12. out.flush();
  13. } else {
  14. MqttToken token = tokenStore.getToken(message);
  15. // While quiescing the tokenstore can be cleared so need
  16. // to check for null for the case where clear occurs
  17. // while trying to send a message.
  18. if (token != null) {
  19. synchronized (token) {//使用了同步,防止一次性多个写操作。
  20. out.write(message);
  21. out.flush();
  22. clientState.notifySent(message);//通知已经发送了一个消息
  23. }
  24. }
  25. }
  26. } else { // null message
  27. //@TRACE 803=get message returned null, stopping}
  28. log.fine(className,methodName,"803");
  29. running = false;
  30. }
  31. } catch (MqttException me) {
  32. handleRunException(message, me);
  33. } catch (Exception ex) {
  34. handleRunException(message, ex);
  35. }
  36. } // end while
  37. //@TRACE 805=<
  38. log.fine(className, methodName,"805");
  39. }

点击message = clientState.get();中get()方法进入之后,方法的部分内容如下:

[java] view plaincopy

  1. synchronized (queueLock) {
  2. while (result == null) {
  3. if (pendingMessages.isEmpty() && pendingFlows.isEmpty()) {
  4. try {
  5. long ttw = getTimeUntilPing();
  6. //@TRACE 644=nothing to send, wait for {0} ms
  7. log.fine(className,methodName, "644", new Object[] {new Long(ttw)});
  8. queueLock.wait(getTimeUntilPing());//如果pendingMessages队列和pendingFlows队列为空,则放弃queueLock锁,等待,而这个等待时间是有限的,如果长时间没有发送消息,同时等待的时间超过了心跳包发送的时间,那么就往下执行,根据实际情况发送心跳包或者消息。
  9. } catch (InterruptedException e) {
  10. }
  11. }

ClientState中还有一个send()方法,部分内容如下:

[java] view plaincopy

  1. if (message instanceof MqttPublish) {
  2. synchronized (queueLock) {
  3. if (actualInFlight >= this.maxInflight) {
  4. //@TRACE 613= sending {0} msgs at max inflight window
  5. log.fine(className, methodName, "613", new Object[]{new Integer(actualInFlight)});
  6. throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
  7. }
  8. MqttMessage innerMessage = ((MqttPublish) message).getMessage();
  9. //@TRACE 628=pending publish key={0} qos={1} message={2}
  10. log.fine(className,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message});
  11. switch(innerMessage.getQos()) {
  12. case 2:
  13. outboundQoS2.put(new Integer(message.getMessageId()), message);
  14. persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
  15. break;
  16. case 1:
  17. outboundQoS1.put(new Integer(message.getMessageId()), message);
  18. persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
  19. break;
  20. }
  21. tokenStore.saveToken(token, message);
  22. pendingMessages.addElement(message);
  23. queueLock.notifyAll();//通知get方法,我已经有消息放入队列了!!!
  24. }

总 的过程如下:send方法将消息放入到pendingMessages队列和pendingFlows当中同时发送消息唤醒等待中的线程,get等待 pendingMessages队列和pendingFlows中的消息,同时等待唤醒,如果有消息放入,同时被唤醒,那么就执行发送消息的操作。这个过 程是不是跟操作系统当中的生产者-消费者的关系一样呢!!!

2.2 CommsReceiver

再来看一下CommsReceiver的run()方法

[java] view plaincopy

  1. public void run() {
  2. final String methodName = "run";
  3. MqttToken token = null;
  4. //在这里,因为客户端无法判断,服务器什么时候能够发消息过来,因此只能采用无限循环的方式,不断的去判断是否有新消息发送过来。
  5. while (running && (in != null)) {//超级无限循环
  6. try {
  7. //@TRACE 852=network read message
  8. log.fine(className,methodName,"852");
  9. MqttWireMessage message = in.readMqttWireMessage();// 这里,因为socket.getInputStream()一直在阻塞,如果没有消息是读不到message的,因此在这里的while循环也没有无限制 的运行下去,只有在有消息的时候才往下走。socket默认是阻塞的,就是在读的时候如果读不到资源就会一直等待,直到超时(如果设置了超时时间的话), 如果服务端和客户端都在读的话而没有写的话就会一直阻塞。你可以使用SocketChannel,设置socket的通道,使其变成非阻塞的。
  10. if (message instanceof MqttAck) {//判断是否是确认包
  11. token = tokenStore.getToken(message);
  12. if (token!=null) {
  13. synchronized (token) {
  14. // Ensure the notify processing is done under a lock on the token
  15. // This ensures that the send processing can complete  before the
  16. // receive processing starts! ( request and ack and ack processing
  17. // can occur before request processing is complete if not!
  18. clientState.notifyReceivedAck((MqttAck)message);
  19. }
  20. } else {
  21. // It its an ack and there is no token then something is not right.
  22. // An ack should always have a token assoicated with it.
  23. throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
  24. }
  25. } else {
  26. // A new message has arrived,一个新消息过来。
  27. clientState.notifyReceivedMsg(message);//点击进入之后
  28. }
  29. }
  30. catch (MqttException ex) {
  31. //@TRACE 856=Stopping, MQttException
  32. log.fine(className,methodName,"856",null,ex);
  33. running = false;
  34. // Token maybe null but that is handled in shutdown
  35. clientComms.shutdownConnection(token, ex);
  36. }
  37. catch (IOException ioe) {
  38. //@TRACE 853=Stopping due to IOException
  39. log.fine(className,methodName,"853");
  40. running = false;
  41. // An EOFException could be raised if the broker processes the
  42. // DISCONNECT and ends the socket before we complete. As such,
  43. // only shutdown the connection if we‘re not already shutting down.
  44. if (!clientComms.isDisconnecting()) {
  45. clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
  46. } // else {
  47. }
  48. }
  49. //@TRACE 854=<
  50. log.fine(className,methodName,"854");
  51. }

我们点击进入clientState.notifyReceivedMsg(message);方法,部分代码如下:

[java] view plaincopy

  1. if (message instanceof MqttPublish) {
  2. MqttPublish send = (MqttPublish) message;
  3. switch (send.getMessage().getQos()) {
  4. case 0:
  5. case 1:
  6. if (callback != null) {
  7. callback.messageArrived(send);
  8. }
  9. break;

我们点击进入callback.messageArrived(send);方法,

[java] view plaincopy

  1. public void messageArrived(MqttPublish sendMessage) {
  2. final String methodName = "messageArrived";
  3. if (mqttCallback != null) {
  4. // If we already have enough messages queued up in memory, wait
  5. // until some more queue space becomes available. This helps
  6. // the client protect itself from getting flooded by messages
  7. // from the server.
  8. synchronized (spaceAvailable) {
  9. if (!quiescing && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
  10. try {
  11. // @TRACE 709=wait for spaceAvailable
  12. log.fine(className, methodName, "709");
  13. spaceAvailable.wait();
  14. } catch (InterruptedException ex) {
  15. }
  16. }
  17. }
  18. if (!quiescing) {
  19. messageQueue.addElement(sendMessage);
  20. // Notify the CommsCallback thread that there‘s work to do...
  21. synchronized (workAvailable) {
  22. // @TRACE 710=new msg avail, notify workAvailable
  23. log.fine(className, methodName, "710");
  24. workAvailable.notifyAll();
  25. }
  26. }
  27. }
  28. }

在 这里,同样使用了生产者-消费者模式,在run方法里,我们可以看到其调用了handleMessage,在这个方法里面调用了 mqttCallback.messageArrived(destName, publishMessage.getMessage());接口回调。

时间: 2024-08-29 20:37:17

MQTT协议实现Eclipse Paho学习总结二的相关文章

MQTT协议实现Eclipse Paho学习总结

转载自:http://xiaoxinzhou.blog.163.com/blog/static/20704538620145411306821/ 一.概述 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放.简单.轻量.易于实现.这些特点使它适用于受限环境.例如,但不仅限于此: 网络代价昂贵,带宽低.不可靠. 在嵌入设备中运行,处理器和内存资源有限. 该协议的特点有: 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合. 对负载内容屏蔽的消息传输. 使

MQTT协议及推送服务(二)

MQTT简介 MQTT全称叫做Message Queuing Telemetry Transport,意为消息队列遥测传输,是IBM开发的一个即时通讯协议.由于其维护一个长连接以轻量级低消耗著称,所以常用于移动端消息推送服务开发. MQTT特性 MQTT具有如下特性: 使用发布/订阅消息模式,提供一对多消息发布: 对负载内容屏蔽的消息传输: 使用TCP/IP进行网络连接: 主流的MQTT是基于TCP进行连接的,同样也有UDP版本的MQTT,但是不太常用,叫做MQTT-SN. 具有三种消息发布服务

[3] MQTT,mosquitto,Eclipse Paho---如何使用 Eclipse Paho MQTT工具来发送订阅MQTT消息?

在上两节,笔者主要介绍了 MQTT,mosquitto,Eclipse Paho的基本概念已经如何安装mosquitto. 在这个章节我们就来看看如何用 Eclipse Paho MQTT工具来发送接收MQTT消息.Eclipse Paho MQTT工具是一个基于Java的Eclipse桌面客户端程序,其底层的和MQTT服务器进行的交互的java类库就是Eclipse Paho java库.假设我们在本机(127.0.0.1)已经启动了一个mosquitto MQTT服务器,其端口为1883.如

Java Web学习(二) Eclipse的配置

Java Web学习(二) Eclipse的配置 一.下载Eclipse 1.进入Eclipse官网,进行下载 上图,下载Eclipse IDE for JaveEE Developers 版本,然后根据windows系统32位或64位,进行选择(建议64位).如果你的机器内存过小,可以选择Eclipse的旧版本:Eclipse Indigo .Eclipse Juno . Eclipse Kepler 等版本. 2.解压缩安装 打开压缩包,将里面的Eclipse 拖出到指定位置,进行解压缩.

物联网协议测评平台——吊兰使用指南-MQTT协议 Python Paho

在忙了一个周末之后,还是小有成果的.发布了一个简单的物联网平台: http://mqtt.phodal.com,简单地写一下使用指南. CoAP协议 使用Libcoap的话可以用 <code style="box-sizing: border-box; font-family: Consolas, Menlo, Monaco, 'Courier New', monospace; font-size: 1em; padding: 0px; color: inherit; border-top

CCNA学习笔记二——VTP协议

VTP协议:VLAN Trunk Protocol 从一个控制点,维护整个网络上VLAN的添加,删除和重命名工作 VTP域:相同的域名,通过Trunk相互连接的一组交换机 VTP模式: 服务器模式(Server):默认 客户机模式(Client) 透明模式(Transparent) VTP通告: 客户机通告请求-获取VLAN信息 交换机重新启动 VTP域名变更后 交换机接收到了配置修订号大的汇总通告 服务器的通告响应-发送VLAN信息 汇总通告-用于通知邻接的Catalyst交换机目前的VTP域

MQTT学习笔记——MQTT协议体验 Mosquitto安装和使用

0 前言 MQTT是IBM开发的一个即时通讯协议.MQTT是面向M2M和物联网的连接协议,采用轻量级发布和订阅消息传输机制.Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,提供轻量级的,支持发布/订阅的的消息推送模式,使设备对设备之间的短消息通信简单易用. 若初次接触MQTT协议,可先理解以下概念: [MQTT协议特点]--相比于RESTful架构的物联网系统,MQTT协议借助消息推送功能,可以更好地实现远程控制. [MQTT协议角色]--在RESTful架构的物联网

马哥学习笔记二十五——ISCSI协议,架构及其安装配置

ISCSI监听在tcp/3260端口 iSCSI Target:iscsi-target-utils 客户端认正方式: 1.基于IP 2.基于用户,CHAP tgtadm:命令行工具,模式化命令 --mode 常用模式:target,logicalunit,account target --op new.delete.show.update.bind.unbind logicalunit --op new.delete account --op new.delete.bind.unbind --

org.eclipse.paho.client mqtt客户端连接超时

客户端订阅mqtt 的时候 显示超时.第二次重起服务可能就好了,但是可能会丢消息. 报错如下 org.eclipse.paho.client.mqttv3.MqttException: 等待来自服务器的响应时超时 at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31) ~[org.eclipse.paho.client.mqttv3-1.2.1.