【连载】redis库存操作,分布式锁的四种实现方式[三]--基于Redis watch机制实现分布式锁

一、redis的事务介绍

1、 Redis保证一个事务中的所有命令要么都执行,要么都不执行。如果在发送EXEC命令前客户端断线了,则Redis会清空事务队列,事务中的所有命令都不会执行。而一旦客户端发送了EXEC命令,所有的命令就都会被执行,即使此后客户端断线也没关系,因为Redis中已经记录了所有要执行的命令。

2、 除此之外,Redis的事务还能保证一个事务内的命令依次执行而不被其他命令插入。试想客户端A需要执行几条命令,同时客户端B发送了一条命令,如果不使用事务,则客户端B的命令可能会插入到客户端A的几条命令中执行。如果不希望发生这种情况,也可以使用事务。

3、 若一个事务中有多条命令,若有一条命令错误,事务中的所有命令都不会执行。若在执行阶段有命令执行错误,其他的命令也会正确的执行,需要注意。

4、与mysql的事务不同,redis的事务执行中时不会回滚的,哪怕出现错误,之前已经执行的命令结果也不会回滚。

二、Redis watch介绍

1、 WATCH命令可以监控一个或多个键,一旦其中有一个键被修改(或删除),之后的事务就不会执行。监控一直持续到EXEC命令(事务中的命令是在EXEC之后才执行的,所以在MULTI命令后可以修改WATCH监控的键值)

2、watch一般配合事务使用

例:启动一个线程,连接redis,监控key watchKeyTest,sleep10s模拟业务逻辑处理,此时再启动另一个进程去修改该key的值,那么当前线程就会返回null

/**
 * @author LiJunJun
 * @date 2018/12/10
 */
public class Test {

    private static Jedis jedis;

    static {
        jedis = new Jedis("192.168.10.109", 6379);
        jedis.auth("[email protected]");
        jedis.sadd("watchKeyTest", "290");
    }

    public static void main(String[] args) {

        jedis.watch("watchKeyTest");

        System.out.println("开始监控key: watchKeyTest");

        Transaction transaction = jedis.multi();

        try {
            // sleep 10秒,模拟业务逻辑处理
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("开始获取key: watchKeyTest");
        transaction.sismember("watchKeyTest", "290");

        List<Object> result = transaction.exec();
        System.out.println("执行结果:" + result);
        jedis.disconnect();
    }
}

启动另一个进程,修改同一个key

public class Test2 {

