主要参考:
- http://blog.csdn.net/fjssharpsword/article/details/52250723#
- http://www.yiibai.com/redis/strings_getset.html
- https://redis.io/commands/expire
- https://www.baidu.com/link?url=kuDPE2uEXfjZO3qCggv2OaDGcd9_Mohb_S2Web1VHiY623cg4IJZRwqb0_tGyan7x0BJuV1I46dh-qGWCPEJsa&wd=&eqid=879de37e00008ea500000004583ef0f8
package com.deppon.spring; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import redis.clients.jedis.Jedis; /** * * @author ganxj * */ public class BuildDistributedLockWithRedisDB { // 锁名 private static final String lockName = "lock"; // 序列名 private static final String sequenceName = "sequence"; // 数据失效时间 private static final int expired = 1;// 1秒超时 // 当前线程处理当次缓存操作所需时间,(预估值) private static final int time = 10 * 1000; // setnx --- 当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX不做任何动作,并返回0。 // getSet -- 将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。 // 上锁 public static boolean acquireLock(Jedis jedis, String lock) { try { // 用于设置成功后的缓存处理时长 long cacheProcessTime = System.currentTimeMillis() + time; // key为空时,争抢锁 long isSetSuccessWhileKeyIsNil = jedis.setnx(lock, String.valueOf(cacheProcessTime)); // SETNX成功,则成功获取一个锁,并设置数据失效时间 if (isSetSuccessWhileKeyIsNil == 1) { jedis.expire(lock, expired); return true; } // key不为空,SETNX失败,说明锁被其他客户端保持,检查其是否已经超时 String lastLockTimeBySomeThread = jedis.get(lock); // 如果获取key为空,则重走空key时的加锁流程 if (lastLockTimeBySomeThread == null) { return false; } // 获取key不为空,则判断是否超时,若未超时,则循环重试 if (Long.valueOf(lastLockTimeBySomeThread) > System.currentTimeMillis()) { return false; } // 若超时,则进行争抢加锁 String getOldIfSet = jedis.getSet(lock, String.valueOf(cacheProcessTime)); // 判断加锁是否成功 if (getOldIfSet != null && getOldIfSet.equals(lastLockTimeBySomeThread)) { return true; } // 若加锁失败,重头再来 return false; } catch (Exception e) { e.printStackTrace(); } return false; } // 释放锁 public static void releaseLock(Jedis jedis, String lock) { try { String lastLockTimeBySomeThread = jedis.get(lock); if (lastLockTimeBySomeThread == null) { return; } // 避免删除非自己获取得到的锁 if (System.currentTimeMillis() < Long.valueOf(lastLockTimeBySomeThread)) { jedis.del(lock); } } catch (Exception e) { e.printStackTrace(); } } @SuppressWarnings("resource") public static void main(String[] args) { // 任务数 int produceTaskMaxNumber = 100; // 初始化redis中的id序列 Jedis jedis = new Jedis(); jedis.set(sequenceName, String.valueOf(Long.valueOf(1))); // 初始化线程池 int corePoolSize = 20; int maximumPoolSize=20; long keepAliveTime =20; TimeUnit unit =TimeUnit.SECONDS; BlockingQueue<Runnable> workQueu = new ArrayBlockingQueue<Runnable>(30); RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy(); ThreadPoolExecutor threadPool = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueu, handler); // 创建任务 for (int i = 1; i <= produceTaskMaxNumber; i++) { try { threadPool.execute(new BuildDistributedLockWithRedisDB().new RedisLockTestTask()); } catch (Exception e) { e.printStackTrace(); } } } class RedisLockTestTask implements Runnable { @Override public void run() { Jedis jedis = new Jedis(); Boolean lockFlag = true; // 循环等待拿锁 long startTime = System.currentTimeMillis(); while (lockFlag) { if (BuildDistributedLockWithRedisDB.acquireLock(jedis, lockName)) { // 获得锁了,开始执行业务逻辑 process(jedis, sequenceName); lockFlag = false; } } long endTime = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + "线程设置的值:" + jedis.get(sequenceName) + " " + "共消耗时长:" + (endTime - startTime) + "ms"); // 释放锁 BuildDistributedLockWithRedisDB.releaseLock(jedis, lockName); } } void process(Jedis jedis,String sequenceName){ jedis.set(sequenceName, String.valueOf(Long.valueOf(jedis.get(sequenceName)) + 1)); } }
时间: 2024-10-08 09:47:59