【redis】基于redis实现分布式并发锁

基于redis实现分布式并发锁(注解实现)

说明

  前提, 应用服务是分布式或多服务, 而这些"多"有共同的"redis";
  GitHub: https://github.com/vergilyn/SpringBootDemo
  代码结构:
  

一、分布式并发锁的几种可行方案

  (具体实现思路参考: 分布式锁的实现如何用消息系统避免分布式事务?)

  1、基于数据库
    可以用数据库的行锁for update, 或专门新建一张锁控制表来实现.
    过于依赖数据库, 且健壮性也不是特别好, 完全可以把此种方案舍弃.
    (话说都涉及到分布式或多服务器,基本主要还是用redis、memcached或其他缓存服务实现并发锁)
  2、基于ZooKeeper实现分布式锁
    并未去研究, 参考上面的博客链接.
  3、基于redis实现
    redis实现的复杂度不算高, 只是需要注意一些实现细节. 健壮性貌似只比zookeeper差点, 但完全可接受.
    (ps: 感觉redis官方jar中完全可以提供实现, 我们直接用就可以, 是我没找到吗?)

二、redis实现分布式并发锁

2.1 实现思路

  1、主要的redis核心命令: 利用redis是单线程的特性, 用setnx、getset、time来实现.
  2、思路: redis的key-value就代表一个对象锁, 当此key存在说明锁已被获取, 其余相同对象操作则需要等待获取锁.
  3、需要注意的细节:
    1) 锁的释放, 要特别避免死锁出现, 主要是特殊情况下如何释放锁.
    2) 等待获取锁的线程, 最好有超时机制.
    3) 注意多服务器之间的时间是否同步.
    4) 注意获取锁操作别占用或创建太多的连接(即使及时关闭了连接), 很影响系统的性能.

2.2 redis并发锁的2种策略说明

2.2.1 key代表锁对象, value无意义
   /**
     * 锁的策略参考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基于redis分布式锁实现“秒杀”</a>
     * FIXME 此方式加锁策略存在一定缺陷: 在setIfAbsent()之后expire()执行之前程序异常 锁不会被释放. 虽然出现几率极低
     *
     * @param timeout timeout的时间范围内轮询锁, 单位: 秒
     * @param expire  设置锁超时时间
     * @return true, 获取锁成功; false, 获取锁失败.
     */
 public boolean lock(long timeout, long expire, final TimeUnit unit) {
        long beginTime = System.nanoTime();  // 用nanos、mills具体看需求.
        timeout = TimeUnit.SECONDS.toNanos(timeout);
        try {
            // 在timeout的时间范围内不断轮询锁
            while (System.nanoTime() - beginTime < timeout) {
                // 锁不存在的话,设置锁并设置锁过期时间,即加锁
                if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) {
                    this.redisClient.expire(key, expire, unit);//设置锁失效时间, 防止永久阻塞
                    this.lock = true;
                    return true;
                }
                // 短暂休眠后轮询,避免可能的活锁
                System.out.println("get lock waiting...");
                Thread.sleep(30, RANDOM.nextInt(30));
            }
        } catch (Exception e) {
            throw new RuntimeException("locking error", e);
        }
        return false;
    }

  以上锁策略已经很完美, 1) 指定了获取锁的超时时间; 2) 设置了锁的失效, 防止永久阻塞;
  但可能有极端情况, 即setIfAbsent()成功, expire()执行之前, 如果出现异常情况, 导致expire()没有执行, 所以此时会出现永久阻塞. (道理是很难遇到这情况)

