redis分布式锁&队列应用

分布式锁

  1. setnx(set if not exists)

如果设值成功则证明上锁成功,然后再调用del指令释放。

// 这里的冒号:就是一个普通的字符,没特别含义,它可以是任意其它字符,不要误解
> setnx lock:codehole true
OK
... do something critical ...
> del lock:codehole
(integer) 1

但是有个问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,这样就会陷入死锁,锁永远得不到释放。

  1. setnx(set if not exists) 加上过期时间
> setnx lock:codehole true
OK
> expire lock:codehole 5
... do something critical ...
> del lock:codehole
(integer) 1

如果在 setnx 和 expire 之间服务器进程突然挂掉了,可能是因为机器掉电或者是被人为杀掉的,就会导致 expire 得不到执行,也会造成死锁。

  1. 使用ex nx命令一起执行
> set lock:codehole true ex 5 nx
OK
... do something critical ...
> del lock:codehole
  1. 删除锁的线程必须是上锁的线程

为 set 指令的 value 参数设置为一个随机数,释放锁时先匹配随机数是否一致,然后再删除 key,这是为了确保当前线程占有的锁不会被其它线程释放,除非这个锁是过期了被服务器自动释放的。
但是匹配 value 和删除 key 不是一个原子操作,Redis 也没有提供类似于delifequals这样的指令,这就需要使用 Lua 脚本来处理了,因为 Lua 脚本可以保证连续多个指令的原子性执行。

上锁
tag = random.nextint()  # 随机数
if redis.set(key, tag, nx=True, ex=5):
    do_something()
    redis.delifequals(key, tag)  # 假想的 delifequals 指令

# delifequals 解锁
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

延时队列

消息队列

注意:
Redis 的消息队列不是专业的消息队列,它没有非常多的高级特性,没有 ack 保证,如果对消息的可靠性有着极致的追求,那么它就不适合使用。

Redis 的 list(列表) 数据结构常用来作为异步消息队列使用,使用rpush/lpush操作入队列,使用lpop 和 rpop来出队列。

> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)

阻塞队列

如果队列空了,客户端就会陷入 pop 的死循环,不停地 pop,没有数据,接着再 pop,又没有数据。这就是浪费生命的空轮询。
通常我们使用 sleep 来解决这个问题,让线程睡一会,睡个 1s 钟就可以了。但是有个小问题,那就是睡眠会导致消息的延迟增大。
我们可以使用 blpop/brpop,阻塞读。
阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。

锁冲突处理

上面我们讲了分布式锁的问题,但是加锁失败没有讲。一般我们有3种策略来处理加锁失败:

  1. 直接抛出异常,通知用户稍后重试
    这种方式比较适合由用户直接发起的请求,用户看到错误对话框后,会先阅读对话框的内容,再点击重试,这样就可以起到人工延时的效果。
  2. sleep 一会再重试
    sleep 会阻塞当前的消息处理线程,会导致队列的后续消息处理出现延迟。如果碰撞的比较频繁或者队列里消息比较多,sleep 可能并不合适。
  3. 将请求转移至延时队列,过一会再试
    这种方式比较适合异步消息处理,将当前冲突的请求扔到另一个队列延后处理以避开冲突。

延时队列

延时队列可以通过 Redis 的 zset(有序列表) 来实现。我们将消息序列化成一个字符串作为 zset 的value,这个消息的到期处理时间作为score,然后用多个线程轮询 zset 获取到期的任务进行处理,多个线程是为了保障可用性,万一挂了一个线程还有其它线程可以继续处理。

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> {

  static class TaskItem<T> {
    public String id;
    public T msg;
  }

  // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
  private Type TaskType = new TypeReference<TaskItem<T>>() {
  }.getType();

  private Jedis jedis;
  private String queueKey;

  public RedisDelayingQueue(Jedis jedis, String queueKey) {
    this.jedis = jedis;
    this.queueKey = queueKey;
  }

  public void delay(T msg) {
    TaskItem<T> task = new TaskItem<T>();
    task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
    task.msg = msg;
    String s = JSON.toJSONString(task); // fastjson 序列化
    jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
  }

  public void loop() {
    while (!Thread.interrupted()) {
      // 只取一条
      Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
      if (values.isEmpty()) {
        try {
          Thread.sleep(500); // 歇会继续
        } catch (InterruptedException e) {
          break;
        }
        continue;
      }
      String s = values.iterator().next();
      if (jedis.zrem(queueKey, s) > 0) { // 抢到了
        TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
        this.handleMsg(task.msg);
      }
    }
  }

  public void handleMsg(T msg) {
    System.out.println(msg);
  }

  public static void main(String[] args) {
    Jedis jedis = new Jedis();
    RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
    Thread producer = new Thread() {

      public void run() {
        for (int i = 0; i < 10; i++) {
          queue.delay("codehole" + i);
        }
      }

    };
    Thread consumer = new Thread() {

      public void run() {
        queue.loop();
      }

    };
    producer.start();
    consumer.start();
    try {
      producer.join();
      Thread.sleep(6000);
      consumer.interrupt();
      consumer.join();
    } catch (InterruptedException e) {
    }
  }
}

原文地址:https://www.cnblogs.com/luozhiyun/p/10880754.html

