Mysql高手系列 - 第26篇:聊聊如何使用mysql实现分布式锁

Mysql系列的目标是:通过这个系列从入门到全面掌握一个高级开发所需要的全部技能。

欢迎大家加我微信itsoku一起交流java、算法、数据库相关技术。

这是Mysql系列第26篇。

本篇我们使用mysql实现一个分布式锁。

分布式锁的功能

  1. 分布式锁使用者位于不同的机器中,锁获取成功之后,才可以对共享资源进行操作
  2. 锁具有重入的功能:即一个使用者可以多次获取某个锁
  3. 获取锁有超时的功能:即在指定的时间内去尝试获取锁,超过了超时时间,如果还未获取成功,则返回获取失败
  4. 能够自动容错,比如:A机器获取锁lock1之后,在释放锁lock1之前,A机器挂了,导致锁lock1未释放,结果会lock1一直被A机器占有着,遇到这种情况时,分布式锁要能够自动解决,可以这么做:持有锁的时候可以加个持有超时时间,超过了这个时间还未释放的,其他机器将有机会获取锁

预备技能:乐观锁

通常我们修改表中一条数据过程如下:

t1:select获取记录R1
t2:对R1进行编辑
t3:update R1

我们来看一下上面的过程存在的问题:

如果A、B两个线程同时执行到t1,他们俩看到的R1的数据一样,然后都对R1进行编辑,然后去执行t3,最终2个线程都会更新成功,后面一个线程会把前面一个线程update的结果给覆盖掉,这就是并发修改数据存在的问题。

我们可以在表中新增一个版本号,每次更新数据时候将版本号作为条件,并且每次更新时候版本号+1,过程优化一下,如下:

t1:打开事务start transaction
t2:select获取记录R1,声明变量v=R1.version
t3:对R1进行编辑
t4:执行更新操作
    update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
t5:t4中的update会返回影响的行数,我们将其记录在count中,然后根据count来判断提交还是回滚
    if(count==1){
        //提交事务
        commit;
    }else{
        //回滚事务
        rollback;
    }

上面重点在于步骤t4,当多个线程同时执行到t1,他们看到的R1是一样的,但是当他们执行到t4的时候,数据库会对update的这行记录加锁,确保并发情况下排队执行,所以只有第一个的update会返回1,其他的update结果会返回0,然后后面会判断count是否为1,进而对事务进行提交或者回滚。可以通过count的值知道修改数据是否成功了。

上面这种方式就乐观锁。我们可以通过乐观锁的方式确保数据并发修改过程中的正确性。

使用mysql实现分布式锁

建表

我们创建一个分布式锁表,如下

DROP DATABASE IF EXISTS javacode2018;
CREATE DATABASE javacode2018;
USE javacode2018;
DROP TABLE IF EXISTS t_lock;
create table t_lock(
  lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '锁唯一标志',
  request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用来标识请求对象的',
  lock_count INT NOT NULL DEFAULT 0 COMMENT '当前上锁次数',
  timeout BIGINT NOT NULL DEFAULT 0 COMMENT '锁超时时间',
  version INT NOT NULL DEFAULT 0 COMMENT '版本号,每次更新+1'
)COMMENT '锁信息表';

分布式锁工具类:

package com.itsoku.sql;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.sql.*;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * 工作10年的前阿里P7分享Java、算法、数据库方面的技术干货!坚信用技术改变命运,让家人过上更体面的生活!
 * 喜欢的请关注公众号:路人甲Java
 */
@Slf4j
public class LockUtils {

    //将requestid保存在该变量中
    static ThreadLocal<String> requestIdTL = new ThreadLocal<>();

    /**
     * 获取当前线程requestid
     *
     * @return
     */
    public static String getRequestId() {
        String requestId = requestIdTL.get();
        if (requestId == null || "".equals(requestId)) {
            requestId = UUID.randomUUID().toString();
            requestIdTL.set(requestId);
        }
        log.info("requestId:{}", requestId);
        return requestId;
    }

