ActiveMQ 简单介绍以及安装

一直知道这个jms,却没有机会用到。今天玩玩!

为什么要学习,使用JMS

  在JAVA中,如果两个应用程序之间对各自都不了解,甚至这两个程序可能部署在不同的大洲上,那么它们之间如何发送消息呢?举个例子,一个应用程序A部署在印度,另一个应用程序部署在美国,然后每当A触发某件事后,B想从A获取一些更新信息。当然,也有可能不止一个B对A的更新信息感兴趣,可能会有N个类似B的应用程序想从A中获取更新的信息。在这种情况下,JAVA提供了最佳的解决方案-JMS,完美解决了上面讨论的问题。JMS同样适用于基于事件的应用程序,如聊天服务,它需要一种发布事件机制向所有与服务器连接的客户端发送消息。JMS与RMI不同,发送消息的时候,接收者不需要在线。服务器发送了消息,然后就不管了;等到客户端上线的时候,能保证接收到服务器发送的消息。这是一个很强大的解决方案,能处理当今世界很多普遍问题。

JMS有什么优势

1、异步:JMS天生就是异步的,客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端。

2、可靠:JMS保证消息只会递送一次。大家都遇到过重复创建消息问题,而JMS能帮你避免该问题,只是避免而不是杜绝,所以在一些糟糕的环境下还是有可能会出现重复。

JMS消息传送模型

在JMS API出现之前,大部分产品使用“点对点”和“发布/订阅”中的任一方式来进行消息通讯。JMS定义了这两种消息发送模型的规范,它们相互独立。任何JMS的提供者可以实现其中的一种或两种模型,这是它们自己的选择。JMS规范提供了通用接口保证我们基于JMS API编写的程序适用于任何一种模型。让我们更加详细的看下这两种消息传送模型;

下载ActiveMQ,动手玩玩

官方网站:http://activemq.apache.org/

运行ActiveMQ服务

  从它的目录来说,还是很简单的:

    • bin存放的是脚本文件
    • conf存放的是基本配置文件
    • data存放的是日志文件
    • docs存放的是说明文档
    • examples存放的是简单的实例
    • lib存放的是activemq所需jar包
    • webapps用于存放项目的目录

 启动ActiveMQ 
  我们了解activemq的基本目录,下面我们运行一下activemq服务,双击bin目录下的activemq.bat脚本文件或运行自己电脑版本下的activemq.bat。 注:ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动 netstat -an|find “61616”

输入:http://127.0.0.1:8161/admin/queues.jsp  (用户名和密码都是admin)

   至此,服务端启动完毕。停止服务器,只需要按着Ctrl+Shift+C,之后输入y即可。

ActiveMQ简单的HelloWorld实例

JMS编程模型

(1) ConnectionFactory

  创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

(2) Destination

  Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

(3) Connection

  Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

(4) Session

  Session是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

(5) 消息的生产者

  消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

(6) 消息消费者

  消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

(7) MessageListener

  消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

1点对点模式 - 编写生产者

 1 package com.xiexy.project.test.activemq;
 2
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageProducer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13
14
15 /**
16  * 消息的生产者(发送者)
17  * 点对点模式
18  * 一条消息只能被一个人接收
19  * 发布者发布消息时,订阅者不需要在线(异步)
20  * 后面才上线,也能收到消息
21  *
22  */
23 public class JMSProducer {
24
25     //默认连接用户名
26     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
27     //默认连接密码
28     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
29     //默认连接地址
30     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
31     //发送的消息数量
32     private static final int SENDNUM = 10;
33
34     public static void main(String[] args) {
35         //连接工厂
36         ConnectionFactory connectionFactory;
37         //连接
38         Connection connection = null;
39         //会话 接受或者发送消息的线程
40         Session session;
41         //消息的目的地
42         Destination destination;
43         //消息生产者
44         MessageProducer messageProducer;
45         //实例化连接工厂
46         connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
47
48         try {
49             //通过连接工厂获取连接
50             connection = connectionFactory.createConnection();
51             //启动连接
52             connection.start();
53             //创建session
54             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
55             //创建一个名称为HelloWorld的消息队列
56             destination = session.createQueue("[email protected]");
57             //创建消息生产者
58             messageProducer = session.createProducer(destination);
59             //发送消息
60             sendMessage(session, messageProducer);
61
62             session.commit();
63
64         } catch (Exception e) {
65             e.printStackTrace();
66         }finally{
67             if(connection != null){
68                 try {
69                     connection.close();
70                 } catch (JMSException e) {
71                     e.printStackTrace();
72                 }
73             }
74         }
75
76     }
77     /**
78      * 发送消息
79      * @param session
80      * @param messageProducer  消息生产者
81      * @throws Exception
82      */
83     public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
84         for (int i = 0; i < JMSProducer.SENDNUM; i++) {
85             //创建一条文本消息
86             TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
87             System.out.println("发送消息:Activemq 发送消息" + i);
88             //通过消息生产者发出消息
89             messageProducer.send(message);
90         }
91
92     }
93 }

