Java消息中间件--初级篇

一、 为什么使用消息中间件?

假设用户登录系统   传统方式 用户登录  调用短息服务   积分服务  日志服务等各种服务  如果短息服务出现问题就无法发送短信而且用户登录成功必须所有调用全部完成返回给用户登录系统一条用户登录成功信息。从整体业务上讲  用户只是要登录系统  并不关心短信服务  日志服务怎么样就想登录成功就好  这种操作让用户等待了时间。

2)通过消息中间件解耦服务调用

用户登录系统会将登录消息发送给消息中间件  ---消息中间件会将用户登录消息异步一条一条推送给---短息服务  日志服务等其他相关服务   用户就不需要等待其他服务处理完成在给我返回结果。

二、 消息中间件的好处:

1)系统解耦

2)异步

3)横向扩展

4)安全可靠   消息中间件会把我们的消息进行保存  如果其他业务系统出现问题  或者业务系统没有对消息进行消费  业务系统可以下一次继续对消息进行消费

5)顺序保存

三、中间件是什么:

中间件作用在业务系统之间  不是操作系统软件  还不是业务软件,用户不能直接使用的软件同一叫法。

四、消息中间件:

用于数据接收和发送,利用高效可靠的异步消息传递机制集成分布式系统

五、JMS (java Message Service )

Java消息服务 java消息中间件的API,用于在两个应用程序之间或者分布式系统中发送消息,进行异步通信的规范。

六、AMQP

提供统一消息服务的应用层标准协议,遵循这个协议客户端与消息中间件可以传递消息,不会受到客户端和中间件不同产品,是不同开发语言影响  只要遵循这种协议就可以传递消息。

七、常见消息中间件

activeMQ 是一个完全支持JMS1.1和J2EE1.4规范的

rabbitMQ 是一个开源的AMQP实现,用于分布式系统中存储转发消息

kafka  是一个高吞吐量的分布式发布订阅消息系统,是一个分布式的,分区的,可靠的分布式日志存储服务。(不是一个严格消息中间件 )

1)高吞吐量:即使非常普通的硬件kafka也可以支持每秒数百万的消息

八、JMS规范

提供者:实现JMS规范的消息中间件服务器

客户端:接收或发送消息的应用程序

生产者/发布者:创建并发送消息的客户端

消费者/订阅者:接收并处理消息的客户端

消息:应用程序之间传递的数据内容

消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

九、JMS消息模式

1)队列模式

客户端包括生产者和消费者

队列中的消息只能被一个消费者消费

消费者可以随时消费队列的消息

举例:生产者  应用1  应用2  向JMS队列中发送消息  应用1发送 1 3 5   应用2 发送2   4   6  JMS消息队列中会存在  1 2 3 4 5 6 消息     时存在消费者  应用3  应用4   应用3与JMS 有两个链接   应用4有一个链接  在消费消息的时候  三个链接会平均分配6各消息

2)主题模式

客户端:包括发布者和订阅者

主题中的消息被所有订阅者消费

消费者不能消费订阅之前就发送到主题中的消息(消费者要消费队列中的消息要先订阅在消费   如果不提前订阅是接收不到消息的)

  举例:应用3 与应用4 向队列中订阅消息  应用3建立了两个链接  应用4建立了一个链接   发布者 应用1 应用2  向队列中发布消息 123456  当订阅者消费消息的时候三个链接都消费了6个消息

十、JMS编码接口

ConnectionFactory 用于创建链接到消息中间件的链接工厂

Connection 代表可应用程序和消息服务器之间的通信链路

Destination (目的地) 指消息发布和接收的地点,包括队列和主题

Session 表示一个单线程的上下文,用于发送和接收消息

MessageConsumer 由会话创建,用于接收发送到目标的消息

  MessageProducer 由会话创建,用于发送消息到目标

Message 是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体

  

十一:JMS代码演示

1)使用JMS接口规范链接activeMQ  队列模式

