分布式场景中确保线程安全的解决方案,redis实现分布式锁

实际工作中,经常会遇到多线程并发时的类似抢购的功能,本篇描述一个简单的redis分布式锁实现的多线程抢票功能。

直接上代码。首先按照慣例,給出一個错误的示范:

我们可以看看,当20个线程一起来抢10张票的时候,会发生什么事。

package com.tiger.utils;

public class TestMutilThread {

// 总票量

public static int count = 10;

public static void main(String[] args) {

statrtMulti();

}

public static void statrtMulti() {

for (int i = 1; i <= 20; i++) {

TicketRunnable tickrunner = new TicketRunnable();

Thread thread = new Thread(tickrunner, "Thread No: " + i);

thread.start();

}

}

public static class TicketRunnable implements Runnable {

@Override

public void run() {

System.out.println(Thread.currentThread().getName() + "  start "

+ count);

// TODO Auto-generated method stub

// logger.info(Thread.currentThread().getName()

// + "  really  start" + count);

if (count <= 0) {

System.out.println(Thread.currentThread().getName()

+ "  ticket sold out ! No tickets remained!" + count);

return;

} else {

count = count - 1;

System.out.println(Thread.currentThread().getName()

+ " bought a ticket,now remaining :" + (count));

}

}

}

}

测试结果,从结果可以看到,票数在不同的线程中已经出现混乱。

Thread No: 2  start 10

Thread No: 6  start 10

Thread No: 4  start 10

Thread No: 5  start 10

Thread No: 3  start 10

Thread No: 9  start 6

Thread No: 1  start 10

Thread No: 1 bought a ticket,now remaining :3

Thread No: 9 bought a ticket,now remaining :4

Thread No: 3 bought a ticket,now remaining :5

Thread No: 12  start 3

Thread No: 5 bought a ticket,now remaining :6

Thread No: 4 bought a ticket,now remaining :7

Thread No: 8  start 7

Thread No: 7  start 8

Thread No: 12 bought a ticket,now remaining :1

Thread No: 14  start 0

Thread No: 6 bought a ticket,now remaining :8

Thread No: 16  start 0

Thread No: 2 bought a ticket,now remaining :9

Thread No: 16  ticket sold out ! No tickets remained!0

Thread No: 14  ticket sold out ! No tickets remained!0

Thread No: 18  start 0

Thread No: 18  ticket sold out ! No tickets remained!0

Thread No: 7 bought a ticket,now remaining :0

Thread No: 15  start 0

Thread No: 8 bought a ticket,now remaining :1

Thread No: 13  start 2

Thread No: 19  start 0

Thread No: 11  start 3

Thread No: 11  ticket sold out ! No tickets remained!0

Thread No: 10  start 3

Thread No: 10  ticket sold out ! No tickets remained!0

Thread No: 19  ticket sold out ! No tickets remained!0

Thread No: 13  ticket sold out ! No tickets remained!0

Thread No: 20  start 0

Thread No: 20  ticket sold out ! No tickets remained!0

Thread No: 15  ticket sold out ! No tickets remained!0

Thread No: 17  start 0

Thread No: 17  ticket sold out ! No tickets remained!0

为了解决多线程时出现的混乱问题,这里給出真正的测试类!!!

真正的测试类,这里启动20个线程,来抢10张票。

RedisTemplate 是用来实现redis操作的,由spring进行集成。这里是使用到了RedisTemplate,所以我以构造器的形式在外部将RedisTemplate传入到测试类中。

MultiTestLock 是用来实现加锁的工具类。

总票数使用volatile关键字,实现多线程时变量在系统内存中的可见性,这点可以去了解下volatile关键字的作用。

TicketRunnable用于模拟抢票功能。

其中由于lock与unlock之间存在if判断,为保证线程安全,这里使用synchronized来保证。

测试类:

package com.tiger.utils;

import java.io.Serializable;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.data.redis.core.RedisTemplate;

public class MultiConsumer {

Logger logger=LoggerFactory.getLogger(MultiTestLock.class);

private RedisTemplate<Serializable, Serializable> redisTemplate;

public MultiTestLock lock;

//总票量

public volatile static int count = 10;

public void statrtMulti() {

lock = new MultiTestLock(redisTemplate);

for (int i = 1; i <= 20; i++) {

TicketRunnable tickrunner = new TicketRunnable();

Thread thread = new Thread(tickrunner, "Thread No: " + i);

thread.start();

}

}

public class TicketRunnable implements Runnable {

@Override

public void run() {

logger.info(Thread.currentThread().getName() + "  start "

+ count);

// TODO Auto-generated method stub

if (count > 0) {

// logger.info(Thread.currentThread().getName()

// + "  really  start" + count);

lock.lock();

synchronized (this) {

if(count<=0){

logger.info(Thread.currentThread().getName()

+ "  ticket sold out ! No tickets remained!" + count);

lock.unlock();

return;

}else{

count=count-1;

logger.info(Thread.currentThread().getName()

+ " bought a ticket,now remaining :" + (count));

}

}

lock.unlock();

}else{

logger.info(Thread.currentThread().getName()

+ "  ticket sold out !" + count);

}

}

}

public RedisTemplate<Serializable, Serializable> getRedisTemplate() {

return redisTemplate;

}

public void setRedisTemplate(

RedisTemplate<Serializable, Serializable> redisTemplate) {

this.redisTemplate = redisTemplate;

}

public MultiConsumer(RedisTemplate<Serializable, Serializable> redisTemplate) {

super();

this.redisTemplate = redisTemplate;

}

}