2.2.2 key代表锁对象, value表示锁超时时间
   /**
     * 特别注意: 如果多服务器之间存在时间差, 并不建议用System.nanoTime()、System.currentTimeMillis().
     * 更好的是统一用redis-server的时间, 但只能获取到milliseconds.
     * 锁的策略参考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis实现分布式锁</a>
     *
     * @param timeout 获取锁超时, 单位: 毫秒
     * @param expire 锁失效时常, 单位: 毫秒
     * @return true, 获取锁成功; false, 获取锁失败.
     */
    public boolean lockB(long timeout, long expire) {
        long bt = System.currentTimeMillis();
        long lockVal;
        String lockExpireTime;
        try {
            while (!this.lock) {
                if(System.currentTimeMillis() - bt > timeout){
                    throw new RedisLockException("get lock timeout!");
                }

                // 锁的键值: {当前时间} + {失效时常} = {锁失效时间}
                lockVal = getRedisTime() + expire;
                // 1. 尝试获取锁
                boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + "");
                if (ifAbsent) { // 设置成功, 表示获得锁
                    // 这种策略下, 是否设置key失效不太重要. 因为, 正常流程中最后会释放锁(del-key); 如果是异常情况下未释放锁, 后面的代码也会判断锁是否失效.
                    // 设置的好处: 能减少redis的内存消耗, 及时清理无效的key(暂时只想到这)
                    // this.redisClient.expire(key, timeout, TimeUnit.SECONDS);
                    this.lock = true;
                    return true;
                }

                lockExpireTime = this.redisClient.opsForValue().get(this.key);
                long curTime = getRedisTime();
                // curTime > expireVal: 表示此锁已无效
                /* 在锁无效的前提下, 尝试获取锁: (一定要用)getAndSet()
                 *
                 * 假设锁已失效, 且未正常expire. 此时C1、C2同时执行到此, C2先执行getAndSet(key, time-02), C2获取到锁
                 * 此时C1.getAndSet(key, time-01)返回的是time-02, 显然curTime > time-02: false.
                 * 所以, C1并未获取到锁. 但C1修改了key的值为: time-01.
                 * 但因为C1、C2是同时执行到此, 所以time-01、time-02的值近视相等.
                 * (若多服务器存在时间差, 那这个差值有问题, 所以服务器时间如果不同步则不能用System.nanoTime()、System.currentTimeMillis(), 该用redis-server time.)
                 */
                if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
                    // getset必须在{curTime > expireVal} 判断之后; 否则, 可能出现死循环
                    lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + "");
                    if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
                        // this.redisClient.expire(key, timeout, TimeUnit.SECONDS); // 是否设置失效不重要, 理由同上.
                        this.lock = true;
                        return true;
                    }
                }

                // 锁被占用, 短暂休眠等待轮询
                System.out.println(this + ": get lock waiting...");
                Thread.sleep(40);
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RedisLockException("locking error", e);
        }
        System.out.println(this + ": get lock error.");
        return false;
    }

  此种锁策略特别要注意:
    1) 如果多服务器之间时间不同步, 那么可以用redis-server的时间.
    2) getset的调用必须在curTime > lockExpireTime的前提下, 否则会出现死循环.
    3) 并发时getset产生的误差, 完全可忽略.
    4) 特别要注意redis连接的释放, 否则很容易占用过多的redis连接数.

三、完整实现代码 (只是简单实现, 性能有问题)

1. 核心redis锁策略
public class RedisLock {
    private String key;
    private boolean lock = false;

    private final StringRedisTemplate redisClient;
    private final RedisConnection redisConnection;

    /**
     * @param purpose 锁前缀
     * @param key     锁定的ID等东西
     */
    public RedisLock(String purpose, String key, StringRedisTemplate redisClient) {
        if (redisClient == null) {
            throw new IllegalArgumentException("redisClient 不能为null!");
        }
        this.key = purpose + "_" + key + "_redis_lock";
        this.redisClient = redisClient;
        this.redisConnection = redisClient.getConnectionFactory().getConnection();
    }

