spring-amqp整合rabbitmq消费者配置和代码

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
 
     
   <!-- 连接服务配置  -->
   <rabbit:connection-factory id="mqConnectionFactory" host="localhost" username="guest"
        password="guest" port="5672"  />
         
   <rabbit:admin connection-factory="mqConnectionFactory"/>
    
   <!-- queue 队列声明-->
   <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
    
    <!-- 配置线程池 -->  
<bean id ="taskExecutor"  class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >  
    <!-- 线程池维护线程的最少数量 -->  
<property name ="corePoolSize" value ="5" />  
    <!-- 线程池维护线程所允许的空闲时间 -->  
<property name ="keepAliveSeconds" value ="30000" />  
    <!-- 线程池维护线程的最大数量 -->  
<property name ="maxPoolSize" value ="1000" />  
    <!-- 线程池所使用的缓冲队列 -->  
<property name ="queueCapacity" value ="200" />  
</bean>  
   <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
       <!-- 消息监听器 -->  
    <bean id="consumerMessageListener" class="com.netease.mobileMq.task.deviceCacheFlushTask"/>  
    <!-- 可以获取session的MessageListener -->  
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="mqConnectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="queue_one" ref="consumerMessageListener"/>
    </rabbit:listener-container>
</beans>
package com.netease.mobileMq.task;

import java.util.Date;
import java.util.List;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;

import com.alibaba.fastjson.JSON;
import com.netease.commonBean.FlushDeviceCacheBean;
import com.netease.device.constant.EquipmentConstants;
import com.netease.device.dao.EquipmentMapper;
import com.netease.device.dao.FingerUserMapper;
import com.netease.device.entity.EquipmentInfo;
import com.netease.device.entity.FingerUserInfo;
import com.netease.mobile.common.RedisUtil;

/**
 * @author 作者 E-mail:[email protected]
 * @version 创建时间:2015年8月4日 下午4:44:39
 * 类说明
 */
public class deviceCacheFlushTask implements  
MessageListener{
	private static Logger logger = LoggerFactory.getLogger("equipmentErrorLog");
	@Autowired
	FingerUserMapper fingerUserMapper;
	@Autowired
	EquipmentMapper equipmentMapper;
	@Override
	public void onMessage(Message  message){
		 
		 String receiveMsg=null;
		try {
			receiveMsg =new String(message.getBody(),"utf-8");
		} catch (Exception e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
			return ;
		}
		System.out.println("receiveMsg:"+receiveMsg);
		if (StringUtils.isBlank(receiveMsg)) {
			logger.error("deviceCacheFlushTask   receiveMsg is null  Time is " + new Date());
			return ;
		}
		else {
			logger.info("deviceCacheFlushTask   receiveMsg " + receiveMsg);//日志中记录每个刷新的数据

		}

		FlushDeviceCacheBean flushBean = JSON.parseObject(receiveMsg, FlushDeviceCacheBean.class);//将传过来的刷新对象进行格式化。
		String mainssn = flushBean.getMainssn();
		String[] refIds = flushBean.getUserIds();

		/*---------------先更新用户缓存----------*/
		if (mainssn != null) {//有主账号就更新主账号信息
		List<FingerUserInfo> fingerUserInfos = null;
		try {
			fingerUserInfos = fingerUserMapper.getAllEqUserInfoByName(mainssn);
		} catch (Exception e) {
			// TODO: handle exception
			logger.error("EquipmentServiceImpl flushCache error", e);
			return ;
		}

		if (fingerUserInfos == null || fingerUserInfos.size() == 0) {// 如果有一个都没有
			RedisUtil.delete(EquipmentConstants.EQUIPMENT_FINGER_USER_PRE
					+ mainssn);
		} else {
			RedisUtil.set(EquipmentConstants.EQUIPMENT_FINGER_USER_PRE
					+ mainssn, JSON.toJSONString(fingerUserInfos));// 新数据直接替换到缓存中
		}
		}
		/*---------------再更新设备缓存----------*/

		List<EquipmentInfo> equipmentInfos = null;

		if (refIds!=null&&refIds.length!=0) {//有ID就刷新ID信息
		for (String refId : refIds) {
			try {
				equipmentInfos = equipmentMapper.getAllEquipmentInfoById(refId);
			} catch (Exception e) {
				// TODO: handle exception
				logger.error("EquipmentServiceImpl flushCache error", e);
				return ;
			}
			if (equipmentInfos == null || equipmentInfos.size() == 0) {// 如果有一个没有,说明查询数据出错失败了
				RedisUtil.delete(EquipmentConstants.EQUIPMENT_EQUINFO_PRE + refId);
			} else {
				RedisUtil.set(EquipmentConstants.EQUIPMENT_EQUINFO_PRE + refId,
						JSON.toJSONString(equipmentInfos));// 新数据直接替换到缓存中
			}
		}
		}
	}

}
时间: 2024-10-26 18:05:05