引入activemq依赖jar  注意:引入相关jar  必须与相应的jdk匹配否则会报异常

1 java.lang.UnsupportedClassVersionError: org/apache/lucene/store/Directory : Unsupported major.minor version 51.0

2 at java.lang.ClassLoader.defineClass1(Native Method)

3 at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)

4 at java.lang.ClassLoader.defineClass(ClassLoader.java:615)

5 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)

6 at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)

7 at java.net.URLClassLoader.access$000(URLClassLoader.java:58) 8 at java.net.URLClassLoader$1.run(URLClassLoader.java:197)

<dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.0</version>
        </dependency>
    </dependencies>

创建消费提供方(主题模式消息发布方)

public class JmsProduce {

    //声明服务器地址
    private static final String url = "tcp://127.0.0.1:61616";
    //声明队列名称
    //private static final String queue = "queue_test";
    private static final string topic = "topic_test";
    public static void main(String []args)throws Exception{

        //创建连接工厂  由消息服务商提供
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        //根据消息工厂创建连接
        Connection connection = factory.createConnection();
        //开启连接
        connection.start();
        //根据连接创建会话   参数一  是否使用事务  参数二 应答模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建目标  也就是队列
        // Destination destination =  session.createQueue(JmsProduce.queue);        //创建主题目标          Destination destination = session.createTopic(topic);
        //创建一个生产者
        MessageProducer producer = session.createProducer(destination);
        //
        for (int i=0;i<100;i++){
            //创建消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            //生产者将消息发送给队列
            producer.send(textMessage);
            System.out.println("生产者"+textMessage);
        }
        connection.close();

    }
}

消息消费方(主题模式订阅者)

public class JmsConsumer {

    private static final String url="tcp://127.0.0.1:61616";
    //private static final String queue = "queue_test";    private static final String topic = "topicName";
    public static void main(String [] args) throws JMSException {

        //创建连接工厂  由消息服务商提供
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        //根据消息工厂创建连接
        Connection connection = factory.createConnection();
        //开启连接
        connection.start();
        //根据连接创建会话   参数一  是否使用事务  参数二 应答模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目标  也就是队列
        //Destination destination =  session.createQueue(queue);         //创建主题目标           Destination destination = session.createTopic(topic);
        //创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        //创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage message1 = (TextMessage) message;
                System.out.println("接收消息"+message1);
            }
        });
    }
}

队列模式是点对点形式

主题模式  消费者需要先对主题进行订阅  然后发布者在发布过程中消费者才能消费消息

Spring 整合JMS ActiveMq

创建一个maven项目

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.ac.bcc</groupId>
  <artifactId>Jms-Activemq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>war</packaging>
  <properties>
        <spring.version>4.1.3.RELEASE</spring.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>
            <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

消息提供方实现

1)定义消息服务方接口

package cn.ac.bcc.jms.service;

public interface ProducerService {
    //定义发送消息的方法
    public void sendMessage(String message);
}

2)配置公共common.xml文件

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd">

        <!--开启注解扫描  -->
        <context:annotation-config/>
        <!--配置activemq连接工厂   在spring提供的连接工厂中需要提供activemq提供的工厂  -->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <!-- 配置activemq服务器地址  通过地址创建连接 -->
            <property name="brokerURL" value="tcp://localhost:61616"/>
        </bean>
        <!-- 配置spring jms 提供的连接工厂 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
        <!-- 配置activeMq目的地 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <!-- 指定队列名称  通过构造方式 -->
            <constructor-arg value="queue-test"/>
        </bean>

</beans>

3)配置spring配置文件 producer.xml

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd" >

    <!--引入公共配置文件 -->
    <import resource="common.xml" />
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--注入连接工厂 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <bean class="cn.ac.bcc.jms.service.impl.ProducerServiceImpl"></bean>
</beans>

4)实现消息发送接口

