MQ java 基础编程(一)

本文转自:http://www.blogjava.net/i369/articles/88035.html

编写人:邬文俊

编写时间 : 2006-2-16

联系邮件 : [email protected]

前言

通过 2 个多星期对 MQ 学习,在 partner 丁 & partner 武 的帮助下完成了该文档。该文档提供一个简单的例子,通过对该例子的讲解,你将知道:

1.         用 java 写客户端从 MQ Server 收发消息。

2.         MQ 作为 Websphere Application Server 的 JMS 资源提供者。

3.         JMS message 映射转化为  MQ message

文档中的知识全部从参考资料和 IBM 提供的文档中获得。 I recommend u to read the documents if u want to know more about the MQ.

参考资料

1.         《 Using java 》( some place name it 《 base java 》) -----the very important document offered by IBM, every java programmer should read it!

2.         《让 WebSphere MQ 成为部署在 WebSphere Application Server 中的应用程序的 JMS 提供程序》

3.         Websphere MQ 入门教程 (a document written by IBM engineer)

4.         mqseries_class_for_java

5.         《 IBM - JMS 应用和使用 WebSphere MQ MQI 接口的应用如何进行信息头的交换(二)数据映射》 ------- 《 using java 》 mapping message 部分的翻译。

6.         MQ--IBM MQSeries 使用指南

7.         WebSphere Application Server V5 and WebSphere MQ Family Integration. PDF

8.         WebSphere MQ Application Programming Guide. PDF

9.         IBM MQSeries 的触发机制

10.     让 WebSphere MQ 成为部署在 WebSphere Application Server 中的应用程序的 JMS 提供程序

例子说明

例子包括 3 个部分:发送客户端、 MDB 、接收客户端。

客户端 A 把文件发送到服务器,服务器将该文件转发给客户端 B 。客户端A通过向 MQ 客户机直接发送消息。 MQ 队列会把客户端A发送的 MQ Message 转换为 JMS Message 。 MDB 接收到转换后的 JMS 消息后,保存文件在服务器,然后把文件转发到发送队列( JMS 队列)。发送队列由 MQ Server 提供,所以即为发送到了 MQ 队列。 MQ 队列把 JMS Message 转换为 MQ Message 。客户端 B 从 MQ 队列中接收转换后的消息,从消息中读取文件,保存在硬盘。

MQMESSAGE             JMS MESSAGE

Client A------------------------->mq queue  ------------------->MDB

Client B<------------------------ mq queue  <-------------------MDB

1.       配置 MQ Server

这里使用的是 MQ Server 5.2 。 MQ Server 和 WebSphere Application Server 安装在同一台机器上( 2 者可以使用绑定方式通信)。

要配置的项目:

1.         队列管理器 QMGR

2.         侦听端口 4001

3.         本地队列 EXAMPLE.QUEUE

4.         本地队列 EXAMPLE.SENDQUEUE

5.         通道 EXAMPLE.CHANNEL

打开 WebSphere MQ 资源管理器。展开队列管理器节点,右键,新建队列管理器。取名字为 QMGR ,设置侦听端口 4001 。

在建好的队列管理器 QMGR 下面新建 2 个本地队列: EXAMPLE.QUEUE , EXAMPLE.SENDQUEUE 。

展开高级节点,新建服务器连接通道 EXAMPLE.CHANNEL 。

Note :不要搞错队列和通道的类型。

2.       验证 MQ 配置

打开 WebSphere MQ 服务。可以查看服务是否启动、服务监听端口。

3.       配置 WAS JMS

具体操作参考《让 WebSphere MQ 成为部署在 WebSphere Application Server 中的应用程序的 JMS 提供程序》该文章可以在 IBM WebSphere 开发者技术期刊 中找到。

要配置的项目:

1.         WebSphere MQ 连接工厂

JNDI name : jms/JMSExampleConnectionFactory

2.         WebSphere MQ 队列目标

JNDI name : jms/JMSExampleQueue ;基本队列名: EXAMPLE.QUEUE ;目标客户机: JMS 。目标客户机决定了 MQ 队列接收方的消息格式。因为是用 MDB 接收消息,所以设置为 JMS 。另一个队列是由 MQ 客户端接收消息,所以另一个队列的目标客户机是 MQ 。如果配置错误, MQ 队列转换消息的格式将不是你所想要的。具体参考《 IBM - JMS 应用和使用 WebSphere MQ MQI 接口的应用如何进行信息头的交换(二)数据映射》

3.         WebSphere MQ 队列目标

JNDI name : jms/JMSExampleSendQueue ;