spring-amqp整合rabbitmq消费者配置和代码的相关文章

Spring Boot (十三): Spring Boot 整合 RabbitMQ

1. 前言 RabbitMQ 是一个消息队列,说到消息队列,大家可能多多少少有听过,它主要的功能是用来实现应用服务的异步与解耦,同时也能起到削峰填谷.消息分发的作用. 消息队列在比较主要的一个作用是用来做应用服务的解耦,消息从消息的生产者传递到消息队列,消费者从消息队列中获取消息并进行消费,生产者不需要管是谁在消费消息,消费者也无需关注消息是由谁来生产的.在分布式的系统中,消息队列也会被用在其他地方,比如分布式事务的支持,代表如阿里开源的 RocketMQ . 当然,我们本篇文章的主角还是 Ra

spring boot整合RabbitMQ(Fanout模式)

1.Fanout Exchange介绍Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).Q

spring整合RabbitMQ

今天就来康康spring怎么整合RabbitMQ 注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效 生产端还可以配置其他属性,比如发送重试,超时时间.次数.间隔等 消费端核心配置 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列.根据业务记录日志等处理 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况 @RabbitListener注解的使用 消费端监听@RabbitL

Spring AMQP 源码分析 08 - XML 配置

### 准备 ## 目标 通过 XML 配置文件使用 Spring AMQP ## 前置知识 <Spring AMQP 源码分析 07 - MessageListenerAdapter> ## 相关资源 Sample code:<https://github.com/gordonklg/study>,rabbitmq module 源码版本:Spring AMQP 1.7.3.RELEASE ## 测试代码 gordon.study.rabbitmq.springamqp.XmlC

译: 1. RabbitMQ Spring AMQP 之 Hello World

本文是译文,原文请访问:http://www.rabbitmq.com/tutorials/tutorial-one-spring-amqp.html RabbitMQ 是一个Brocker (消息队列服务器),它接受和转发消息 . 你可以将它当做邮局: 当你将要发布的邮件放在邮箱中时,您可以确定邮件先生或Mailperson女士最终会将邮件发送给您的收件人.在这个比喻中,RabbitMQ是邮箱,邮局和邮递员. RabbitMQ和邮局之间的主要区别在于它不处理信纸,而是接受,存储和转发二进制大对

简单介绍下怎么在spring中使用RabbitMQ

这篇文章主要介绍了简单了解如何在spring中使用RabbitMQ,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 常见的消息中间件产品: (1)ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现. (2)RabbitMQ AMQP协议的领导实现,支持多种场景.淘宝的MySQL集群内部有使用它进行通讯,Open

Spring AMQP 源码分析 04 - MessageListener

### 准备 ## 目标 了解 Spring AMQP 如何实现异步消息投递(推模式) ## 前置知识 <RabbitMQ入门_05_多线程消费同一队列> ## 相关资源 Quick Tour for the impatient:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#async-consumer> Sample code:<https://git

SpringBoot整合RabbitMQ之发送接收消息实战

实战前言 前几篇文章中,我们介绍了SpringBoot整合RabbitMQ的配置以及实战了Spring的事件驱动模型,这两篇文章对于我们后续实战RabbitMQ其他知识要点将起到奠基的作用的.特别是Spring的事件驱动模型,当我们全篇实战完毕RabbitMQ并大概了解一下RabbitMQ相关组件的源码时,会发现其中的ApplicationEvent.ApplicationListener.ApplicationEventPublisher跟RabbitMQ的Message.Listener.R

Spring AMQP 源码分析 05 - 异常处理

### 准备 ## 目标 了解 Spring AMQP Message Listener 如何处理异常 ## 前置知识 <Spring AMQP 源码分析 04 - MessageListener> ## 相关资源 Offical doc:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#exception-handling> Sample code:<ht