package cn.ac.bcc.jms.service.impl;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import cn.ac.bcc.jms.service.ProducerService;

@Service
public class ProducerServiceImpl implements ProducerService {

    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name = "queueDestination")
    private Destination destination;

    @Override
    public void sendMessage(final String message) {

        //通过jmsTemplate 模板发送消息  传递两个参数  消息的目的地  也就是activemq服务    参数2  创建一个消息体 封装消息信息
        jmsTemplate.send(destination, new MessageCreator() {

            @Override
            public Message createMessage(Session session) throws JMSException {

                TextMessage textMessage = session.createTextMessage(message);
                System.out.println("发送消息" + textMessage.getText());
                return textMessage;
            }
        });

    }

}

5)测试类

package cn.ac.bcc.test;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import cn.ac.bcc.jms.service.ProducerService;

public class JmsProducer {

    public static void main(String[] args) {

        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        // 获取提供者接口实例
        ProducerService producerService = context.getBean(ProducerService.class);
        for (int i = 0; i < 100; i++) {
            // 调用发送消息方法
            producerService.sendMessage("消息发送来了" + i);
        }
        //关闭连接
        context.close();
    }

}

消息消费方实现

1)自定义消息消费方监听实现spring提供的MessageListener监听

package cn.ac.bcc.jms.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ConsumerListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage)message;
        try {
            System.out.println("消息消费"+textMessage.getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

2)配置消费方spring 配置文件 consumer.xml

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd">
        <!--引入公共配置文件  -->
        <import resource="common.xml"/>
        <!--创建自定义监听  -->
        <bean id = "consumerListener" class="cn.ac.bcc.jms.listener.ConsumerListener"></bean>
        <!--配置jms监听器  -->
        <bean id="jmsLisener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
         <property name="connectionFactory" ref="connectionFactory"></property>
        <property name="destination" ref="queueDestination"></property>
        <property name="messageListener" ref="consumerListener"></property>
        </bean>
</beans>

3)消费方测试实现

package cn.ac.bcc.test;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ConsumerText {

    public static void main(String[] args) {

        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");

    }

}

以上为spring整合JMS 实现消息接收发送队列模式实现   在消息接收与发送过程中要启动activemq

原文地址:https://www.cnblogs.com/lwdmaib/p/9117889.html

时间: 2024-10-29 03:41:59

Java消息中间件--初级篇的相关文章

【Java】在Eclipse中使用JUnit4进行单元测试(初级篇)

本文绝大部分内容引自这篇文章: http://www.devx.com/Java/Article/31983/0/page/1 我们在编写大型程序的时候,需要写成千上万个方法或函数,这些函数的功能可能很强大,但我们在程序中只用到该函数的一小部分功能,并且经过调试可以确定,这一小部分功能是正确的.但是,我们同时应该确保每一个函数都完全正确,因为如果我们今后如果对程序进行扩展,用到了某个函数的其他功能,而这个功能有bug的话,那绝对是一件非常郁闷的事情.所以说,每编写完一个函数之后,都应该对这个函数

python 面向对象初级篇

Python 面向对象(初级篇) 概述 面向过程:根据业务逻辑从上到下写垒代码 函数式:将某功能代码封装到函数中,日后便无需重复编写,仅调用函数即可 面向对象:对函数进行分类和封装,让开发"更快更好更强-" 面向过程编程最易被初学者接受,其往往用一长段代码来实现指定功能,开发过程中最常见的操作就是粘贴复制,即:将之前实现的代码块复制到现需功能处. Python while True: if cpu利用率 > 90%: #发送邮件提醒 连接邮箱服务器 发送邮件 关闭连接 if 硬盘

在Eclipse中使用JUnit4进行单元测试(初级篇)【转】

