46.ActiveMQ开篇(Hello World、安全认证、Connection、Session、MessageProducer、MessageConsumer)

一、背景介绍

  CORBA\DCOM\RMI等RPC中间件技术已经广泛应用于各个领域,但是面对规模和复杂度都越来越高的分布式系统,这些技术慢慢显现出局限性:

  • 同步通信:客户发出调用后,必须等待服务完成处理并返回结果后才能继续执行;
  • 客户端和服务端的生命周期密切耦合;客户进程和服务对象进行必须都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常。
  • 点对点通信:客户的一次调用只发送给一个单独的目标对象。

  面向消息的中间件较好地解决了上面的问题(Message Oriented Middleware,MOM)。发送者再发送消息给消息服务器消息服务器将消息存放到若干队列中,在合适的时候再将消息转发给接受者。这种模式下,发送和结束是异步的,发送者无需等待;二者的生命周期未必相同;发送消息的时候接受者不一定运行,接收消息的时候发送者也不一定运行;一对多通信:对一个消息可以有多个接受者。

  java消息服务(JMS)定义了java中访问消息中间件的接口。JMS只是接口,并没有实现。实现JMS的接口的消息中间件成文JMS Provider,已有的MOM系统包括Apache的ActiveMQ、以及阿里的RocketMQ、IBM的MQSeries、Microsoft的MSMQ和BEA的MessageQ、RabbitMQ等待,他们基本都遵循JMS规范。

二、JMS术语

JMS 实现JMS接口的消息中间件

Provider(MessageProvider):生产者

Consumer(MessageConsumer):消费者

PTP:Point to Point ,即点对点的消息模型;

pub/sub : publish/subscribe,发布/订阅的消息模型;

Queue:队列目标;

Topic:主题目标;

ConnectionFactory:连接工厂,JMS用它创建连接;

Connection:JMS客户端到JMS Provider的连接;

Destination:消息的目的地;

Session:会话,一个法师或者接收消息的线程;

Message 接口(消息):

  是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:

  • 消息头(必须要有):包含用于识别和为消息寻找路由的设置。
  • 一组消息属性:包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过路桥(消息选择器)。
  • 一个消息体:允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)

Session 接口(会话):

  表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个地接收。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文保存一组消息,知道事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话运行用户创建消息生产者来发送消息,创建消费者来接收消息。

三、ActiveMQ简介

  ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。

  ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中仍然扮演着重要角色,可以说ActiveMQ在业界应用最广泛,当然如果想要有更强大的性能和海量数据处理能力,ActiveMQ还需要不断地提升版本,80%以上的业务我们用ActiveMq都能满足,当然后续如天猫淘宝这种大型的电商网站,尤其是双十一这种特殊事件,ActiveMQ需要进行很复杂的优化源码以及架构设计才能完成,我们只会会学习一个更强大的分布式消息中间件,RocketMQ,可以说ActiveMQ是和谐,是基础,所以也必须要掌握好。

四、ActiveMQ的使用

4.1 环境搭建

  去官网下载:http://activemq.apache.org,解压放到一个地方,我放在D:\Programe_Files下,然后直接启动bin下的wrapper.exe即可启动监控台,在浏览器中访问localhost:8186,用户名admin/admin 即可访问。

4.2 ActiveMQ Hello World

  首先写一个简单的Hello World实例,感受一下ActiveMQ,需要实现接受者和发送者两部分代码。

  1. 建立COnnectionFactory工厂对象,需要填入用户名密码以及要连接的地址,默认端口为“tcp://localhost:61616”
  2. 通过ConnectionFactory工厂对象创建一个Connection连接,并且调用Connection的Start方法开启连接。
  3. 通过Connection对象创建Session会话,用于接收消息,参数配置1为是否启动事务,参数配置2为签收哦模式,一般我们设置自动签收。
  4. 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue;在Pub/Sub模式中,Destination被称作Topic(即主题)。可以使用多个Queue和Topic
  5. 需要通过Session对象创建消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumer
  6. 使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)
  7. 使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。同理客户端使用receive方法接收数据,最后莫忘关闭Connection

