Redis的分布式锁


一、锁的作用   

  当多线程执行某一业务时(特别是对数据的更新、新增)等操作,可能就会出现多个线程对同一条数据进行修改。其最终的结果一
定与你期望的结果“不太一样”,这就与需要一把锁来控制线程排排队了 - java内部为我们提供了解决方案,可以使用synchronized
或Lock等方式来实现。
  但是在生产过程中,因为性能的关系,多数公司都会采用多台服务器来搭建”分布式”。一条请求过来之后,不一定会打到哪台服务
器上,这就保证不了多台服务器的某一”关键业务”同一时间只会有一条线程进行执行。这时就需要一个“媒介”来充当"锁"这个角色,这
个“媒介”需要满足一些特性才能胜任这个工作:
(1)各个服务器都可以对其进行获取和操作(基于数据库的实现方式);
(2)高性能的获取和释放锁;
(3)具备非阻塞锁特性,如果获取不到锁则立即返回失败;
(4)具备锁等待的特性,即没有获取到锁将继续等待获取锁;
(5)具有锁失效机制,防止死锁;
(6)具备可重入特性;

二、分布式锁的一种实现   

  高性能、失效时间、可重入、非阻塞等特性,Redis都能满足,所以基于Redis实现是解决分布式锁的一种方式。基于Redis的分布式
锁的实现思想:
  A、加锁
    1、计算开始时间,防止一直获取不到锁而死循环;
    2、根据系统名称:业务名称:主键ID来创建一个分布式锁的key;
    3、循环获取分布式锁(尝试时间=当前时间-开始时间<5000毫秒);
    4、使用setnx方法获取分布式锁(key,value - 当前系统时间+"$TRUE",失效时间 - 10毫秒);
    5、如果设置成功则获取分布式锁成功,返回true;
    6、如果获取失败则睡5毫秒,继续尝试,直到过了时间;
    7、如果时间之内没有获取到分布式所,则返回失败;
  B、解锁
    1、根据系统号,业务名称,主键拼接分布式锁的key;
    2、根据key获取到真实的key(使用keys方法);
    3、判断真实的key是否为存在,是否唯一,如果否则为空;
    4、如果key唯一则进行删除;
  栗子:

/**
 * Jedis的工具类.
 */
public class JedisUtil {
    private static final Logger LOG = LoggerFactory.getLogger(JedisUtil.class);
    private static Map<String, JedisUtil> uniqueInstance = new HashMap();
    private JedisPool jedisPool;
    private static JedisUtil jedisUtil = null;

    public JedisUtil() {
        super();
    }

    /**
     * 选择需要链接哪个库
     * @param serverName
     */
    public JedisUtil(String serverName) {
        if (StringUtils.isBlank(serverName)) {
            throw new RuntimeException("请指定Redis主或从库!");
        } else {
            //实际使用需从配置文件中获取
            String ip = "127.0.0.1";
            Integer port = 6379;
            String pwd = "test";
            //仅仅是测试用
            String redisPoolMaxActive = "30";
            if ("master".equals(serverName)) {
                //链接到主库
                this.initialPool(ip, port, pwd, redisPoolMaxActive);
            }else{
                //可以链接为别的Redis库
            }
        }
    }

