IBM MQ 学习

  1 import java.io.IOException;
  2 import java.util.HashMap;
  3 import java.util.Map;
  4
  5 import com.ibm.mq.MQC;
  6 import com.ibm.mq.MQEnvironment;
  7 import com.ibm.mq.MQException;
  8 import com.ibm.mq.MQGetMessageOptions;
  9 import com.ibm.mq.MQMessage;
 10 import com.ibm.mq.MQPutMessageOptions;
 11 import com.ibm.mq.MQQueue;
 12 import com.ibm.mq.MQQueueManager;
 13
 14 public class CLIENT_MQ{
 15      //定义队列管理器和队列的名称
 16      private static final String qmName = "MQ_SERVICE";//MQ的队列管理器名称 ;
 17      //private static final String qName = "MIDDLE_SEND_QUEUE"; //MQ远程队列的名称
 18      private static MQQueueManager qMgr;//队列管理器
 19      public  static void init(){
 20          //设置环境:
 21          //MQEnvironment中包含控制MQQueueManager对象中的环境的构成的静态变量,MQEnvironment的值的设定会在MQQueueManager的构造函数加载的时候起作用,
 22          //因此必须在建立MQQueueManager对象之前设定MQEnvironment中的值.
 23          MQEnvironment.hostname="10.172.12.156";          //MQ服务器的IP地址
 24          MQEnvironment.channel="SERVICE_JAVA";           //通道类型:服务器连接
 25          MQEnvironment.CCSID=1381;//437                    //服务器MQ服务使用的编码1381代表GBK、1208代表UTF(Coded Character Set Identifier:CCSID)
 26          MQEnvironment.port=1456;                       //MQ端口
 27          try {
 28              //定义并初始化队列管理器对象并连接
 29              //MQQueueManager可以被多线程共享,但是从MQ获取信息的时候是同步的,任何时候只有一个线程可以和MQ通信。
 30             qMgr = new MQQueueManager(qmName);
 31         } catch (MQException e) {
 32             // TODO Auto-generated catch block
 33             System.out.println("初使化MQ出错");
 34             e.printStackTrace();
 35         }
 36      }
 37      /**
 38       * 往MQ发送消息
 39       * @param message
 40       * @return
 41       */
 42      public static Map<String,Object> sendMessage(Object message,String qName){
 43          Map<String,Object> map=new HashMap<String,Object>();
 44          try{
 45              //设置将要连接的队列属性
 46              // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
 47              //(except for completion code constants and error code constants).
 48              //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
 49              //MQOO_OUTPUT:Open the queue to put messages.
 50              /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/
 51              //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
 52              /*以下选项可适合远程队列与本地队列*/
 53              //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用
 54              //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
 55              int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
 56              //连接队列
 57              //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.
 58              //The inquire and set capabilities are inherited from MQManagedObject.
 59              /*关闭了就重新打开*/
 60             if(qMgr==null || !qMgr.isConnected()){
 61                 qMgr = new MQQueueManager(qmName);
 62             }
 63              MQQueue queue = qMgr.accessQueue(qName, openOptions);
 64              //定义一个简单的消息
 65              MQMessage putMessage = new MQMessage();
 66              map.put("messageId",putMessage);
 67              //String uuid=java.util.UUID.randomUUID().toString();
 68              //将数据放入消息缓冲区
 69              putMessage.writeObject(message);
 70              //设置写入消息的属性(默认属性)
 71              MQPutMessageOptions pmo = new MQPutMessageOptions();
 72
 73              //将消息写入队列
 74              queue.put(putMessage,pmo);
 75              map.put("message",message.toString());
 76              queue.close();
 77          }catch (MQException ex) {
 78              System.out.println("A WebSphere MQ error occurred : Completion code "   + ex.completionCode + " Reason code " + ex.reasonCode);
 79              ex.printStackTrace();
 80          }catch (IOException ex) {
 81              System.out.println("An error occurred whilst writing to the message buffer: " + ex);
 82          }catch(Exception ex){
 83              ex.printStackTrace();
 84          }finally{
 85              try {
 86                 qMgr.disconnect();
 87             } catch (MQException e) {
 88                 e.printStackTrace();
 89             }
 90           }
 91          return map;
 92      }
 93
 94
 95
 96
 97
 98      /**
 99       * 处理完消息回放到MQ队列
100       * @param message
101       * @return
102       */
103      public static Map<String,Object> sendReplyMessage(Object message,String qName,MQMessage mqMessage){
104          Map<String,Object> map=new HashMap<String,Object>();
105          try{
106              //设置将要连接的队列属性
107              // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
108              //(except for completion code constants and error code constants).
109              //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
110              //MQOO_OUTPUT:Open the queue to put messages.
111              /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/
112              //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
113              /*以下选项可适合远程队列与本地队列*/
114              //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用
115              //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
116              int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
117              //连接队列
118              //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.
119              //The inquire and set capabilities are inherited from MQManagedObject.
120              /*关闭了就重新打开*/
121             if(qMgr==null || !qMgr.isConnected()){
122                 qMgr = new MQQueueManager(qmName);
123             }
124              MQQueue queue = qMgr.accessQueue(qName, openOptions);
125              //定义一个简单的消息
126              MQMessage putMessage = new MQMessage();
127              putMessage.messageId=mqMessage.messageId;
128              map.put("messageId",putMessage);
129              //String uuid=java.util.UUID.randomUUID().toString();
130              //将数据放入消息缓冲区
131              putMessage.writeObject(message);
132              //设置写入消息的属性(默认属性)
133              MQPutMessageOptions pmo = new MQPutMessageOptions();
134
135              //将消息写入队列
136              queue.put(putMessage,pmo);
137              map.put("message",message.toString());
138              queue.close();
139          }catch (MQException ex) {
140              System.out.println("A WebSphere MQ error occurred : Completion code "   + ex.completionCode + " Reason code " + ex.reasonCode);
141              ex.printStackTrace();
142          }catch (IOException ex) {
143              System.out.println("An error occurred whilst writing to the message buffer: " + ex);
144          }catch(Exception ex){
145              ex.printStackTrace();
146          }finally{
147              try {
148                 qMgr.disconnect();
149             } catch (MQException e) {
150                 e.printStackTrace();
151             }
152           }
153          return map;
154      }
155
156
157
158
159      /**
160       * 从队列中去获取消息,如果队列中没有消息,就会发生异常,不过没有关系,有TRY...CATCH,如果是第三方程序调用方法,如果无返回则说明无消息
161       * 第三方可以将该方法放于一个无限循环的while(true){...}之中,不需要设置等待,因为在该方法内部在没有消息的时候会自动等待。
162       * @return
163       */
164      public static String getMessage(String qName,MQMessage mqMessage){
165          String message="";
166          try{
167              //设置将要连接的队列属性
168              // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
169              //(except for completion code constants and error code constants).
170              //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
171              //MQOO_OUTPUT:Open the queue to put messages.
172              //int qOptioin = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; 发送时使用
173              //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
174
175              int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
176              MQMessage retrieve = new MQMessage();
177              //设置取出消息的属性(默认属性)
178              //Set the put message options.(设置放置消息选项)
179              MQGetMessageOptions gmo = new MQGetMessageOptions();
180
181              gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步点控制下获取消息)
182              gmo.options = gmo.options + MQC.MQGMO_WAIT;  // Wait if no messages on the Queue(如果在队列上没有消息则等待)
183              gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败)
184              gmo.waitInterval = 3000 ;  // Sets the time limit for the wait.(设置等待的毫秒时间限制)
185              /*关闭了就重新打开*/
186             if(qMgr==null || !qMgr.isConnected()){
187                 qMgr = new MQQueueManager(qmName);
188             }
189              MQQueue queue = qMgr.accessQueue(qName, openOptions);
190
191              MQMessage retrievedMessage = new MQMessage();
192              //从队列中取出对应messageId的消息
193              retrieve.messageId = mqMessage.messageId;
194              // 从队列中取出消息
195              queue.get(retrieve, gmo);
196
197
198              Object obj = retrieve.readObject();
199              message=obj.toString();//解决中文乱码问题
200              /*
201
202              //int size = rcvMessage.getMessageLength();
203              //byte[] p = new byte[size];
204              //rcvMessage.readFully(p);
205
206              int len=retrieve.getDataLength();
207              byte[] str = new byte[len];
208               retrieve.readFully(str,0,len);
209               message = new String(str);//readUTF();
210              */
211
212              queue.close();
213          }catch (MQException ex) {
214              int reason=ex.reasonCode;
215              if(reason==2033)//no messages
216              {
217                 message="nomessage";
218              }else{
219                 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode);
220              }
221          }catch (IOException ex) {
222              System.out.println("An error occurred whilst writing to the message buffer: " + ex);
223          }catch(Exception ex){
224              ex.printStackTrace();
225          }finally{
226             try {
227                 qMgr.disconnect();
228             } catch (MQException e) {
229                 e.printStackTrace();
230             }
231          }
232          return message;
233      }
234
235
236
237      public static void main(String args[]) {
238          init();
239          Map<String,Object> map = new HashMap<String,Object>();
240          map=sendMessage("{name: test get message id 123}","SERVICE_TRANSFER_QUEUE");
241          MQMessage mqMessage = (MQMessage)map.get("messageId");
242          outSys("传输消息:",mqMessage.messageId.toString());
243
244          outSys("接收传输队列:",getMessage("SERVICE_TRANSFER_QUEUE",mqMessage));
245          Map<String,Object> reply_map = new HashMap<String,Object>();
246          reply_map=sendReplyMessage("{name: local queue 008}","SERVICE_RECEIVE_QUEUE",mqMessage);
247          outSys("放入正常队列:",reply_map.get("message").toString());
248
249          outSys("接收正常队列:",getMessage("SERVICE_RECEIVE_QUEUE",mqMessage));
250
251
252      }
253
254
255
256      public static void outSys(String display,String val){
257          System.out.println(display+val);
258      }
259
260 }   
时间: 2024-10-06 03:05:17