1点对点模式 - 编写消费者

 1 package com.xiexy.project.test.activemq;
 2
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageConsumer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13
14
15 /**
16  * 消息的消费者(接受者)
17  * 点对点模式
18  * 一条消息只能被一个人接收
19  * 发布者发布消息时,订阅者不需要在线(异步)
20  * 后面才上线,也能收到消息
21  *
22  */
23 public class JMSConsumer {
24
25     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
26     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
27     //private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
28     private static final String BROKEURL = "tcp://localhost:61616";
29
30     public static void main(String[] args) {
31         ConnectionFactory connectionFactory;//连接工厂
32         Connection connection = null;//连接
33
34         Session session;//会话 接受或者发送消息的线程
35         Destination destination;//消息的目的地
36
37         MessageConsumer messageConsumer;//消息的消费者
38
39         //实例化连接工厂
40         connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
41
42         try {
43             //通过连接工厂获取连接
44             connection = connectionFactory.createConnection();
45             //启动连接
46             connection.start();
47             //创建session
48             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
49             //创建一个连接HelloWorld的消息队列
50             destination = session.createQueue("[email protected]");
51             //创建消息消费者
52             messageConsumer = session.createConsumer(destination);
53
54             while (true) {
55                 //receive(连接队列时间)
56                 TextMessage textMessage = (TextMessage) messageConsumer.receive(10000);
57                 if(textMessage != null){
58                     System.out.println("收到的消息:" + textMessage.getText());
59                 }else {
60                     System.out.println("暂时没有消息~");
61                     break;
62                 }
63             }
64
65         } catch (JMSException e) {
66             e.printStackTrace();
67         }finally{
68             try {
69                 if(connection != null) {
70                     connection.close();
71                 }
72             } catch (JMSException e) {
73                 e.printStackTrace();
74             }
75         }
76
77     }
78 }

2订阅模式 - 编写生产者

 1 package com.xiexy.project.test.activemq;
 2
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.DeliveryMode;
 6 import javax.jms.Destination;
 7 import javax.jms.JMSException;
 8 import javax.jms.MessageProducer;
 9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11
12 import org.apache.activemq.ActiveMQConnection;
13 import org.apache.activemq.ActiveMQConnectionFactory;
14
15
16 /**
17  * 消息的生产者(发送者)
18  * 订阅模式
19  * 发布者发布消息时,订阅者必须在线才能收到消息(同步)
20  * 后面才上线,也收不到消息
21  */
22 public class JMSTopicProducer {
23
24     //默认连接用户名
25     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
26     //默认连接密码
27     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
28     //默认连接地址
29     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
30     //发送的消息数量
31     private static final int SENDNUM = 10;
32
33     public static void main(String[] args) {
34         //连接工厂
35         ConnectionFactory connectionFactory;
36         //连接
37         Connection connection = null;
38         //会话 接受或者发送消息的线程
39         Session session;
40         //消息的目的地
41         Destination destination;
42         //消息生产者
43         MessageProducer messageProducer;
44         //实例化连接工厂
45         System.out.println("MQ-user:  " + JMSTopicProducer.USERNAME);
46         System.out.println("MQ-pwds:  " + JMSTopicProducer.PASSWORD);
47         System.out.println("MQ-burl:  " + JMSTopicProducer.BROKEURL);
48         connectionFactory = new ActiveMQConnectionFactory(JMSTopicProducer.USERNAME, JMSTopicProducer.PASSWORD, JMSTopicProducer.BROKEURL);
49
50         try {
51             //通过连接工厂获取连接
52             connection = connectionFactory.createConnection();
53             //启动连接
54             connection.start();
55             //创建session
56             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
57             //创建一个名称为HelloTopic的主题订阅
58             destination = session.createTopic("HelloTopic");
59             //创建消息生产者
60             messageProducer = session.createProducer(destination);
61
62             System.out.println("DeliveryMode.NON_PERSISTENT: " + DeliveryMode.NON_PERSISTENT);
63             //发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
64             messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
65
66             //发送消息
67             sendMessage(session, messageProducer);
68             session.commit();
69
70         } catch (Exception e) {
71             e.printStackTrace();
72         }finally{
73             if(connection != null){
74                 try {
75                     connection.close();
76                 } catch (JMSException e) {
77                     e.printStackTrace();
78                 }
79             }
80         }
81
82     }
83     /**
84      * 发送消息
85      * @param session
86      * @param messageProducer  消息生产者
87      * @throws Exception
88      */
89     public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
90         for (int i = 0; i < JMSTopicProducer.SENDNUM; i++) {
91             //创建一条文本消息
92             TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
93             System.out.println("发送消息:Activemq 发送消息" + i);
94             //通过消息生产者发出消息
95             messageProducer.send(message);
96         }
97
98     }
99 }

