Java ActiveMQ 讲解(一)理解JMS 和 ActiveMQ基本使用(转)

转自:http://www.cnblogs.com/luochengqiuse/p/4678020.html?utm_source=tuicool&utm_medium=referral

最近的项目中用到了mq,之前自己一直在码农一样的照葫芦画瓢。最近几天研究了下,把自己所有看下来的文档和了解总结一下。

一. 认识JMS

1.概述

对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

简短来说,JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity),提供了应用程序之间异步通信的功能。

JMS1.0是jsr 194里规定的规范(关于jsr规范,请点击)。目前最新的规范是JSR 343,JMS2.0。

好了,说了这么多,其实只是在说,JMS只是sun公司为了统一厂商的接口规范,而定义出的一组api接口。

2. JMS体系结构

描述如下:

  • JMS提供者(JMS的实现者,比如activemq jbossmq等)
  • JMS客户(使用提供者发送消息的程序或对象,例如在12306中,负责发送一条购票消息到处理队列中,用来解决购票高峰问题,那么,发送消息到队列的程序和从队列获取消息的程序都叫做客户)
  • JMS生产者,JMS消费者(生产者及负责创建并发送消息的客户,消费者是负责接收并处理消息的客户)
  • JMS消息(在JMS客户之间传递数据的对象)
  • JMS队列(一个容纳那些被发送的等待阅读的消息的区域)
  • JMS主题(一种支持发送消息给多个订阅者的机制)

3. JMS对象模型

  • 连接工厂(connectionfactory)客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
  • JMS连接 表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
  • JMS会话 session 标识JMS客户端和服务端的会话状态。会话建立在JMS连接上,标识客户与服务器之间的一个会话进程。
  • JMS目的 Destinatio 又称为消息队列,是实际的消息源
  • 生产者和消费者
  • 消息类型,分为队列类型(优先先进先出)以及订阅类型

二. ActiveMQ

1. ActiveMQ的安装

  1. 从官网下载安装包,http://activemq.apache.org/download.html
  2. 赋予运行权限 chmod +x,windows可以忽略此步
  3. 运行 ./active start | stop

启动后,activeMQ会占用两个端口,一个是负责接收发送消息的tcp端口:61616,一个是基于web负责用户界面化管理的端口:8161。这两个端口可以在conf下面的xml中找到。http服务器使用了jettry。
这里有个问题是启动mq后,很长时间管理界面才可以显示出来。

2. 用Java访问ActiveMQ

先附上Bean代码:

public class MqBean implements Serializable{
    private Integer age;
    private String name;
    public Integer getAge() {
        return age;
    }
    public void setAge(Integer age) {
        this.age = age;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

2.1 队列消息的发送:

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection;
    Session session;
    Destination destination;
    MessageProducer producer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        connection = connectionFactory.createConnection();
        connection.start();
        //第一个参数是是否是事务型消息,设置为true,第二个参数无效
        //第二个参数是
        //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。异常也会确认消息,应该是在执行之前确认的
        //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。可以在失败的
        //时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的连接不断开,其他的消费者也不会接受(正常情况下队列模式不存在其他消费者)
        //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。
        //待测试
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        destination = session.createQueue("test-queue");
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢
        MqBean bean = new MqBean();
        bean.setAge(13);
        for(int i=0;i<100;i++){
            bean.setName("小黄"+i);
            producer.send(session.createObjectMessage(bean));
        }
        producer.close();
        System.out.println("呵呵");
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

注:在上面的代码中,确认模式有三种,里面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直没明白有什么区别。因为无法测试。不过大概也明白了一些。其实主要是MQ处理消息的流程决定的:

  1. 消息从生成方客户端传送到消息服务器。
  2. 消息服务器读取消息。
  3. 消息被放置到持久性存储器当中(出于可靠性的考虑)。
  4. 消息服务器确认收到消息(出于可靠性的考虑)。
  5. 消息服务器确定消息的路由。
  6. 消息服务器写出消息。
  7. 消息从消息服务器传送到使用方客户端。
  8. 使用方客户端确认收到消息(出于可靠性的考虑)。
  9. 消息服务器处理客户端确认(出于可靠性的考虑)。
  10. 消息服务器确定已经处理客户端确认。

这些步骤是连续的,所以任何步骤都可能成为消息从生成方客户端到使用方客户端的传送过程的瓶颈。这些步骤中的大多数都取决于消息传送系统的物理特征:网络带宽、计算机处理速度和消息服务器体系结构等等。但是,有一些步骤还取决于消息传送应用程序的特征和该应用程序要求的可靠性级别。
其实就是基于可靠性还是性能的选择.

2.2 队列消息的接收:

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    // Connection :JMS 客户端到JMS Provider 的连接
    Connection connection = null;
    // Session: 一个发送或接收消息的线程
    Session session;
    // Destination :消息的目的地;消息发送给谁.
    Destination destination;
    // 消费者,消息接收者
    MessageConsumer consumer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        // 构造从工厂得到连接对象
        connection = connectionFactory.createConnection();
        // 启动
        connection.start();
        // 获取操作连接
        //这个最好还是有事务
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
        destination = session.createQueue("test-queue");
        consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
                    System.out.println(bean);
                    if (null != message) {
                        System.out.println("收到消息" + bean.getName());
                    }
                } catch (Exception e) {
                    // TODO: handle exception
                }
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}

