学习笔记--Java消息中间件

#### 消息中间件

消息中间件:关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统

JMS:Java消息服务,Java平台中关于面向消息中间件的API

AMQP:提供统一消息服务的应用层标准协议

常见消息中间件

ActiveMQ

RabbitMQ

Kafka

JMS规范

提供者:实现JMS规范的消息中间件服务器

客户端:发送或接受消息的应用程序

生产者/发布者:创建并发送消息的客户端

消费者/订阅者:接收并处理消息的客户端

消息:应用程序之间传递的数据内容

消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

JMS消息模式

队列模型:

  • 客户端包括生产者和消费者
  • 消息只能被一个消费者消费
  • 随时消费

主题模型:

  • 客户端包括发布者和订阅者
  • 消息能被所有订阅者消费
  • 消费者不能消费订阅之前就发送到主题中的消息

JMS编码接口:

  • ConnectionFactory:用于创建连接到消息中间件的连接工厂
  • Connection:代表了应用程序和消息服务器之间的通信链路
  • Destination:消息发布和接收的地点,包括队列和主题
  • Session:表示一个单线程的上下文,用于发送和接收消息
  • MessageConsumer:由会话创建,用于接收发送到目标的消息
  • MessageProducer:由会话创建,用于发送消息到目标
  • Message:在消费者和生产者之间传送的对象,包括消息头,一组消息属性,一个消息体

使用ActiveMQ

队列模型

producer

        //1. 创建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(url);

        //2. 创建Connection
        Connection connection = factory.createConnection();

        //3. 启动Connection
        connection.start();

        //4. 创建Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 创建Destination
        Destination destination = session.createQueue(queueName);

        //6. 创建MessageProducer
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {
            //7. 创建消息
            TextMessage message = session.createTextMessage("test" + i);

            //8. 发布消息
            producer.send(message);

            System.out.println("发送消息: " + message.getText());
        }

        //9. 关闭连接
        connection.close();

consumer

        //1. 创建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(url);

        //2. 创建Connection
        Connection connection = factory.createConnection();

        //3. 启动Connection
        connection.start();

        //4. 创建Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 创建Destination
        Destination destination = session.createQueue(queueName);

        //6. 创建MessageConsumer
        MessageConsumer consumer = session.createConsumer(destination);

        //7. 创建消息监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息: " + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        //9. 关闭连接(消息监听异步执行,需程序全部运行结束才能关闭连接)
//        connection.close();

主题模型

producer

        //1. 创建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(url);

        //2. 创建Connection
        Connection connection = factory.createConnection();

        //3. 启动Connection
        connection.start();

        //4. 创建Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 创建Destination
        Destination destination = session.createTopic(topicName);

        //6. 创建MessageProducer
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {
            //7. 创建消息
            TextMessage message = session.createTextMessage("test" + i);

            //8. 发布消息
            producer.send(message);

            System.out.println("发送消息: " + message.getText());
        }

        //9. 关闭连接
        connection.close();

consumer

        //1. 创建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(url);

        //2. 创建Connection
        Connection connection = factory.createConnection();

        //3. 启动Connection
        connection.start();

        //4. 创建Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 创建Destination
        Destination destination = session.createTopic(topicName);

        //6. 创建MessageConsumer
        MessageConsumer consumer = session.createConsumer(destination);

        //7. 创建消息监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息: " + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        //9. 关闭连接(消息监听异步执行,需程序全部运行结束才能关闭连接)
//        connection.close();

spring jms

  • ConnectionFactory 用于管理连接的连接工厂
  • 由spring提供
  • SingleConnectionFactory和CachingConnectionFactory
  • JmsTemplate 用于发送和接收消息的模板类
  • 由spring提供,在容器中注册就可以使用
  • 线程安全
  • MessageListener 消息监听器
  • 实现一个onMessage方法,只接收一个Message参数

spring使用jms示例

common.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <context:annotation-config />

    <!-- ActiveMQ为我们提供的ConnectionFactory -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />
    </bean>
    <!-- spring jms为我们提供连接池 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 一个队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue" />
    </bean>
    <!-- 一个主题目的地,发布订阅消息 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic"/>
    </bean>
</beans>

producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <import resource="common.xml" />

    <!-- 配置JmsTemplate,用于发送消息-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>

    <bean class="com.qyluo.jms.spring.producer.ProducerServiceImpl" />
</beans>

cosumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 导入公共配置 -->
    <import resource="common.xml" />

    <!-- 配置消息监听器 -->
    <bean id="consumerMessageListener" class="com.qyluo.jms.spring.consumer.ConsumerMessageListener" />

    <!-- 配置消息监听容器 -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination"/>
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>

