spring+activemq中多个consumer同时处理消息时遇到的性能问题

最近在做数据对接的工作,用到了activemq,我需要从activemq中接收消息并处理,但是我处理数据的步骤稍微复杂,渐渐的消息队列中堆的数据越来越多,就想到了我这边多开几个线程来处理消息。

可是会发现,服务器占用的网络带宽变的异常的高,仔细分析发现,mq入队时并没有异常高的网络流量,仅仅在出队时会产生很高的网络流量。最终发现是spring的jmsTemplate与activemq的prefetch机制配合导致的问题。研究源码发现jmsTemplate实现机制是:每次调用receive()时都会创建一个新的consumer对象,用完即销毁。正常情况下仅仅会浪费重复创建consumer的资源代价,并不至于产生正常情况十倍百倍的网络流量。但是activeMQ有一个提高性能的机制prefetch,此时就会有严重的问题。

ActiveMq的prefetch机制:
每次consumer连接至MQ时,MQ预先存放许多message到消费者(前提是MQ中存在大量消息),预先存放message的数量取决于prefetchSize(默认为1000)。此机制的目的很显然,是想让客户端代码用一个consumer反复进行receive操作,这样能够大量提高出队性能。

此机制与jmsTemplate配合时就会产生严重的问题,每次jmsTemplate.receive(),都会产生1000个消息的网络流量,但是因为jmsTemplae并不会重用consumer,导致后面999个消息都被废弃。反复jmsTemplate.receive()时,表面上看不出任何问题,其实网络带宽会造成大量的浪费。

解决方案

1、若坚持使用jmsTemplate,需要设置prefetch值为1,相当于禁用了activeMQ的prefetch机制,此时感觉最健壮,就算多线程,反复调用jmsTemplate.receive()也不会有任何问题。但是会有资源浪费,因为要反复创建consumer并频繁与服务器进行数据通信,但在性能要求不高的应用中也不算什么问题。

2、不使用jmsTemplate,手工创建一个consumer,此时可以充分利用prefetch机制。配合多线程的方式每个线程拥有自己的一个consumer,此时能够充分发挥MQ在大吞吐量时的速度优势。

 

ps:如何看到activeMQ的prefetch机制?

prefetchsize值默认是1000,那么只要队列里的消息小于1000,消费端无论开几个线程,我们会发现真正处理消息的线程永远只有一个,其它的线程其实都在做无用功,帮不上忙。

时间: 2024-11-05 22:45:24

spring+activemq中多个consumer同时处理消息时遇到的性能问题的相关文章

Spring MVC 中的 forward 和 redirect

Spring MVC 中的 forward 和 redirect Spring MVC 中,我们在返回逻辑视图时,框架会通过 viewResolver 来解析得到具体的 View,然后向浏览器渲染.假设逻辑视图名为 hello,通过配置,我们配置某个 ViewResolver 如下: <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <description

ActiveMQ中消息的重发与持久化保存

消息中间件解决方案续 上一篇中我们讲到了在Spring工程中基本的使用消息中间件,这里就不在继续赘述. 针对消息中间件,这篇讲解两个我们常遇到的问题. 问题1:如果我们的消息的接收过程中发生异常,怎么解决? 问题2:发布订阅模式(Topic)下如果消费端宕机引起的消息的丢失,怎么解决? 问题解决方案: 问题1暂时有两种解决方案:第一种是开启消息确认机制,第二种开启事务.下面会在点对点模式下进行演示. 问题2的解决方案:实现发布订阅消息的持久化保存. 根据上面的解决方案搭建工程如下:测试消息的重发

dubbo是如何“插入”到spring框架中的

作为一个分布式服务治理框架,dubbo做的非常好,在业界使用很广,所以最近研究了下这个框架.任何框架要研究其原理最好的办法之一就是沿着其运行流程进行追踪,这样就能从上到下,从粗到细对一个系统进行了解.今天,我们要说明的问题就是dubbo如何启动的. dubbo是基于Spring进行开发的,而且扩展了Spring的XML schema和注解标签,其实这里也就是整个dubbo的切入点.dubbo除去依赖其他的第三方框架外,整个框架只有一个jar包,可谓是精致.在这个jar包的NET-INF目录下有两

spring boot中使用@Async实现异步调用

什么是“异步调用”? “异步调用”对应的是“同步调用”,同步调用指程序按照定义顺序依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行:异步调用指程序在顺序执行时,不等待异步调用的语句返回结果就执行后面的程序. 同步调用 下面通过一个简单示例来直观的理解什么是同步调用: 定义Task类,创建三个处理函数分别模拟三个执行任务的操作,操作消耗时间随机取(10秒内) package com.dxz.demo1; import java.util.Random; import org.sprin

Spring Boot中通过CORS解决跨域问题

今天和小伙伴们来聊一聊通过CORS解决跨域问题. 同源策略 很多人对跨域有一种误解,以为这是前端的事,和后端没关系,其实不是这样的,说到跨域,就不得不说说浏览器的同源策略. 同源策略是由Netscape提出的一个著名的安全策略,它是浏览器最核心也最基本的安全功能,现在所有支持JavaScript的浏览器都会使用这个策略.所谓同源是指协议.域名以及端口要相同.同源策略是基于安全方面的考虑提出来的,这个策略本身没问题,但是我们在实际开发中,由于各种原因又经常有跨域的需求,传统的跨域方案是JSONP,

Spring Cloud中五大神兽总结(Eureka/Ribbon/Feign/Hystrix/zuul)

Spring Cloud中五大神兽总结(Eureka/Ribbon/Feign/Hystrix/zuul) 1.Eureka Eureka是Netflix的一个子模块,也是核心模块之一.Eureka是一个基于REST的服务,用于定位服务,以实现云端中间层服务发现和故障转移.服务注册与发现对于微服务架构来说是非常重要的,有了服务发现与注册,只需要使用服务的标识符,就可以访问到服务,而不需要修改服务调用的配置文件了.功能类似于dubbo的注册中心,比如Zookeeper. Eureka包含两个组件:

spring框架中使用junt单元测试

前言: 该代码适用于与spring框架整合的项目 代码: dao层的junit测试父类,需要在spring配置中注入两个bean <bean class="org.springframework.web.servlet.mvc.annotation.DefaultAnnotationHandlerMapping"/> <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationM

3.Spring Boot中使用Swagger2构建强大的RESTful API文档

原文:http://www.jianshu.com/p/8033ef83a8ed 由于Spring Boot能够快速开发.便捷部署等特性,相信有很大一部分Spring Boot的用户会用来构建RESTful API.而我们构建RESTful API的目的通常都是由于多终端的原因,这些终端会共用很多底层业务逻辑,因此我们会抽象出这样一层来同时服务于多个移动端或者Web前端. 这样一来,我们的RESTful API就有可能要面对多个开发人员或多个开发团队:IOS开发.Android开发或是Web开发

细说shiro之五:在spring框架中集成shiro

官网:https://shiro.apache.org/ 1. 下载在Maven项目中的依赖配置如下: <!-- shiro配置 --> <dependency> <groupId>org.apache.shiro</groupId> <artifactId>shiro-core</artifactId> <version>${version.shiro}</version> </dependency&g