    /**
     * 获取锁
     *
     * @param lock_key        锁key
     * @param locktimeout(毫秒) 持有锁的有效时间,防止死锁
     * @param gettimeout(毫秒)  获取锁的超时时间,这个时间内获取不到将重试
     * @return
     */
    public static boolean lock(String lock_key, long locktimeout, int gettimeout) throws Exception {
        log.info("start");
        boolean lockResult = false;
        String request_id = getRequestId();
        long starttime = System.currentTimeMillis();
        while (true) {
            LockModel lockModel = LockUtils.get(lock_key);
            if (Objects.isNull(lockModel)) {
                //插入一条记录,重新尝试获取锁
                LockUtils.insert(LockModel.builder().lock_key(lock_key).request_id("").lock_count(0).timeout(0L).version(0).build());
            } else {
                String reqid = lockModel.getRequest_id();
                //如果reqid为空字符,表示锁未被占用
                if ("".equals(reqid)) {
                    lockModel.setRequest_id(request_id);
                    lockModel.setLock_count(1);
                    lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
                    if (LockUtils.update(lockModel) == 1) {
                        lockResult = true;
                        break;
                    }
                } else if (request_id.equals(reqid)) {
                    //如果request_id和表中request_id一样表示锁被当前线程持有者,此时需要加重入锁
                    lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
                    lockModel.setLock_count(lockModel.getLock_count() + 1);
                    if (LockUtils.update(lockModel) == 1) {
                        lockResult = true;
                        break;
                    }
                } else {
                    //锁不是自己的,并且已经超时了,则重置锁,继续重试
                    if (lockModel.getTimeout() < System.currentTimeMillis()) {
                        LockUtils.resetLock(lockModel);
                    } else {
                        //如果未超时,休眠100毫秒,继续重试
                        if (starttime + gettimeout > System.currentTimeMillis()) {
                            TimeUnit.MILLISECONDS.sleep(100);
                        } else {
                            break;
                        }
                    }
                }
            }
        }
        log.info("end");
        return lockResult;
    }