    /**
     * 初始化jedisPool
     * @param ip
     * @param port
     * @param pwd
     * @param redisPoolMaxActive
     */
    private void initialPool(String ip, int port, String pwd, String redisPoolMaxActive) {
        JedisPoolConfig config = new JedisPoolConfig();
        //最大连接数,默认8个
        config.setMaxTotal(256);
        //最大空闲连接数,默认8个
        config.setMaxIdle(256);
        if (StringUtils.isNotBlank(redisPoolMaxActive)) {
            Integer redisPoolMaxActiveInt = Integer.valueOf(Integer.parseInt(redisPoolMaxActive));
            if (redisPoolMaxActiveInt.intValue() > 512) {
                redisPoolMaxActiveInt = Integer.valueOf(512);
            }

            config.setMaxTotal(redisPoolMaxActiveInt.intValue());
            config.setMaxIdle(redisPoolMaxActiveInt.intValue());
        }
        //获取连接时的最大等待毫秒数
        config.setMaxWaitMillis(1000L);
        //在获取连接的时候检查有效性,默认false
        config.setTestOnBorrow(true);
        config.setTestOnReturn(true);
        //在空闲时检查有效性,默认false
        config.setTestWhileIdle(true);
        //每次逐出检查时 逐出的最大数目。如果为负数就是: 1/abs(n), 默认3
        config.setNumTestsPerEvictionRun(-1);
        //逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
        config.setTimeBetweenEvictionRunsMillis(30000L);
        //逐出连接的最小空闲时间,默认1800000毫秒(30分钟)
        config.setMinEvictableIdleTimeMillis(720000L);
        //对象空闲多久后逐出,当空闲时间>该值且空闲连接 > 最大空闲数时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
        config.setSoftMinEvictableIdleTimeMillis(360000L);
        this.jedisPool = new JedisPool(config, ip, port, 2000, pwd);
        LogUtils.info(LOG, "jedisPool init finish", new Object[]{"jedisPool", this.jedisPool});
    }

    /**
     * 单例获取jedisPool
     * @param serverName Redis库的名称
     * @return
     */
    public static JedisUtil getInstance(String serverName) {
        jedisUtil = (JedisUtil)uniqueInstance.get(serverName);
        if(jedisUtil == null) {
            Class var1 = JedisUtil.class;
            synchronized(JedisUtil.class) {
                jedisUtil = (JedisUtil)uniqueInstance.get(serverName);
                if(jedisUtil == null) {
                    jedisUtil = new JedisUtil(serverName);
                    uniqueInstance.put(serverName, jedisUtil);
                }
            }
        }
        return jedisUtil;
    }

    public Long setnx(String key, String value, int expireTime, int dbIndex) {
        Jedis jedis = null;

        Long var1;
        try {
            jedis = this.jedisPool.getResource();
            jedis.select(dbIndex);
            Long ret = jedis.setnx(key, value);
            if(ret.longValue() == 1L && expireTime > 0) {
                jedis.expire(key, expireTime);
            }

            var1 = ret;
        } catch (Exception var11) {
            LogUtils.error(LOG, "redis setnx error. ", var11, new Object[0]);
            throw var11;
        } finally {
            if(jedis != null) {
                jedis.close();
            }

        }

        return var1;
    }

    /**
     * 查找key
     * @param sKey
     * @param index
     * @return
     */
    public Set<String> key(String sKey, int index) {
        Jedis jedis = null;

        Set var2;
        try {
            jedis = this.jedisPool.getResource();
            jedis.select(index);
            var2 = jedis.keys(sKey);
        } catch (Exception var8) {
            LogUtils.error(LOG, "redis key error. ", var8, new Object[0]);
            throw var8;
        } finally {
            if(jedis != null) {
                jedis.close();
            }

        }

        return var2;
    }

    /**
     * 删除key
     * @param sKey
     * @param index
     * @return
     */
    public Long del(String sKey, int index) {
        Jedis jedis = null;

        Long var3;
        try {
            jedis = this.jedisPool.getResource();
            jedis.select(index);
            var3 = jedis.del(sKey);
        } catch (Exception var8) {
            LogUtils.error(LOG, "redis del error. ", var8, new Object[0]);
            throw var8;
        } finally {
            if(jedis != null) {
                jedis.close();
            }

        }

        return var3;
    }
}

分布式锁的工具类:

public class LockRedisService {
    private static final Logger LOG = LoggerFactory.getLogger(LockRedisService.class);    private static final Integer EXPIRE_SECOND = Integer.valueOf(10);    public LockRedisService() {}