来自[http://blog.csdn.net/andycpp/article/details/1327147] 本文绝大部分内容引自这篇文章: http://www.devx.com/Java/Article/31983/0/page/1 我们在编写大型程序的时候,需要写成千上万个方法或函数,这些函数的功能可能很强大,但我们在程序中只用到该函数的一小部分功能,并且经过调试可以确定,这一小部分功能是正确的.但是,我们同时应该确保每一个函数都完全正确,因为如果我们今后如果对程序进行扩展,用到了某个

零基础学习hadoop到上手工作线路指导初级篇:hive及mapreduce

此篇是在零基础学习hadoop到上手工作线路指导(初级篇)的基础,一个继续总结.五一假期:在写点内容,也算是总结.上面我们会了基本的编程,我们需要对hadoop有一个更深的理解:hadoop分为hadoop1.X.hadoop2.X,并且还有hadoop生态系统.这里只能慢慢介绍了.一口也吃不成胖子. hadoop 1.x分为mapreduce与hdfs 其中mapreduce是很多人都需要迈过去的槛,它比较难以理解,我们有时候即使写出了mapreduce程序,但是还是摸不着头脑.我们不知道ke

java web进阶篇(四) Tomcat数据源

动态web开发的最大特点是可以进行数据库的操作,传统的jdbc操作由于步骤重复性造成程序性能下降. 先来回顾JDBC的操作原理 1.加载数据库驱动程序,数据库驱动程序通过classpath配置. 2.通过DirverManager类取得数据库连接对象. 3.通过Connection实例化PreparedStatement对象,编写sql语句命令操作数据库. 4.数据库属于资源操作,操作完成后要关闭数据库以释放资源. 其实以上操作,1.2.4步骤是重复的,保留3,实际上就是数据源产生的原因. 数据

内存泄露之常见问题解决--初级篇

身为一个段子猿,我决定来写写最近的学习心得. 1.简介 在整个Android开发过程中,内存泄露是导致OOM的一个重点因素.大概意思就是:GC无法回收原本应该被回收的对象,这个对象就引发了内存泄露.那有什么危害呢?手机的内存大小是有限的,如果不能释放的话,你就无法创建新的对象,你的新界面等等就无法正常运行,然后程序就OOM了(OutOfMemory). 2.OOM以及内存泄露 OOM通俗点讲就是,你家里有2个厕所,本来你和你老婆用的话,都是够用的,有一天你不小心造人了,从此家里有了1+1=3个人

javascript调试之chrome初级篇

请原谅我的喜新厌旧! 以前我是firebug和firefox的忠实粉丝,虽然现在依然对ff心存好感,但是chrome在我心中的地位与日俱增.以前实习时导师看到我在用firebug调试时善意地告诉我以后要习惯用chrome,我还不以为意,直到现在深深地爱上了chrome的简洁方便.我基本不会html和css,js也只会那么一点,所以现在为止基本上不用用到高级一点的调试,so此文为调试之初级篇.

python_way ,day7 面向对象 (初级篇)

面向对象 初级篇 python支持 函数 与 面向对象 什么时候实用面向对象? 面向对象与函数对比 类和对象 创建类 class 类名 def 方法名(self,xxxx) 类里面的方法,只能对象去调用 对象 = 类名() 通过对象执行方法 对象.方法名(xxxx) 设计,增删改查功能 函数式: def fetch(self,host,port name passwd,sql) pass def create(self,host,port name passwd,sql) pass def re

(转)25个增强iOS应用程序性能的提示和技巧--初级篇

(转)25个增强iOS应用程序性能的提示和技巧--初级篇 本文转自:http://www.cocoachina.com/newbie/tutorial/2013/0408/5952.html 在开发iOS应用程序时,让程序具有良好的性能是非常关键的.这也是用户所期望的,如果你的程序运行迟钝或缓慢,会招致用户的差评.然而由于iOS设备的局限性,有时候要想获得良好的性能,是很困难的.在开发过程中,有许多事项需要记住,并且关于性能影响很容易就忘记. 本文收集了25个关于可以提升程序性能的提示和技巧,把