ActiveMQ教程

Queue与Topic的比较

1、JMS Queue执行load balancer语义:

一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该 message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那 儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。

2、Topic实现publish和subscribe语义:

一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。

3、分别对应两种消息模式:

Point-to-Point (点对点),Publisher/Subscriber Model (发布/订阅者)

其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久订阅)和durable subscription (持久化订阅)2种消息处理方式。

ProducerTool.java用于发送消息:

import javax.jms.Connection;   
 import javax.jms.DeliveryMode;   
 import javax.jms.Destination;   
 import javax.jms.JMSException;   
 import javax.jms.MessageProducer;   
 import javax.jms.Session;   
 import javax.jms.TextMessage;   
   
 import org.apache.activemq.ActiveMQConnection;   
 import org.apache.activemq.ActiveMQConnectionFactory;   
   
 public class ProducerTool {   
   
     private String user = ActiveMQConnection.DEFAULT_USER;   
   
     private String password = ActiveMQConnection.DEFAULT_PASSWORD;   
   
     private String url = ActiveMQConnection.DEFAULT_BROKER_URL;   
   
     private String subject = "TOOL.DEFAULT";   
   
     private Destination destination = null;   
   
     private Connection connection = null;   
   
     private Session session = null;   
   
     private MessageProducer producer = null;   
   
     // 初始化   
     private void initialize() throws JMSException, Exception {   
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(   
                 user, password, url);   
         connection = connectionFactory.createConnection();   
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);   
         destination = session.createQueue(subject);   
         producer = session.createProducer(destination);   
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
     }   
   
     // 发送消息   
     public void produceMessage(String message) throws JMSException, Exception {   
         initialize();   
         TextMessage msg = session.createTextMessage(message);   
         connection.start();   
         System.out.println("Producer:->Sending message: " + message);   
         producer.send(msg);   
         System.out.println("Producer:->Message sent complete!");   
     }   
   
     // 关闭连接   
     public void close() throws JMSException {   
         System.out.println("Producer:->Closing connection");   
         if (producer != null)   
             producer.close();   
         if (session != null)   
             session.close();   
         if (connection != null)   
             connection.close();   
     }   
 }

ConsumerTool.java用于接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。

import javax.jms.Connection;   
 import javax.jms.Destination;   
 import javax.jms.JMSException;   
 import javax.jms.MessageConsumer;   
 import javax.jms.Session;   
 import javax.jms.MessageListener;   
 import javax.jms.Message;   
 import javax.jms.TextMessage;   
   
 import org.apache.activemq.ActiveMQConnection;   
 import org.apache.activemq.ActiveMQConnectionFactory;   
   
 public class ConsumerTool implements MessageListener {   
   
     private String user = ActiveMQConnection.DEFAULT_USER;   
   
     private String password = ActiveMQConnection.DEFAULT_PASSWORD;   
   
     private String url = ActiveMQConnection.DEFAULT_BROKER_URL;   
   
     private String subject = "TOOL.DEFAULT";   
   
     private Destination destination = null;   
   
     private Connection connection = null;   
   
     private Session session = null;   
   
     private MessageConsumer consumer = null;   
   
     // 初始化   
     private void initialize() throws JMSException, Exception {
         //连接工厂是用户创建连接的对象,这里使用的是ActiveMQ的ActiveMQConnectionFactory根据url,username和password创建连接工厂。
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(   
                 user, password, url);
         //连接工厂创建一个jms connection
         connection = connectionFactory.createConnection();   
         //是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  //不支持事务
         //目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点和发布/订阅
         destination = session.createQueue(subject);
         //会话创建消息的生产者将消息发送到目的地
         consumer = session.createConsumer(destination);   
            
     }   
   
     // 消费消息   
     public void consumeMessage() throws JMSException, Exception {   
         initialize();   
         connection.start();   
            
         System.out.println("Consumer:->Begin listening...");   
         // 开始监听   
         consumer.setMessageListener(this);   
         // Message message = consumer.receive();   
     }   
   
     // 关闭连接   
     public void close() throws JMSException {   
         System.out.println("Consumer:->Closing connection");   
         if (consumer != null)   
             consumer.close();   
         if (session != null)   
             session.close();   
         if (connection != null)   
             connection.close();   
     }   
   
     // 消息处理函数   
     public void onMessage(Message message) {   
         try {   
             if (message instanceof TextMessage) {   
                 TextMessage txtMsg = (TextMessage) message;   
                 String msg = txtMsg.getText();   
                 System.out.println("Consumer:->Received: " + msg);   
             } else {   
                 System.out.println("Consumer:->Received: " + message);   
             }   
         } catch (JMSException e) {   
             // TODO Auto-generated catch block   
             e.printStackTrace();   
         }   
     }   
 }

