JMS开发步骤和持久化/非持久化Topic消息

------------------------------------------------

开发一个JMS的基本步骤如下:

  1.创建一个JMS connection factory

  2.通过connection factory来创建JMS connection

  3.启动JMS connection

  4.通过connection创建JMS session

  5.创建JMS destination

  6.创建JMS producer 或者创建JMS message,并设置destination

  7.创建JMS consumer 或者注册一个JMS message listener

  8.发送或者接受JMS message

  9.关闭所有的JMS资源(connection、session、producer、consumer等)

可以参考下图:

非持久的Topic消息示例

  对于非持久化的消息,当发送方发送消息的时候:

    如果接收方不在线,则接收方永远也收不到这些消息了

    如果接收方在线,则接收方会收到这些消息

1、消息发送程序

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.Destination;
 4 import javax.jms.MessageProducer;
 5 import javax.jms.Session;
 6 import javax.jms.TextMessage;
 7
 8 import org.apache.activemq.ActiveMQConnectionFactory;
 9
10 /**
11  * 非持久化Topic消息发送者
12  * @author Administrator
13  *
14  */
15 public class NoPersistenceSender {
16     public static void main(String[] args) throws Exception {
17         //创建一个JMS connection factory
18         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
19         //通过connection factory来创建JMS connection
20         Connection connection = connectionFactory.createConnection();
21         //启动JMS connection
22         connection.start();
23         //通过connection创建JMS session
24         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
25         //创建JMS destination
26         Destination destination = session.createTopic("noPersistenceTopic");
27         //创建JMS producer
28         MessageProducer producer = session.createProducer(destination);
29
30         for(int i = 0;i < 10;i++){
31             TextMessage message = session.createTextMessage("message-"+i);
32             //发送message
33             producer.send(message);
34         }
35         //关闭所有的JMS资源
36         session.commit();
37         session.close();
38         connection.close();
39     }
40 }

运行完消息发送程序后,可以访问192.168.1.81:8161

2、消息接收程序

  对于非持久的Topic消息的接收需要注意以下几点:

    a.接收程序必须在线,然后消息发送方再发送消息,接收程序才能接收到消息

    b.由于不知道消息发送方要发送多少条消息,所以利用while循环的方式来接收消息

    c.如果接收程序不在线,此时发送程序发送了消息的话,则该消息将永远不会被接收方收到。

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.Destination;
 4 import javax.jms.Message;
 5 import javax.jms.MessageConsumer;
 6 import javax.jms.Session;
 7 import javax.jms.TextMessage;
 8
 9 import org.apache.activemq.ActiveMQConnectionFactory;
10
11 /**
12  * 非持久化Topic消息接收者
13  * @author Administrator
14  *
15  */
16 public class NoPersistenceReceiver {
17     public static void main(String[] args) throws Exception {
18         //创建一个JMS connection factory
19         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
20         //通过connection factory来创建JMS connection
21         Connection connection = connectionFactory.createConnection();
22         //启动JMS connection
23         connection.start();
24         //通过connection创建JMS session
25         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
26         //创建JMS destination
27         Destination destination = session.createTopic("noPersistenceTopic");
28         //创建JMS consumer
29         MessageConsumer consumer = session.createConsumer(destination);
30
31         Message message = consumer.receive();
32         while(message != null){
33             TextMessage txtMsg = (TextMessage)message;
34             System.out.println("收到消息:"+txtMsg.getText());
35             message = consumer.receive();
36         }
37         //关闭所有的JMS资源
38         session.commit();
39         session.close();
40         connection.close();
41     }
42 }

运行结果:

持久的Topic消息示例

1.消息发送程序

  对于持久的Topic消息的发送方需要注意以下几点:

    a.要用持久化订阅,发送消息者要用DeliveryMode.PERSISTENT模式来发送

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.DeliveryMode;
 4 import javax.jms.Destination;
 5 import javax.jms.MessageProducer;
 6 import javax.jms.Session;
 7 import javax.jms.TextMessage;
 8
 9 import org.apache.activemq.ActiveMQConnectionFactory;