    /**
     * 尝试获取分布式锁
     * @param systemName 系统名称
     * @param business   业务名称
     * @param keyfix     主键
     * @return
     */
    public static Boolean tryLock(String systemName, String business, String keyfix) {
        long start = System.currentTimeMillis();
        String key = String.format("lock:%s:%s:%s", new Object[]{systemName, business, keyfix});
        LogUtils.info(LOG, "尝试获取分布式锁", new Object[]{"key", key});

        do {
            try {
                long ret = JedisUtil.getInstance("master").setnx(key, System.currentTimeMillis() + "$TRUE", EXPIRE_SECOND, 1).longValue();
                if(ret == 1L) {
                    LogUtils.info(LOG, "成功获得分布式锁", new Object[]{"key", key});
                    return Boolean.TRUE;
                }

                Thread.sleep(5L);
            } catch (Exception var8) {
                LogUtils.error(LOG, "获取锁失败", var8, new Object[0]);
            }
        } while(System.currentTimeMillis() - start < 5000L);

        LogUtils.warn(LOG, "获取分布式锁失败", new Object[]{"key", key});
        return Boolean.FALSE;
    }

    /**
     * 解锁
     * @param systemName 系统名称
     * @param business   业务名称
     * @param keyfix     主键
     * @return
     */
    public static Boolean unlock(String systemName, String business, String keyfix) {
        String key = String.format("lock:%s:%s:%s", new Object[]{systemName, business, keyfix});

        try {
            //1为使用哪个Redis库,正常应该写为枚举类
            Set<String> realKeySet = JedisUtil.getInstance("master").key(key,1);
            if(CollectionUtils.isEmpty(realKeySet)) {
                return null;
            } else if(realKeySet.size() > 1) {
                LogUtils.warn(LOG, "lock key 通配符存在多个", new Object[]{"key", key});
                return null;
            } else {
                String realKey = (String)realKeySet.iterator().next();
                JedisUtil.getInstance("master").del(realKey, 1);
            }
        } catch (Exception var5) {
            LogUtils.error(LOG, "解锁失败", var5, new Object[0]);
        }

        LogUtils.info(LOG, "解锁成功", new Object[]{"key", key});
        return Boolean.valueOf(true);
    }
}

测试类:

public class Test {
    public static void main(String[] args) {
        //获取cpu的核心数
        int count = Runtime.getRuntime().availableProcessors();
        //创建线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(count, count * 10,
                60L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(40960), new ThreadPoolExecutor.AbortPolicy());
        //多线程执行业务
        threadPool.execute(()->{
            String bussiness = "测试分布式锁";
            String key = "TestLock:" + 123;
            //尝试获取分布式锁
            if (LockRedisService.tryLock("测试系统", bussiness, key)) {
                System.out.println("---获取分布式锁成功---");
                try {
                    //可能会抛出异常
                    System.out.println("执行逻辑");
                }catch (Exception e){
                    System.out.println("执行逻辑错误: "+e);
                }finally {
                    //执行完毕后解锁
                    LockRedisService.unlock("测试系统", bussiness, key);
                }
            }else {
                System.out.println("获取分布式锁失败");
            }
        });
    }
}

原文地址:https://www.cnblogs.com/0813lichenyu/p/10376631.html

时间: 2024-10-26 05:10:15

Redis的分布式锁的相关文章

基于Redis的分布式锁到底安全吗(上)?

网上有关Redis分布式锁的文章可谓多如牛毛了,不信的话你可以拿关键词"Redis 分布式锁"随便到哪个搜索引擎上去搜索一下就知道了.这些文章的思路大体相近,给出的实现算法也看似合乎逻辑,但当我们着手去实现它们的时候,却发现如果你越是仔细推敲,疑虑也就越来越多. 实际上,大概在一年以前,关于Redis分布式锁的安全性问题,在分布式系统专家Martin Kleppmann和Redis的作者antirez之间就发生过一场争论.由于对这个问题一直以来比较关注,所以我前些日子仔细阅读了与这场争

基于Redis实现分布式锁(转载)