IBM MQ 学习的相关文章

Java连接IBM MQ

package com.hometest.IBMMqTest; import java.io.IOException;import java.io.UnsupportedEncodingException; import com.ibm.mq.MQEnvironment;import com.ibm.mq.MQException;import com.ibm.mq.MQGetMessageOptions;import com.ibm.mq.MQMessage;import com.ibm.mq.

IBM MQ学习过程问题汇总

IBM MQ使用过程问题汇总----------------------------------------------1. 客户端发送消息时出现2035问题的解决过程####环境:win7系统administrator用户,WebSphereMQ8.0####测试:执行命令"amqsputc.exe Q1"a---按照教程添加"服务器连接"通道时为MCA指定用户名.无效:b---执行runmqsc命令,输入alter qmgr chlauth(disabled)禁

IBM MQ介绍

转自:http://hi.baidu.com/lubezhang/blog/item/bd308b3b7ecce3ec14cecb4f.html IBM MQ(IBM Message Queue)是IBM的一款商业消息中间产品,适用于分布式计算环境或异构系统之中.消息队列技术是分布式应用间交换信息的一种技术.消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走.通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置.或在继续执行前不需要等待接收程序接收此消息. MQ基本概念

IBM MQ消息中间件jms消息中RHF2消息头的处理

公司的技术平台在和某券商对接IBM MQ消息中间件时,发送到MQ中的消息多出了消息头信息:RHF2,造成消息的接收处理不正常.在此记录此问题的处理方式. 在IBM MQ中提供了一个参数 targetClient,可以通过此参数来控制jms消息中是否启用该消息头.当采用jms 作为IBM MQ的client时,在消息目标(队列名或主题名)后添加该参数即可,格式为: topic:///{目标名}?targetClient=1 targetClient=1时:消息中不会包括RHF2消息头:target