时间: 2024-10-06 04:23:37

redis分布式锁&队列应用的相关文章

redis分布式锁和消息队列

最近博主在看redis的时候发现了两种redis使用方式,与之前redis作为缓存不同,利用的是redis可设置key的有效时间和redis的BRPOP命令. 分布式锁 由于目前一些编程语言,如PHP等,不能在内存中使用锁,或者如Java这样的,需要一下更为简单的锁校验的时候,redis分布式锁的使用就足够满足了.redis的分布式锁其实就是基于setnx方法和redis对key可设置有效时间的功能来实现的.基本用法比较简单. public boolean tryLock(String lock

基于redis分布式锁实现“秒杀”

最近在项目中遇到了类似"秒杀"的业务场景,在本篇博客中,我将用一个非常简单的demo,阐述实现所谓"秒杀"的基本思路. 业务场景 所谓秒杀,从业务角度看,是短时间内多个用户"争抢"资源,这里的资源在大部分秒杀场景里是商品:将业务抽象,技术角度看,秒杀就是多个线程对资源进行操作,所以实现秒杀,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确. 一些可能的实现 刚才提到过,实现秒杀的关键点是控制线程对资源的争抢,根据基本的线程知识,可

Redis分布式锁实现简单秒杀功能

这版秒杀只是解决瞬间访问过高服务器压力过大,请求速度变慢,大大消耗服务器性能的问题. 主要就是在高并发秒杀的场景下,很多人访问时并没有拿到锁,所以直接跳过了.这样就处理了多线程并发问题的同时也保证了服务器的性能的稳定. 接下来我们使用redis的分布式锁来进行枷锁处理: 我们可以在进入下单的方法后将核心的方法加锁,然后离开后进行解锁 主要三步: 加锁 核心方法 解锁 首页分布式加锁解锁工具类: @Component public class RedisLock { private static

死磕 java同步系列之redis分布式锁进化史

问题 (1)redis如何实现分布式锁? (2)redis分布式锁有哪些优点? (3)redis分布式锁有哪些缺点? (4)redis实现分布式锁有没有现成的轮子可以使用? 简介 Redis(全称:Remote Dictionary Server 远程字典服务)是一个开源的使用ANSI C语言编写.支持网络.可基于内存亦可持久化的日志型.Key-Value数据库,并提供多种语言的API. 本章我们将介绍如何基于redis实现分布式锁,并把其实现的进化史从头到尾讲明白,以便大家在面试的时候能讲清楚

redis分布式锁解决超卖问题

1.1 redis事物 1.redis事物介绍 1. redis事物是可以一次执行多个命令,本质是一组命令的集合. 2. 一个事务中的所有命令都会序列化,按顺序串行化的执行而不会被其他命令插入 作用:一个队列中,一次性.顺序性.排他性的执行一系列命令 2.multi 指令基本使用 1. 下面指令演示了一个完整的事物过程,所有指令在exec前不执行,而是缓存在服务器的一个事物队列中 2. 服务器一旦收到exec指令才开始执行事物队列,执行完毕后一次性返回所有结果 3. 因为redis是单线程的,所

Redis分布式锁实现

直接上代码: 1 package cn.wywk.yac.comm.redis; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 6 import redis.clients.jedis.Jedis; 7 8 /** 9 * ClassName: redis分布式锁实现 <br/> 10 * date: 2017年2月17日 上午10:23:24 <br/> 11 * 12 * @author 134

RedLock.Net - 基于Redis分布式锁的开源实现

工作中,经常会遇到分布式环境中资源访问冲突问题,比如商城的库存数量处理,或者某个事件的原子性操作,都需要确保某个时间段内只有一个线程在访问或处理资源. 因此现在网上也有很多的分布式锁的解决方案,有数据库.MemCache.ZoopKeeper等等的方式. 这次,我们要学习的是一个基于Redis分布式锁的插件,RedLock.Net. 首先必须要有一个Redis服务来支持此分布式锁,其次就当然是要获取此插件了. 可以从Nuget中获取,也可以直接去Github下载   https://github

redis分布式锁小试

一.场景 项目A监听mq中的其他项目的部署消息(包括push_seq, status, environment,timestamp等),然后将部署消息同步到数据库中(项目X在对应环境[environment]上部署的push_seq[项目X的版本]).那么问题来了,mq中加入包含了两个部署消息 dm1 和 dm2,dm2的push_seq > dm1的push_seq,在分布式的情况下,dm1 和 dm2可能会分别被消费(也就是并行),那么在同步数据库的时候可能会发生 dm1 的数据保存 后于

Memcached 和 Redis 分布式锁方案

分布式缓存,能解决单台服务器内存不能无限扩张的瓶颈.在分布式缓存的应用中,会遇到多个客户端同时争用的问题.这个时候,需要用到分布式锁,得到锁的客户端才有操作权限. Memcached 和 Redis 是常用的分布式缓存构建方案,下面列举下基于Memcached 和 Redis 分布式锁的实现方法. Memcached 分布式锁 Memcached 可以使用 add 命令,该命令只有KEY不存在时,才进行添加,或者不会处理.Memcached 所有命令都是原子性的,并发下add 同一个KEY ,只