JMS:java消息服务,JMS客户端可以通过JMS服务进行异步消息传输。可以支持两种模型:P2P和Pub/Sub
P2P
点对点消息传输模式,这种模式主要包括发送者,消息队列和接收者
特点:
1、每个消息只有一个消费者,一旦被接收(消费),此消息就不存在于消息队列中了。
2、发送者和接收者在时间上没有依赖性(当消息发送后,无论接收者有没有在接收,都不会影响消息进入消息队列)
3、接收者在成功接收消息之后,需要向队列应答成功。
Pub/Sub
发布/订阅的模式。主要包括:发布者,主题,订阅者三部分。
特点:
1、每个消息可以有多个消费者。(这是和P2P最大的不同)只要消费者订阅了某个主题,那这个主题的发布者所发布的消息,就会被订阅者全部接收。
2、发布者和订阅者必须有时间上的依赖(这种依赖是指,订阅者必须在发布之前先进行订阅,才能接收到发布的消息)
3、为了缓和这种严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅,这样,即使订阅者没有运行,也能接收到发布者的消息
实现思路
二者除了模式不同外,程序中的实现是非常类似的。都是通过一个连接工厂connectionFactory来获取connection来,然后再通过创建
Session来创建发送者(发布者),通过context来查找并获取队列或者主题,通过发送者(发布者)来发送或发布消息。
在这里需要知道的是:某个队列或者主题是由应用服务器来提供的,不由任何一个客户端来创建。我们在客户端只需要通过JNDI来进行查找就好了。
简单的了解了这两种消息服务的区别后,通过一个小例子来加深一下对他们的理解:
P2P code
创建两个消费者MyQueueMDBBean和MyQueueMDBBean1:
//配置的queue/MyQueue为我们要发送消息的消息队列
@MessageDriven(
activationConfig={ @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"), @ActivationConfigProperty(propertyName="destination",propertyValue="queue/MyQueue")
}
)
//只要实现了MessageListener接口,接收到消息会自动触发onMessage这个方法
//这里相当于一个消费者
public class MyQueueMDBBean implements MessageListener {
public void onMessage(Message arg) {
TextMessage textMessage=(TextMessage)arg;
try {
System.out.println("MyQueueMDBBean已接收到queue消息了,消息为:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
由于JBoss不能自己创建Queue对象,必须在server/default/deploy目录下找到一个xxx-service.xml文件中加入:
注意JNDIName的大小写和程序中要保持一致。
<mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.org.destination:server=Queue,name=myqueue" >
<attribute name="JNDIName" >queue/MyQueue</attribute>
<depends optional-attribute-name = "DestinationManager" >
jboss.mq:service=DestinationManager
</depends>
</mbean>
发送者代码:
public static void main(String[] args) throws NamingException, JMSException {
InitialContext context=new InitialContext();
//获取队列工厂
QueueConnectionFactory factory=(QueueConnectionFactory) context.lookup("ConnectionFactory");
//获取connection
QueueConnection connection=factory.createQueueConnection();
//创建QueueSession
QueueSession session=connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
//获取的destination对象
Queue queue=(Queue)context.lookup("queue/MyQueue");
//创建消息
TextMessage textMessage=session.createTextMessage("hello,world!");
//创建发送者
QueueSender sender=session.createSender(queue);
//发送消息
sender.send(textMessage);
//关闭会话
session.close();
System.out.println("消息已成功发送!");
}
Pub/Sub Code
创建消费者TopicMDBBean,TopicMDBBean1和TopicMDBBean2
//这里相当于一个消费者
@MessageDriven(
activationConfig={
@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),
@ActivationConfigProperty(propertyName="destination",propertyValue="topic/myTopic")
}
)
public class TopicMDBBean implements MessageListener {
public void onMessage(Message arg) {
TextMessage textMessage=(TextMessage)arg;
try {
System.out.println("TopicMDBBean已接收到topic消息了,消息为:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
客户端代码和P2P类似,把Queue改成Topic就可以了,这里也必须配置"topic/myTopic"主题。
public static void main(String[] args) throws NamingException, JMSException {
InitialContext context=new InitialContext();
//获取队列工厂
TopicConnectionFactory factory=(TopicConnectionFactory) context.lookup("ConnectionFactory");
//获取connection
TopicConnection connection=factory.createTopicConnection();
//创建QueueSession
TopicSession session=connection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
//获取的destination对象
Topic topic=(Topic)context.lookup("topic/myTopic");
//创建消息
TextMessage textMessage=session.createTextMessage("hello,world!");
//创建发送者
TopicPublisher sender=session.createPublisher(topic);
//发送消息
sender.send(textMessage);
//关闭会话
session.close();
System.out.println("消息已成功发送!");
}
}
执行结果
P2P:
第一次执行:
第二次执行:
每次发消息,只有一个消费者能够接收到消息。
Pub/Sub:
第一次执行:
每发一次消息,只要进行监听了此主题的订阅者都可以收到这个消息!
总结
我认为通常在使用JDM的P2P发送消息时,至少应该是有一个消费者的,否则采用这个方式就没有任何意义。
所以如果我们想让我们发出去的指令只被执行一次,且每条消息都可以被成功处理可以选择P2P模式,如果想让自己发出去的消息可以被多个人接收,
或者一个或多个人处理,可以选择Pub/Sub模式
时间: 2024-10-13 11:57:06