一、场景
项目A监听mq中的其他项目的部署消息(包括push_seq, status, environment,timestamp等),然后将部署消息同步到数据库中(项目X在对应环境[environment]上部署的push_seq[项目X的版本])。那么问题来了,mq中加入包含了两个部署消息 dm1 和 dm2,dm2的push_seq > dm1的push_seq,在分布式的情况下,dm1 和 dm2可能会分别被消费(也就是并行),那么在同步数据库的时候可能会发生 dm1 的数据保存 后于 dm2的数据保存,导致保存项目的部署信息发生异常。
二、解决思路
将mq消息的并行消费变成串行消费,这里借助redis分布式锁来完成。同一个服务在分布式的状态下,监听到mq消息后,触发方法的执行,执行之前(通过spring aop around来做的)首先获得redis的一个分布式锁,获取锁成功之后才能执行相关的逻辑以及数据库的保存,最后释放锁。
三、主要代码
import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author: hujunzheng * @create: 17/9/29 下午2:49 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface RedisLock { /** * redis的key * @return */ String value(); /** * 持锁时间,单位毫秒,默认一分钟 */ long keepMills() default 60000; /** * 当获取失败时候动作 */ LockFailAction action() default LockFailAction.GIVEUP; public enum LockFailAction{ /** * 放弃 */ GIVEUP, /** * 继续 */ CONTINUE; } /** * 睡眠时间,设置GIVEUP忽略此项 * @return */ long sleepMills() default 500; }
import java.lang.reflect.Method; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author: hujunzheng * @create: 17/9/29 下午2:49 */ @Component @Aspect public class RedisLockAspect { private static final Log log = LogFactory.getLog(RedisLockAspect.class); @Autowired private RedisCacheTemplate.RedisLockOperation redisLockOperation; @Pointcut("execution(* com.hjzgg..StargateDeployMessageConsumer.consumeStargateDeployMessage(..))" + "&& @annotation(me.ele.api.portal.service.redis.RedisLock)") private void lockPoint(){} @Around("lockPoint()") public Object arround(ProceedingJoinPoint pjp) throws Throwable{ MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); Method method = methodSignature.getMethod(); RedisLock lockInfo = method.getAnnotation(RedisLock.class); boolean lock = false; Object obj = null; while(!lock){ long timestamp = System.currentTimeMillis()+lockInfo.keepMills(); lock = setNX(lockInfo.value(), timestamp); //得到锁,已过期并且成功设置后旧的时间戳依然是过期的,可以认为获取到了锁(成功设置防止锁竞争) long now = System.currentTimeMillis(); if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){ log.info("得到redis分布式锁..."); obj = pjp.proceed(); if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){ releaseLock(lockInfo.value()); } }else{ if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){ log.info("稍后重新请求redis分布式锁..."); Thread.currentThread().sleep(lockInfo.sleepMills()); }else{ log.info("放弃redis分布式锁..."); break; } } } return obj; } private boolean setNX(String key,Long value){ return redisLockOperation.setNX(key, value); } private long getLock(String key){ return redisLockOperation.get(key); } private Long getSet(String key,Long value){ return redisLockOperation.getSet(key, value); } private void releaseLock(String key){ redisLockOperation.delete(key); } }
四、遇到的问题
开始是将锁加到deploy的方法上的,但是一直aop一直没有作用,换到consumeStargateDeployMessage方法上就可以了。考虑了一下是因为 @Transactional的原因。这里注意下。
在一篇文章中找到了原因:SpringBoot CGLIB AOP解决Spring事务,对象调用自己方法事务失效.
只要脱离了Spring容器管理的所有对象,对于SpringAOP的注解都会失效,因为他们不是Spring容器的代理类,SpringAOP,就切入不了。也就是说是 @Transactional注解方法的代理对象并不是spring代理对象。
参考: 关于proxy模式下,@Transactional标签在创建代理对象时的应用
五、参考
其他工具类,请参考这里。