JMS (Java Message Service)。用于在Application之间进行message传递。
Enterprise Message
WebSphere MQ、SonicMQ、Microsoft Message Queue(MSMQ)、ActiveMQ、EJB中的Message Bean、SOA中的ESB(Enterprise Message Bus)等都是Enterprise Message。
使用Enterprise Message时,可以有下列方式:
集中式
分布式
混合式
在每个Client上都安装一个Daemon程序,Client与Daemon使用TCP通信,Daemon与其它Daemon之间使用MultiCast。
JMS消息模型
JMS支持两种消息模型:point-to-point、publish-and-subscribe。
point-to-point
在P2P模型中,一个JMS Client可以发送、接收消息到(从)一个queue。可以是同步方式,也可以是异步方式。消息以PULL(拉)的方式获取。发送方称为sender,接收方称为receiver。
publish-and-subscribe
消息以push方式发送。即使订阅者是离线状态,也会进行发送。发送方称为publisher,接收方称为subscriber。
JMS API 简述
JMS 只是定义了接口,并没有具体的实现,实现都是由相关的厂商来完成的。这一点类似于JDBC。
JMS的API可以分为3个主要部分:1)通用API, 2)p2p API,3)pub/sub API。
几个主要的接口是:
·ConnectionFactory
·Destination
·Connection
·Session
·Message
·MessageProducer
·MessageConsumer
在上述接口中,ConnectionFactory、Destination是必须通过JNDI来取得,其实的接口都是根据这两个来创建的。取得ConnectionFactory后,就可以取得Connection;有了Connection就可以创建Session;有了Session,就可以创建出Message、MessageProducer、MessageConsumer。
实例的创建关系如下:
在JMS中,Session对象会持有transactional,而不是Connection,这一点与JDBC是不一样的。也就是说,在使用JMS时,一个应用只会创建一个Connection对象,可以根据这个Connection对象创建出多个Session对象。
Point-to-point
这是为P2P设计的接口。与通用接口一一对应。
Publish/Subscribe
与此类似,也为pub/sub专门设计了接口:
JMS Pub/Sub示例
要运行JMS,需要有JMS代理服务器。下面这个例子中,会使用activemq作为代理服务器。
在安装完成activemq后,启动它。
示例程序是一个简易Chat Room。
package com.fjn.java.jms.topic; import java.io.IOException; import java.util.Properties; import java.util.Scanner; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.InitialContext; import javax.naming.NamingException; public class Chat implements MessageListener { private TopicSession pubSession; private TopicPublisher publisher; private TopicConnection connection; private String username; public Chat(String factoryJdni, String topicName, String username) throws NamingException, JMSException, IOException { Properties props=new Properties(); props.load(Chat.class.getResourceAsStream("jndi.properties")); InitialContext ctx = new InitialContext(props); TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx .lookup(factoryJdni); Topic topic = (Topic) ctx.lookup(topicName); TopicConnection conn = connFactory.createTopicConnection(); TopicSession pubSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSession subSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher publisher = pubSession.createPublisher(topic); TopicSubscriber subscriber = subSession.createSubscriber(topic, null, true); subscriber.setMessageListener(this); this.connection = conn; this.publisher = publisher; this.pubSession = pubSession; this.username = username; conn.start(); } @Override public void onMessage(Message msg) { try { TextMessage text = (TextMessage) msg; System.out.println(text.getText()); } catch (JMSException e) { e.printStackTrace(); } } protected void writeMessage(String text) throws JMSException{ TextMessage msg=pubSession.createTextMessage(); msg.setText(username+": "+text); publisher.publish(msg); } public void close() throws JMSException{ connection.close(); } public static void main(String[] args) throws NamingException, JMSException, IOException { Chat chat=new Chat("TopicCF","topic1","ZhangSan"); Scanner scanner=new Scanner(System.in); while(true){ String line=scanner.nextLine(); if("exit".equals(line)){ chat.close(); System.exit(0); scanner.close(); }else{ chat.writeMessage(line); } } } } package com.fjn.java.jms.topic; import java.io.IOException; import java.util.Scanner; import javax.jms.JMSException; import javax.naming.NamingException; public class Chat2 { public static void main(String[] args) throws NamingException, JMSException, IOException { Chat chat=new Chat("TopicCF","topic1","LiSi"); Scanner scanner=new Scanner(System.in); while(true){ String line=scanner.nextLine(); if("exit".equals(line)){ chat.close(); System.exit(0); scanner.close(); }else{ chat.writeMessage(line); } } } }
其实,你可以将程序的main方法稍作改动,让创建Chat的三个参数从命名行取得。这样,就可以启动任意客户端了。
Jndi.properties文件内容:
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url = tcp://localhost:61616 java.naming.security.principal=system java.naming.security.credentials=manager connectionFactoryNames = TopicCF topic.topic1 = jms.topic1
这样就可以进行多个客户端对话了,也就是群组对话了。