基本队列名: EXAMPLE.SENDQUEUE ;目标客户机: MQ 。

4.       配置 MDB

在 WAS 上配置 侦听器端口

名称: JMSExampleQueuePort ;

连接工厂 JNDI 名      jms/JMSExampleConnectionFactory ;

目标 JNDI 名: jms/JMSExampleQueue 。

Message Driven Beans 用于侦听消息的侦听器端口。每个端口指定 MDB 将侦听的(依据该端口部署的) JMS 连接工厂和 JMS 目标。

MDB 部署描述符中配置

连接工厂 JNDI 名      jms/JMSExampleConnectionFactory ;

目标 JNDI 名: jms/JMSExampleQueue ;

监听端口名称: JMSExampleQueuePort (监听端口名称也可以在管理控制台中修改)

5.       代码

客户端 A (发送方)

MqPut.java


package cn.edu.itec.mqclient;

import java.io.File;

import com.ibm.mq.MQC;

import com.ibm.mq.MQEnvironment;

import com.ibm.mq.MQException;

import com.ibm.mq.MQMessage;

import com.ibm.mq.MQPutMessageOptions;

import com.ibm.mq.MQQueueManager;

public class MQPut {

private String HOST_URL = "192.168.1.116";

private String MQ_CHANNEL = "EXAMPLE.CHANNEL";

private String MQ_MANAGER = "QMGR";

private String MQ_QUEUE = "EXAMPLE.QUEUE";

private int MQ_PORT = 4001;

public static void main(String args[]) {

new MQPut().SendFile("f:/JMSExampleEJB.jar");

}

public void SendFile(String sFilePath) {

try {

/* 设置 MQEnvironment 属性以便客户机连接 */

MQEnvironment.hostname = HOST_URL;

MQEnvironment.channel = MQ_CHANNEL;

MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,

MQC.TRANSPORT_MQSERIES);

MQEnvironment.CCSID = 1381;

MQEnvironment.port = MQ_PORT;

/* 连接到队列管理器 */

MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER);

System.out.println("queue manager is connected!");

/* 设置打开选项以便打开用于输出的队列,如果队列管理器正在停止,我们也已设置了选项去应对不成功情况。 */

int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;

/* 打开队列 */

com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions);

/* 设置放置消息选项我们将使用默认设置 */

MQPutMessageOptions pmo = new MQPutMessageOptions();

/* 创建消息, MQMessage 类包含实际消息数据的数据缓冲区,和描述消息的所有 MQMD 参数 */

/* 创建消息缓冲区 */

MQMessage outMsg = new MQMessage();

/* set the properties of the message fot the selector */

outMsg.correlationId = "clinet_B_receive".getBytes();

outMsg.messageId = "1Aa".getBytes();

/* write msg */

MsgWriter.readFile(outMsg, new File(sFilePath));

/* put message with default options */

queue.put(outMsg, new MQPutMessageOptions());

System.out.println("send file is success!");

/* release resource */

queue.close();

qMgr.disconnect();

} catch (MQException ex) {

//System.out.println("fft!");

System.out.println("An MQ Error Occurred: Completion Code is :\t"

+ ex.completionCode + "\n\n The Reason Code is :\t"

+ ex.reasonCode);

ex.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}

private void readFileToMessage(String FilePath) {

}

}

JMS message 和 MQ message 有几个字段是相同的,这些字段的值将会在转换中保留。比较方便的是使用 CorrelationID 这个字段。通过设置这个字段,达到选择性的接收特定消息的功能。其它字段没有完全搞清楚,有的数据类型需要转换,例如 MessageID (对应于 JMSMessageID )。 MQ 消息选择和 JMS 不同,后者采用 selector ,前者通过设置接收消息的属性完成。例如设置 CorrelationID 为特定值。

客户端 B

MQGet.java


package cn.edu.itec.mqclient;

import java.io.FileOutputStream;

import java.io.IOException;

import java.util.Hashtable;

import com.ibm.mq.MQC;

import com.ibm.mq.MQEnvironment;

import com.ibm.mq.MQException;

import com.ibm.mq.MQGetMessageOptions;

import com.ibm.mq.MQMessage;

import com.ibm.mq.MQQueueManager;

/**

* @author Administrator

*

* TODO To change the template for this generated type comment go to Window -

* Preferences - Java - Code Style - Code Templates

*/