连接IBM MQ原因码报2035的错误解决办法

我们的系统使用了ibm mq,用户用来向国家局上报文件和接收文件,前几天用户说上报一直不成功.由于 开发这块程序的人已经辞职了,我觉定在我的机器部署一套,研究一下.我的思路: 在我的机器上安装mq,建立两个队列,一个用来接收,一个用来发送;在另外一台机器上,发布应用程序,通 过java client向我的机器的队列发送消息和接收消息. mq安装成功,队列管理器.队列.通道也都建起来了,用mq自带的api检测程序检测也成功了. 接下来,在另外一台机器上通过程序发送消息,结果就是不行,总是报原因码是

使用Loadrunner对IBM MQ进行性能测试

一.概述 使用Loadrunner对IBM MQ进行性能测试,需要用到java vuser以及java编码知识.此次先介绍什么是IBM MQ,然后java vuser的使用与配置细节,最后介绍IBM MQ的测试脚本. 二.IBM MQ介绍 IBM MQ(IBM Message Queue)是IBM的一款商业消息中间产品,适用于分布式计算环境或异构系统之中.消息队列技术是分布式应用间交换信息的一种技术.消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走. 通过消息队列应用程序可独立地

IBM MQ 集成CXF 发送JMS 消息

1.修改wsdl 协议类型为 jms 替换 <soap:binding style="document" transport="http://schemas.xmlsoap.org/soap/http"/> 为 <soap:binding style="document" transport="http://cxf.apache.org/transports/jms"/> 2.根据wsdl生成服务端代码

spring监听与IBM MQ JMS整合

spring xml 的配置: 文件名:applicationContext-biz-mq.xml [html] view plain copy print? <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001

IBM Mq Spring JMS 的xml配置

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/