Redis阻塞队列原理学习

1.redis介绍

Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。

2.阻塞队列

使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。

3.redis的阻塞队列

redis提供远程访问模式,本文以redis远程阻塞从队列中取数据为例,介绍redis阻塞实现原理。

/*-----------------------------------------------------------------------------
 * Blocking POP operations
 *----------------------------------------------------------------------------*/

/* This is how the current blocking POP works, we use BLPOP as example:
 * - If the user calls BLPOP and the key exists and contains a non empty list
 *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
 *   if blocking is not required.
 * - If instead BLPOP is called and the key does not exists or the list is
 *   empty we need to block. In order to do so we remove the notification for
 *   new data to read in the client socket (so that we‘ll not serve new
 *   requests if the blocking request is not served). Also we put the client
 *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
 *   blocking for this keys.
 * - If a PUSH operation against a key with blocked clients waiting is
 *   performed, we mark this key as "ready", and after the current command,
 *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
 *   for this list, from the one that blocked first, to the last, accordingly
 *   to the number of elements we have in the ready list.
 */

  在调用阻塞队列取操作时,队列中无数据时才会真正的触发代码的阻塞分支。在客户端,redis对新来的读请求删除了标记(通知),这样知道阻塞的请求在获得服务前,新来的读请求都不能够被正常的服务。

blpop

void blpopCommand(redisClient *c) {
    blockingPopGenericCommand(c,REDIS_HEAD);
}

  队列的左右出队对应队列(list)的表头和表尾。

blockingPopGenericCommand

/* Blocking RPOP/LPOP */
void blockingPopGenericCommand(redisClient *c, int where) {
    robj *o;
    mstime_t timeout;
    int j;

    if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
        != REDIS_OK) return;

    for (j = 1; j < c->argc-1; j++) {
        o = lookupKeyWrite(c->db,c->argv[j]);
        if (o != NULL) {
            if (o->type != REDIS_LIST) {
                addReply(c,shared.wrongtypeerr);
                return;
            } else {
                if (listTypeLength(o) != 0) {
                    /* Non empty list, this is like a non normal [LR]POP. */
                    char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
                    robj *value = listTypePop(o,where);
                    redisAssert(value != NULL);

                    addReplyMultiBulkLen(c,2);
                    addReplyBulk(c,c->argv[j]);
                    addReplyBulk(c,value);
                    decrRefCount(value);
                    notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
                                        c->argv[j],c->db->id);
                    if (listTypeLength(o) == 0) {
                        dbDelete(c->db,c->argv[j]);
                        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                                            c->argv[j],c->db->id);
                    }
                    signalModifiedKey(c->db,c->argv[j]);
                    server.dirty++;

                    /* Replicate it as an [LR]POP instead of B[LR]POP. */
                    rewriteClientCommandVector(c,2,
                        (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
                        c->argv[j]);
                    return;
                }
            }
        }
    }

    /* If we are inside a MULTI/EXEC and the list is empty the only thing
     * we can do is treating it as a timeout (even with timeout 0). */
    if (c->flags & REDIS_MULTI) {
        addReply(c,shared.nullmultibulk);
        return;
    }

    /* If the list is empty or the key does not exists we must block */
    //这里才是阻塞的关键
    blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}

  

阻塞

/* Set a client in blocking mode for the specified key, with the specified
 * timeout */
void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
    dictEntry *de;
    list *l;
    int j;

    c->bpop.timeout = timeout;
    c->bpop.target = target;

    if (target != NULL) incrRefCount(target);

    for (j = 0; j < numkeys; j++) {
        /* If the key already exists in the dict ignore it. */
        if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
        incrRefCount(keys[j]);

        /* And in the other "side", to map keys -> clients */
        de = dictFind(c->db->blocking_keys,keys[j]);
        if (de == NULL) {
            int retval;

            /* For every key we take a list of clients blocked for it */
            l = listCreate();
            retval = dictAdd(c->db->blocking_keys,keys[j],l);
            incrRefCount(keys[j]);
            redisAssertWithInfo(c,keys[j],retval == DICT_OK);
        } else {
            l = dictGetVal(de);
        }
        listAddNodeTail(l,c);
    }
    blockClient(c,REDIS_BLOCKED_LIST);
}

  

时间: 2024-10-14 21:53:21

Redis阻塞队列原理学习的相关文章

可阻塞队列-原理及源码解析

