<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 连接服务配置 --> <rabbit:connection-factory id="mqConnectionFactory" host="localhost" username="guest" password="guest" port="5672" /> <rabbit:admin connection-factory="mqConnectionFactory"/> <!-- queue 队列声明--> <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/> <!-- 配置线程池 --> <bean id ="taskExecutor" class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" > <!-- 线程池维护线程的最少数量 --> <property name ="corePoolSize" value ="5" /> <!-- 线程池维护线程所允许的空闲时间 --> <property name ="keepAliveSeconds" value ="30000" /> <!-- 线程池维护线程的最大数量 --> <property name ="maxPoolSize" value ="1000" /> <!-- 线程池所使用的缓冲队列 --> <property name ="queueCapacity" value ="200" /> </bean> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="queue_one" key="queue_one_key"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息监听器 --> <bean id="consumerMessageListener" class="com.netease.mobileMq.task.deviceCacheFlushTask"/> <!-- 可以获取session的MessageListener --> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="mqConnectionFactory" acknowledge="auto" task-executor="taskExecutor"> <rabbit:listener queues="queue_one" ref="consumerMessageListener"/> </rabbit:listener-container> </beans>
package com.netease.mobileMq.task; import java.util.Date; import java.util.List; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import com.alibaba.fastjson.JSON; import com.netease.commonBean.FlushDeviceCacheBean; import com.netease.device.constant.EquipmentConstants; import com.netease.device.dao.EquipmentMapper; import com.netease.device.dao.FingerUserMapper; import com.netease.device.entity.EquipmentInfo; import com.netease.device.entity.FingerUserInfo; import com.netease.mobile.common.RedisUtil; /** * @author 作者 E-mail:[email protected] * @version 创建时间:2015年8月4日 下午4:44:39 * 类说明 */ public class deviceCacheFlushTask implements MessageListener{ private static Logger logger = LoggerFactory.getLogger("equipmentErrorLog"); @Autowired FingerUserMapper fingerUserMapper; @Autowired EquipmentMapper equipmentMapper; @Override public void onMessage(Message message){ String receiveMsg=null; try { receiveMsg =new String(message.getBody(),"utf-8"); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); return ; } System.out.println("receiveMsg:"+receiveMsg); if (StringUtils.isBlank(receiveMsg)) { logger.error("deviceCacheFlushTask receiveMsg is null Time is " + new Date()); return ; } else { logger.info("deviceCacheFlushTask receiveMsg " + receiveMsg);//日志中记录每个刷新的数据 } FlushDeviceCacheBean flushBean = JSON.parseObject(receiveMsg, FlushDeviceCacheBean.class);//将传过来的刷新对象进行格式化。 String mainssn = flushBean.getMainssn(); String[] refIds = flushBean.getUserIds(); /*---------------先更新用户缓存----------*/ if (mainssn != null) {//有主账号就更新主账号信息 List<FingerUserInfo> fingerUserInfos = null; try { fingerUserInfos = fingerUserMapper.getAllEqUserInfoByName(mainssn); } catch (Exception e) { // TODO: handle exception logger.error("EquipmentServiceImpl flushCache error", e); return ; } if (fingerUserInfos == null || fingerUserInfos.size() == 0) {// 如果有一个都没有 RedisUtil.delete(EquipmentConstants.EQUIPMENT_FINGER_USER_PRE + mainssn); } else { RedisUtil.set(EquipmentConstants.EQUIPMENT_FINGER_USER_PRE + mainssn, JSON.toJSONString(fingerUserInfos));// 新数据直接替换到缓存中 } } /*---------------再更新设备缓存----------*/ List<EquipmentInfo> equipmentInfos = null; if (refIds!=null&&refIds.length!=0) {//有ID就刷新ID信息 for (String refId : refIds) { try { equipmentInfos = equipmentMapper.getAllEquipmentInfoById(refId); } catch (Exception e) { // TODO: handle exception logger.error("EquipmentServiceImpl flushCache error", e); return ; } if (equipmentInfos == null || equipmentInfos.size() == 0) {// 如果有一个没有,说明查询数据出错失败了 RedisUtil.delete(EquipmentConstants.EQUIPMENT_EQUINFO_PRE + refId); } else { RedisUtil.set(EquipmentConstants.EQUIPMENT_EQUINFO_PRE + refId, JSON.toJSONString(equipmentInfos));// 新数据直接替换到缓存中 } } } } }
时间: 2024-10-26 18:05:05