2订阅模式 - 编写消费者

 1 package com.xiexy.project.test.activemq;
 2
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageConsumer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13
14
15 /**
16  * 消息的消费者(接受者)
17  * 订阅模式
18  * 发布者发布消息时,订阅者必须在线才能收到消息(同步)
19  * 后面才上线,也收不到消息
20  */
21 public class JMSTopicConsumer {
22
23     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
24     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
25     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
26     //private static final String BROKEURL = "tcp://localhost:61616";
27
28     public static void main(String[] args) {
29         ConnectionFactory connectionFactory;//连接工厂
30         Connection connection = null;//连接
31
32         Session session;//会话 接受或者发送消息的线程
33         Destination destination;//消息的目的地
34
35         MessageConsumer messageConsumer;//消息的消费者
36
37         System.out.println("MQ-user   " + JMSTopicConsumer.USERNAME);
38         System.out.println("MQ-pwds   " + JMSTopicConsumer.PASSWORD);
39         System.out.println("MQ-burl   " + JMSTopicConsumer.BROKEURL);
40         //实例化连接工厂
41         connectionFactory = new ActiveMQConnectionFactory(JMSTopicConsumer.USERNAME, JMSTopicConsumer.PASSWORD, JMSTopicConsumer.BROKEURL);
42
43         try {
44             //通过连接工厂获取连接
45             connection = connectionFactory.createConnection();
46             //启动连接
47             connection.start();
48             //创建session
49             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
50             //创建一个名称为HelloTopic的主题订阅
51             destination = session.createTopic("HelloTopic");
52             //创建消息消费者
53             messageConsumer = session.createConsumer(destination);
54
55             while (true) {
56                 TextMessage textMessage = (TextMessage) messageConsumer.receive();
57                 if(textMessage != null){
58                     System.out.println("收到的消息:" + textMessage.getText());
59                 }else {
60                     System.out.println("暂时没有消息~");
61                     break;
62                 }
63             }
64
65         } catch (JMSException e) {
66             e.printStackTrace();
67         }finally{
68             try {
69                 if(connection != null) {
70                     connection.close();
71                 }
72             } catch (JMSException e) {
73                 e.printStackTrace();
74             }
75         }
76
77     }
78 }

原文地址:https://www.cnblogs.com/xiexy/p/8444593.html

时间: 2024-10-08 19:35:13

ActiveMQ 简单介绍以及安装的相关文章

【转】深入浅出JMS(二)--ActiveMQ简单介绍以及安装

现实的企业中,对于消息通信的应用一直都非常的火热,而且在J2EE的企业应用中扮演着特殊的角色,所以对于它研究是非常有必要的. 这篇博文介绍一款开源的JMS具体实现——ActiveMQ.ActiveMQ是一个易于使用的消息中间件. 消息中间件 我们简单的介绍一下消息中间件,对它有一个基本认识就好,消息中间件(MOM:Message Orient middleware). 消息中间件有很多的用途和优点: 1. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块: 2.

ActiveMQ简单介绍以及安装