发送:

 1 package com;
 2
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.DeliveryMode;
 6 import javax.jms.Destination;
 7 import javax.jms.JMSException;
 8 import javax.jms.MessageProducer;
 9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11
12 import org.apache.activemq.ActiveMQConnectionFactory;
13
14 public class Sender {
15
16     public static void main(String[]args) throws JMSException{
17         ConnectionFactory factory = new ActiveMQConnectionFactory(
18                 ActiveMQConnectionFactory.DEFAULT_USER,
19                 ActiveMQConnectionFactory.DEFAULT_PASSWORD,
20                 "tcp://localhost:61616"
21                 );
22
23         Connection conn = factory.createConnection();
24         conn.start();
25
26         Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
27
28         Destination dest = session.createQueue("queue1");
29
30         MessageProducer messProducer = session.createProducer(dest);
31
32         messProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);;
33
34         TextMessage mess = session.createTextMessage();
35         for(int i=0;i<5;i++){
36
37             mess.setText("消息内容"+i);
38
39             messProducer.send(mess);
40         }
41
42         if(conn != null)conn.close();
43     }
44
45 }

接收:

package com;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
    public static void main(String[]args) throws JMSException{
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616"
                );

        Connection conn = factory.createConnection();
        conn.start();

        Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

        Destination dest = session.createQueue("queue1");

        MessageConsumer messConsumer = session.createConsumer(dest);

        while(true){
            TextMessage mess = (TextMessage) messConsumer.receive();
            System.out.println(mess.getText());
        }

//        if(conn != null)conn.close();
    }
}

4.3 启动安全认证

在acitvemq.xml中的 broker 下加上如下代码:

        <plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="sgm" password="sgm" groups="users,admins" />
                </users>
            </simpleAuthenticationPlugin>
        </plugins>

重启ActiveMQ,再运行4.2中的 sender 发生异常:

可见在第24行尝试连接的时候就发生错误了。所以ActiveMQConnectionFactory.DEFAULT_USER要改成"sgm",密码也同样修改。receiver做同样修改。

4.4 Connection的使用

  在成功创建正确的ConnectionFactory后,下一步将是创建一个连接,他是JMS定义的一个接口。ConnectionFactory负责返回可以与底层消息传递系统进行通信的Connection实现。通常客户端只使用单一连接。根据JMS文档,Connection的目的是“利用JMS提供者封装开发的连接”。以及表示“客户端与服务例程之间的开放TCP/IP套接字”。该文档还指出Connection应该是进行客户端身份校验的地方。

  当一个Connection被创建时,它的传输默认是关闭的,必须使用start方法开启。一个Connection可以建立一个或者多个Session。

  当一个程序执行完成后,必须关闭Connection,否则ActiveMQ不能释放资源,关闭一个Connection同样也关闭了Session、MessageProducer和MessageConsumer。

Connection createConnection();

Connection createConnection(String userName,String password);

大多数开源框架不需要自己优化了,能使用好API其实就是最好的优化了。

4.5 Session的使用

  一旦从ConnectionFactory中获得一个Connection,必须从Connection中创建一个或者多个Session,才能进一步执行其他操作。Session是一个发送或者接收消息的线程,可以使用Session创建MessageProducer,MessageConsumer和Message。

  Session可以被事务化,也可以不用事务,通常,可以通过向Connection上的创建方法传一个布尔值进行设置。

  Session createSession(boolean transacted,int acknowledgeMode);

  其中 transacted为是否使用事务的标识,acknowledgeMode为签收方式。

  结束事务有两种方式:提交或者回滚,当一个事务提交,消息被处理。事务中有一个步骤失败,事务就回滚,这个事务中的已经执行的动作将被撤销。在发送消息最后也必须要使用session.commit()方法提交事务。

  签收模式有三种:

  1. session.AUTO_ACKNOWLEDGE当消费端从receive或者onMessage成功返回时,Session自动签收客户端的这条消息。
  2. Session.CLIENT_ACNOWLEDGE消费端通过调用消息(Message)的acknoledge方法签收消息(mess.acknowledge())。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有消费已消费消息的收条。
  3. Session.DUPS_OK_ACKNOWLEDGE此选项指示Session不必确保对传送消息的签收,它可能引起消息的重复,但是降低了Session的开销,所以只有消费端能容忍重复消息才能使用。

  工作中一般使用手动签收的方式

