分布式缓存系统 Memcached 哈希表操作

memcached 中有两张hash 表,一个是“主hash 表”(primary_hashtable),另外一个是“原hash 表”(old_hashtable)。一般情况下都在主表中接受操作,在插入新item时判断是否需要进行扩;每次操作的时候,先会检测表是否正处于扩展(expanding)状态,如果是,则原表中进行操作,当扩容完成在转移到主表中进行操作。 在扩容时,采取逐步迁移策略:即每次只从原表中迁移一个bucket节点的item到新主表中,进行逐步迁移。

总的来看,这与Redis中的hash操作几乎一致。因此不再做详细讲解,具体分析见代码注释。

//hash表的初始化,参数hashtable_init为所设置的hashpower大小(阶数),默认大小为16
void assoc_init(const int hashtable_init) {
    if (hashtable_init) {
        hashpower = hashtable_init;
    }
 //创建主表(hashsize(hashpower):计算bucket节点数目=2的hashpower次方)
    primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
    if (! primary_hashtable) {
        fprintf(stderr, "Failed to init hashtable.\n");
        exit(EXIT_FAILURE);
    }
 //emcached内部有很多全局的统计信息,用于实时获取各个资源的使用情况,
 //对统计信息的更新都需要加锁
    STATS_LOCK();//对全局统计信息加锁,已更新信息
    stats.hash_power_level = hashpower;
    stats.hash_bytes = hashsize(hashpower) * sizeof(void *);
    STATS_UNLOCK();//解锁
}

//在哈希表中查找给定key的item:找到对应的哈希表,再找对应的桶节点,最后遍历链表找到目标key的item
item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
    item *it;//桶节点
    unsigned int oldbucket;//在原表中的桶节点索引

//正在扩容,且当前节点在愿表中,还未迁移到主表
 //注意:i&(2^n-1)结果即为i除以2^n的余数
    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        it = old_hashtable[oldbucket];
    } else {//没有扩容,或者已经迁移到主表中
        it = primary_hashtable[hv & hashmask(hashpower)];
    }

item *ret = NULL;
    int depth = 0;//目标节点在桶中的深度
    while (it) {//遍历桶节点链表
        if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
            ret = it;
            break;
        }
        it = it->h_next;
        ++depth;
    }
    MEMCACHED_ASSOC_FIND(key, nkey, depth);
    return ret;
}

/* returns the address of the item pointer before the key.  if *item == 0,
  the item wasn‘t found */
//内部函数:返回目标key item的前一个item的指针,这样在删除目标item时只需要将该返回item指针的next指针指向目标item的next item即可。
static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
    item **pos;
    unsigned int oldbucket;

if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        pos = &old_hashtable[oldbucket];
    } else {
        pos = &primary_hashtable[hv & hashmask(hashpower)];
    }

while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
        pos = &(*pos)->h_next;
    }
    return pos;
}

/* grows the hashtable to the next power of 2. */
//哈希表扩容为原来的2倍(将原来的主表拷贝到久表中,对主表扩容)
static void assoc_expand(void) {
    old_hashtable = primary_hashtable;

primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
    if (primary_hashtable) {
        if (settings.verbose > 1)
            fprintf(stderr, "Hash table expansion starting\n");
        hashpower++;
        expanding = true;
        expand_bucket = 0;
        STATS_LOCK();
        stats.hash_power_level = hashpower;
        stats.hash_bytes += hashsize(hashpower) * sizeof(void *);
        stats.hash_is_expanding = 1;
        STATS_UNLOCK();
    } else {
        primary_hashtable = old_hashtable;
        /* Bad news, but we can keep running. */
    }
}

static void assoc_start_expand(void) {
    if (started_expanding)
        return;
    started_expanding = true;
    pthread_cond_signal(&maintenance_cond);
}

/* Note: this isn‘t an assoc_update.  The key must not already exist to call this */
//将给定item插入到哈希表的桶的头部中  注意:该item不能已经存在于hash表中(hv:哈希值)
int assoc_insert(item *it, const uint32_t hv) {
    unsigned int oldbucket;

//    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn‘t have duplicately named things defined */

//正在扩容,还未完成,则将该item放到原hashtable的对应bucket的单链表的头部
    if (expanding &&  
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)//注意hashpower已经加倍,因此是hashpower-1
    {
        it->h_next = old_hashtable[oldbucket];
        old_hashtable[oldbucket] = it;
    } else {//没有正在扩容则放到主hashtable中
        it->h_next = primary_hashtable[hv & hashmask(hashpower)];
        primary_hashtable[hv & hashmask(hashpower)] = it;
    }

hash_items++;
 //是否需要开始扩容
    if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
        assoc_start_expand();
    }

MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
    return 1;
}

//删除对应item(只是将item从桶链表中移除)
void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
    item **before = _hashitem_before(key, nkey, hv);//查找该item的前一个item

if (*before) {
        item *nxt;
        hash_items--;//hash表中的item总数
        /* The DTrace probe cannot be triggered as the last instruction
        * due to possible tail-optimization by the compiler
        */
        MEMCACHED_ASSOC_DELETE(key, nkey, hash_items);
        nxt = (*before)->h_next;
        (*before)->h_next = 0;  /* probably pointless, but whatever. */
        *before = nxt;
        return;
    }
    /* Note:  we never actually get here.  the callers don‘t delete things
      they can‘t find. */
    assert(*before != 0);
}