    /**
     * 锁的策略参考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基于redis分布式锁实现“秒杀”</a>
     * FIXME 此方式加锁策略存在一定缺陷: 在setIfAbsent()之后expire()执行之前程序异常 锁不会被释放. 虽然出现几率极低
     *
     * @param timeout timeout的时间范围内轮询锁, 单位: 秒
     * @param expire  设置锁超时时间
     * @return true, 获取锁成功; false, 获取锁失败.
     */
    public boolean lockA(long timeout, long expire, final TimeUnit unit) {
        long beginTime = System.nanoTime();  // 用nanos、mills具体看需求.
        timeout = unit.toNanos(timeout);
        try {
            // 在timeout的时间范围内不断轮询锁
            while (System.nanoTime() - beginTime < timeout) {
                // 锁不存在的话,设置锁并设置锁过期时间,即加锁
                if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) {
                    this.redisClient.expire(key, expire, unit);//设置锁失效时间, 防止永久阻塞
                    this.lock = true;
                    return true;
                }

                // 短暂休眠后轮询,避免可能的活锁
                System.out.println("get lock waiting...");
                Thread.sleep(30);
            }
        } catch (Exception e) {
            throw new RedisLockException("locking error", e);
        }
        return false;
    }

    /**
     * 特别注意: 如果多服务器之间存在时间差, 并不建议用System.nanoTime()、System.currentTimeMillis().
     * 更好的是统一用redis-server的时间, 但只能获取到milliseconds.
     * 锁的策略参考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis实现分布式锁</a>
     *
     * @param timeout 获取锁超时, 单位: 毫秒
     * @param expire 锁失效时常, 单位: 毫秒
     * @return true, 获取锁成功; false, 获取锁失败.
     */
    public boolean lockB(long timeout, long expire) {
        long bt = System.currentTimeMillis();
        long lockVal;
        String lockExpireTime;
        try {
            while (!this.lock) {
                if(System.currentTimeMillis() - bt > timeout){
                    throw new RedisLockException("get lock timeout!");
                }

                // 锁的键值: {当前时间} + {失效时常} = {锁失效时间}
                lockVal = getRedisTime() + expire;

                // 1. 尝试获取锁
                boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + "");
                if (ifAbsent) { // 设置成功, 表示获得锁
                    this.lock = true;
                    return true;
                }

                lockExpireTime = this.redisClient.opsForValue().get(this.key);
                long curTime = getRedisTime();

                if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
                    lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + "");

                    if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
                        this.lock = true;
                        return true;
                    }
                }

                // 锁被占用, 短暂休眠等待轮询
                System.out.println(this + ": get lock waiting...");
                Thread.sleep(40);
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RedisLockException("locking error", e);
        }
        System.out.println(this + ": get lock error.");
        return false;
    }

    /**
     * @return current redis-server time in milliseconds.
     */
    private long getRedisTime() {
        return this.redisConnection.time();
    }

    private void closeConnection(){
        if(!this.redisConnection.isClosed()){
            this.redisConnection.close();
        }
    }

    /** 释放锁 */
    public void unlock() {
        if (this.lock) {
            redisClient.delete(key);
        }
    }

    public boolean isLock() {
        return lock;
    }
}
2. 注解部分
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLockedKey {
    /**
     * 复杂对象中需要加锁的成员变量
     */
    String field() default "";
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisDistributedLock {
    /** 锁key的前缀 */
    String lockedPrefix() default "";
    /** 轮询锁的时间超时时常, 单位: ms */
    long timeout() default 2000;
    /** redis-key失效时常, 单位: ms */
    int expireTime() default 1000;
}
@Component
@Aspect
public class RedisDistributedLockAop {
    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 定义缓存逻辑
     */
    @Around("@annotation(com.vergilyn.demo.springboot.distributed.lock.annotation.RedisDistributedLock)")
    public void cache(ProceedingJoinPoint pjp) {
        Method method = getMethod(pjp);

        RedisDistributedLock cacheLock = method.getAnnotation(RedisDistributedLock.class);
        String key = getRedisKey(method.getParameterAnnotations(), pjp.getArgs());

        RedisLock redisLock = new RedisLock(cacheLock.lockedPrefix(), key, redisTemplate);

        //       boolean isLock = redisLock.lockB(cacheLock.timeout(), cacheLock.expireTime());
        boolean isLock = redisLock.lockA(cacheLock.timeout(), cacheLock.expireTime(), TimeUnit.MILLISECONDS);
        if (isLock) {
            try {
                pjp.proceed();
                return;
            } catch (Throwable e) {
                e.printStackTrace();
            } finally {
                redisLock.unlock();
            }
        }
        System.out.println("执行方法失败");
    }

    /**
     * 获取被拦截的方法对象
     */
    private Method getMethod(ProceedingJoinPoint pjp) {
        Object[] args = pjp.getArgs();
        Class[] argTypes = new Class[pjp.getArgs().length];
        for (int i = 0; i < args.length; i++) {
            argTypes[i] = args[i].getClass();
        }
        Method method = null;
        try {
            method = pjp.getTarget().getClass().getMethod(pjp.getSignature().getName(), argTypes);
        } catch (NoSuchMethodException | SecurityException e) {
            e.printStackTrace();
        }
        return method;

    }

    private String getRedisKey(Annotation[][] annotations, Object[] args){
        if (null == args || args.length == 0) {
            throw new RedisLockException("方法参数为空,没有被锁定的对象");
        }
        if (null == annotations || annotations.length == 0) {
            throw new RedisLockException("没有被注解的参数");
        }
        // 只支持第一个注解为RedisLockedKey的参数
        for (int i = 0; i < annotations.length; i++) {
            for (int j = 0; j < annotations[i].length; j++) {
                if (annotations[i][j] instanceof RedisLockedKey) { //注解为LockedComplexObject
                    RedisLockedKey redisLockedKey = (RedisLockedKey) annotations[i][j];
                    String field = redisLockedKey.field();
                    try {
                        // field存在, 表示取参数对象的相应field;
                        if(StringUtils.isBlank(field)){
                            return args[i].toString();
                        }else {
                            return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString();
                        }
                    } catch (NoSuchFieldException | SecurityException e) {
                        e.printStackTrace();
                        throw new RedisLockException("注解对象中不存在属性: " + redisLockedKey.field());
                    }
                }
            }
        }

        throw new RedisLockException("未找到注解对象!");
    }
}
public class RedisLockException extends RuntimeException{
    public RedisLockException(String msg, Throwable throwable) {
        super(msg, throwable);
    }
    public RedisLockException(String msg) {
        super(msg);
    }
}
3.测试部分
#### 视情况调整
# 部分redis配置
spring.redis.database=0
spring.redis.host=127.0.0.1
# spring.redis.password=
spring.redis.port=6379
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.pool.max-active=1
spring.redis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.pool.max-idle=4
# 连接池中的最小空闲连接
spring.redis.pool.min-idle=0
spring.redis.timeout=2000
@SpringBootApplication
@EnableCaching
public class DistributedLockApplication implements CommandLineRunner{
    @Autowired
    StringRedisTemplate redisTemplate;
    @Autowired
    LockService lockService;
    @Autowired
    ThreadPoolTaskExecutor executor;

    @Bean
    public ThreadPoolTaskExecutor myExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(8);
        // 最大线程数
        executor.setMaxPoolSize(12);
        // 运行线程满时,等待队列的大小
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("vl-thread-");
        // 池和队列满的策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 空闲线程清除时间
        executor.setKeepAliveSeconds(60);
        // 是否允许释放核心线程
        executor.setAllowCoreThreadTimeOut(true);
        executor.initialize();
        return executor;
    }

    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(DistributedLockApplication.class);
        application.setAdditionalProfiles("redis");
        application.run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("run....");
        for (int i = 0; i < 2; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
//                  lockService.lockMethod(new LockBean(1L));
                    lockService.lockMethod("arg1", 1L);
                }
            });
        }
        System.out.println(executor.getThreadPoolExecutor().getTaskCount());
    }
}
public interface LockService {
    public void lockMethod(String arg1,Long arg2);
    public void lockMethod(LockBean lockBean);
}
@Service
public class LockServiceImpl implements LockService {
    public static Map<Long, Integer> goods;
    static{
        goods = new HashMap<>();
        goods.put(1L, 100);
        goods.put(2L, 200);
    }

