一段同步接收和发送MQ消息的代码

JAVA代码:

Java代码  

  1. package com.sdb.payment.core.mq;
  2. import org.apache.log4j.Logger;
  3. import com.ibm.mq.MQC;
  4. import com.ibm.mq.MQEnvironment;
  5. import com.ibm.mq.MQException;
  6. import com.ibm.mq.MQGetMessageOptions;
  7. import com.ibm.mq.MQMessage;
  8. import com.ibm.mq.MQPutMessageOptions;
  9. import com.ibm.mq.MQQueue;
  10. import com.ibm.mq.MQQueueManager;
  11. public class MessageQueueService {
  12. private static Logger logger = Logger.getLogger(MessageQueueService.class);
  13. private String hostname = "192.168.0.117";
  14. private String channel = "CHL.SVRCONN";
  15. private String queueManager = "QM_SERVER";
  16. private String sendQueue = "OMP.QRMT";
  17. private String recvQueue = "OMP.QLCA";
  18. private int port = 24100;
  19. private int ccsid = 1381;
  20. private int failedCount = 5;
  21. private int intervalTime = 1000;
  22. public MessageQueueService() {
  23. MQEnvironment.hostname = hostname;
  24. MQEnvironment.channel = channel;
  25. MQEnvironment.CCSID = ccsid;
  26. MQEnvironment.port = port;
  27. }
  28. public String send(String sendMsg) throws Exception {
  29. MQQueueManager qManager = new MQQueueManager(queueManager);
  30. // send message
  31. int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
  32. MQQueue sQueue = qManager.accessQueue(sendQueue, openOptions);
  33. MQPutMessageOptions pmo = new MQPutMessageOptions();
  34. MQMessage send = new MQMessage();
  35. send.write(sendMsg.getBytes());
  36. System.out.println("send message : " + sendMsg);
  37. sQueue.put(send, pmo);
  38. sQueue.close();
  39. System.out.println("send message Id");
  40. for (int i = 0; i<send.messageId.length; i++) {
  41. System.out.print(send.messageId[i]);
  42. }
  43. System.out.println();
  44. System.out.println("send message Id");
  45. // fetch message
  46. openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_FAIL_IF_QUIESCING
  47. + MQC.MQOO_INPUT_SHARED;
  48. MQQueue rQueue = qManager.accessQueue(recvQueue, openOptions);
  49. MQGetMessageOptions getOptions = new MQGetMessageOptions();
  50. getOptions.options = MQC.MQGMO_WAIT;
  51. getOptions.waitInterval = intervalTime;
  52. MQMessage recvMsg = new MQMessage();
  53. recvMsg.messageId = send.messageId;//这里是关键,要保持接收的msgid跟发送的msgid值是一样的,
  54. //这样就会根据msgId来取队列的消息了,而不会取到别的消息
  55. send.clearMessage();
  56. boolean received = false;
  57. int fetchCount = 0;
  58. while (!received) {
  59. try {
  60. fetchCount++;
  61. rQueue.get(recvMsg, getOptions);
  62. //logger.debug("the " + fetchCount + " time fetch message!");
  63. System.out.println("fetch message !!!");
  64. received = true;
  65. } catch (MQException me) {
  66. if (me.reasonCode == MQException.MQRC_NO_MSG_AVAILABLE) {
  67. if (fetchCount > failedCount) {
  68. recvMsg.clearMessage();
  69. rQueue.close();
  70. qManager.disconnect();
  71. //logger.error("can‘t fetch message for " + me.getMessage());
  72. return null;
  73. }
  74. }
  75. } catch (Exception ex) {
  76. recvMsg.clearMessage();
  77. rQueue.close();
  78. qManager.disconnect();
  79. //logger.error("can‘t fetch message for " + ex.getMessage());
  80. return null;
  81. }
  82. }
  83. byte[] bMsg = new byte[recvMsg.getMessageLength()];
  84. recvMsg.readFully(bMsg);
  85. System.out.println("rec correlationId Id");
  86. for (int i = 0; i<recvMsg.correlationId.length; i++) {
  87. System.out.print(recvMsg.correlationId[i]);
  88. }
  89. System.out.println();
  90. System.out.println("rec correlationId Id");
  91. String recv = new String(bMsg);
  92. recvMsg.clearMessage();
  93. rQueue.close();
  94. qManager.disconnect();
  95. return recv;
  96. }
  97. public void setChannel(String channel) {
  98. this.channel = channel;
  99. }
  100. public void setHostname(String hostname) {
  101. this.hostname = hostname;
  102. }
  103. public void setQueueManager(String queueManager) {
  104. this.queueManager = queueManager;
  105. }
  106. public void setPort(int port) {
  107. this.port = port;
  108. }
  109. public void setIntervalTime(int intervalTime) {
  110. this.intervalTime = intervalTime;
  111. }
  112. public void setFailedCount(int failedCount) {
  113. this.failedCount = failedCount;
  114. }
  115. public void setRecvQueue(String recvQueue) {
  116. this.recvQueue = recvQueue;
  117. }
  118. public void setSendQueue(String sendQueue) {
  119. this.sendQueue = sendQueue;
  120. }
  121. }