10
11 /**
12  * 持久化Topic消息发送者
13  * @author Administrator
14  */
15 public class PersistenceSender {
16     public static void main(String[] args) throws Exception {
17         //创建一个JMS connection factory
18         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
19         //通过connection factory来创建JMS connection
20         Connection connection = connectionFactory.createConnection();
21         //通过connection创建JMS session
22         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
23         //创建JMS destination
24         Destination destination = session.createTopic("PersistenceTopic");
25         //创建JMS producer
26         MessageProducer producer = session.createProducer(destination);
27        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
28         //启动JMS connection
29         connection.start();
30         for(int i = 0;i < 10;i++){
31             TextMessage message = session.createTextMessage("message-"+i);
32             //发送message
33             producer.send(message);
34         }
35         //关闭所有的JMS资源
36         session.commit();
37         session.close();
38         connection.close();
39     }
40 }

2.消息接收程序

  对于持久的Topic消息的接收方需要注意以下几点:

    a.需要在连接上设置消费者id,用来识别消费者

    b.需要创建TopicSubscriber来订阅

    c.一定要先运行一次该消费者程序,等于向消费服务中间件注册这个消费者,然后再运行消息发送者来发送消息,这样的话,无论消费者是否在线都会收到消息,如果不在线的话,则下次连接的时候会把没有收过的消息都接收下来。

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.Message;
 4 import javax.jms.Session;
 5 import javax.jms.TextMessage;
 6 import javax.jms.Topic;
 7 import javax.jms.TopicSubscriber;
 8
 9 import org.apache.activemq.ActiveMQConnectionFactory;
10
11 /**
12  * 持久化Topic消息接收者
13  * @author Administrator
14  *
15  */
16 public class PersistenceReceiver {
17     public static void main(String[] args) throws Exception {
18         //创建一个JMS connection factory
19         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://120.76.123.81:61616");
20         //通过connection factory来创建JMS connection
21         Connection connection = connectionFactory.createConnection();
22         connection.setClientID("con1");
23         //通过connection创建JMS session
24         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
25         //创建JMS destination
26         Topic destination = session.createTopic("PersistenceTopic");
27         //创建JMS consumer
28         TopicSubscriber ts = session.createDurableSubscriber(destination, "TT");
29         //启动JMS connection
30         connection.start();
31         Message message = ts.receive();
32         while(message != null){
33             TextMessage txtMsg = (TextMessage)message;
34             session.commit();
35             System.out.println("收到消息:"+txtMsg.getText());
36             message = ts.receive(1000L);
37         }
38         //关闭所有的JMS资源
39         session.close();
40         connection.close();
41     }
42 }

关于持久化和非持久化消息

有两种方式指定传送模式:

  1.使用setDeliveryMode方法,这样所有的消息都采用此传送模式;如producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

  2.使用send方法为每条消息设置传送模式

持久化消息

  这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

  这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但是却增加了可靠性。

非持久化消息

  保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。

  

时间: 2024-12-27 03:52:29

JMS开发步骤和持久化/非持久化Topic消息的相关文章

ActiveMQ(05):JMS的API结构、开发步骤与Topic

一.JMS的API结构 二.一个JMS应用的基本步骤 1:创建一个JMS connection factory 2:通过connection factory来创建JMS connection 3:启动JMS connection 4:通过connection创建JMS session 5:创建JMS destination 6:创建JMS producer,或者创建JMS message,并设置destination 7:创建JMS consumer,或者是注册一个JMS message lis

JMS学习(五)--ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系

一,消息的持久化和非持久化 ①DeliveryMode 这是传输模式.ActiveMQ支持两种传输模式:持久传输和非持久传输(persistent and non-persistent delivery),默认情况下使用的是持久传输. 可以通过MessageProducer 类的 setDeliveryMode方法设置传输模式: MessageProducer producer = ...; producer.setDeliveryMode(DeliveryMode.PERSISTENT); 持

JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中

ActiveMQ5.8.0版本采用kahadb作为默认的消息持久化方式.使用默认的持久化机制,我们不容易直接看到消息究竟是如何持久的.ActiveMQ提供的JDBC持久化机制,能够将持久化信息存储到数据库.通过查看数据库中ActiveMQ生成的表结构和存储的数据,能够帮助我们更好的了解消息的持久化机制.现在介绍如何配置activemq,将数据持久化到mysql中. 1.配置activeMQ需要的mySql数据源 为了能够使用JDBC访问mysql数据库,显然必须要配置消息服务器的数据库源.在ac

hibernate开发步骤

Hibernate框架开发步骤 项目导入需要的jar包: http://pan.baidu.com/s/1eRQ19C2 编写hibernate.cfg.xml文件 <?xml version='1.0'encoding='UTF-8'?> <!DOCTYPE hibernate-configuration PUBLIC "-//Hibernate/Hibernate Configuration DTD3.0//EN" "http://hibernate.s

iOS应用内付费(IAP)开发步骤列表

iOS应用内付费(IAP)开发步骤列表 前两天和服务端同事一起,完成了应用内付费(以下简称IAP, In app purchase)的开发工作.步骤繁多,在此把开发步骤列表整理如下.因为只是步骤列表,所以并不含详细的说明教程,需要看教程的新手,可以看我附在最后的一些参考链接. 配置Developer.apple.com 登录到Developer.apple.com,然后进行以下步骤: 为应用建立建立一个不带通配符的App ID 用该App ID生成和安装相应的Provisioning Profi

以DDD为开发模式的设计开发步骤可以是

以DDD为开发模式的设计开发步骤可以是:1)分析需求:2)画出用例图,系统中各个角色如何使用系统,也包括外部系统如何使用系统,也包括系统中到某个时间点自动启动的某些功能(此时角色就是时间):3)针对各个用例图,就知道了系统使用的各种业务场景,同时也明确了系统的边界,从而就明确了领域模型的边界:4)在领域模型的边界内划分聚合,找出每个聚合的边界,找出边界内的聚合根,实体,值对象:这步是难点.这里一定不能混淆的一个概念是,领域建模不是以用户为中心的建模,而是以用户的需求为中心的建模.所以要努力寻找各

Android_app项目开发步骤总结

做了几个android企业应用项目后,总结了项目的基本开发步骤.希望可以交流. 一 应用规划: ※确定功能. ※必须的界面及界面跳转的流程. ※须要的数据及数据的来源及格式. ※是否须要服务端支持. ※是否须要本地数据库支持. ※是否须要特殊权限. ※是否须要后台服务. 二 架构设计: ※分层. ※网络连接. ※数据处理-xml.domain. ※封装Activity. 三 界面设计: ※主界面确定. ※模块界面.列表.查看.编辑界面. ※菜单.button.对话框.提示信息. ※界面整体颜色.

ActiveMQ持久化到mysql实现消息永不丢失

ActiveMQ持久化到mysql实现消息永不丢失 配置 1.找到apache-activemq-5.15.2/examples/conf下面的activemq-jdbc-performance.xml 2.打开activemq-jdbc-performance.xml,在persistenceAdapter节点后面添加dataSource="#mysql-ds" 并配置你的数据库 其实可以直接更改apache-activemq-5.15.2/conf/activemq.xml的per

一个网站的开发步骤

github 经常有朋友说"我想做个网站".“网站上放个什么什么”."对你来说应该很容易,帮个忙吧"等等.怎么说呢,大部分非互联网行业的人是不了解网站的开发流程的,他们可能以为就是个简单的页面,顺手拈来,分分钟搞定的,其实不然. 今天就来简单聊一聊一个网站的开发步骤,尽量做到简单易懂,希望能让非相关行业的人也能弄懂. 准备服务器 服务器是运行网站程序的电脑. 公网IP 每台电脑都有一个ip地址. 例如运行网站程序后,用户可以在浏览器里输入:http://192.16