//迁移函数start_assoc_maintenance_thread(),创建迁移线程,调用函数assoc_maintenance_thread进行迁移
//线程函数:迁移bucket节点,默认一次迁移一个bucket
static void *assoc_maintenance_thread(void *arg) {

while (do_run_maintenance_thread) {
        int ii = 0;

/* Lock the cache, and bulk move multiple buckets to the new
        * hash table. */
        item_lock_global();
        mutex_lock(&cache_lock);

for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
            item *it, *next;
            int bucket;

for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
                next = it->h_next;

//计算哈希值,并计算得桶节点索引值
                bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
                it->h_next = primary_hashtable[bucket];
                primary_hashtable[bucket] = it;
            }

//每迁移完一个bucket,就在久表中移除该bucket
            old_hashtable[expand_bucket] = NULL;

expand_bucket++;
   //扩容结束
            if (expand_bucket == hashsize(hashpower - 1)) {
                expanding = false;
                free(old_hashtable);
                STATS_LOCK();
                stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
                stats.hash_is_expanding = 0;
                STATS_UNLOCK();
                if (settings.verbose > 1)
                    fprintf(stderr, "Hash table expansion done\n");
            }
        }

mutex_unlock(&cache_lock);
        item_unlock_global();

if (!expanding) {
            /* finished expanding. tell all threads to use fine-grained locks */
            switch_item_lock_type(ITEM_LOCK_GRANULAR);
            slabs_rebalancer_resume();
            /* We are done expanding.. just wait for next invocation */
            mutex_lock(&cache_lock);
            started_expanding = false;
            pthread_cond_wait(&maintenance_cond, &cache_lock);
            /* Before doing anything, tell threads to use a global lock */
            mutex_unlock(&cache_lock);
            slabs_rebalancer_pause();
            switch_item_lock_type(ITEM_LOCK_GLOBAL);
            mutex_lock(&cache_lock);
            assoc_expand();
            mutex_unlock(&cache_lock);
        }
    }
    return NULL;
}

时间: 2024-10-26 09:34:26

分布式缓存系统 Memcached 哈希表操作的相关文章

memcached哈希表操作主要逻辑笔记

以下注释的源代码都在memcached项目的assoc.c文件中 1 /* how many powers of 2's worth of buckets we use */ 2 unsigned int hashpower = HASHPOWER_DEFAULT; /* 哈希表bucket的级别,(1<<hashpower) == bucket的个数 */ 3 4 /* Main hash table. This is where we look except during expansio

分布式缓存系统 Memcached 整体架构

分布式缓存系统 Memcached整体架构 Memcached经验分享[架构方向] Memcached 及 Redis 架构分析和比较

分布式缓存系统 Memcached slab和item的主要操作

上节在分析slab内存管理机制时分析Memcached整个Item存储系统的初始化过程slabs_init()函数:分配slabclass数组空间,到最后将各slab划分为各种级别大小的空闲item并挂载到对应大小slab的空闲链表slots上.本节将继续分析对slab和item的主要操作过程. slab机制中所采用的LRU算法: 在memcached运行过程中,要把一个item调入内存,但内存已无空闲空间时,为了保证程序能正常运行,系统必须从内存中调出一部分数据,送磁盘的对换区中.但应将哪些数

分布式缓存系统Memcached简介与实践

缘起: 在数据驱动的web开发中,经常要重复从数据库中取出相同的数据,这种重复极大的增加了数据库负载.缓存是解决这个问题的好办法.但是ASP.NET中的虽然已经可以实现对页面局部进行缓存,但还是不够灵活.此时Memcached或许是你想要的. Memcached是什么?Memcached是由Danga Interactive开发的,高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提升访问速度. Memcached能缓存什么?通过在内存里维护一个统一的巨大的hash表,Memc

分布式缓存系统Memcached

Memcached是高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提高访问速度. 通过在内存中维护一个巨大的统一的hash表,Memcached能够用来存储各种格式的数据,包括图像.视频.文件以及数据库检索的结果等. Memcached使用了libevent(如果可以的话,在linux下使用epoll) 来均衡任何数量的打开链接,使用非阻塞的网络I/O,对内部对象实现引用计数(因此,针对多样的客户端,对象可以处在多样的状态), 使用自己的页块分配器和哈希表, 因此虚拟内存不

分布式缓存系统 Memcached 数据存储slab与hashtable

缓存数据以item为基本单元,以双链表形式存放在对应级别大小的slabclass结构的chunk中.同时该item还存放在链式hashtable中bucket中,用于提供快速查找的索引. 首先是理解缓存的基本数据单元item结构: typedef struct _stritem {    struct _stritem *next;    //在slab中的双链表后向指针    struct _stritem *prev;    //在slab中的双向链表的前向指针    struct _str

分布式缓存系统Memcached在Asp.net下的应用

Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. 站下的session性能并不高,所以造成人们一种印象,大型WEB项目使用Java的错觉,致使很多人吐槽微软不给力,其实这好比拉不出怪地球引力,本

分布式缓存系统 Memcached 基本配置与命令

为了方便测试,给出一个C客户端libmemcached链接:https://launchpad.net/libmemcached/ 以及memcacheclient-2.0 : http://code.jellycan.com/files/memcacheclient-2.0.zip(已生成 sln,在windows下直接用VS打开,编译成功) 在Memcached启动时,有很多配置参数可以选择,以下参数对应memcached1.4.15,现给出这些参数的具体含义: "a:" //un

分布式缓存系统 Memcached 工作线程初始化

Memcached采用典型的Master-Worker模式,其核心思想是:有Master和Worker两类进程(线程)协同工作,Master进程负责接收和分配任务,Worker进程负责处理子任务.当各Worker进程将各个子任务处理完成后,将结果返回给Master进程,由Master进程做归纳和汇总. 工作示意图如下所示: 其中每个工作线程维护一个连接队列,以接收由主线程分配的客户连接:同时每个工作线程维护一个Libevent实例,以处理和主线程间的管道通信以及和客户连接间的socket通信事件