JAVA代码:
Java代码
- package com.sdb.payment.core.mq;
- import org.apache.log4j.Logger;
- import com.ibm.mq.MQC;
- import com.ibm.mq.MQEnvironment;
- import com.ibm.mq.MQException;
- import com.ibm.mq.MQGetMessageOptions;
- import com.ibm.mq.MQMessage;
- import com.ibm.mq.MQPutMessageOptions;
- import com.ibm.mq.MQQueue;
- import com.ibm.mq.MQQueueManager;
- public class MessageQueueService {
- private static Logger logger = Logger.getLogger(MessageQueueService.class);
- private String hostname = "192.168.0.117";
- private String channel = "CHL.SVRCONN";
- private String queueManager = "QM_SERVER";
- private String sendQueue = "OMP.QRMT";
- private String recvQueue = "OMP.QLCA";
- private int port = 24100;
- private int ccsid = 1381;
- private int failedCount = 5;
- private int intervalTime = 1000;
- public MessageQueueService() {
- MQEnvironment.hostname = hostname;
- MQEnvironment.channel = channel;
- MQEnvironment.CCSID = ccsid;
- MQEnvironment.port = port;
- }
- public String send(String sendMsg) throws Exception {
- MQQueueManager qManager = new MQQueueManager(queueManager);
- // send message
- int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
- MQQueue sQueue = qManager.accessQueue(sendQueue, openOptions);
- MQPutMessageOptions pmo = new MQPutMessageOptions();
- MQMessage send = new MQMessage();
- send.write(sendMsg.getBytes());
- System.out.println("send message : " + sendMsg);
- sQueue.put(send, pmo);
- sQueue.close();
- System.out.println("send message Id");
- for (int i = 0; i<send.messageId.length; i++) {
- System.out.print(send.messageId[i]);
- }
- System.out.println();
- System.out.println("send message Id");
- // fetch message
- openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_FAIL_IF_QUIESCING
- + MQC.MQOO_INPUT_SHARED;
- MQQueue rQueue = qManager.accessQueue(recvQueue, openOptions);
- MQGetMessageOptions getOptions = new MQGetMessageOptions();
- getOptions.options = MQC.MQGMO_WAIT;
- getOptions.waitInterval = intervalTime;
- MQMessage recvMsg = new MQMessage();
- recvMsg.messageId = send.messageId;//这里是关键,要保持接收的msgid跟发送的msgid值是一样的,
- //这样就会根据msgId来取队列的消息了,而不会取到别的消息
- send.clearMessage();
- boolean received = false;
- int fetchCount = 0;
- while (!received) {
- try {
- fetchCount++;
- rQueue.get(recvMsg, getOptions);
- //logger.debug("the " + fetchCount + " time fetch message!");
- System.out.println("fetch message !!!");
- received = true;
- } catch (MQException me) {
- if (me.reasonCode == MQException.MQRC_NO_MSG_AVAILABLE) {
- if (fetchCount > failedCount) {
- recvMsg.clearMessage();
- rQueue.close();
- qManager.disconnect();
- //logger.error("can‘t fetch message for " + me.getMessage());
- return null;
- }
- }
- } catch (Exception ex) {
- recvMsg.clearMessage();
- rQueue.close();
- qManager.disconnect();
- //logger.error("can‘t fetch message for " + ex.getMessage());
- return null;
- }
- }
- byte[] bMsg = new byte[recvMsg.getMessageLength()];
- recvMsg.readFully(bMsg);
- System.out.println("rec correlationId Id");
- for (int i = 0; i<recvMsg.correlationId.length; i++) {
- System.out.print(recvMsg.correlationId[i]);
- }
- System.out.println();
- System.out.println("rec correlationId Id");
- String recv = new String(bMsg);
- recvMsg.clearMessage();
- rQueue.close();
- qManager.disconnect();
- return recv;
- }
- public void setChannel(String channel) {
- this.channel = channel;
- }
- public void setHostname(String hostname) {
- this.hostname = hostname;
- }
- public void setQueueManager(String queueManager) {
- this.queueManager = queueManager;
- }
- public void setPort(int port) {
- this.port = port;
- }
- public void setIntervalTime(int intervalTime) {
- this.intervalTime = intervalTime;
- }
- public void setFailedCount(int failedCount) {
- this.failedCount = failedCount;
- }
- public void setRecvQueue(String recvQueue) {
- this.recvQueue = recvQueue;
- }
- public void setSendQueue(String sendQueue) {
- this.sendQueue = sendQueue;
- }
- }
时间: 2024-10-10 09:53:37