public class MQGet {

private static String HOST_URL = "192.168.1.116";

private static String MQ_CHANNEL = "EXAMPLE.CHANNEL";

private static String MQ_MANAGER = "QMGR";

private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE";

private static int MQ_PORT = 4001;

public static void main(String[] args) {

MQGet.getMessage();

}

public static void getMessage() {

try {

/* 设置 MQEnvironment 属性以便客户机连接 */

MQEnvironment.hostname = HOST_URL;

MQEnvironment.channel = MQ_CHANNEL;

MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,

MQC.TRANSPORT_MQSERIES);

MQEnvironment.CCSID = 1381;

MQEnvironment.port = MQ_PORT;

/* 连接到队列管理器 */

MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER);

System.out.println("queue manager is connected!");

/*

* 设置打开选项以便打开用于输出的队列,如果队列管理器停止,我们也 已设置了选项去应对不成功情况

*/

int openOptions = MQC.MQOO_INPUT_SHARED

| MQC.MQOO_FAIL_IF_QUIESCING;

/* 打开队列 */

com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions);

System.out.println(" 队列连接成功 ");

/* 设置放置消息选项 */

MQGetMessageOptions gmo = new MQGetMessageOptions();

/* 在同步点控制下获取消息 */

gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;

/* 如果在队列上没有消息则等待 */

gmo.options = gmo.options + MQC.MQGMO_WAIT;

/* 如果队列管理器停顿则失败 */

gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;

/* 设置等待的时间限制 */

gmo.waitInterval = MQC.MQWI_UNLIMITED;

/* create the message buffer store */

MQMessage inMessage = new MQMessage();

/* set the selector */

inMessage.correlationId = "clinet_B_receive".getBytes();

/* get the message */

queue.get(inMessage, gmo);

System.out.println("get message success");

/* 读出消息对象 */

Hashtable messageObject = (Hashtable) inMessage.readObject();

System.out.println(messageObject);

/* 读出消息内容 */

byte[] content = (byte[]) messageObject.get("content");

/* save file */

FileOutputStream output = new FileOutputStream(

"f:/exampleReceive.jar");

output.write(content);

output.close();

System.out.println(messageObject.get("FileName"));

/* 提交事务 , 相当于确认消息已经接收,服务器会删除该消息 */