概述 首先简单的介绍一下MQ,MQ英文名MessageQueue,中文名也就是大家用的消息队列,干嘛用的呢,说白了就是一个消息的接受和转发的容器,可用于消息推送. ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范.JMS是一组Java应用程序接口,它提供消息的创建.发送.读取等一系列服务.JMS提供了一组公共应用程 序接口和响应的语法,类似于Java数据库

ActiveMQ简单介绍及安装

消息中间件 我们简单的介绍一下消息中间件,对它有一个基本认识就好,消息中间件(MOM:Message Orient middleware). 消息中间件有很多的用途和优点: 1. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块: 2. 负责建立网络通信的通道,进行数据的可靠传送. 3. 保证数据不重发,不丢失 4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务 MQ 首先简单的介绍一下MQ,MQ英文名MessageQueue,中文名也就是大

深入浅出JMS(二)--ActiveMQ简单介绍以及安装

现实的企业中,对于消息通信的应用一直都非常的火热,而且在J2EE的企业应用中扮演着特殊的角色,所以对于它研究是非常有必要的. 上篇博文深入浅出JMS(一)–JMS基本概念,我们介绍了消息通信的规范JMS,我们这篇博文介绍一款开源的JMS具体实现--ActiveMQ.ActiveMQ是一个易于使用的消息中间件. 消息中间件 我们简单的介绍一下消息中间件,对它有一个基本认识就好,消息中间件(MOM:Message Orient middleware). 消息中间件有很多的用途和优点: 1. 将数据从

运维神器Chef简单介绍和安装笔记

首先大概解释一下Chef Chef有三个重要的概念:(如上图所示) 它们的合作关系大致是这样的, Workstation把资源或者说是一些要被运行的命令上传到Chef-Server上, Nodes自动通过Chef-Server拿到属于自己的执行任务到本地执行,这样可达到一个将军指挥千军万马的效果:smirk:. Chef Server 存放所有通过Workstation上传的资源,和用户等公共数据(用PostgreSQL). 可以干脆叫它为资源服务器,大家都可以与它通讯(用RabbitMQ ),

RabbitMQ简单介绍及安装使用

一.RabbitMQ简单介绍 二.安装配置1.安装环境 CentOS7 server1 190.168.3.250安装包依赖[[email protected] ~]# yum -y install gcc gcc-c++ m4 ncurses-devel openssl-devel2.安装RabbitMQ 按顺序安装:3.配置[[email protected] ~]# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.14/ebin/rabbit.a

MongoDB(1)--简单介绍以及安装

前段时间接触了NoSql类型的数据库redis,当时是作为缓存server使用的.那么从这篇博客開始学习还有一个非常出名的NoSql数据库:MongoDb.只是眼下还没有在开发其中使用.一步一步来吧. 简单介绍 MongoDB是一个开源的,基于分布式的,面向文档存储的非关系型数据库. 是非关系型数据库其中功能最丰富.最像关系数据库的. MongoDB由C++编写,其名字来源于"humongous"这个单词,其宗旨在于处理大量数据. MongoDB能够执行在Windows.unix.OS

Mahout学习之Mahout简单介绍、安装、配置、入门程序測试

一.Mahout简单介绍 查了Mahout的中文意思--驭象的人,再看看Mahout的logo,好吧,想和小黄象happy地玩耍,得顺便陪陪这位驭象人耍耍了... 附logo: (就是他,骑在象头上的那个Mahout) 步入正文啦: Mahout 是一个非常强大的数据挖掘工具,是一个分布式机器学习算法的集合,包含:被称为Taste的分布式协同过滤的实现.分类.聚类等.Mahout最大的长处就是基于hadoop实现,把非常多曾经执行于单机上的算法,转化为了MapReduce模式,这样大大提升了算法

Cloudera impala简单介绍及安装具体解释

一.Impala简单介绍 Cloudera Impala对你存储在Apache Hadoop在HDFS,HBase的数据提供直接查询互动的SQL.除了像Hive使用同样的统一存储平台,Impala也使用同样的元数据,SQL语法(Hive SQL),ODBC驱动程序和用户界面(Hue Beeswax).Impala还提供了一个熟悉的面向批量或实时查询和统一平台. 二.Impala安装 1.安装要求 (1)软件要求 Red Hat Enterprise Linux (RHEL)/CentOS 6.2