MQTT的学习研究(八)基于HTTP DELETE MQTT 订阅消息服务端使用

HTTP DELETE 订阅主题请求协议和响应协议
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21240_.htm

请求响应头各个字段的含义的讲解
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21250_.htm

响应错误处理
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21340_.htm

The HTTP DELETE operation gets a message from a WebSphere® MQ queue, or retrieves a publication from a topic. The message is removed from the queue. If the publication is retained, it is not removed. A response message is sent back to the client including information about the message.

Syntax


Request

>>-+-DELETE-+-- --| Path |-- --HTTP version--CRLF--------------->
   ‘-GET----‘                                       

   .-CRLF---------------.  .-CRLF---------------.   
   V                    |  V                    |   
>----+----------------+-+----+----------------+-+--------------->
     ‘-general-header-‘      ‘-request-header-‘     

   .-CRLF----------------------------.   
   V                                 |   
>----+-----------------------------+-+-------------------------><
     ‘-| Entity-header (Request) |-‘     

Path

|--/--contextRoot--/-------------------------------------------->

>--msg/--+-queue/--queueName--+-------------+-+--/--------------|
         |                    ‘[email protected]qMgrName-‘ |      
         ‘-topic/--topicName------------------‘      

entity-header (Request)

|--+----------------------------------------------+-------------|
   +-standard entity-header-- --entity-value------+   
   +-x-msg-correlId - --correlation ID------------+   
   +-x-msg-msgId - --message ID-------------------+   
   +-x-msg-range-- --range------------------------+   
   +-x-msg-require-headers-- --entity header name-+   
   ‘-x-msg-wait - --wait time---------------------‘   

Note:

  1. If a question mark (?) is used it must be substituted with %3f. For example, orange?topic should be specified as orange%3ftopic.
  2. @qMgrName is only valid on an HTTP POST


Response

>>-HTTP version-- --HTTP Status-Code-- --HTTP Reason-Phrase--CRLF-->

   .-CRLF---------------.  .-CRLF----------------.   
   V                    |  V                     |   
>----+----------------+-+----+-----------------+-+-------------->
     ‘-general-header-‘      ‘-response-header-‘     

   .-CRLF-----------------------------.                      
   V                                  |                      
>----+------------------------------+-+--+---------------+-----><
     ‘-| Entity-header (Response) |-‘    ‘-CRLF--Message-‘   

entity-header (Response)

|--+-----------------------------------------+------------------|
   +-standard entity-header-- --entity-value-+   
   +-x-msg-class-- --message type------------+   
   +-x-msg-correlId-- --correlation ID-------+   
   +-x-msg-encoding-- --encoding type--------+   
   +-x-msg-expiry-- --duration---------------+   
   +-x-msg-format-- --message format---------+   
   +-x-msg-msgId-- --message ID--------------+   
   +-x-msg-persistence-- --persistence-------+   
   +-x-msg-priority-- --priority class-------+   
   +-x-msg-replyTo-- --reply-to queue--------+   
   +-x-msg-timestamp-- --HTTP-date-----------+   
   ‘-x-msg-usr-- --user properties-----------‘   

Request parameters

PathSee URI Format.HTTP versionHTTP version; for example, HTTP/1.1general-headerSee HTTP/1.1 - 4.5 General Header Fields.request-headerSee HTTP/1.1 - 5.3 Request Header Fields. The Host field is mandatory on an HTTP/1.1 request. It is often automatically inserted by the tool you use to create a client request.entity-header (Request)See HTTP/1.1 - 7.1 Entity Header Fields. One of the entity headers listed in the Request syntax diagram.

Response parameters

PathSee URI Format.HTTP versionHTTP version; for example, HTTP/1.1general-headerSee HTTP/1.1 - 4.5 General Header Fields.response-headerSee HTTP/1.1 - 6.2 Response Header Fields.entity-header (Response)See HTTP/1.1 - 7.1 Entity Header Fields. One of the entity or response headers listed in the Response syntax diagram.The Content–Length is always present in a response. It is set to zero if there is no message body.MessageMessage body.