如果想主动的去接受消息,而不用消息监听的话,把consumer.setMessageListener(this)改为Message message = consumer.receive(),手动去调用MessageConsumer的receive方法即可。

下面是测试类Test.java:

import javax.jms.JMSException;   
import org.apache.activemq.ActiveMQConnection;
   
 public class Test {   
   
     /**  
      * @param args  
      */  
     public static void main(String[] args) throws JMSException, Exception {   
        
         // TODO Auto-generated method stub   
         ConsumerTool consumer = new ConsumerTool();   
         ProducerTool producer = new ProducerTool();  
         System.out.println(ActiveMQConnection.DEFAULT_BROKER_URL+"------------");
         // 开始监听   
         consumer.consumeMessage();   
            
         // 延时500毫秒之后发送消息   
         Thread.sleep(500);   
         producer.produceMessage("Hello, world!");   
         producer.close();   
            
         // 延时500毫秒之后停止接受消息   
         Thread.sleep(500);   
         consumer.close();   
     }   
 }

ActiveMQ教程

时间: 2024-10-28 21:42:10

ActiveMQ教程的相关文章

ActiveMQ教程(简介与安装)

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 一.ActiveMq的特性: ⒈ 多种语言和协议编写客户端.语言: Java,C,C++,C#,Ruby,Perl,Python,PHP.应用协议: OpenWire,Stomp REST,WS Notification,XMPP,

ActiveMQ学习教程(二)——简单示例

ActiveMQ学习教程(二)--简单示例 一.应用IDEA构建Maven项目 File->New->Module...->Maven->勾选->选择->Next -> GroupId:com.jd.myMaven   |    ArtifactId:activeMQ    |    version:默认   ->Finish 项目构建成功!项目结构如下所示: 二.创建生产者类,模拟生产者发消息 Step1:java/activemq/JMSProducer

ActiveMQ基础教程----简单介绍与基础使用

概述 ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线.ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能. 特性 遵循JMS规范:ActiveMQ的各种特性是JMS1.1规范的实现.它们包括同步和异步消息传递,一次和只有一次的消息传递,对于预订者的持久消息等等.依附于JMS规范意味着,不论JMS消息提供者是谁,同样的基本特

Dubbo应用教程--ActiveMQ的安装与使用(单节点)

  IP:192.168.4.101    环境:CentOS 6.6.JDK7 1.  安装JDK并配置环境变量(略) JAVA_HOME=/usr/local/java/jdk1.7.0_72 2.  下载Linux版的ActiveMQ(当前最新版apache-activemq-5.11.1-bin.tar.gz) $ wgethttp://apache.fayea.com/activemq/5.11.1/apache-activemq-5.11.1-bin.tar.gz 3.  解压安装

ActiveMQ使用教程

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. ActiveMQ特性列表 1. 多种语言和协议编写客户端.语言: Java, C, C++, C#, Ruby, Perl, Python, PHP.应用协议: OpenWire,Stomp REST,WS Notification

ActiveMq系列教程(一) - 简介与环境搭建

1.什么是JMS? JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(Message Oriented MiddleWare)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持.(摘自百度百科) 2.什么是ActiveMq? ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线,是JMS的一个具体实现方

activemq安装使用教程

一.下载安装 下载地址:http://activemq.apache.org/activemq-5158-release.html 然后解压即可,apache的东西解压后就可以使用了. 二.启动 在安装目录的bin目录下: activemq start 就可以启动了. 访问localhost:8161就可以访问 三. 未完待续..... 原文地址:https://www.cnblogs.com/chenmz1995/p/10431013.html

ActiveMQ基础教程JMS概述

什么是JMS JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持.可以简单的理解为:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合. JMS的优势 异步:发送消息者可以在发送消息后进行其它的

SSM框架Spring+SpringMVC+MyBatis——详细整合教程

摘要: 包括SQL Maps和Data Access ObjectsDAOMyBatis 消除了几乎所有的JDBC代码和参数的手工设置以及结果集的... 摘要:   spring MVC属于SpringFrameWork的后续产品已经融合在Spring Web Flow里面.Spring MVC 分离了控制器.模型对... 1.基本概念 1.1.Spring Spring是一个开源框架Spring是于2003 年兴起的一个轻量级的Java 开发框架由Rod Johnson 在其著作Expert