时间: 2024-10-10 09:53:37

一段同步接收和发送MQ消息的代码的相关文章

如何解决群聊(MUC)聊天室重复存储、接收自己发送的消息的问题

CHENYILONG Blog 如何#解决方案#群聊(MUC)聊天室重复存储.接收自己发送的消息 编号 项目 描述 1 问题描述 单聊没问题,群聊会出现自动回复的问题 数据库中存储的数据出现的问题 界面上出现的问题:类似自动回复.回音壁一样一模一样地回答.  2 问题产生的原因 3 群聊基本的原理示意图 聊天内容的显示是经由从数据库进行的读取排序, 4 #解决方案# 拦截阻挡红色区域的执行  5 失败的尝试:尝试但是没有效果的方法 // AppDelegate.m中#pragma 接收消息代理监

springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制

1.引入maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.在application.yml的配置: spring: rabbitmq: host: 106.52.82.241 port: 5672 username: yang

微信发送模板消息代码示例

最近一个微信的项目里需要发送微信模板消息给卖家或者供应商等,微信开发其实也就按照微信的官方接口要求组装起来即可,下面简单介绍一下我的微信模板发送代码. 1.获取access token,至于access token是什么,大家可以自行微信接口文档看一下,这边不多说 获取access token我这边主要是用定时器没大概2分钟获取一次,每天获取的次数是100000次,用法如下: 1 #region 2 3 using System; 4 using System.Timers; 5 6 #endr

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

C#串口通信—向串口发送数据,同步接收返回数据

最近写C#串口通信程序,系统是B/S架构.SerialPort类有一个DataReceived事件,用来接收串口返回的数据,但这种方式在C/S架构下很好用,但B/S就不好处理了.所以写了一个同步模式接收返回数据的方法,不使用DataReceived事件.经过测试,可以正常使用(不支持多线程调用). 一.Machine类 1.Machine类有一个静态变量,定义如下: private static SerialPort serialPort = null; 2.向串口发送数据,同步接收返回数据的方

微信公众平台-接收消息与发送被动消息

接收消息代码如下(包含回复消息调用): /// <summary> /// 接收用户消息 /// iftrue /// 2014-07-08 /// </summary> public class Receive { public delegate Models.Send_Msg delegate_SendMsg(string msgType); public delegate void delegate_RececiveHandler(Models.Receive_Msg mod

Java与微信不得不说的故事——消息的接收与发送

Java与微信的知识也是自学阶段,代码都是参照柳峰老师的.具体可以查看此博:http://blog.csdn.net/lyq8479/article/details/8949088. 下面说一下消息的接收和发送吧. 消息的推送:当普通用户向公众账号发送消息是,微信服务器将POST消息到填写的URL上.消息是一个xml包. 消息的回复:对于每一个POST请求,开发者在响应包中返回特定的xml包,对消息进行响应. 所以,需要有解析xml包和包装xml包的方法.于是,引进了dom4j.jar和xstr

个人微信公众号搭建Python实现 -接收和发送消息-基本说明与实现(14.2.1)

目录 1.原理 2.接收普通消息 3.接收代码普通消息代码实现 @(接收和发送消息-基本说明与实现) 1.原理 2.接收普通消息 其他消息类似参考官方文档 3.接收代码普通消息代码实现 from flask import Flask,request,abort import xmltodict import time app = Flask(__name__) #常量 微信的token令牌 WECHAT_TOKEN = "xxxx" @app.route("/wx"

在服务端处理同步发送小消息的性能上Kafka&gt;RocketMQ&gt;RabbitMQ

在发送小消息的场景中,三个消息中间件的表现区分明显: Kafka的吞吐量高达17.3w/s,远超其他两个产品.这主要取决于它的队列模式保证了写磁盘的过程是线性IO.此时broker磁盘IO已达瓶颈. RocketMQ也表现不俗,吞吐量在11.6w/s,磁盘IO %util已接近100%.RocketMQ的消息写入内存后即返回ack,由单独的线程专门做刷盘的操作,所有的消息均是顺序写文件. RabbitMQ的吞吐量5.95w/s,CPU资源消耗较高.它支持AMQP协议,实现非常重量级,为了保证消息