Description

If the HTTP DELETE request is successful, the response message contains the data retrieved from the WebSphere MQ queue. The number of bytes in the body of the message is returned in the HTTP Content-Length header. The status code for the HTTP response is set to 200 OK. If x-msg-range is specified as 0, or0-0, then the status code of the HTTP response is 204 No Content.

If the HTTP DELETE request is unsuccessful, the response includes a WebSphere MQ bridge for HTTP error message and an HTTP status code.

HTTP DELETE example

HTTP DELETE gets a message from a queue and deletes the message, or retrieves and deletes a publication. The HTTPDELETE Java sample is an example an HTTP DELETE request reading a message from a queue. Instead of using Java, you could create an HTTP DELETE request using a browser form, or an AJAX toolkit instead.

Figure 1 is an HTTP request to delete the next message on queue called myQueue. In response, the message body is returned to the client. In WebSphere MQ terms, HTTP DELETE is a destructive get.

The request contains the HTTP request header x-msg-wait, which instructs WebSphere MQ bridge for HTTP how long to wait for a message to arrive on the queue. The request also contains the x-msg-require-headersrequest header, which specifies that the client is to receive the message correlation ID in the response.

Figure 1. Example of an HTTP DELETE request

DELETE /msg/queue/myQueue/ HTTP/1.1
Host: www.example.org
x-msg-wait: 10
x-msg-require-headers: correlID

Figure 2, is the response returned to the client. The correlation ID is returned to the client, as requested in x-msg-require-headers of the request.

Figure 2. Example of an HTTP DELETE response

HTTP/1.1 200 OK
Date: Wed, 2 Jan 2007 22:38:34 GMT
Server: Apache-Coyote/1.1 WMQ-HTTP/1.1 JEE-Bridge/1.1
Content-Length: 50
Content-Type: text/plain; charset=utf-8
x-msg-correlId: 1234567890

Here‘s my message body that will appear on the queue.

HTTP DELETE订阅主题信息