    /**
     * 释放锁
     *
     * @param lock_key
     * @throws Exception
     */
    public static void unlock(String lock_key) throws Exception {
        //获取当前线程requestId
        String requestId = getRequestId();
        LockModel lockModel = LockUtils.get(lock_key);
        //当前线程requestId和库中request_id一致 && lock_count>0,表示可以释放锁
        if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) {
            if (lockModel.getLock_count() == 1) {
                //重置锁
                resetLock(lockModel);
            } else {
                lockModel.setLock_count(lockModel.getLock_count() - 1);
                LockUtils.update(lockModel);
            }
        }
    }

    /**
     * 重置锁
     *
     * @param lockModel
     * @return
     * @throws Exception
     */
    public static int resetLock(LockModel lockModel) throws Exception {
        lockModel.setRequest_id("");
        lockModel.setLock_count(0);
        lockModel.setTimeout(0L);
        return LockUtils.update(lockModel);
    }

    /**
     * 更新lockModel信息,内部采用乐观锁来更新
     *
     * @param lockModel
     * @return
     * @throws Exception
     */
    public static int update(LockModel lockModel) throws Exception {
        return exec(conn -> {
            String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND  version = ?";
            PreparedStatement ps = conn.prepareStatement(sql);
            int colIndex = 1;
            ps.setString(colIndex++, lockModel.getRequest_id());
            ps.setInt(colIndex++, lockModel.getLock_count());
            ps.setLong(colIndex++, lockModel.getTimeout());
            ps.setString(colIndex++, lockModel.getLock_key());
            ps.setInt(colIndex++, lockModel.getVersion());
            return ps.executeUpdate();
        });
    }

    public static LockModel get(String lock_key) throws Exception {
        return exec(conn -> {
            String sql = "select * from t_lock t WHERE t.lock_key=?";
            PreparedStatement ps = conn.prepareStatement(sql);
            int colIndex = 1;
            ps.setString(colIndex++, lock_key);
            ResultSet rs = ps.executeQuery();
            if (rs.next()) {
                return LockModel.builder().
                        lock_key(lock_key).
                        request_id(rs.getString("request_id")).
                        lock_count(rs.getInt("lock_count")).
                        timeout(rs.getLong("timeout")).
                        version(rs.getInt("version")).build();
            }
            return null;
        });
    }

    public static int insert(LockModel lockModel) throws Exception {
        return exec(conn -> {
            String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)";
            PreparedStatement ps = conn.prepareStatement(sql);
            int colIndex = 1;
            ps.setString(colIndex++, lockModel.getLock_key());
            ps.setString(colIndex++, lockModel.getRequest_id());
            ps.setInt(colIndex++, lockModel.getLock_count());
            ps.setLong(colIndex++, lockModel.getTimeout());
            ps.setInt(colIndex++, lockModel.getVersion());
            return ps.executeUpdate();
        });
    }

    public static <T> T exec(SqlExec<T> sqlExec) throws Exception {
        Connection conn = getConn();
        try {
            return sqlExec.exec(conn);
        } finally {
            closeConn(conn);
        }
    }

    @FunctionalInterface
    public interface SqlExec<T> {
        T exec(Connection conn) throws Exception;
    }

    @Getter
    @Setter
    @Builder
    public static class LockModel {
        private String lock_key;
        private String request_id;
        private Integer lock_count;
        private Long timeout;
        private Integer version;
    }

    private static final String url = "jdbc:mysql://localhost:3306/javacode2018?useSSL=false";        //数据库地址
    private static final String username = "root";        //数据库用户名
    private static final String password = "root123";        //数据库密码
    private static final String driver = "com.mysql.jdbc.Driver";        //mysql驱动

    /**
     * 连接数据库
     *
     * @return
     */
    public static Connection getConn() {
        Connection conn = null;
        try {
            Class.forName(driver);  //加载数据库驱动
            try {
                conn = DriverManager.getConnection(url, username, password);  //连接数据库
            } catch (SQLException e) {
                e.printStackTrace();
            }
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return conn;
    }

    /**
     * 关闭数据库链接
     *
     * @return
     */
    public static void closeConn(Connection conn) {
        if (conn != null) {
            try {
                conn.close();  //关闭数据库链接
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

上面代码中实现了文章开头列的分布式锁的所有功能,大家可以认真研究下获取锁的方法:lock,释放锁的方法:unlock

测试用例

package com.itsoku.sql;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import static com.itsoku.sql.LockUtils.lock;
import static com.itsoku.sql.LockUtils.unlock;

/**
 * 工作10年的前阿里P7分享Java、算法、数据库方面的技术干货!坚信用技术改变命运,让家人过上更体面的生活!
 * 喜欢的请关注公众号:路人甲Java
 */
@Slf4j
public class LockUtilsTest {

    //测试重复获取和重复释放
    @Test
    public void test1() throws Exception {
        String lock_key = "key1";
        for (int i = 0; i < 10; i++) {
            lock(lock_key, 10000L, 1000);
        }
        for (int i = 0; i < 9; i++) {
            unlock(lock_key);
        }
    }

    //获取之后不释放,超时之后被thread1获取
    @Test
    public void test2() throws Exception {
        String lock_key = "key2";
        lock(lock_key, 5000L, 1000);
        Thread thread1 = new Thread(() -> {
            try {
                try {
                    lock(lock_key, 5000L, 7000);
                } finally {
                    unlock(lock_key);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        thread1.setName("thread1");
        thread1.start();
        thread1.join();
    }
}

test1方法测试了重入锁的效果。

test2测试了主线程获取锁之后一直未释放,持有锁超时之后被thread1获取到了。

留给大家一个问题

上面分布式锁还需要考虑一个问题:比如A机会获取了key1的锁,并设置持有锁的超时时间为10秒,但是获取锁之后,执行了一段业务操作,业务操作耗时超过10秒了,此时机器B去获取锁时可以获取成功的,此时会导致A、B两个机器都获取锁成功了,都在执行业务操作,这种情况应该怎么处理?大家可以思考一下然后留言,我们一起讨论一下。

更多优质文章

  1. java高并发系列全集(34篇)
  2. mysql高手系列(20多篇,高手必备)
  3. 聊聊db和缓存一致性常见的实现方式

mysql系列大概有20多篇,喜欢的请关注一下,欢迎大家加我微信itsoku或者留言交流mysql相关技术!

原文地址:https://www.cnblogs.com/itsoku123/p/11750739.html

时间: 2024-10-11 12:49:36

Mysql高手系列 - 第26篇:聊聊如何使用mysql实现分布式锁的相关文章

Mysql高手系列 - 第27篇:mysql如何确保数据不丢失的?我们借鉴这种设计思想实现热点账户高并发设计及跨库转账问题

Mysql系列的目标是:通过这个系列从入门到全面掌握一个高级开发所需要的全部技能. 欢迎大家加我微信itsoku一起交流java.算法.数据库相关技术. 这是Mysql系列第27篇. 本篇文章我们先来看一下mysql是如何确保数据不丢失的,通过本文我们可以了解mysql内部确保数据不丢失的原理,学习里面优秀的设计要点,然后我们再借鉴这些优秀的设计要点进行实践应用,加深理解. 预备知识 mysql内部是使用b+树的结构将数据存储在磁盘中,b+树中节点对应mysql中的页,mysql和磁盘交互的最小

Mysql高手系列 - 第12篇:子查询详解

这是Mysql系列第12篇. 环境:mysql5.7.25,cmd命令中进行演示. 本章节非常重要. 子查询 出现在select语句中的select语句,称为子查询或内查询. 外部的select查询语句,称为主查询或外查询. 子查询分类 按结果集的行列数不同分为4种 标量子查询(结果集只有一行一列) 列子查询(结果集只有一列多行) 行子查询(结果集有一行多列) 表子查询(结果集一般为多行多列) 按子查询出现在主查询中的不同位置分 select后面:仅仅支持标量子查询. from后面:支持表子查询

Mysql高手系列 - 第14篇:详解事务

这是Mysql系列第14篇. 环境:mysql5.7.25,cmd命令中进行演示. 开发过程中,会经常用到数据库事务,所以本章非常重要. 本篇内容 什么是事务,它有什么用? 事务的几个特性 事务常见操作指令详解 事务的隔离级别详解 脏读.不可重复读.可重复读.幻读详解 演示各种隔离级别产生的现象 关于隔离级别的选择 什么是事务? 数据库中的事务是指对数据库执行一批操作,这些操作最终要么全部执行成功,要么全部失败,不会存在部分成功的情况. 举个例子 比如A用户给B用户转账100操作,过程如下: 1

Elasticsearch顶尖高手系列-快速入门篇

01.课程介绍02.用大白话告诉你什么是Elasticsearch03.Elasticsearch的功能.适用场景以及特点介绍04.手工画图剖析Elasticsearch核心概念:NRT.索引.分片.副本等05.在windows上安装和启动Elasticseach06.快速入门案例实战之电商网站商品管理:集群健康检查,文档CRUD07.快速入门案例实战之电商网站商品管理:多种搜索方式08.快速入门案例实战之电商网站商品管理:嵌套聚合,下钻分析,聚合分析09.手工画图剖析Elasticsearch

Mysql优化系列(1)--Innodb引擎下mysql自身配置优化

摘自 http://www.cnblogs.com/kevingrace/p/6133818.html 1.简单介绍InnoDB给MySQL提供了具有提交,回滚和崩溃恢复能力的事务安全(ACID兼容)存储引擎.InnoDB锁定在行级并且也在SELECT语句提供一个Oracle风格一致的非锁定读.这些特色增加了多用户部署和性能.没有在InnoDB中扩大锁定的需要,因为在InnoDB中行级锁定适合非常小的空间.InnoDB也支持FOREIGN KEY强制.在SQL查询中,你可以自由地将InnoDB类

Java分布式锁,搞懂分布式锁实现看这篇文章就对了

随着微处理机技术的发展,人们只需花几百美元就能买到一个CPU芯片,这个芯片每秒钟执行的指令比80年代最大的大型机的处理机每秒钟所执行的指令还多.如果你愿意付出两倍的价钱,将得到同样的CPU,但它却以更高的时钟速率运行.因此,最节约成本的办法通常是在一个系统中使用集中在一起的大量的廉价CPU.所以,倾向于分布式系统的主要原因是它可以潜在地得到比单个的大型集中式系统好得多的性价比.实际上,分布式系统是通过较低廉的价格来实现相似的性能的. 随着互联网的兴起,越来越多的人使用者互联网产品.一般互联网系统

搞懂Java分布式锁实现看这篇文章就对了

前言: 随着微处理机技术的发展,人们只需花几百美元就能买到一个CPU芯片,这个芯片每秒钟执行的指令比80年代最大的大型机的处理机每秒钟所执行的指令还多.如果你愿意付出两倍的价钱,将得到同样的CPU,但它却以更高的时钟速率运行.因此,最节约成本的办法通常是在一个系统中使用集中在一起的大量的廉价CPU.所以,倾向于分布式系统的主要原因是它可以潜在地得到比单个的大型集中式系统好得多的性价比.实际上,分布式系统是通过较低廉的价格来实现相似的性能的. 随着互联网的兴起,越来越多的人使用者互联网产品.一般互

Elasticsearch顶尖高手系列-高手进阶篇视频教程

14套java精品高级架构课,缓存架构,深入Jvm虚拟机,全文检索Elasticsearch,Dubbo分布式Restful 服务,并发原理编程,SpringBoot,SpringCloud,RocketMQ中间件,Mysql分布式集群,服务架构,运 维架构视频教程 14套精品课程介绍: 1.14套精 品是最新整理的课程,都是当下最火的技术,最火的课程,也是全网课程的精品: 2.14套资 源包含:全套完整高清视频.完整源码.配套文档: 3.知识也 是需要投资的,有投入才会有产出(保证投入产出比是

redis 延时任务 看一篇成高手系列2

引言 在开发中,往往会遇到一些关于延时任务的需求.例如 生成订单30分钟未支付,则自动取消 生成订单60秒后,给用户发短信 对上述的任务,我们给一个专业的名字来形容,那就是延时任务.那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别 定时任务有明确的触发时间,延时任务没有 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务 下面,我们以判断订单是否超时为例,进行方案分析