原文地址:http://blog.csdn.net/ugg/article/details/41894947 Redis命令介绍使用Redis实现分布式锁,有两个重要函数需要介绍 SETNX命令(SET if Not eXists)语法:SETNX key value功能:当且仅当 key 不存在,将 key 的值设为 value ,并返回1:若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0. GETSET命令语法:GETSET key value功能:将给定 key 的值设为

利用多写Redis实现分布式锁原理与实现分析

在我写这篇文章的时候,其实我还是挺纠结的,因为我这个方案本身也是雕虫小技拿出来显眼肯定会被贻笑大方,但是我最终还是拿出来与大家分享,我本着学习的态度和精神,希望大家能够给与我指导和改进方案. 一.关于分布式锁 关于分布式锁,可能绝大部分人都会或多或少涉及到. 我举二个例子: 场景一:从前端界面发起一笔支付请求,如果前端没有做防重处理,那么可能在某一个时刻会有二笔一样的单子同时到达系统后台. 场景二:在App中下订单的时候,点击确认之后,没反应,就又点击了几次.在这种情况下,如果无法保证该接口的幂

Redis实现分布式锁

http://redis.io/topics/distlock 在不同进程需要互斥地访问共享资源时,分布式锁是一种非常有用的技术手段. 有很多三方库和文章描述如何用Redis实现一个分布式锁管理器,但是这些库实现的方式差别很大,而且很多简单的实现其实只需采用稍微增加一点复杂的设计就可以获得更好的可靠性. 这篇文章的目的就是尝试提出一种官方权威的用Redis实现分布式锁管理器的算法,我们把这个算法称为RedLock,我们相信这个算法会比一般的普通方法更加安全可靠.我们也希望社区能一起分析这个算法,

基于redis的分布式锁

<?php /** * 基于redis的分布式锁 * * 参考开源代码: * http://nleach.com/post/31299575840/redis-mutex-in-php * * https://gist.github.com/nickyleach/3694555 */ pc_base::load_sys_class('cache_redis', '', 0); class dist_key_redis { //锁的超时时间 const TIMEOUT = 20; const SL

转载:基于Redis实现分布式锁

转载:基于Redis实现分布式锁  ,出处: http://blog.csdn.net/ugg/article/details/41894947 背景在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等.大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系.其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制. Redis命令介绍使用Redis实现分

基于Redis实现分布式锁

http://blog.csdn.net/ugg/article/details/41894947 背景在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等.大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系.其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制. Redis命令介绍使用Redis实现分布式锁,有两个重要函数需要介绍 SETNX命令

基于Redis实现分布式锁-Redisson使用及源码分析

在分布式场景下,有很多种情况都需要实现最终一致性.在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等).通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性. 本文主要探讨另外一种实现分布式最终一致性的解决方案--采用分布式锁.基于分布式锁的解决

用redis实现分布式锁

分布式部署中不可避免用到分布式锁,目前比较常用的实现方式一般有基于数据库的乐观锁.基于redis的分布式锁和基于zookeeper的分布式锁.本文只说redis的实现方式,使用jedis作为连接器. 比较简单,直接上代码吧. public class PaasLock { private static final String KEY_NNXX = "NX"; private static final String KEY_EXPX = "PX"; private

Redis实现分布式锁原理与实现分析

一.关于分布式锁 关于分布式锁,可能绝大部分人都会或多或少涉及到. 我举二个例子: 场景一:从前端界面发起一笔支付请求,如果前端没有做防重处理,那么可能在某一个时刻会有二笔一样的单子同时到达系统后台. 场景二:在App中下订单的时候,点击确认之后,没反应,就又点击了几次.在这种情况下,如果无法保证该接口的幂等性,那么将会出现重复下单问题. 在接收消息的时候,消息推送重复.如果处理消息的接口无法保证幂等,那么重复消费消息产生的影响可能会非常大. 类似这种场景,我们有很多种方法,可以使用幂等操作,也