消息中间件activemq的使用场景介绍(结合springboot的示例)

一、消息队列概述

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

二、消息队列应用场景

以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯四个场景。本篇使用ActiveMQ+SpringBoot来模拟这四个场景。

2.1异步处理

场景说明:汽车触发围栏报警后,需要发送报警邮件和报警短信。传统的做法有两种1.串行的方式;2.并行方式。

(1)串行方式:将报警信息写入数据库成功后,发送报警邮件,再发送报警短信。以上三个任务全部完成后,该报警信息加入统计列表。

(2)并行方式:报警信息写入数据库成功后,同时发送报警邮件和短信。

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。

因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)。

小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

代码示例

①在pom文件中引入activemq依赖

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
        <version>1.5.6.RELEASE</version>
    </dependency>

②在配置文件中加上activemq的配置

spring.activemq.broker-url=tcp://127.0.0.1:61616
# 在考虑结束之前等待的时间
#spring.activemq.close-timeout=15s
# 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
spring.activemq.in-memory=true
# 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
spring.activemq.non-blocking-redelivery=false
# 密码
spring.activemq.password=123456
# 等待消息发送响应的时间。设置为0等待永远。
spring.activemq.send-timeout=0
spring.activemq.user=haha
# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗号分隔列表(当不信任所有包时)
#spring.activemq.packages.trusted=
# 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然满,则在抛出异常前阻塞时间。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在启动时创建连接。可以在启动时用于加热池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 连接过期超时。
#spring.activemq.pool.expiry-timeout=0ms
# 连接空闲超时
#spring.activemq.pool.idle-timeout=30s
# 连接池最大连接数
#spring.activemq.pool.max-connections=1
# 每个连接的有效会话的最大数目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 当有"JMSException"时尝试重新连接
#spring.activemq.pool.reconnect-on-exception=true
# 在空闲连接清除线程之间运行的时间。当为负数时,没有空闲连接驱逐线程运行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一个MessageProducer
#spring.activemq.pool.use-anonymous-producers=true

③消息生产者

import java.util.Map;

import javax.jms.Destination;
import javax.jms.Queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * 报警消息Producer
 * @author ko
 *
 */
@Component
//@EnableScheduling
public class AlarmProducer {

    // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
    @Autowired
    private JmsTemplate jmsTemplate;
//    private JmsMessagingTemplate jmsTemplate;

//    @Autowired
//    private Queue queue;

//    @Scheduled(fixedDelay=5000) // 5s执行一次   只有无参的方法才能用该注解
    public void sendMessage(Destination destination, String message){
//        jmsTemplate.convertAndSend(destinationName, payload, messagePostProcessor);
        this.jmsTemplate.convertAndSend(destination, message);
    }

  // 双向队列

    // @JmsListener(destination="out.queue")
    //   public void consumerMessage(String text){
    //   System.out.println("从out.queue队列收到的回复报文为:"+text);
    // }

}

④消息消费者

import org.apache.commons.lang.StringUtils;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 围栏报警Consumer
 * @author ko
 *
 */
@Component
public class AlarmConsumer {

    // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
    @JmsListener(destination = "mytest.queue")     // @SendTo("out.queue") 为了实现双向队列
    public void receiveQueue(String text) {
        if(StringUtils.isNotBlank(text)){
            System.out.println("AlarmConsumer收到的报文为:"+text);
            System.out.println("把报警信息["+text+"]发送邮件给xxx");
            System.out.println("把报警信息["+text+"]发送短信给xxx");
            System.out.println("");
        }
    }
}

⑤controller里写上测试接口

@Autowired
    private AlarmProducer alarmProducer;

    @RequestMapping(value="/chufabaojing",method=RequestMethod.GET)
    @ApiOperation(value="触发报警", notes="触发报警")
    @ApiImplicitParams({
        @ApiImplicitParam(name = "devicename", value = "name",example = "xxxx", required = true, dataType = "string",paramType="query"),
    })
    public String chufabaojing(String devicename){

        List<String> alarmStrList = new ArrayList<>();
        alarmStrList.add(devicename+"out fence01");
        alarmStrList.add(devicename+"out fence02");
        alarmStrList.add(devicename+"in fence01");
        alarmStrList.add(devicename+"in fence02");

        System.out.println("设备"+devicename+"出围栏报警");
        // 报警信息写入数据库
        System.out.println("报警数据写入数据库。。。");

        // 写入消息队列
        Destination destination = new ActiveMQQueue("mytest.queue");
        for (String alarmStr : alarmStrList) {
            alarmProducer.sendMessage(destination, alarmStr);
        }

        // 消息写进消息队列里就不管了

        // 下面两步骤移到activemq消费者里
        // 发送邮件
        // 发送短信

        return "success";
    }

2.2 应用解耦

场景介绍,在spring cloud分布式微服务项目中,工单管理和设备管理分别是两个微服务,如果A工单被张三接单了,那么工单状态要设为已派单,检验员设为张三,设备状态要置为在检。

传统的做法是,先调用工单管理的工单更新接口,成功之后再调用设备管理的设备更新接口,成功之后再返回操作提示给用户。这样做的缺点是应用耦合,如果在派单操作的时候正好设备管理微服务挂了或者阻塞了,那么派单操作就会失败或者要等待很长时间无反馈。另外如果设备管理的接口有变动,那么工单管理里面的代码也要改动。

