MQTT的学习研究(七)基于HTTP POST MQTT 发布消息服务端使用

参阅官方文档

http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21220_.htm

HTTP POST puts a message to a queue, or a publication to a topic. The HTTPPOST Java sample is an example an HTTP POST request of a message to a queue. Instead of using Java, you could create an HTTPPOST request using a browser form, or an AJAX toolkit instead.

Figure 1 shows an HTTP request to put a message on a queue called myQueue. This request contains the HTTP header x-msg-correlId to set the correlation ID of the WebSphere MQ message.

Figure 1. Example of an HTTP POST request to a queue

POST /msg/queue/myQueue/ HTTP/1.1
Host: www.example.org
Content-Type: text/plain
x-msg-correlID: 1234567890
Content-Length: 50

Here‘s my message body that will appear on the queue.

Figure 2 shows the response sent back to the client. There is no response content.

Figure 2. Example of an HTTP POST response

HTTP/1.1 200 OK
Date: Wed, 2 Jan 2007 22:38:34 GMT
Server: Apache-Coyote/1.1 WMQ-HTTP/1.1 JEE-Bridge/1.1
Content-Length: 0

请求的协议格式和请求的响应格式

The HTTP POST operation puts a message on a WebSphere® MQ queue, or publishes a message to a topic.

Syntax

Request

>>-POST-- --| Path |-- --HTTP version--CRLF--------------------->

   .-CRLF---------------.  .-CRLF---------------.   
   V                    |  V                    |   
>----+----------------+-+----+----------------+-+--------------->
     ‘-general-header-‘      ‘-request-header-‘     

   .-CRLF----------------------------.        .-CRLF----.   
   V                                 |        V         |   
>----+-----------------------------+-+--CRLF----Message-+------><
     ‘-| entity header (Request) |-‘                        

Path

|--/--contextRoot--/-------------------------------------------->

>--msg/--+-queue/--queueName--+-------------+-+--/--------------|
         |                    ‘[email protected]qMgrName-‘ |      
         ‘-topic/--topicName------------------‘      

entity-header (Request)

|--+----------------------------------------------+-------------|
   +-standard entity-header-- --entity-value------+   
   +-x-msg-class-- --message type-----------------+   
   +-x-msg-correlId-- --correlation ID------------+   
   +-x-msg-encoding-- --encoding type-------------+   
   +-x-msg-expiry-- --duration--------------------+   
   +-x-msg-format-- --message format--------------+   
   +-x-msg-msgId-- --message ID-------------------+   
   +-x-msg-persistence-- --persistence------------+   
   +-x-msg-priority-- --priority class------------+   
   +-x-msg-replyTo-- --reply-to queue-------------+   
   +-x-msg-require-headers-- --entity header name-+   
   ‘-x-msg-usr-- --user properties----------------‘   

Note:

  1. If a question mark (?) is used it must be substituted with %3f. For example, orange?topic should be specified as orange%3ftopic.
  2. @qMgrName is only valid on an HTTP POST
Response

>>-HTTP version-- --HTTP Status-Code-- --HTTP Reason-Phrase--CRLF-->

   .-CRLF---------------.  .-CRLF----------------.   
   V                    |  V                     |   
>----+----------------+-+----+-----------------+-+-------------->
     ‘-general-header-‘      ‘-response-header-‘     

   .-CRLF-----------------------------.   
   V                                  |   
>----+------------------------------+-+------------------------><
     ‘-| entity-header (Response) |-‘     

entity-header (Response)

|--+-----------------------------------------+------------------|
   +-standard entity-header-- --entity-value-+   
   +-x-msg-class-- --message type------------+   
   +-x-msg-correlId-- --correlation ID-------+   
   +-x-msg-encoding-- --encoding type--------+   
   +-x-msg-expiry-- --duration---------------+   
   +-x-msg-format-- --message format---------+   
   +-x-msg-msgId-- --message ID--------------+   
   +-x-msg-persistence-- --persistence-------+   
   +-x-msg-priority-- --priority class-------+   
   +-x-msg-replyTo-- --reply-to queue--------+   
   +-x-msg-timestamp-- --HTTP-date-----------+   
   ‘-x-msg-usr-- --user properties-----------‘   

HTTP POST方式实现如下:

Java代码  

  1. package com.etrip.mqttv3.http;
  2. /**
  3. * This sample shows how to post a message. It has the same behaviour as the
  4. * amqsput command in that it will read in lines from the command line and put
  5. * them to the queue. It will put non-persistent String messages on to the queue
  6. * with UNLIMITED expiry and LOW (0) priority. The program is terminated by
  7. * either EOF being put into the entry line (^Z on windows) or a blank line.
  8. * usage: java HTTPPOST <Queue (default=SYSTEM.DEFAULT.LOCAL.QUEUE)> <host:port
  9. * (default localhost:8080> <context-root (the MQ Bridge for HTTP‘s
  10. * context-root)>
  11. */
  12. import java.io.BufferedReader;
  13. import java.io.BufferedWriter;
  14. import java.io.IOException;
  15. import java.io.InputStreamReader;
  16. import java.io.OutputStream;
  17. import java.io.OutputStreamWriter;
  18. import java.net.HttpURLConnection;
  19. import java.net.MalformedURLException;
  20. import java.net.URL;
  21. /**
  22. *
  23. * 采用HTTP POST发布相关的消息
  24. *     The HTTP POST operation puts a message on a WebSphere® MQ queue, or publishes
  25. *  a message to a topic.
  26. *
  27. *  发布消息到主题或者队列的路径:
  28. *
  29. *
  30. *
  31. *
  32. *
  33. * @author longgangbai
  34. */
  35. public class HTTPPOST
  36. {
  37. private static final String DEFAULT_HOST = "localhost";
  38. private static final String DEFAULT_PORT = "8080";
  39. private static final String DEFAULT_QUEUE = "SYSTEM.DEFAULT.LOCAL.QUEUE";
  40. private static final String DEFAULT_CONTEXT_ROOT = "mq";
  41. private static final String CRLF = "\r\n";
  42. public static int MALFORMED_URL_EXCEPTION_RC = -1;
  43. public static int END_IOEXCEPTION_RC = -2;
  44. /**
  45. * 构建发布主题队列路径
  46. *
  47. * @param host
  48. * @param port
  49. * @param context
  50. * @param queueName
  51. */
  52. private static String getPublishQueueURL(String host, String port,
  53. String context, String queueName) {
  54. StringBuffer urlString =new StringBuffer("http://");
  55. if(StringUtils.isEmtry(host)){
  56. host=DEFAULT_HOST;
  57. }
  58. if(StringUtils.isEmtry(port)){
  59. port=DEFAULT_PORT;
  60. }
  61. urlString.append(host).append(":").append(port);
  62. if(StringUtils.isEmtry(context)){
  63. context=DEFAULT_CONTEXT_ROOT;
  64. }
  65. urlString.append("/");
  66. urlString.append(context);
  67. urlString.append("/msg/queue/");
  68. if(StringUtils.isEmtry(queueName)){
  69. }
  70. queueName=DEFAULT_QUEUE;
  71. urlString.append(queueName);
  72. System.out.println("urlString="+urlString);
  73. return urlString.toString();
  74. }
  75. /**
  76. *
  77. * @param host
  78. * @param port
  79. * @param context
  80. * @param queueName
  81. * @param message
  82. * @return
  83. * @throws MalformedURLException
  84. */
  85. public static boolean publishTopic(String host,String port,String context,String queueName,String message ){
  86. boolean response = true;
  87. HttpURLConnection connection=null;
  88. try {
  89. String publishURL=getPublishQueueURL(host, port, context, queueName);
  90. URL url=new URL(publishURL);
  91. connection = (HttpURLConnection) url.openConnection();
  92. /* Build the headers */
  93. // the verb first
  94. connection.setRequestMethod("POST");
  95. // Content type is a string message
  96. connection.setRequestProperty("content-type", "text/plain");
  97. // set the message priority to low
  98. connection.setRequestProperty("x-msg-priority", "LOW");
  99. // Ensure we can get the output stream from the connection
  100. connection.setDoOutput(true);
  101. OutputStream outputStream = connection.getOutputStream();
  102. // wrapper the outputstream in a writer
  103. BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
  104. outputStream));
  105. // Now write the actual content.
  106. // Make sure the CRLF is there in case some HTTP servers don‘t understand
  107. // that it‘s the end of the message
  108. writer.write(message + CRLF + CRLF);
  109. writer.flush();
  110. // now actually send the message
  111. connection.connect();
  112. // check the response for errors
  113. int responseCode = connection.getResponseCode();
  114. if (responseCode != 200)
  115. {
  116. String responseMessage =connection.getResponseMessage();
  117. System.out.println("responsere sponseCode "+responseCode+" response request ="+responseMessage);
  118. System.out.println("responsere context ");
  119. BufferedReader reader = new BufferedReader(new InputStreamReader(
  120. connection.getErrorStream()));
  121. String line = null;
  122. while ((line = reader.readLine()) != null)
  123. {
  124. System.out.println(line);
  125. }
  126. connection.disconnect();
  127. response = false;
  128. }else{
  129. //获取相应的消息头信息
  130. String responseQueueName=connection.getHeaderField("x-msg-replyTo");
  131. System.out.println("responseQueueName="+responseQueueName);
  132. System.out.println("response successful context :"+connection.getResponseMessage());
  133. }
  134. } catch (MalformedURLException e) {
  135. response = false;
  136. e.printStackTrace();
  137. // TODO: handle exception
  138. } catch (IOException e) {
  139. response = false;
  140. // TODO Auto-generated catch block
  141. e.printStackTrace();
  142. }finally{
  143. connection.disconnect();
  144. }
  145. return response;
  146. }
  147. public static void main(String[] args) {
  148. HTTPPOST.publishTopic("192.168.208.46", "8080", "mq", "java_lover", "this is a message ");
  149. }
  150. }