4.6 MessageProducer

---

4.7 MessageConsumer

---

时间: 2024-08-23 16:30:39

46.ActiveMQ开篇(Hello World、安全认证、Connection、Session、MessageProducer、MessageConsumer)的相关文章

Spring ActiveMQ Caused By: javax.jms.IllegalStateException: Connection closed

根据 http://www.cnblogs.com/yshyee/p/7448808.html 进行JMS操作时,发送跟监听放到不同的项目中进行时,出现以下异常信息: org.springframework.jms.IllegalStateException: Connection closed; nested exception is javax.jms.IllegalStateException: Connection closed at org.springframework.jms.su

restful架构风格设计准则(五)用户认证和session管理

读书笔记,原文链接:http://www.cnblogs.com/loveis715/p/4669091.html,感谢作者! Authentication 其实在上一节中,我们已经提出了无状态约束给REST实现带来的麻烦:用户的状态是需要全部保存在客户端的.当用户需要执行某个操作的时候,其需要将所有的执行该请求所需要的信息添加到请求中.该请求将可能被REST服务集群中的任意服务器处理,而不需要担心该服务器中是否存有用户相关的状态. 但是在现有的各种基于HTTP的Web服务中,我们常常使用会话来

ActiveMQ简单入门

一.创建一个简单的Hello World案例 首先需要导入activemq-all-5.14.5.jar包,写生产端: package com.ietree.mq.helloworld; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProduc

ActiveMq整理之java应用

一.JMS 更多介绍参考 "http://baike.baidu.com/link?url=LNCEOGgqEX-uSKuRJooyG1RSfS7CTWDKYT8OOouhxLk_yWNN-0wNSWq7KjNQ259a9pfL95janJi8v8-drvdHqa" 1.1背景 当前,CORBA.DCOM.RMI等RPC中间件技术已广泛应用于各个领域.但是面对规模和复杂度越来越高的分布式系统,这些技术也显示出其局限性: 1.同步通信:客户发出调用后,必须等待服务对象完成处理并返回结果才

ActiveMQ(06):ActiveMQ结合Spring开发

一.pom.xml与mq.properties Spring提供了对JMS的支持,需要添加Spring支持jms的包,如下: <dependency>     <groupId>org.springframework</groupId>     <artifactId>spring-jms</artifactId>     <version>4.1.7.RELEASE</version> </dependency&g

ActiveMQ的入门demo

步骤: 1 :下载ActiveMQ 官网:http://activemq.apache.org/ 2 :解压AcitveMQ, 根据自己的操作系统选择运行win64或者win32下的activemq.bat,双击运行.注意:千万不能选择bin目录下的activemq.bat,这样会闪退. 我的解压目录为: 3:创建Java项目: 引入jar包.注意:不能少了hawtbuf-1.11.jar这个包,这个包是不能缺少的. 目录结构如图: 4:登录本地的MQ服务: 登录地址:http://localh

ActiveMQ实例1--简单的发送和接收消息

一.环境准备 1,官网http://activemq.apache.org/下载最新版本的ActiveMQ,并解压 2,打开对应的目录,在Mac环境下,一般可以运行命令: cd /Users/***/Downloads/apache-activemq-***/bin/macosx ./activemq start 3,启动成功后,登录http://localhose:8161/admin/,登陆账号和密码都为admin,创建一个queue名为jyQueue. 二.创建Eclipse项目 1,新建

ActiveMq C#客户端 消息队列的使用(存和取)

1.准备工具 VS2013Apache.NMS.ActiveMQ-1.7.2-bin.zipapache-activemq-5.14.0-bin.zip 2.开始项目 VS2013新建一个C#控制台应用程序,项目中添加两个dll引用,一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目录下的Apache.NMS.dll,另一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug

ActiveMQ(一)初步接触-编写Demo

声明 转载请注明出处! Reprint please indicate the source! http://www.hiknowledge.top/?p=86&preview=true 什么是JMS JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持. 引用自百