引入消息中间件,派单的时候,工单管理的工单更新接口处理好后把信息写入消息队列,然后直接返回操作反馈给用户。不管工单管理服务正不正常,正常就从消息队列里订阅消息处理,不正常就等待回复正常后再订阅消息处理。

2.3 流量削峰

场景介绍,XX公司的系统原来是针对A地区的客户开发的,现在为了抢占市场,拿下了B和C两个地区的客户,那么新系统上线,就存在如何把B和C的基础数据导入XX公司的系统中来的问题,短时间内要把庞大的旧数据改造适合新系统再导入进来,这很容易使系统挂掉,另外每天还有增量数据产生。

这时可以引入消息中间件,B和C的客户只要负责把数据规则放到消息队列里就好了,XX公司可以有条不紊的从消息队列里订阅数据,可以有效缓解短时间内的高流量压力,但是这也对消息中间件的可靠性提出了要求。

如果遇到activemq的瓶颈,可以看看activemq集群方案,这篇文章 http://blog.csdn.net/shuangzh115/article/details/50989182

2.4 点对点通讯

类似聊天室的功能。

时间: 2024-07-30 20:03:59

消息中间件activemq的使用场景介绍(结合springboot的示例)的相关文章

Redis 中 5 种数据结构的使用场景介绍

这篇文章主要介绍了Redis中5种数据结构的使用场景介绍,本文对Redis中的5种数据类型String.Hash.List.Set.Sorted Set做了讲解,需要的朋友可以参考下 一.redis 数据结构使用场景 原来看过 redisbook 这本书,对 redis 的基本功能都已经熟悉了,从上周开始看 redis 的源码.目前目标是吃透 redis 的数据结构.我们都知道,在 redis 中一共有5种数据结构,那每种数据结构的使用场景都是什么呢? String——字符串 Hash——字典

从Client应用场景介绍IdentityServer4(三)

原文:从Client应用场景介绍IdentityServer4(三) 在学习其他应用场景前,需要了解几个客户端的授权模式.首先了解下本节使用的几个名词 Resource Owner:资源拥有者,文中称"user": Client为第三方客户端: Authorization server为授权服务器: redirection URI:简单理解为取数据的地址: User Agent:用户代理,本文中就是指浏览器: 这里把访问资源服务器简单理解成取数据. Resource Owner Pas

消息中间件ActiveMQ及Spring整合JMS的介绍

一 .消息中间件的基本介绍 1.1 消息中间件 1.1.1 什么是消息中间件 消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成.通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信.对于消息中间件,常见的角色大致也就有Producer(生产者).Consumer(消费者) 常见的消息中间件产品: (1)ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.

消息队列常用应用场景介绍

消息队列作为分布式系统中重要的组件,可以解决应用耦合,异步消息,流量削锋等系列问题 实现高性能,高可用,可伸缩和最终一致性架构 使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景. 1 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式:2.并行方式 (1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信.以上三个任

ZooKeeper应用场景介绍

ZooKeeper是一个高可用的分布式数据管理与系统协调框架.维护着一个树形层次结构,书中的节点被称为znode.znode可以用来存储数据,并且有一个与之相关联的ACL(权限),znode不能大于1M.基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得ZooKeeper解决很多分布式问题.网上对ZK的应用场景也有不少介绍,本文将结合作者身边的项目例子,系统地对ZK的应用场景进行一个分门归类的介绍. 值得注意的是,ZK并非天生就是为这些应用场景设计的

消息中间件--ActiveMQ&amp;JMS消息服务

### 消息中间件 ### ---------- **消息中间件** 1. 消息中间件的概述 2. 消息中间件的应用场景(查看大纲文档,了解消息队列的应用场景) * 异步处理 * 应用解耦 * 流量削峰 * 消息通信 ---------- ### JMS消息服务 ### ---------- **JMS的概述** 1. JMS消息服务的概述 2. JMS消息模型 * P2P模式 * Pub/Sub模式 3. 消息消费的方式 * 同步的方式---手动 * 异步的方式---listener监听 4.

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)

前言 1. SpringBoot整合配置详解 publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback 注意一点,在发送消息的时候对template进行配置mandatory=tr

Memcache应用场景介绍

面临的问题 对于高并发高访问的Web应用程序来说,数据库存取瓶颈一直是个令人头疼的问题.特别当你的程序架构还是建立在单数据库模式,而一个数据池连接数峰 值已经达到500的时候,那你的程序运行离崩溃的边缘也不远了.很多小网站的开发人员一开始都将注意力放在了产品需求设计上,缺忽视了程序整体性能,可扩 展性等方面的考虑,结果眼看着访问量一天天网上爬,可突然发现有一天网站因为访问量过大而崩溃了,到时候哭都来不及.所以我们一定要未雨绸缪,在数据库还 没罢工前,想方设法给它减负,这也是这篇文章的主要议题.

学习dubbo(8):消息中间件activemq -简介

消息中间定义 消息中间件是在分布式系统中完成消息的发送和接收的基础软件 消息中间件的作用 消息中间件可利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成 .通过提供消息传递 和消息排队模型,可以在分布式环境下扩展进程间的通信. 通过 消息中间件,应用程序或组件之间可以进行可靠的异步通讯,从而降低系统之间的耦合度,提高系统的可扩展性和可用性 应用场景 通过使用消息中间件对Dubbo服务间的调用进行解耦 JMS消息模型 1.点对点或队列模型 JMS 点对点队列模型