HTTP POST 发布主题请求协议和响应协议
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21220_.htm

请求响应头各个字段的含义的讲解
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21250_.htm

响应错误处理
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21340_.htm

时间: 2024-07-29 21:08:16

MQTT的学习研究(七)基于HTTP POST MQTT 发布消息服务端使用的相关文章

MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现 callback方式接收没有成功.所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因. 采用Callback式 发布主题 Java代码   package com.etrip.mqtt.callback; import java

MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现

MQTT moquette 的Server发布主题 Java代码   package com.etrip.mqtt.future; import java.net.URISyntaxException; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fuseso

MQTT的学习研究(十六) MQTT的Mosquitto的window安装部署

在mqtt的官方网站,有许多mqtt,其中:MosquittoAn Open Source MQTT server with C, C++, Python and Javascript clients. A public, hosted test server is also available (more information) MoquetteA Java MQTT broker based on an eventing model with Apache Mina. Mosquitto的

MQTT的学习研究(十五) MQTT 和android整合文章

详细参考:  How to Implement Push Notifications for Android http://tokudu.com/2010/how-to-implement-push-notifications-for-android/ Push notifications for mobile apps http://dalelane.co.uk/blog/?p=938 源代码地址: https://github.com/tokudu/AndroidPushNotificati

基于java工程开发RMI服务端

ServiceRegist.java import java.rmi.Remote; import java.rmi.RemoteException; public interface ServiceRegist extends Remote{ public user login(String userName, String password) throws RemoteException; } ServiceRegisterImpl.java public class ServiceRegi

如何搭建一个基于 Egg + Webpack + Vue 的服务端渲染工程项目呢?

如何搭建一个基于 Egg + Webpack + Vue 的服务端渲染工程项目呢? 项目你可以通过 easywebpack-cli 直接初始化即可完成或者clone egg-vue-webpack-boilerplate.下面说明一下从零如何搭建一个Egg + Webpack + Vue 的服务端渲染工程项目. 通过 egg-init 初始化 egg 项目 egg-init egg-vue-ssr // choose Simple egg app 安装 easywebpack-vue 和 egg

MQTT的学习研究(十一) IBM MQTT 简单发布订阅实例

package com.etrip.push; import com.ibm.mqtt.MqttAdvancedCallback; import com.ibm.mqtt.MqttClient; import com.ibm.mqtt.MqttException; import com.ibm.mqtt.MqttSimpleCallback; /** * Android推送方案分析(MQTT/XMPP/GCM) 方案1. 使用GCM服务(Google Cloud Messaging) 简介:Go

MQTT的学习研究(二)moquette-mqtt 的使用之mqtt broker的启动

在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式.具体参看官网,Moquette是基于Apache Mina 的模型的一个Java MQTT broker.使用过Mina的同学发现其实broker的启动过程就是一个Mina应用的启动. 在MQTT moquette 中采用MINA作为底层消息的传递方式  本类的目的启动MQTT moquette Broker 的方式,本文的源代码来自  moquette-broker-0.1-jar-with-de

MQTT的学习研究(三)moquette-mqtt 的使用之mqtt服务发布主题信息

接着上一篇的moquette-mqtt 的使用之broker启动之后,我们需要启动moquette-mqtt 的服务端发布消息. 在moquette-mqtt 的mqtt-client中三种方式实现发布消息的方式: 1.采用阻塞式的连接的(BlockingConnection) 2.采用回调式的连接 (CallbackConnection) 3.采用Future样式的连接(FutureConnection) 本文采用阻塞式作为实验对象. MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅

MQTT的学习研究(四)moquette-mqtt 的使用之mqtt Blocking API客户端订阅并接收主题信息

在上面两篇关于mqtt的broker的启动和mqtt的服务端发布主题信息之后,我们客户端需要订阅相关的信息并接收相关的主题信息. Java代码   package com.etrip.mqtt; import java.net.URISyntaxException; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.