qMgr.commit();

} catch (MQException e) {

e.printStackTrace();

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (ClassNotFoundException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

MDB

MQMDBBeanBean.java MDB 文件


package ejbs;

import javax.jms.ObjectMessage;

import javax.jms.BytesMessage;

import javax.jms.StreamMessage;

import javax.jms.TextMessage;

import javax.jms.JMSException;

import ehub.ihub.exchangeManager.*;

import java.util.Hashtable;

import java.io.ByteArrayInputStream;

import java.io.FileOutputStream;

import java.io.File;

import java.io.ObjectInputStream;

/**

* Bean implementation class for Enterprise Bean: MQMDBBean

*/

public class MQMDBBeanBean implements javax.ejb.MessageDrivenBean,

javax.jms.MessageListener {

private javax.ejb.MessageDrivenContext fMessageDrivenCtx;

/**

* getMessageDrivenContext

*/

public javax.ejb.MessageDrivenContext getMessageDrivenContext() {

return fMessageDrivenCtx;

}

/**

* setMessageDrivenContext

*/

public void setMessageDrivenContext(javax.ejb.MessageDrivenContext ctx) {

fMessageDrivenCtx = ctx;

}

/**

* ejbCreate

*/

public void ejbCreate() {

}

/**

* onMessage

*/

public void onMessage(javax.jms.Message msg) {

try {

System.out.println(msg.toString());

if (msg instanceof TextMessage) {

System.out.println("TextMessage");

} else if (msg instanceof ObjectMessage) {

System.out.println("ObjectMessage");

} else if (msg instanceof StreamMessage) {

System.out.println("StreamMessage");

} else if (msg instanceof BytesMessage) {

System.out.println("BytesMessage");

BytesMessage bytesMessage = (BytesMessage) msg;

String sCorrelationID = new String(bytesMessage

.getJMSCorrelationIDAsBytes());

String sMessageID = bytesMessage.getJMSMessageID();

long size = bytesMessage.getBodyLength();

System.out.println("size=" + size + "/n CorrelationID="

+ sCorrelationID + "/n MessageID=" + sMessageID);

/*read the message and save the file*/

ReadMessage.readMessage(bytesMessage);

System.out.println("read message success");

/*send the message to the client */

SendMessage.sendFileToReceiveQueue(new File("c:/receivedExample.jar"));

System.out.println("send file success");

} else {

System.out.println("no message");

}

} catch (Exception e) {

System.out.println("onmessage 执行错误,回滚! ");

e.printStackTrace(System.err);

fMessageDrivenCtx.setRollbackOnly();

}

}

private void getProperties(byte[] p) {

}

/**

* ejbRemove

*/

public void ejbRemove() {

}

}

ReadMessage.java


/*

* Created on 2006-2-15

*

* TODO To change the template for this generated file go to

* Window - Preferences - Java - Code Style - Code Templates

*/

package ehub.ihub.exchangeManager;

import java.io.ByteArrayInputStream;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.ObjectInputStream;

import java.util.Hashtable;

import javax.jms.BytesMessage;

import javax.jms.JMSException;

/**

* @author Administrator

*

*

*/

public class ReadMessage {

/**

* read message including property and body

*

* @param Message

* @throws JMSException

* @throws IOException

* @throws ClassNotFoundException

*/

public static void readMessage(BytesMessage Message) {

try {

long bodySize = Message.getBodyLength();

byte[] buf = new byte[Integer.parseInt(String.valueOf(bodySize))];

/* 消息内容读到字节数组中 */

Message.readBytes(buf);

ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(

buf);

/* 从字节流读出消息对象 */

ObjectInputStream objectInputStream = new ObjectInputStream(

byteArrayInputStream);

Hashtable messageObject = (Hashtable) objectInputStream

.readObject();

/* 解析消息 */

byte[] contentBuf = (byte[]) messageObject.get("content");

/* 把文件保存 */

FileOutputStream fileWriter = new FileOutputStream(

"c:/receivedExample.jar");

fileWriter.write(contentBuf);

fileWriter.close();

} catch (JMSException e) {

e.printStackTrace();

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (ClassNotFoundException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

SendMessage.java


/*

* Created on 2006-2-16

*

* TODO To change the template for this generated file go to

* Window - Preferences - Java - Code Style - Code Templates

*/

package ehub.ihub.exchangeManager;

import java.io.File;

import java.io.FileInputStream;

import java.io.IOException;

import java.util.Hashtable;

import javax.jms.BytesMessage;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.ObjectMessage;

import javax.jms.Queue;

import javax.jms.Session;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

/**

* @author Administrator

*

* TODO To change the template for this generated type comment go to Window -

* Preferences - Java - Code Style - Code Templates

*/

public class SendMessage {

private static String MQ_CHANNEL = "EXAMPLE.CHANNEL";

private static String MQ_MANAGER = "QMGR";

private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE";

private static int MQ_PORT = 4001;

private static String JMS_CONNECTIONFACTORY = "jms/JMSExampleConnectionFactory";

private static String QUEUE_NAME="jms/JMSExampleSendQueue";

public static void sendFileToReceiveQueue(File file) {

try {

Context initContext = new InitialContext();

ConnectionFactory qconFactory = (ConnectionFactory) initContext

.lookup(JMS_CONNECTIONFACTORY);

Connection qcon = qconFactory.createConnection();

Session session = qcon.createSession(false,

Session.AUTO_ACKNOWLEDGE);

Queue queue = (Queue) initContext.lookup(QUEUE_NAME);

MessageProducer producer = session.createProducer(queue);

ObjectMessage outMessage=session.createObjectMessage();

/* write the file information into the message */

Hashtable fileInfo = new Hashtable();

fileInfo.put("FileName", file.getName());

fileInfo.put("FileSize", Long.toString(file.length()));

/* write the file content into the message */

FileInputStream fos = new FileInputStream(file);

byte[] buf;

int size = (int) file.length();

buf = new byte[size];

int num = fos.read(buf);

fos.close();

/*add the file byte stream to the object*/

fileInfo.put("content", buf);

outMessage.setObject(fileInfo);

outMessage.getObject();

outMessage.setJMSCorrelationIDAsBytes((new String("clinet_B_receive")).getBytes());

//                   qcon.start();

producer.send(outMessage);

producer.close();

session.close();

qcon.close();

} catch (NamingException e) {

System.out.println(" 获得连接失败 ,jndi 查找失败 ");

e.printStackTrace();

} catch (JMSException e) {

System.out.println(" 发送文件异常 ");

// TODO Auto-generated catch block

e.printStackTrace();

} catch (IOException e) {

// TODO Auto-generated catch block

System.out.println(" 发送文件过程中 io 操作失败 ");

e.printStackTrace();

}

}

}

Trackback: http://tb.blog.csdn.net/TrackBack.aspx?PostId=600470

MQ java 基础编程(一)

时间: 2024-08-13 15:43:50

MQ java 基础编程(一)的相关文章

6、50道JAVA基础编程练习题跟答案

1 50道JAVA基础编程练习题 2 [程序1] 3 题目:古典问题:有一对兔子,从出生后第3个月起每个月都生一对兔子,小兔子长到第三个月后每个月又生一对兔子,假如兔子都不死,问每个月的兔子总数为多少? 4 程序分析: 兔子的规律为数列1,1,2,3,5,8,13,21.... 5 public class Prog1{ 6 public static void main(String[] args){ 7 int n = 10; 8 System.out.println("第"+n+

JAVA基础编程练习题

50道JAVA基础编程练习题 [程序1] 题目:古典问题:有一对兔子,从出生后第3个月起每个月都生一对兔子,小兔子长到第三个月后每个月又生一对兔子,假如兔子都不死,问每个月的兔子对数为多少? 程序分析: 兔子的规律为数列1,1,2,3,5,8,13,21.... public class Prog1{ public static void main(String[] args){ int n = 10; System.out.println("第"+n+"个月兔子总数为&qu

50道JAVA基础编程练习题

50道JAVA基础编程练习题[程序1]题目:古典问题:有一对兔子,从出生后第3个月起每个月都生一对兔子,小兔子长到第三个月后每个月又生一对兔子,假如兔子都不死,问每个月的兔子总数为多少?程序分析: 兔子的规律为数列1,1,2,3,5,8,13,21....public class Prog1{public static void main(String[] args){ int n = 10; System.out.println("第"+n+"个月兔子总数为"+f

java基础编程题

java基础编程题 1.打印出如下图案 1 public class Prog19{ 2 public static void main(String[] args){ 3 int n = 5; 4 printStar(n); 5 } 6 7 //打印星星 8 private static void printStar(int n){ 9 //打印上半部分 10 for(int i=0;i<n;i++){ 11 for(int j=0;j<2*n;j++){ 12 if(j<n-i) 1

JAVA基础编程50题(7-9题)详解

一.描述 1.输入一行字符,分别统计出其中英文字母.空格.数字和其它字符的总个数和每个字符出现的频率. 程序分析:使用String类的matchs()分别统计符合正则表达式的每类字符的总个数,然后分别使用List和Map集合类统计每个字符出现的频率. 2.求s=a+aa+aaa+aaaa+aa...a的值,其中a是一个数字.例如2+22+222+2222+22222(此时共有5个数相加),几个数相加由键盘控制. 3.题目:一个数如果恰好等于它的因子之和,这个数就称为"完数",即除了本身

Java 基础编程练习题

1.编写程序实现对给定的 4 个整数从大到小的顺序排列. package HomeWork01; import java.util.Scanner; public class HomeWork01 { static int number=4; //输入4个数存放在数组中 static int[] t1 = new int[number]; public static void main(String[] args) { HomeWork01 jiejie=new HomeWork01(); ji

JAVA基础编程50题(13-15题)详解

一.描述 1.一个整数,它加上100后是一个完全平方数,再加上168又是一个完全平方数,请问该数是多少? 程序分析:在10万以内判断,先将该数加上100后再开方,再将该数加上168后再开方,如果开方后再平方等于原数则符合结果. 2.输入某年某月某日,判断这一天是这一年的第几天? 程序分析:以3月5日为例,应该先把前两个月的加起来,然后再加上5天即本年的第几天,特殊情况,闰年且输入月份大于3时需考虑多加一天. 3.输入三个整数x,y,z,请把这三个数由小到大输出. 程序分析:将最小的数放到x上,先

JAVA基础编程50题(22-24题)详解

一.描述 题目1:统计输入的一段字符串,分别统计这个字符串中大小写字母的个数,以及数字出现的次数. 第一种方法使用Character封装类的方法:isLowerCase(),isUpperCase(),isDigit()判断是否是该类字符, 第二种方法是直接使用char字符范围比较来统计. 题目2:用户输入一串待统计的字符串,然后输入用户想要统计的某个单词或者字符的次数. 比如我输入如下字符串:fdhiaojavajidaoijdjava 我要统计其中的java字符串的个数. 解题思路:传入待统

JAVA基础编程50题(25-27题)详解

一.描述 题目1:判断一个数字是否是2的阶次方数,例如8,16,64,256都是2的阶次方数. 题目解析:如果一个数是2的阶次方数,那么这个数字的二进制数的首位为1,后面跟着若干个0,例如8用二进制表示为1000,64为1000000, 如果让这个数减1,然后和这个数做按位&运算即得0,即(number-1)&number==0,8&7=1000&0111=0000. 题目2:列出一个数组中所有元素的组合,比如1.2.3列出来为1.12.123.13.132.2.21.21