Java代码  

  1. package com.etrip.mqttv3.http;
  2. /**
  3. * This sample shows how to delete a message. It has slightly enhanced function
  4. * of the amqsget command in that it will print out the timestamp, expiry and
  5. * persistence of the messages. The program continues until the queue is empty
  6. * or a request fails. This program can potentially take in three parameters:
  7. * <queueName>
  8. * <host:port> <context-root (the MQ Bridge for HTTP‘s context-root)>
  9. * defaults are: SYSTEM.DEFAULT.LOCAL.QUEUE localhost:8080 mq
  10. *
  11. * If there are any exceptions thrown from this program or errors returned from the server then they are
  12. * printed to standard output as-is.
  13. *
  14. * No more messages
  15. * HTTP DELETE Sample end
  16. */
  17. import java.io.BufferedReader;
  18. import java.io.IOException;
  19. import java.io.InputStreamReader;
  20. import java.net.HttpURLConnection;
  21. import java.net.MalformedURLException;
  22. import java.net.URL;
  23. /**
  24. *
  25. * 采用HTTP DELETE方式的订阅相关的MQTT的主题的信息
  26. *
  27. * @author longgangbai
  28. *
  29. *
  30. */
  31. public class HTTPDELETE
  32. {
  33. private static final String DEFAULT_HOST = "localhost";
  34. private static final String DEFAULT_PORT = "8080";
  35. private static final String DEFAULT_QUEUE = "SYSTEM.DEFAULT.LOCAL.QUEUE";
  36. private static final String DEFAULT_CONTEXT_ROOT = "mq";
  37. public static String newline = System.getProperty("line.separator");
  38. private static final String MESSAGE_BOUNDARY = "_________________________________________________________________________________________";
  39. // the maximum length of the message that we want to print to the screen
  40. private static final int MAX_OUTPUT_MESSAGE_SIZE = 256;
  41. private static int OK_RC = 200;
  42. /**
  43. * 构建订阅主题队列路径
  44. *
  45. * @param host
  46. * @param port
  47. * @param context
  48. * @param queueName
  49. */
  50. private static String getPublishQueueURL(String host, String port,
  51. String context, String queueName) {
  52. StringBuffer urlString =new StringBuffer("http://");
  53. if(StringUtils.isEmtry(host)){
  54. host=DEFAULT_HOST;
  55. }
  56. if(StringUtils.isEmtry(port)){
  57. port=DEFAULT_PORT;
  58. }
  59. urlString.append(host).append(":").append(port);
  60. if(StringUtils.isEmtry(context)){
  61. context=DEFAULT_CONTEXT_ROOT;
  62. }
  63. urlString.append("/");
  64. urlString.append(context);
  65. urlString.append("/msg/queue/");
  66. if(StringUtils.isEmtry(queueName)){
  67. queueName=DEFAULT_QUEUE;
  68. }
  69. urlString.append(queueName);
  70. System.out.println("urlString="+urlString);
  71. return urlString.toString();
  72. }
  73. /**
  74. *  通过HTTP POST 订阅主题的具体实现
  75. * @param host
  76. * @param port
  77. * @param context
  78. * @param queueName
  79. * @return
  80. * @throws MalformedURLException
  81. */
  82. public static boolean subTopic(String host,String port,String context,String queueName ){
  83. String publishURL=getPublishQueueURL(host, port, context, queueName);
  84. URL url=null;
  85. HttpURLConnection connection=null;
  86. try {
  87. url = new URL(publishURL);
  88. connection= (HttpURLConnection) url.openConnection();
  89. /* Build the headers */
  90. // the verb first.
  91. connection.setRequestMethod("DELETE");
  92. // write out what headers we want back
  93. // the header names are case-sensitive
  94. connection.setRequestProperty("x-msg-require-headers",
  95. "timestamp, expiry, persistence");
  96. // Now actually send the request message. There is no content as this is a
  97. // DELETE
  98. connection.connect();
  99. String formattedMessage = null;
  100. // check the response for errors
  101. int responseCode = connection.getResponseCode();
  102. if (responseCode == OK_RC)
  103. {
  104. // Get the headers first
  105. String timestamp = connection.getHeaderField("x-msg-timestamp");
  106. String expiry = connection.getHeaderField("x-msg-expiry");
  107. String persistence = connection.getHeaderField("x-msg-persistence");
  108. // now get the message data
  109. BufferedReader reader = new BufferedReader(new InputStreamReader(
  110. connection.getInputStream()));
  111. String line = null;
  112. StringBuffer messageBuffer = new StringBuffer();
  113. while ((line = reader.readLine()) != null)
  114. {
  115. messageBuffer.append(line);
  116. }
  117. String messageBody = messageBuffer.toString();
  118. formattedMessage = MESSAGE_BOUNDARY + newline;
  119. // Which is greater the max output message size or the message length?
  120. int messageSizeToPrint = messageBody.length() > MAX_OUTPUT_MESSAGE_SIZE ? MAX_OUTPUT_MESSAGE_SIZE
  121. : messageBody.length();
  122. formattedMessage += messageBody.substring(0, messageSizeToPrint)
  123. + newline;
  124. formattedMessage += "timestamp   = " + timestamp + newline;
  125. formattedMessage += "expiry      = " + expiry + newline;
  126. formattedMessage += "persistence = " + persistence + newline;
  127. System.out.println("formattedMessage "+formattedMessage);
  128. }else{
  129. String responseMessage =connection.getResponseMessage();
  130. System.out.println("responsere sponseCode "+responseCode+" response request ="+responseMessage);
  131. }
  132. } catch (MalformedURLException e) {
  133. // TODO Auto-generated catch block
  134. e.printStackTrace();
  135. } catch (IOException e) {
  136. // TODO Auto-generated catch block
  137. e.printStackTrace();
  138. }finally{
  139. connection.disconnect();
  140. }
  141. return false;
  142. }
  143. public static void main(String[] args) {
  144. HTTPDELETE.subTopic("192.168.208.46", "8080", "mq", "java_lover");
  145. }
  146. }