    @Override
    @RedisDistributedLock(lockedPrefix="TEST_PREFIX")
    public void lockMethod(String arg1, @RedisLockedKey Long arg2) {
        //最简单的秒杀,这里仅作为demo示例
        System.out.println("lockMethod, goods: " + reduceInventory(arg2));

    }

    @Override
    @RedisDistributedLock(lockedPrefix="TEST_PREFIX")
    public void lockMethod(@RedisLockedKey(field = "idic")LockBean lockBean) {
        System.out.println("lockMethod bean, goods: " + reduceInventory(lockBean.getIdic()));

    }

    // 模拟秒杀操作,姑且认为一个秒杀就是将库存减一
    private Integer reduceInventory(Long commodityId){
        goods.put(commodityId, goods.get(commodityId) - 1);
        return goods.get(commodityId);
    }
}
public class LockBean {
    private Long idic;

    public LockBean(){}

    public LockBean(long idic) {
        this.idic = idic;
    }

    public Long getIdic() {
        return idic;
    }

    public void setIdic(Long idic) {
        this.idic = idic;
    }
}


  以上只是简单实现代码, 如果用于实际项目中, 以上代码存在很多性能问题, 具体性能问题:
  1) 太频繁的获取redis连接、关闭连接.
    lockA: 每次while必定有一次setIfAbsent, 可能会有expire, 然后释放锁有delete. 所以一次正常的流程就需要3个连接. 如果是并发同时竞争等待获取锁, 那么性能影响也蛮大的.
    lockB: 这种策略要用到的连接更多, 并且如果是this.redisClient.getConnectionFactory().getConnection().time()还要注意要手动释放这个连接.
  针对此问题, (个人)想到的可能的代码改进方案, 每个RedisLock中用一个redisConnection, 把所有的StringRedisTemplate命令换成更底层的redisConnection命令:

