特别提示:本人博客部分有参考网络其他博客,但均是本人亲手编写过并验证通过。如发现博客有错误,请及时提出以免误导其他人,谢谢!欢迎转载,但记得标明文章出处:http://www.cnblogs.com/mao2080/
说明:前面介绍的tcp、ws方式适合Java程序在局域网内使用,不涉及到安全问题。但由于Android手机APP需要通过websocket方式来连接,就必须考虑安全性问题了,这时候就采用了wss+CA证书方式进行认证,而且在数据传输中也是加密的。大致与ws方式相同,只不过是加了证书。
1、Java代码
1 package com.mao.mqtt; 2 3 import java.io.FileInputStream; 4 import java.security.KeyStore; 5 import java.security.cert.CertificateFactory; 6 import java.security.cert.X509Certificate; 7 import java.text.SimpleDateFormat; 8 import java.util.Date; 9 10 import javax.net.ssl.SSLContext; 11 import javax.net.ssl.SSLSocketFactory; 12 import javax.net.ssl.TrustManagerFactory; 13 14 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 15 import org.eclipse.paho.client.mqttv3.MqttCallback; 16 import org.eclipse.paho.client.mqttv3.MqttClient; 17 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 18 import org.eclipse.paho.client.mqttv3.MqttException; 19 import org.eclipse.paho.client.mqttv3.MqttMessage; 20 import org.eclipse.paho.client.mqttv3.MqttTopic; 21 22 /** 23 * 24 * 功能描述:MQTT测试 25 * 创建人: [email protected] 26 * 创建时间:2017年7月4日 下午5:08:59 27 * 修改人: [email protected] 28 * 修改时间:2017年7月4日 下午5:08:59 29 */ 30 public class MQTTTest_wss { 31 32 /**MQTT服务端ip及端口*/ 33 private static String host = "wss://ip:443"; 34 35 /**账号*/ 36 private static String username = "li2080"; 37 38 /**密码*/ 39 private static String password = "123"; 40 41 /**订阅的主题*/ 42 private static String subTopic = "a/b/c"; 43 44 /**clientID*/ 45 private static String clientId = "li2080"; 46 47 /**发布的主题*/ 48 private static String pubTopic = "a/b/c"; 49 50 /**MQTT-Client*/ 51 private static MqttClient client; 52 53 /**证书路径*/ 54 private static String caPath = "E:\\mqtt-demo\\certfile\\CA.crt"; 55 56 /** 57 * @throws InterruptedException 58 * @throws MqttException */ 59 public static void main(String[] args) throws InterruptedException, MqttException { 60 61 // 订阅消息的方法 62 subscribe(); 63 // 64 publish(); 65 } 66 67 /** 68 * 69 * 描述:订阅信息 70 * @author [email protected] 71 * @created 2017年7月4日 下午4:53:47 72 * @since 73 * @return 74 */ 75 public static void subscribe() { 76 try { 77 // 创建MqttClient 78 MQTTTest_wss.getClient().setCallback(new MqttCallback() { 79 80 public void connectionLost(Throwable arg0) { 81 82 } 83 84 public void messageArrived(String topic, MqttMessage message) throws Exception { 85 System.out.println("MQTT Rece:" + message.toString()); 86 } 87 88 public void deliveryComplete(IMqttDeliveryToken token) { 89 90 } 91 92 }); 93 MQTTTest_wss.getClient().subscribe(subTopic, 0); 94 System.out.println("连接状态:" + client.isConnected()); 95 } catch (Exception e) { 96 e.printStackTrace(); 97 } 98 } 99 100 /** 101 * 102 * 描述:获取MqttClient 103 * @author [email protected] 104 * @created 2017年7月6日 上午9:56:37 105 * @since 106 * @return 107 * @throws MqttException 108 */ 109 public static MqttClient getClient() throws MqttException{ 110 try { 111 if(client == null){ 112 client = new MqttClient(host, clientId); 113 MqttConnectOptions conOptions = new MqttConnectOptions(); 114 conOptions.setUserName(username); 115 conOptions.setPassword(password.toCharArray()); 116 conOptions.setCleanSession(true); 117 conOptions.setSocketFactory(getSSLSocktet(caPath)); 118 client.connect(conOptions); 119 } 120 if(!client.isConnected()){ 121 client.reconnect(); 122 } 123 } catch (Exception e) { 124 e.printStackTrace(); 125 } 126 return client; 127 } 128 129 /** 130 * 131 * 描述:发布信息 132 * @author [email protected] 133 * @throws MqttException 134 * @created 2017年7月4日 下午4:53:32 135 * @since 136 */ 137 public static void publish() throws MqttException { 138 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 139 String sendMsg = "{time:"+sdf.format(new Date())+", content:"+com.lds.iot.common.util.UUIDUtil.getLowerLetterNumber(15)+", from: java console}"; 140 try { 141 MqttTopic topic = MQTTTest_wss.getClient().getTopic(pubTopic); 142 MqttMessage message = new MqttMessage(sendMsg.getBytes()); 143 message.setQos(0); 144 topic.publish(message); 145 System.out.println("MQTT Send:" + sendMsg); 146 } catch (Exception e) { 147 e.printStackTrace(); 148 } 149 } 150 151 /** 152 * 获取SSLSocketFactory 153 * @param caPath 154 * @return 155 * @throws Exception 156 */ 157 public static SSLSocketFactory getSSLSocktet(String caPath) throws Exception { 158 CertificateFactory cAf = CertificateFactory.getInstance("X.509"); 159 FileInputStream caIn = new FileInputStream(caPath); 160 X509Certificate ca = (X509Certificate) cAf.generateCertificate(caIn); 161 KeyStore caks = KeyStore.getInstance(KeyStore.getDefaultType()); 162 caks.load(null, null); 163 caks.setCertificateEntry("ca-certificate", ca); 164 TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); 165 tmf.init(caks); 166 caIn.close(); 167 SSLContext context = SSLContext.getInstance("TLSv1"); 168 context.init(null, tmf.getTrustManagers(), null); 169 return context.getSocketFactory(); 170 } 171 172 }
2、Maven配置
1 <dependency> 2 <groupId>org.eclipse.paho</groupId> 3 <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 4 <version>1.2.0</version> 5 </dependency>
3、运行效果
时间: 2024-11-08 00:59:22