时间: 2024-10-07 18:12:33

MQTT的学习研究(八)基于HTTP DELETE MQTT 订阅消息服务端使用的相关文章

.net平台 基于 XMPP协议的即时消息服务端简单实现

.net平台 基于 XMPP协议的即时消息服务端简单实现 昨天抽空学习了一下XMPP,在网上找了好久,中文的资料太少了所以做这个简单的例子,今天才完成.公司也正在准备开发基于XMPP协议的即时通讯工具所以也算是打一个基础吧!如果你还没有了解过XMPP请先阅读附录中链接的文章,本实例是基agsXMPP上开发的,agsXMPP是C#写的支持开源XMPP协议软件,我们可以在agsXMPP上快速构建自已的即时通讯平台,我的这个例子只是修改了服务器端,因为agsXMPP本身自带的服务器端没有实现聊天功能.

MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现 callback方式接收没有成功.所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因. 采用Callback式 发布主题 Java代码   package com.etrip.mqtt.callback; import java

MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现

MQTT moquette 的Server发布主题 Java代码   package com.etrip.mqtt.future; import java.net.URISyntaxException; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fuseso

MQTT的学习研究(十六) MQTT的Mosquitto的window安装部署

在mqtt的官方网站,有许多mqtt,其中:MosquittoAn Open Source MQTT server with C, C++, Python and Javascript clients. A public, hosted test server is also available (more information) MoquetteA Java MQTT broker based on an eventing model with Apache Mina. Mosquitto的

MQTT的学习研究(十五) MQTT 和android整合文章

详细参考:  How to Implement Push Notifications for Android http://tokudu.com/2010/how-to-implement-push-notifications-for-android/ Push notifications for mobile apps http://dalelane.co.uk/blog/?p=938 源代码地址: https://github.com/tokudu/AndroidPushNotificati

MQTT的学习研究(十一) IBM MQTT 简单发布订阅实例

package com.etrip.push; import com.ibm.mqtt.MqttAdvancedCallback; import com.ibm.mqtt.MqttClient; import com.ibm.mqtt.MqttException; import com.ibm.mqtt.MqttSimpleCallback; /** * Android推送方案分析(MQTT/XMPP/GCM) 方案1. 使用GCM服务(Google Cloud Messaging) 简介:Go

MQTT的学习研究(二)moquette-mqtt 的使用之mqtt broker的启动

在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式.具体参看官网,Moquette是基于Apache Mina 的模型的一个Java MQTT broker.使用过Mina的同学发现其实broker的启动过程就是一个Mina应用的启动. 在MQTT moquette 中采用MINA作为底层消息的传递方式  本类的目的启动MQTT moquette Broker 的方式,本文的源代码来自  moquette-broker-0.1-jar-with-de

MQTT的学习研究(三)moquette-mqtt 的使用之mqtt服务发布主题信息

接着上一篇的moquette-mqtt 的使用之broker启动之后,我们需要启动moquette-mqtt 的服务端发布消息. 在moquette-mqtt 的mqtt-client中三种方式实现发布消息的方式: 1.采用阻塞式的连接的(BlockingConnection) 2.采用回调式的连接 (CallbackConnection) 3.采用Future样式的连接(FutureConnection) 本文采用阻塞式作为实验对象. MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅

MQTT的学习研究(四)moquette-mqtt 的使用之mqtt Blocking API客户端订阅并接收主题信息

在上面两篇关于mqtt的broker的启动和mqtt的服务端发布主题信息之后,我们客户端需要订阅相关的信息并接收相关的主题信息. Java代码   package com.etrip.mqtt; import java.net.URISyntaxException; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.