public class RedisLock {
    private String key;
    private boolean lock = false;
    private final RedisConnection redisConnection;

    public RedisLock(String purpose, String key, RedisConnection redisConnection) {
        if (redisConnection == null) {
            throw new IllegalArgumentException("redisConnection 不能为null!");
        }
        this.key = purpose + "_" + key + "_redis_lock";
        this.redisConnection = redisConnection;
    }

    public boolean lockAc(long timeout, long expire, final TimeUnit unit) {
        long beginTime = System.nanoTime();
        timeout = unit.toNanos(timeout);
        try {
            while (System.nanoTime() - beginTime < timeout) {
                if (this.redisConnection.setNX(this.key.getBytes(), "1".getBytes())) {
                    this.redisConnection.expire(key.getBytes(), unit.toSeconds(expire));
                    this.lock = true;
                    return true;
                }

                System.out.println("lockAc get lock waiting...");
                Thread.sleep(30);
            }
        } catch (Exception e) {
            throw new RedisLockException("locking error", e);
        }
        return false;
    }

    private long getRedisTime() {
        return this.redisConnection.time();
    }

    private void closeConnection(){
        if(!this.redisConnection.isClosed()){
            this.redisConnection.close();
        }
    }

    public void unlock() {
        if (this.lock) {
            this.redisConnection.delete(key);
        }
        closeConnection(); // 用完一定要关闭, 这个位置不一定好, 可能在Aop调用unlock的finally处更好
    }

    public boolean isLock() {
        return lock;
    }

}

  以上改进代码依然可能存在的问题:
    1) 连接很可能没有正常关闭.
    2) 连接依然过多, 假设并发有1000个, 那一样会产生1000个连接, 且这些连接只会在竞争获取锁完后才会释放.(且产生了1000个RedisLock对象)
    3) 是否可以缓存注解对象?

  针对问题2), 主要想达到怎么尽可能减少redis连接?
  比如: 有1000个并发, 其中200个是兑换商品A, 其中300个是兑换商品B, 其中500个是兑换商品C.

  1、是否可以用单例模式来实现RedisLock?
    对单例多线程还是很混乱, 不好说. 但如果可行, 会否太影响获取锁的性能?
  比如兑换商品A的200个并发共用一个redisConnection, 感觉还是合理的, 毕竟互相之间是竞争关系.
  但商品A、商品B、商品C如果也共用一个redisConnection, 是不是完全不合理?
  他们之间根本是"并行"的, 相互之间没有一点联系.
  2、所以, 是否更进一步的实现是: 同一个锁竞争用相同的RedisLock对象和RedisConnection连接.
  即竞争商品A的200个并发用同一个"redisConnection_A"、"redisLock_A", 商品B的300个并发用同一个"redisConnection_B"、"redisLock_B"?

  针对问题3), 在代码RedisDistributedLockAop中, 每次都会:
    1) getMethod(pjp): 获取拦截方法.
    2) 通过拦截方法解析出getRedisKey.
  是不是可以这么实现, 相同的拦截方法只有第一次需要通过反射获取. 之后直接从缓存(如map)中获取到method, 且因为同一个方法, 所能取field也是一样的.
  比如, 有一下几个方法都需要用到分布式并发锁:

    @RedisDistributedLock(lockedPrefix="TEST_PREFIX")
    public void a(String arg1, @RedisLockedKey Long arg2) {
        // ...
    }
    @RedisDistributedLock(lockedPrefix="TEST_PREFIX")
    public void b(@RedisLockedKey(field = "idic")LockBean lockBean) {
        // ...
    }

    @RedisDistributedLock(lockedPrefix="TEST_PREFIX")
    public void c(@RedisLockedKey(field = "xx")LockBean lockBean) {
        // ...
    }