Lock工具类:

我们知道为保证线程安全,程序中执行的操作必须时原子的。redis后续的版本中可以使用set key同时设置expire超时时间。

想起上次去 电信翼支付 面试时,面试官问过一个问题:分布式锁如何防止死锁,问题关键在于我们在分布式中进行加锁操作时成功了,但是后续业务操作完毕执行解锁时出现失败。导致分布式锁无法释放。出现死锁,后续的加锁无法正常进行。所以这里设置expire超时时间的目的就是防止出现解锁失败的情况,这样,即使解锁失败了,分布式锁依然会在超时时间过了之后自动释放。

具体在代码中也有注释,也可以作为参考。

package com.tiger.utils;

import java.io.Serializable;

import java.util.Arrays;

import java.util.Collections;

import java.util.HashMap;

import java.util.Iterator;

import java.util.List;

import java.util.Random;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import javax.sound.midi.MidiDevice.Info;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.dao.DataAccessException;

import org.springframework.data.redis.core.RedisOperations;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.core.SessionCallback;

import org.springframework.data.redis.core.script.RedisScript;

public class MultiTestLock implements Lock {

Logger logger=LoggerFactory.getLogger(MultiTestLock.class);

private RedisTemplate<Serializable, Serializable> redisTemplate;

public MultiTestLock(RedisTemplate<Serializable, Serializable> redisTemplate) {

super();

this.redisTemplate = redisTemplate;

}

@Override

public void lock() {

//这里使用while循环强制线程进来之后先进行抢锁操作。只有抢到锁才能进行后续操作

while(true){

if(tryLock()){

try {

//这里让线程睡500毫秒的目的是为了模拟业务耗时,确保业务结束时之前设置的值正好打到超时时间,

//实际生产中可能有偏差,这里需要经验

Thread.sleep(500l);

// logger.info(Thread.currentThread().getName()+" time to awake");

return;

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}else{

try {

//这里设置一个随机毫秒的sleep目的时降低while循环的频率

Thread.sleep(new Random().nextInt(200)+100);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

@Override

public boolean tryLock() {

//这里也可以选用transactionSupport支持事务操作

SessionCallback<Object> sessionCallback=new SessionCallback<Object>() {

@Override

public  Object execute(RedisOperations operations)

throws DataAccessException {

operations.multi();

operations.opsForValue().setIfAbsent("secret", "answer");

//设置超时时间要根据业务实际的可能处理时间来,是一个经验值

operations.expire("secret", 500l, TimeUnit.MILLISECONDS);

Object object=operations.exec();

return object;

}

};

//执行两部操作,这里会拿到一个数组值 [true,true],分别对应上述两部操作的结果,如果中途出现第一次为false则表明第一步set值出错

List<Boolean> result=(List) redisTemplate.execute(sessionCallback);

// logger.info(Thread.currentThread().getName()+" try lock "+ result);

if(true==result.get(0)||"true".equals(result.get(0)+"")){

logger.info(Thread.currentThread().getName()+" try lock success");

return true;

}else{

return false;

}

}

@Override

public boolean tryLock(long arg0, TimeUnit arg1)

throws InterruptedException {

// TODO Auto-generated method stub

return false;

}

@Override

public void unlock() {

//unlock操作直接删除锁,如果执行完还没有达到超时时间则直接删除,让后续的线程进行继续操作。起到补刀的作用,确保锁已经超时或被删除

SessionCallback<Object> sessionCallback=new SessionCallback<Object>() {

@Override

public  Object execute(RedisOperations operations)

throws DataAccessException {

operations.multi();

operations.delete("secret");

Object object=operations.exec();

return object;

}

};

Object result=redisTemplate.execute(sessionCallback);

}

@Override

public void lockInterruptibly() throws InterruptedException {

// TODO Auto-generated method stub

}

@Override

public Condition newCondition() {

// TODO Auto-generated method stub

return null;

}

public RedisTemplate<Serializable, Serializable> getRedisTemplate() {

return redisTemplate;

}

public void setRedisTemplate(

RedisTemplate<Serializable, Serializable> redisTemplate) {

this.redisTemplate = redisTemplate;

}

}

执行结果

可以看到,票数稳步减少,后续没有抢到锁的线程余票为0,无票可抢。

tips:

这其中也出现了一个问题,redis进行多部封装操作时,系统报错:ERR EXEC without MULTI

后经过查阅发现问题出在:

在spring中,多次执行MULTI命令不会报错,因为第一次执行时,会将其内部的一个isInMulti变量设为true,后续每次执行命令是都会检查这个变量,如果为true,则不执行命令。而多次执行EXEC命令则会报开头说的"ERR EXEC without MULTI"错误。

原文地址:https://www.cnblogs.com/java889/p/12158826.html

时间: 2024-11-07 17:42:27

分布式场景中确保线程安全的解决方案,redis实现分布式锁的相关文章

分布式事务中常见的三种解决方案

目录 一.分布式事务前奏 二.柔性事务解决方案架构 (一).基于可靠消息的最终一致性方案概述 (二).TCC事务补偿型方案 (三).最大努力通知型 三.基于可靠消息的最终一致性方案详解 (一).消息发送一致性 (二).保证消息一致的变通做法 (三).常规MQ消息处理流程和特点 (四).消息重复发送问题和业务接口幂等性设计 (五).本地消息服务方案 (六).独立消息服务方案 (七).消息服务子系统的设计实现 一.分布式事务前奏 事务:事务是由一组操作构成的可靠的独立的工作单元,事务具备ACID的特

还没弄懂分布式场景下数据一致性问题?一文教你轻松解决!

文章纲要 此次分享的缘由 目前分布式事务问题是怎么解决的 行业中有什么解决方案 这些解决方案分别有什么优缺点 别人是怎么做的 我们可以怎么来做 此次分享的缘由 支付重构 考虑支付重构的时候,自然想到原本属于一个本地事务中的处理,现在要跨应用了要怎么处理.拿充值订单举个栗子吧,假设:原本订单模块和账户模块是放在一起的,现在需要做服务拆分,拆分成订单服务,账户服务.原本收到充值回调后,可以将修改订单状态和增加金币放在一个mysql事务中完成的,但是呢,因为服务拆分了,就面临着需要协调2个服务才能完成

分布式场景下如何保证消息队列实现最终一致性

考虑一个分布式场景中一个常见的场景:服务A执行某个数据库操作成功后,会发送一条消息到消息队列,现在希望只有数据库操作执行成功才发送这条消息.下面是一些常见的作法: 1. 先执行数据库操作,再发送消息 public void purchaseOrder() { orderDao.save(order); messageQueue.send(message); } 有可能order新增成功,发送消息失败.最终形成不一致状态. 2. 先发送消息,再执行数据库操作 public void purchas

SpringBoot电商项目实战 — Redis实现分布式锁

最近有小伙伴发消息说,在Springboot系列文第二篇,zookeeper是不是漏掉了?关于这个问题,其实我在写第二篇的时候已经考虑过,但基于本次系列文章是实战练习,在项目里你能看到Zookeeper相关内容的也只有dubbo注册地址了.因为Zookeeper在项目中,我们不需要做任何配置和代码,只需要在服务器上安装一个Zookeeper应用即可. 包括对Zookeeper的依赖,我们在SpringBoot项目中只需要依赖Dubbo就ok了.在本次系列实战中,我是本着少说多动手的原则,如果有些

基于redis的分布式锁实现

关于分布式锁 很久之前有讲过并发编程中的锁并发编程的锁机制:synchronized和lock.在单进程的系统中,当存在多个线程可以同时改变某个变量时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量.而同步的本质是通过锁来实现的.为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记.

Java 实现基于Redis的分布式可重入锁

如何实现可重入? 首先锁信息(指redis中lockKey关联的value值) 必须得设计的能负载更多信息,之前non-reentrant时value直接就是一个超时时间,但是要实现可重入单超时时间是不够的,必须要标识锁是被谁持有的,也就是说要标识分布式环境中的线程,还要记录锁被入了多少次. 如何在分布式线程中标识唯一线程? MAC地址 +jvm进程 + 线程ID(或者线程地址都行),三者结合即可唯一分布式环境中的线程.下载 实现 锁的信息采用json存储,格式如下: 代码框架还是和之前实现的非

【分布式缓存系列】集群环境下Redis分布式锁的正确姿势

一.前言 在上一篇文章中,已经介绍了基于Redis实现分布式锁的正确姿势,但是上篇文章存在一定的缺陷——它加锁只作用在一个Redis节点上,如果通过sentinel保证高可用,如果master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况: 客户端1在Redis的master节点上拿到了锁 Master宕机了,存储锁的key还没有来得及同步到Slave上 master故障,发生故障转移,slave节点升级为master节点 客户端2从新的Master获取到了对应同一个资源的锁 于是,客

分布式数据库中全局唯一主键

[相关文章] <分布式数据库中全局唯一主键生成策略的设计与实现><activiti5.10解决分布式集群部署的主键问题><分布式环境下数据库主键方案><如何在高并发分布式系统中生成全局唯一Id><分布式环境下ID生成方法总结> <分布式环境下数据库主键方案> [ http://www.2cto.com/database/201309/243195.html ] 在只使用单数据库时,使用自增主键ID无疑是最适合的.但在集群.主从架构上时

C#中的线程(四)高级话题

C#中的线程(四)高级话题 Keywords:C# 线程Source:http://www.albahari.com/threading/Author: Joe AlbahariTranslator: Swanky WuPublished: http://www.cnblogs.com/txw1958/Download:http://www.albahari.info/threading/threading.pdf 第四部分:高级话题 非阻止同步 早些时候,我们讨论了非常简单的赋值和 更新一个字