注:对于队列来说,比较简单的优化策略,应该就是队列分载了。由于每个消费者都是单线程的,所以可以设置多个消费者来提高速度。
大家可以复制个消费者自己测试下,在消费者中添加sleep测试下效果。

2.3 订阅消息的发送

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection;
    Session session;
    Destination destination;
    MessageProducer producer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        connection = connectionFactory.createConnection();
        connection.start();

        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        destination = session.createTopic("test-topic");
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢
        MqBean bean = new MqBean();
        bean.setAge(13);
        for(int i=0;i<100;i++){
            Thread.sleep(1000);
            bean.setName("小黄"+i);
            producer.send(session.createObjectMessage(bean));
        }
        producer.close();
        System.out.println("呵呵");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

2.4 订阅消息的接收

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    // Connection :JMS 客户端到JMS Provider 的连接
    Connection connection = null;
    // Session: 一个发送或接收消息的线程
    Session session;
    // Destination :消息的目的地;消息发送给谁.
    Destination destination;
    // 消费者,消息接收者
    MessageConsumer consumer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        // 构造从工厂得到连接对象
        connection = connectionFactory.createConnection();
        // 启动
        connection.start();
        // 获取操作连接
        //这个最好还是有事务
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
        destination = session.createQueue("test-queue");
        consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
                    System.out.println(bean);
                    if (null != message) {
                        System.out.println("收到消息" + bean.getName());
                    }
                } catch (Exception e) {
                    // TODO: handle exception
                }
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}

以上的消息发送后,如果没有接收到,可以登录自己的MQ管理页面:http://192.168.3.159:8161/admin/ ,默认帐号密码都是admin,查看队列中的消息

Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量

时间: 2024-10-11 07:16:16

Java ActiveMQ 讲解(一)理解JMS 和 ActiveMQ基本使用(转)的相关文章

Spring 实现远程访问详解——jms和activemq

前几章我们分别利用spring rmi.httpinvoker.httpclient.webservice技术实现不同服务器间的远程访问.本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问. 一.  简介 1.      什么是Apache ActiveMq Apache ActiveMq是最流行和最强大的开源消息和集成服务器.同时Apache ActiveMq是速度快,支持多种跨语言客户端和协议,同时配有易于使用的企业集成模式和优秀

JMS and ActiveMQ first lesson(转)

JMS and ActiveMQ first lesson -- jms基础概念和应用场景 2011-6-18 PM 9:30 主讲:kimmking <[email protected]> 整理:林木森 ppt下载地址: http://code.google.com/p/activemq-store-mongodb/downloads/list 下面开始: kimmking:介绍下jms和ActiveMQ.在讲JMS之前,我们聊聊相关的背景.谁知道JMS是什么意思? kimmking:对,是

JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中

JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中 一.JMS的理解JMS(Java Message Service)是jcp组织02-03年定义了jsr914规范(http://jcp.org/en/jsr/detail?id=914),它定义了消息的格式和消息传递模式:消息包括:消息头,消息扩展属性和消息体,其结构看起来与SOAP非常的相似,但一般情况下,SOAP主要关注远程服务调用,而消息则专注于信息的交换:消息分为:消息生产者,消息服务器和消息消费者.生产者与消费者之间

JMS中间件--ActiveMQ

java系统之间的消息通讯使用最多的是基于RMI的RPC和基于JMS的RPC,这两种的消息传输方式虽然都能够起到通讯的作用,但是在笔者看来,二者之间的差别还是非常大的.首先RMI是同步传输,而JMS是异步传输,另外二者的使用场景也是大不相同.在系统集成平台这个项目中让我能够有机会更加深入的认识这两种消息通讯机制.基础系统与考试系统之间的数据传输我们采用的是将ejb发布成webservice,然后再通过esb将客户端和webservice进行连接,这种方式在本质上是ejb之间的相互调用,属于RMI

Spring jms 与 ActiveMq初识

Spring JMS 与 ActiveMQ初识 1.1 Spring jms 与 ActiveMQ简介 jms 的全称是 Java Message Service,其主要作用是在生产者与消费者之间进行消息的传递:实际业务场景下,当A系统完成某项业务操作后,需要通知B系统或者其他任意系统 A系统操作完成的状态,以及操作中涉及到的相关信息,比如 当会员卡发放系统完成给用户绑定一张会员卡的操作之后,可以发出一条消息,消息内容是 uid或phone为XXX的用户,绑定了一张XX类型(普通卡.贵宾卡等)的

深入浅出JMS(三)--ActiveMQ简单的HelloWorld实例

第一篇博文深入浅出JMS(一)–JMS基本概念,我们介绍了JMS的两种消息模型:点对点和发布订阅模型,以及消息被消费的两个方式:同步和异步,JMS编程模型的对象,最后说了JMS的优点. 第二篇博文深入浅出JMS(二)–ActiveMQ简单介绍以及安装,我们介绍了消息中间件ActiveMQ,安装,启动,以及优缺点. 这篇博文,我们使用ActiveMQ为大家实现一种点对点的消息模型.如果你对点对点模型的认识较浅,可以看一下第一篇博文的介绍. JMS其实并没有想象的那么高大上,看完这篇博文之后,你就知

开源jms服务ActiveMQ的负载均衡+高可用部署方案探索

一个文件(或目录)拥有若干个属性,包括(r/w/x)等基本属性,以及是否为目录(d)与文件(-)或连接文件(l)等属性.此外,Linux还可以设置其他系统安全属性,使用chattr来设置,以lsattr来查看,最重要的是可以设置其不可修改的特性,即便是文件的拥有者都不能进行修改.这个属性相当重要,尤其是在安全机制方面(security). 文件默认权限:umask 当建立一个新的文件或目录时,它的默认属性是与umask有关的.通常,umask就是指定当前用户在建立文件或目录时的属性默认值.那么,

Spring ActiveMQ Caused By: javax.jms.IllegalStateException: Connection closed

根据 http://www.cnblogs.com/yshyee/p/7448808.html 进行JMS操作时,发送跟监听放到不同的项目中进行时,出现以下异常信息: org.springframework.jms.IllegalStateException: Connection closed; nested exception is javax.jms.IllegalStateException: Connection closed at org.springframework.jms.su

JMS实现-ActiveMQ,介绍,安装,使用,注意点,spring整合

[TOC] 缘由: 最近在用netty开发游戏服务器,目前有这样的一个场景,聊天服务器和逻辑服务器要进行消息交互,比如,某个玩家往某个公会提交了加入申请,这个申请动作是在逻辑服务器上完成的,但是要产生一条申请消息,由聊天服务器推送到对应的公会频道,目前这个申请消息就是通过jms发送到聊天服务器上,聊天服务器监听到后,推送到对应的公会频道. 下面主要介绍以下几点 - JMS简介 - 消息传递模型 - ActiveMQ介绍 - 安装使用 - spring整合JMS - 代码相关 JMS简介 J Ja