    public static void main(String[] args) {

        Jedis jedis = new Jedis("192.168.10.109", 6379);
        jedis.auth("[email protected]");
        long result = jedis.sadd("watchKeyTest", "358");
        System.out.println(result);
        jedis.disconnect();
    }
}

此时,进程1就会返回null

若在进程1执行期间,该key没有被其他进程修改,则返回正确的值。

三、实现思路

基于以上介绍的redis的事务以及watch机制,我们可以做分布式锁处理,即在分布式系统中,高并发情况下,一个线程watch相应的key后,其他进程若修改了key,则该进程所在的事务就不执行,返回null,我们可以增加重试机制,来做库存操作

四、业务代码实现

采用watch机制,做乐观锁处理,重试三次,三次返回均未成功,则接口返回失败
    /**
     * 减库存(基于redis watch机制实现)
     *
     * @param trace 请求流水
     * @param stockManageReq(stockId、decrNum)
     * @return -1为失败,大于-1的正整数为减后的库存量,-2为库存不足无法减库存
     */
    @Override
    @ApiOperation(value = "减库存", notes = "减库存")
    @RequestMapping(value = "/decrByStock", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public int decrByStock(@RequestHeader(name = "Trace") String trace, @RequestBody StockManageReq stockManageReq) {

        long startTime = System.currentTimeMillis();

        LOGGER.reqPrint(Log.CACHE_SIGN, Log.CACHE_REQUEST, trace, "decrByStock", JSON.toJSONString(stockManageReq));

        int res = 0;
        String stockId = stockManageReq.getStockId();
        Integer decrNum = stockManageReq.getDecrNum();

        boolean decrByStock = false;

        try {
            if (null != stockId && null != decrNum) {

                stockId = PREFIX + stockId;

                // 采用watch机制,做乐观锁处理,重试三次,三次返回均未成功,则接口返回失败
                for (int i = 0; i < TRY_COUNT; i++) {
                    Integer decrByStockRes = decrByStock(stockId, decrNum, trace);

                    // 更新库存时key对应的value发生变更,重试
                    if (decrByStockRes != -1) {
                        res = decrByStockRes;
                        decrByStock = true;
                        break;
                    }
                }

                if (!decrByStock) {
                    res = -2;
                    LOGGER.info("本次请求减库存失败!decrByStockFailure=1");
                }
            }
        } catch (Exception e) {
            LOGGER.error(trace, "decr sku stock failure.", e);
            res = -1;
        } finally {
            LOGGER.respPrint(Log.CACHE_SIGN, Log.CACHE_RESPONSE, trace, "decrByStock", System.currentTimeMillis() - startTime, String.valueOf(res));
        }
        return res;
    }

    /**
     * 减库存逻辑
     *
     * @param stockId 库存id
     * @param decrNum 减少的量
     * @return 减库存结果(-1:表示更新库存时key对应的value发生变更,即提示调用方重试;-2: 库存不够减,售罄;其它值表示减库存后的值)
     */
    private Integer decrByStock(String stockId, int decrNum, String trace) {

        Response<Long> v = null;
        List<Object> result = null;

        try (Jedis jedis = jedisPool.getWriteResource()) {

            if (!jedis.select(0).equals("OK")) {
                LOGGER.error(trace, "减库存,本次请求未获取到jedis连接!");
                return -1;
            }

            jedis.watch(stockId);

            // redis 减库存逻辑
            String vStock = jedis.get(stockId);

            long realV = 0L;

            if (StringUtils.isNotEmpty(vStock)) {
                realV = Long.parseLong(vStock);
            }
            //库存数  大于等于 要减的数目,则执行减库存
            if (realV < decrNum) {

                return -2;
            }

            Transaction transaction = jedis.multi();

            v = transaction.decrBy(stockId, decrNum);

            result = transaction.exec();
        }

        return (result == null || result.isEmpty()) ? -1 : v.get().intValue();
    }

五、ab压测及分析
同样的,我们以5000的请求量100的并发量来压、tps在640左右,比zk做分布式锁来看,提升了20倍的性能,比redisson分布式锁提升了2倍,性能提升不大

同时我们发现,5000个请求,有4561个请求失败,我们看下日志统计,有多少请求没有成功执行事务

也是4561,说明有4561个事务没有成功执行,并不是运行错误。

六、总结

watch可以用来控制对redis的操作同步执行,但失败的几率较大,用该机制做抢购的业务还行,但对redis操作结果依赖较强的业务来说,不太适用,下一篇我们讲下终极解决方案,适用redis的lua脚本编程,变相的实现分布式锁。

原文地址:https://www.cnblogs.com/ft535535/p/10150577.html

时间: 2024-07-28 16:43:55

【连载】redis库存操作,分布式锁的四种实现方式[三]--基于Redis watch机制实现分布式锁的相关文章

关于分布式Session 的几种实现方式

分布式Session的几种实现方式 1.基于数据库的Session共享 2.基于NFS共享文件系统 3.基于memcached 的session,如何保证 memcached 本身的高可用性? 4. 基于resin/tomcat web容器本身的session复制机制 5. 基于TT/Redis 或 jbosscache 进行 session 共享. 6. 基于cookie 进行session共享 介绍下常用的分布式Session 实现 1. Session Replication 方式管理 (

Java并发问题--乐观锁与悲观锁以及乐观锁的一种实现方式-CAS

首先介绍一些乐观锁与悲观锁: 悲观锁:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁.传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁.再比如Java里面的同步原语synchronized关键字的实现也是悲观锁. 乐观锁:顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版

同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)四种调用方式

1. 概念理解        在进行网络编程时,我们常常见到同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)四种调用方式:   同步/异步主要针对C端: 同步:      所谓同步,就是在c端发出一个功能调用时,在没有得到结果之前,该调用就不返回.也就是必须一件一件事做,等前一件做完了才能做下一件事.   例如普通B/S模式(同步):提交请求->等待服务器处理->处理完毕返回 这个期间客户端浏览器不能干任何事 异步:      异步的概念和同步相对.当c端一个

[转][Android]Android数据的四种存储方式

android.database.sqlite类 SQLiteQueryBuilder java.lang.Object android.database.sqlite.SQLiteQueryBuilder public class SQLiteQueryBuilderextends Object This is a convience class that helps build SQL queries to be sent to SQLiteDatabase objects. 构造方法摘要

Android数据的四种存储方式SharedPreferences、SQLite、Content Provider和File (一) —— 总览

Android数据的四种存储方式SharedPreferences.SQLite.Content Provider和File (一) —— 总览 作为一个完成的应用程序,数据存储操作是必不可少的.因此,Android系统一共提供了四种数据存储方式.分别是:SharePreference.SQLite.Content Provider和File.由于Android系统中,数据基本都是私有的的,都是存放于“data/data/程序包名”目录下,所以要实现数据共享,正确方式是使用Content Pro

msyql四种启动方式

1 mysql默认启动配置文件my.cnf顺序 第一步:/etc/my.cnf 第二步:/etc/mysql/my.cnf 第三步:/usr/local/mysql/etc/my.cnf 第四步:~/.my.cnf 可以通过命令查看加载顺序: [[email protected] ~]# which mysqld /usr/local/mysql/bin/mysqld [[email protected] ~]# /usr/local/mysql/bin/mysqld --verbose --h

Android数据的四种存储方式

很清晰的思路,转自Android数据的四种存储方式 作为一个完成的应用程序,数据存储操作是必不可少的.因此,Android系统一共提供了四种数据存储方式.分别是:SharePreference.SQLite.Content Provider和File.由于Android系统中,数据基本都是私有的的,都是存放于“data/data/程序包名”目录下,所以要实现数据共享,正确方式是使用Content Provider. SQLite: SQLite是一个轻量级的数据库,支持基本SQL语法,是常被采用

Android开发之基本控件和详解四种布局方式

Android中的控件的使用方式和iOS中控件的使用方式基本相同,都是事件驱动.给控件添加事件也有接口回调和委托代理的方式.今天这篇博客就总结一下Android中常用的基本控件以及布局方式.说到布局方式Android和iOS还是区别挺大的,在iOS中有Frame绝对布局和AutoLayout相对布局.而在Android中的布局方式就比较丰富了,今天博客中会介绍四种常用的布局方式.先总结一下控件,然后再搞一搞基本方式,开发环境还是用的Mac下的Android Studio.开始今天的正题, 虽然A

Android数据的四种存储方式之SQLite数据库

Test.java: /** * 本例解决的问题: * 核心问题:通过SQLiteOpenHelper类创建数据库对象 * 通过数据库对象对数据库的数据的操作 * 1.sql语句方式操作SQLite数据库 * 2.谷歌提供的api对SQLite数据库的操作 * 3.SQLite对事务的操作 */ import com.ghsy.createsqlitedb.db.MyOpenHelper; import android.content.ContentValues; import android.