阻塞原理:比如,一个队列中有8个格子,代表可放入8条数据,当一条信息到来就放入一个格子中,然后就进行处理.但是这个时候一次性来了8条数据,格子满了,数据还没有处理完,就来个一条数据.这个时候就把这条数据进行阻塞. 示例:假定有一个绑定的缓冲区,它支持 put 和 take 方法.如果试图在空的缓冲区上执行 take 操作,则在某一个项变得可用之前,线程将一直阻塞:如果试图在满的缓冲区上执行 put 操作,则在有空间变得可用之前,线程将一直阻塞.我们喜欢在单独的等待 set 中保存 put 线程和

Java核心知识点学习----多线程中的阻塞队列,ArrayBlockingQueue介绍

1.什么是阻塞队列? 所谓队列,遵循的是先进先出原则(FIFO),阻塞队列,即是数据共享时,A在写数据时,B想读同一数据,那么就将发生阻塞了. 看一下线程的四种状态,首先是新创建一个线程,然后,通过start方法启动线程--->线程变为可运行可执行状态,然后通过数据产生共享,线程产生互斥---->线程状态变为阻塞状态---->阻塞状态想打开的话可以调用notify方法. 这里Java5中提供了封装好的类,可以直接调用然后构造阻塞状态,以保证数据的原子性. 2.如何实现? 主要是实现Blo

RHCA学习笔记:RH442-Unit5 队列原理

NIT 5 Queuing Theory 队列原理 目标: 1.明白性能调优的关键术语 2. 应用队列技术解决性能问题 3.明白性能调优的复杂性 5.1    Introduction to queuing theory 队列原理简介 A.      Little’s Law 给出了队列原理的基础 John Little于1961年用数学证明了这个原理. B.      带来的好处: a.       可以用工程学方法来进行性能管理. b.       量化系统未来的性能 c.       说明

Java阻塞队列实现原理分析-ArrayBlockingQueue和LinkedBlockingQueue

Java中的阻塞队列接口BlockingQueue继承自Queue接口. BlockingQueue接口提供了3个添加元素方法. add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常 offer:添加元素到队列里,添加成功返回true,添加失败返回false put:添加元素到队列里,如果容量满了会阻塞直到容量不满 3个删除方法. poll:删除队列头部元素,如果队列为空,返回null.否则返回元素. remove:基于对象找到

【并发】8、借助redis 实现多线程生产消费阻塞队列

顾名思义这个就是再消费的时候,不是之前的那哥用yield进行线程切换的操作,而是用线程等待阻塞的方式去执行,说实话我感觉效率不一定有之前那个好, 因为我对这种阻塞队列使用的时候,之前有发现阻塞队列,塞着塞着线程就会进入假死状态,这个很奇怪,但是有的时候又是好的,这个也不清楚到底是为什么 但是毕竟也是一种实现,我就写出来了看看吧 生产者 package queue.redisQueue; import queue.fqueue.vo.TempVo; import redis.clients.jed

多线程编程学习六(Java 中的阻塞队列).

介绍 阻塞队列(BlockingQueue)是指当队列满时,队列会阻塞插入元素的线程,直到队列不满:当队列空时,队列会阻塞获得元素的线程,直到队列变非空.阻塞队列就是生产者用来存放元素.消费者用来获取元素的容器. 当线程 插入/获取 动作由于队列 满/空 阻塞后,队列也提供了一些机制去处理,或抛出异常,或返回特殊值,或者线程一直等待... 方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put(e) offer(e, timeout, unit

java 阻塞队列 LinkedBlockingQueue ArrayBlockingQueue 分析

BlockingQueue是阻塞队列接口类,该接口继承了Queue接口 BlockingQueue实现类常见的有以下几种. ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里.有界也就意味着,它不能够存储无限多数量的元素.它有一个同一时间能够存储元素数量的上限.你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改). D

Java多线程 阻塞队列和并发集合

转载:大关的博客 Java多线程 阻塞队列和并发集合 本章主要探讨在多线程程序中与集合相关的内容.在多线程程序中,如果使用普通集合往往会造成数据错误,甚至造成程序崩溃.Java为多线程专门提供了特有的线程安全的集合类,通过下面的学习,您需要掌握这些集合的特点是什么,底层实现如何.在何时使用等问题. 3.1 BlockingQueue接口 java阻塞队列应用于生产者消费者模式.消息传递.并行任务执行和相关并发设计的大多数常见使用上下文. BlockingQueue在Queue接口基础上提供了额外

从使用到原理学习Java线程池

来源:SilenceDut http://www.codeceo.com/article/java-threadpool-learn.html 线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收. 所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这