</beans>

ProducerServiceImpl

public class ProducerServiceImpl implements ProducerService {
    @Autowired
    JmsTemplate jmsTemplate;

    @Resource(name = "topicDestination")
    Destination destination;

    @Override
    public void sendMessage(final String message) {
        //使用JmsTemplate发送消息
        jmsTemplate.send(destination, new MessageCreator() {
            //创建一个消息
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
        System.out.println("发送消息: " + message);
    }
}

AppProducer

public class AppProducer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        ProducerService service = context.getBean(ProducerService.class);
        for (int i = 0; i < 100; i++) {
            service.sendMessage("text" + i);
        }
        context.close();
    }
}

ConsumerMessageListener

public class ConsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息: " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }

AppConsumer

public class AppConsumer {
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    }
}

ActiveMQ集群

集群方式

  • 客户端集群:多个消费者消费同一个队列
  • Broker clusters:多个Broker之间同步消息
  • Master Slave:实现高可用

ActiveMQ失效转移(failover)

允许当其中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器

语法:failover:(uri1,...,uriN)?transportOptions

transportOptions参数说明

  • randomize 默认为true,表示在URI列表中选择URI连接时是否采用随机策略
  • initialReconnectDelay 默认为10,单位毫秒,表示第一次尝试重新连接之间等待的时间
  • maxReconnectDelay 默认为30000,单位毫秒,最长重连的时间间隔

Broker Cluster集群配置

NetworkConnector(网络连接器):ActiveMQ服务器之间的网络通讯方式

分为静态连接器和动态连接器

静态连接器:

<networkConnectors>
    <networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>

动态连接器:

<networkConnectors>
    <networkConnector uri="multicast://default"/>
</networkConnectors>

<transportConnectors>
    <transportConnector uri="tcp://localhost:0" discoverUri="multicast://default"/>
</transportConnectors>

Master/Slave集群配置

ActiveMQ Master Slave集群方案

  • Share nothing storage master/slave (已过时,5.8+后移除)
  • Shared storage master/slave 共享存储
  • Replicated LevelDB Store 基于复制的LevelDB Store

两种集群方式对比

方式 | 高可用 | 负载均衡 |

--|----------|--------------|

Master/Slave | 是 | 否 |

Broker Cluster | 否 | 是 |

三台服务器的完美集群方案

Node A和Node B做消息同步,Node A和Node C做消息同步,Node B和Node C做Master / Slave对资源进行持久化

服务器 服务端口 管理端口 存储 网络连接器 用途
Node-A 61616 8161 - Node-B、Node-C 消费者
Node-B 61617 8162 /share_file/kahadb Node-A 生产者,消费者
Node-C 61618 8163 /share_file/kahadb Node-A 生产者,消费者

企业系统中的最佳实践

实际业务场景特点

  • 子业务系统都有集群的可能性
  • 同一个消息会广播给关注该类消息的所有子业务系统
  • 同一类消息在集群中被负载消费
  • 业务的发生和消息的发布最终一致性

使用ActiveMQ的虚拟主题解决方案

  • 发布者:将消息发布到一个主题中,主题名以VirtualTopic开头,如VirtualTopic.TEST
  • 消费者:从队列中获取消息,在队列名中表明自己身份,如Consumer.A.VirtualTopic.TEST

使用JMS中XA系列接口保证强一致性

  • 引入分布式事务
  • 要求业务操作必须支持XA协议

使用消息表的本地事务解决方案

使用内存日志的解决方案

基于消息机制的事件总线

事件驱动架构

RabbitMQ

RabbitMQ:使用交换器绑定到队列

  • 创建ConnectionFactory
  • 创建Connection
  • 创建Channel
  • 定义Exchange,类型I必须为fanout
  • 定义Queue并且绑定队列

Kafka

Kafka使用group.id分组消费者

  • 配置消息者参数group.id相同时对消息进行负载处理
  • 配置服务器partitions参数,控制同一个group.id下的consumer数量小于partitions
  • kafka只保证同一个group.id下的消息是有序的

原文地址:https://www.cnblogs.com/kioluo/p/8824804.html

时间: 2024-10-12 01:02:16

学习笔记--Java消息中间件的相关文章

嵌入式开发学习笔记 ( java - c/c++ :从入门到入门 )

发现放到Blog之后排版全乱套了.. 已经把PDF上传到资源页了  http://download.csdn.net/detail/lyy289065406/8934637 那边排版好看一点...看官们随意吧 >...< · 目 录 导 航 1. 引言 1.1. 编写目的 1.2. 阅读范围 1.3. 声明 1.4. 缩写词/名词解释 1.5. 参考资料 2. 嵌入式开发学习笔记 2.1. 开发环境/测试环境 2.2. 开坑:提要 2.3. 入坑:JNI 2.3.1. navicate 接口定