// key: 完整方法名, 要唯一正确找到; value: 缓存的method
Map<String, Method> methodCache = new HashMap<>;
methodCache.put("com.service.aa.a()", method);
methodCache.put("com.service.aa.b()", method);
methodCache.put("com.service.bb.b()", method);
// 然后, 同一个方法的注解内容相同, 所以完全可以直接调用, 省略RedisLockedKey的逻辑判断
if(StringUtils.isBlank(field)){
    return args[i].toString();
}else {
    return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString();
}

  以上只是自己的构想, 这些构想的可行性, 代码的具体实现还很难说...
  但现在觉得, 基于redis的分布式并发锁完全可以有位大神写个开源的实现啊, 我们这些小白直接拿到系统中用就好了. 我也不知道怎么去找, 不知道github有没有.

时间: 2024-10-19 15:38:16

【redis】基于redis实现分布式并发锁的相关文章

基于(Redis | Memcache)实现分布式互斥锁

设计一个缓存系统,不得不要考虑的问题就是:缓存穿透.缓存击穿与失效时的雪崩效应. 缓存击穿 缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义.在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞. 解决方案 有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap

[Redis] 基于redis的分布式锁

前言分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁. 可靠性首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 互斥性.在任意时刻,只有一个客户端能持有锁.不会发生死锁.即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁.具有容错性.只要大部分的Redis节点正常运行,客户端就可以加锁和解锁.解铃还须系铃人.加锁和解锁必须

ServiceStack.Redis常用操作 - 事务、并发锁

一.事务 使用IRedisClient执行事务示例: using (IRedisClient RClient = prcm.GetClient()) { RClient.Add("key",1); using (IRedisTransaction IRT = RClient.CreateTransaction()) { IRT.QueueCommand(r => r.Set("key", 20)); IRT.QueueCommand(r => r.Inc

ServiceStack.Redis常用操作 - 事务、并发锁_转

一.事务 使用IRedisClient执行事务示例: using (IRedisClient RClient = prcm.GetClient()) { RClient.Add("key",1); using (IRedisTransaction IRT = RClient.CreateTransaction()) { IRT.QueueCommand(r => r.Set("key", 20)); IRT.QueueCommand(r => r.Inc

分布式并发锁处理

1.采用数据库悲观锁来实现同步 采用悲观锁来实现同步 在sql语句后加 for update就加上了锁,在查询的时候进行加锁,在加锁后不能进行查询.提交时候后其他人才能查询. public static int generate(String tableName){ //使用数据库的悲观锁for update String sql = "select value from t_table_id where table_name=? for update"; Connection con

golang基于etcd做分布式任务锁

参考:https://www.jianshu.com/p/d7434d916856 etcd服务搭建请看上一篇,下面直接上代码: # go get github.com/etcd-io/etcd main.go 内容如下: package main import ( "context" "fmt" "time" //"github.com/etcd-io/etcd/clientv3" 这个引用会导致启动失败 "go.

基于redis的分布式锁实现

关于分布式锁 很久之前有讲过并发编程中的锁并发编程的锁机制:synchronized和lock.在单进程的系统中,当存在多个线程可以同时改变某个变量时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量.而同步的本质是通过锁来实现的.为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记.

转载:基于Redis实现分布式锁

转载:基于Redis实现分布式锁  ,出处: http://blog.csdn.net/ugg/article/details/41894947 背景在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等.大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系.其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制. Redis命令介绍使用Redis实现分

基于Redis实现分布式锁

http://blog.csdn.net/ugg/article/details/41894947 背景在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等.大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系.其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制. Redis命令介绍使用Redis实现分布式锁,有两个重要函数需要介绍 SETNX命令