学习笔记——Java类和对象

今天学习了Java的类和对象的相关知识,由于Java面向对象的编程的思想和C++几乎一样,所以需要更多的关注Java的一些不同之处. 1.类 1.1 在类这一块,除了基本的成员变量,成员方法,构造函数等外,需要掌握三种权限修饰符的区别,并会合理使用(private限制了权限只在本类,protected限定了权限只在包内). 1.2 静态常量.变量.方法:static修饰.我们可以使用“类名.静态类成员”来调用,如: public class StaticTest{ static double P

[Java学习笔记]-Java对象和类

Java是完全面向对象的高级语言,其基本的操作基本都是针对相应的对象和类.面向对象的程序是由对象组成的,每个对象包含对用户公开的特定功能部分和隐藏的实现部分.对应面向对象的语言,还有一种面向过程的语言,如C语言.面向对象的语言是在面向过程语言的基础上发展而来的.面向对象(OOP,全称为Object-Oriented-Programer,下文简称为OOP)相对于面向过程的语言而言,其优势在于很多问题的解决方法被封装在对象里,有时只需要创建这样的对象就可以解决我们的问题,而不必关心其具体实现细节,这

Java泛型学习笔记--Java泛型和C#泛型比较学习(一)

总结Java的泛型前,先简单的介绍下C#的泛型,通过对比,比较学习Java泛型的目的和设计意图.C#泛型是C#语言2.0和通用语言运行时(CLR)同时支持的一个特性(这一点是导致C#泛型和Java泛型区别的最大原因,后面会介绍).C#泛型在.NET CLR支持为.NET框架引入参数化变量支持.C#泛型更类似C++模板,可以理解,C#泛型实际上可以理解为类的模板类.我们通过代码实例来看C# 2.0泛型解决的问题,首先,我们通过一个没有泛型的迭代器的代码示例说起,代码实现如下: interface

学习笔记-java 多线程

背景说明: 多线程并发与管理,是java基础知识里的重点,本文根据<java核心技术第八版>中的多线程技术的学习,对知识点进行整理:这里只对基础知识点进行简单罗列,以达到对知识点有网状关联的效果,能起到提纲挈领的作用,在于其它知识点融合时,有更好的一览效果. 线程概念 1.明确进程与线程的区别 2.不要调用Thread或Runnable对象的run方法,直接调用run方法,只会执行同一个线程中的任务,而不会启动新线程.应该调用Thread.start方法.这个方法将创建一个执行run方法地新线

Java基础学习笔记-Java概述与环境配置

第一篇 Java 概述与环境配置 一.基础常识:   1.软件开发: <1>什么是软件? 软件:一系列按照特定顺序组织的计算机数据和指令的集合. 常见的软件: 系统软件:如DOS,windows, Linux等. 应用软件:如扫雷,迅雷,QQ等. <2>什么是开发? 制作软件 <3>人机交互 软件的出现实现了人与计算机之间的更好的交互.   交互方式: 图形化界面:这种方式简单直观,使用者易于接受,容易上手操作. 命令行方式:需要有一个控制台,输入特定的指令,让计算机完

学习笔记-java 集合

背景: 看的是<java核心技术 第8版>,覆盖jdk1.6.主要是对集合全局和细节进行全面掌握,较深入的理解集合.本人对java比较熟悉,但是对于细节的理解不深,知识点还不全,这是知识的查缺不漏. 一.集合接口 接口和实现分离 当程序中使用集合时,一旦构建了集合就不需要知道究竟使用的哪种实现,因此,只有构建集合对象时,使用具体的类才有意义.可以使用接口类型存放集合的引用. 2.  集合接口和迭代接口 java迭代器应该认为是位于两个元素中间,当调用next时,迭代器就越过下一个元素. Ite

Kafka学习笔记-Java简单操作

Maven依赖包: [plain] view plain copy <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.1</version> </dependency> <dependency> <groupId>org.apache

[java学习笔记]java语言基础概述之数组的定义&amp;常见操作(遍历、排序、查找)&amp;二维数组

1.数组基础 1.什么是数组:           同一类型数据的集合,就是一个容器. 2.数组的好处:           可以自动为数组中的元素从零开始编号,方便操作这些数据. 3.格式:  (一旦创建,必须明确长度)          格式1:              元素类型   [ ]  数组名  =  new  元素类型  [元素个数即数组的长度]:              示例:int[] array = new int[5];          格式2: