memcached 线程模型

memcached 线程可分为两种,一是负责事件处理(主线程)和内存管理等的线程, 二是负责负责连接请求处理的线程即 worker threads 。这里只对 worker threads 进行讨论。

main 函数开始分析:

int main(int argc, char* argv[]) { // in memcached.c
    // ...
    /* start up worker threads if MT mode */
    memcached_thread_init(settings.num_threads, main_base);
    // ...
}

settings.num_threadsworker threads 的数目, 默认值为 4, 可通过 -t 选项指定,
通常这个值不应超过机器处理器核的数目。

main_base 是一个指向 event_base 结构的指针, 用于网络事件处理(libevent)。

在继续分析之前,先看一下相关的数据结构和变量定义:

1. 连接队列(thread.c)

/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int               sfd;                // 一个连接对应的 socket fd
    enum conn_states   init_state;
    int               event_flags;        // 网络事件对应的 event flags, libevent概念
    int               read_buffer_size;
    enum network_transport     transport;    // 传输协议: local_transport(Unix sockets)、 tcp_transport 或 udp_transport
    CQ_ITEM          *next;                // 链表 下一项指针, 用于链接在连接队列中的下一项或空闲项列表中的下一项
};

/* A connection queue. */
typedef struct conn_queue CQ;                // 连接队列, 实际上是一个CQ_ITEM的单链表
struct conn_queue {
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;                    // 访问锁
};

2. 线程信息(memcached.h)

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */            // 线程 ID
    struct event_base *base;    /* libevent handle this thread uses */        // event base
    struct event notify_event;  /* listen event for notify pipe */            // 用于监听管道事件
    int notify_receive_fd;      /* receiving end of notify pipe */            // 管道接受端 fd
    int notify_send_fd;         /* sending end of notify pipe */            // 管道发送端 fd
    struct thread_stats stats;  /* Stats generated by this thread */        // 线程统计数据
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */ // 连接队列
    cache_t *suffix_cache;      /* suffix cache */
} LIBEVENT_THREAD; // 用于描述 worker thread

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
} LIBEVENT_DISPATCHER_THREAD; // 用于描述 diapatcher thread

3. 变量定义(thread.c)

/* Locks for cache LRU operations */
pthread_mutex_t lru_locks[POWER_LARGEST];                // 用于 lru cache 的锁

/* Connection lock around accepting new connections */
pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;    // 用于接受新连接的锁

/* Lock for global stats */
static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; // 用于收集统计信息的锁

/* Lock to cause worker threads to hang up after being woken */
static pthread_mutex_t worker_hang_lock;            // 用于线程同步的锁, 具体在下面会有交代 

/* Free list of CQ_ITEM structs */
static CQ_ITEM *cqi_freelist;                    // CQ_ITEM空闲项列表
static pthread_mutex_t cqi_freelist_lock;        // 相应的访问锁

static pthread_mutex_t *item_locks;                // 数组,存放用于访问 hash 表的锁
/* size of the item lock hash table */
static uint32_t item_lock_count;                // 上述数组大小
unsigned int item_lock_hashpower;                // 上述数组大小以2为底的对数, 即 item_lock_count = 2**item_lock_hashpower
#define hashsize(n) ((unsigned long int)1<<(n))    // 2 ** n
#define hashmask(n) (hashsize(n)-1)                // 2 ** n - 1

static LIBEVENT_DISPATCHER_THREAD dispatcher_thread; // dispatcher 描述

/*
 * Each libevent instance has a wakeup pipe, which other threads
 * can use to signal that they‘ve put a new connection on its queue.
 */
static LIBEVENT_THREAD *threads; // worker threads 描述

/*
 * Number of worker threads that have finished setting themselves up.
 */
static int init_count = 0;                // 已启动或重启的线程的数目
static pthread_mutex_t init_lock;        // 用于启动或重启线程的锁和条件变量
static pthread_cond_t init_cond;

下面贴出 memcached_thread_init 的定义:

/*
 * Initializes the thread subsystem, creating various worker threads.
 *
 * nthreads  Number of worker event handler threads to spawn
 * main_base Event base for main thread
 */
void memcached_thread_init(int nthreads, struct event_base *main_base) {
    int         i;            // 用于循环
    int         power;        // 指数表示的 item lock table 大小, 实际大小为 2**power。
                            // item lock table 是一个数组, 保存的是用来访问 hash table 的锁。

    // 初始化锁以及条件变量
    for (i = 0; i < POWER_LARGEST; i++) {
        pthread_mutex_init(&lru_locks[i], NULL);
    }
    pthread_mutex_init(&worker_hang_lock, NULL);

    pthread_mutex_init(&init_lock, NULL);
    pthread_cond_init(&init_cond, NULL);

    pthread_mutex_init(&cqi_freelist_lock, NULL);
    cqi_freelist = NULL; // 初始化 cqi_freelist 

    /* Want a wide lock table, but don‘t waste memory */
    if (nthreads < 3) { // 0 1 2
        power = 10;
    } else if (nthreads < 4) { // 3
        power = 11;
    } else if (nthreads < 5) { // 4
        power = 12;
    } else {
        /* 8192 buckets, and central locks don‘t scale much past 5 threads */
        power = 13;
    }

    // 为了保证线程安全性, 在访问 hash 表时, 需要对当前访问的链(chain)加锁
    // 为了提升访问效率的同时又不浪费过多的存储空间, 要求锁的数目尽可能多但不允许其超过 hash 表的 slots 数
    if (power >= hashpower) {
        fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
        fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
        fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
        exit(1);
    }

    item_lock_count = hashsize(power); // 2 ** power
    item_lock_hashpower = power;

    // 初始化 item_locks
    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
    if (! item_locks) {
        perror("Can‘t allocate item locks");
        exit(1);
    }
    for (i = 0; i < item_lock_count; i++) {
        pthread_mutex_init(&item_locks[i], NULL);
    }

    // 为 threads 数组分配空间
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
        perror("Can‘t allocate thread descriptors");
        exit(1);
    }

    // 初始化 dispatcher_thread
    dispatcher_thread.base = main_base; // dispatcher thread 即主线程
    dispatcher_thread.thread_id = pthread_self();

    // 初始化用于线程通信的管道 pipe
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror("Can‘t create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        setup_thread(&threads[i]); // 初始化 thread[i] 的其他字段
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

    /* Create threads after we‘ve done all the libevent setup. */
    for (i = 0; i < nthreads; i++) { // 创建线程 pthread_create
        create_worker(worker_libevent, &threads[i]);
    }

    /* Wait for all the threads to set themselves up before returning. */
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads); // 等待所有线程创建完成,利用的是条件变量
    pthread_mutex_unlock(&init_lock);
}

setup_thread:

/*
 * Set up a thread‘s information.
 */
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init(); // 初始化 libevent handle (或 event base)
    if (! me->base) {
        fprintf(stderr, "Can‘t allocate event base\n");
        exit(1);
    }

    // 管道读事件监听
    /* Listen for notifications from other threads */
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);
    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can‘t monitor libevent notify pipe\n");
        exit(1);
    }

    // 连接队列初始化
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue); 

    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
        perror("Failed to initialize mutex");
        exit(EXIT_FAILURE);
    }

    me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
                                    NULL, NULL);
    if (me->suffix_cache == NULL) {
        fprintf(stderr, "Failed to create suffix cache\n");
        exit(EXIT_FAILURE);
    }
}

create_worker:

static void create_worker(void *(*func)(void *), void *arg) { // 线程创建
    pthread_t       thread;
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);

    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
        fprintf(stderr, "Can‘t create thread: %s\n",
                strerror(ret));
        exit(1);
    }
}

wait_for_thread_registration:

static void wait_for_thread_registration(int nthreads) {
    while (init_count < nthreads) {
        pthread_cond_wait(&init_cond, &init_lock);
    }
}

在执行 wait_for_thread_registration 时, 我们已经持有锁 init_lock。 每初始化完成一个thread,init_count 都会自增1(这个操作需要保证其原子性, 利用init_cond加锁) 并且调用 pthread_cond_signal(&init_cond),这些都是在 register_thread_initialized (in thread.c) 中完成的。

调用栈: worker_libevent()->register_thread_initialized()->pthread_cond_signal(), 其中 worker_libevent 是 worker 线程的主方法, 其除了调用 register_thread_initialized() 外, 还启动了当前线程的事件处理循环。源代码如下:

/*
 * Worker thread: main event loop
 */
static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;

    /* Any per-thread setup can happen here; memcached_thread_init() will block until
     * all threads have finished initializing.
     */

    register_thread_initialized();

    event_base_loop(me->base, 0);
    return NULL;
}

看一下 thread_libevent_process (管道读事件处理器 notify event handler):

/*
 * Processes an incoming "handle a new connection" item. This is called when
 * input arrives on the libevent wakeup pipe.
 */
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg; // 当前线程信息
    CQ_ITEM *item;
    char buf[1];

    if (read(fd, buf, 1) != 1) // 读取消息 c 或 p
        if (settings.verbose > 0)
            fprintf(stderr, "Can‘t read from libevent pipe\n");

    switch (buf[0]) {
    case ‘c‘: // 处理新连接
    item = cq_pop(me->new_conn_queue);

    if (NULL != item) {
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can‘t listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can‘t listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    /* we were told to pause and report in */
    case ‘p‘: // 暂停当前线程
        register_thread_initialized(); // 这时 worker_hang_lock 已被其他线程持有, 调用后因在该锁上执行lock操作暂停
        break;
    }
}

conn_new (in memcached.c) 初始化连接状态信息。主线程接收到连接请求后就会调用 dispatch_conn_new (sfd, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, false)dispatch_conn_new 选择一个线程处理连接的网络事件(其实只有读事件)。

/* Which thread we assigned a connection to most recently. */
static int last_thread = -1; // 记录最近处理新连接的线程的索引 (index)

/*
 * Dispatches a new connection to another thread. This is only ever called
 * from the main thread, either during initialization (for UDP) or because
 * of an incoming connection.
 */
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new(); // 分配一个新的 CQ_ITEM
    char buf[1];
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let‘s try */
        fprintf(stderr, "Failed to allocate memory for connection object\n");
        return ;
    }

    int tid = (last_thread + 1) % settings.num_threads; // 计算处理这个新连接的线程的 index

    LIBEVENT_THREAD *thread = threads + tid; // 获取相应的线程描述信息

    last_thread = tid; // 更新 last_thread

    // 初始化 item
    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    cq_push(thread->new_conn_queue, item); // 将 item 添加到线程的连接队列

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = ‘c‘;
    if (write(thread->notify_send_fd, buf, 1) != 1) { // 通知相应的线程处理连接
        perror("Writing to thread notify pipe");
    }
}

write(thread->notify_send_fd, buf, 1) 成功执行后, 线程会监听到管道读事件, 从管道读取消息 "c", 对连接队列中的新连接进行处理, 源代码见 thread_libevent_process() (管道读事件处理器, 或通知处理器)。

这篇文章到此为止。 上述代码涉及的其他细节如 CQ_ITEM 的分配和释放等,这里不再作详细说明。至于 hash 表 和 网络通信 会分别在内存管理(TODO : link)和网络通信(TODO : link)这两节给出比较详尽的描述。

时间: 2024-10-08 13:08:41

memcached 线程模型的相关文章

Memcached线程模型分析

前面提到过Memcached的线程模型:Memcached使用Libevent,以此为基础来处理事件.其原理为:启动时的线程为main thread,它包含一个event_base,之后创建多个worker thread:每个work thread中也有一个event_base.main thread中的event_base负责监听网络,接收新连接:当建立连接后就把新连接交给worker thread来处理. Memcached的main函数在Memcached.c中.main函数中初始化了系统

Memcached源码分析之线程模型

作者:Calix 一)模型分析 memcached到底是如何处理我们的网络连接的? memcached通过epoll(使用libevent,下面具体再讲)实现异步的服务器,但仍然使用多线程,主要有两种线程,分别是“主线程”和“worker线程”,一个主线程,多个worker线程. 主线程负责监听网络连接,并且accept连接.当监听到连接时,accept后,连接成功,把相应的client fd丢给其中一个worker线程.worker线程接收主线程丢过来的client fd,加入到自己的epol

redis和memcached有什么区别?redis的线程模型是什么?为什么单线程的redis比多线程的memcached效率要高得多(为什么redis是单线程的但是还可以支撑高并发)?

1.redis和memcached有什么区别? 这个事儿吧,你可以比较出N多个区别来,但是我还是采取redis作者给出的几个比较吧 1)Redis支持服务器端的数据操作:Redis相比Memcached来说,拥有更多的数据结构和并支持更丰富的数据操作,通常在Memcached里,你需要将数据拿到客户端来进行类似的修改再set回去.这大大增加了网络IO的次数和数据体积.在Redis中,这些复杂的操作通常和一般的GET/SET一样高效.所以,如果需要缓存能够支持更复杂的结构和操作,那么Redis会是

Redis 和 Memcached 有什么区别?Redis 的线程模型是什么?为什么单线程的 Redis 比多线程的 Memcached 效率要高得多?

面试题 redis 和 memcached 有什么区别?redis 的线程模型是什么?为什么 redis 单线程却能支撑高并发? 面试官心理分析 这个是问 redis 的时候,最基本的问题吧,redis 最基本的一个内部原理和特点,就是 redis 实际上是个单线程工作模型,你要是这个都不知道,那后面玩儿 redis 的时候,出了问题岂不是什么都不知道? 还有可能面试官会问问你 redis 和 memcached 的区别,但是 memcached 是早些年各大互联网公司常用的缓存方案,但是现在近

2.redis 和 memcached 有什么区别?redis 的线程模型是什么?为什么 redis 单线程却能支撑高并发?

作者:中华石杉 面试题 redis 和 memcached 有什么区别?redis 的线程模型是什么?为什么 redis 单线程却能支撑高并发? 面试官心理分析 这个是问 redis 的时候,最基本的问题吧,redis 最基本的一个内部原理和特点,就是 redis 实际上是个单线程工作模型,你要是这个都不知道,那后面玩儿 redis 的时候,出了问题岂不是什么都不知道? 还有可能面试官会问问你 redis 和 memcached 的区别,但是 memcached 是早些年各大互联网公司常用的缓存

Redis与Memcached区别?Redis线程模型?

一.redis 和 memcached 有啥区别? 1. redis 支持复杂的数据结构 redis 相比 memcached 来说,拥有更多的数据结构,能支持更丰富的数据操作.如果需要缓存能够支持更复杂的结构和操作, redis 会是不错的选择. 2. redis 原生支持集群模式 在 redis3.x 版本中,便能支持 cluster 模式,而 memcached 没有原生的集群模式,需要依靠客户端来实现往集群中分片写入数据. 3. 性能对比 由于 redis 只使用单核,而 memcach

转:Memcached 线程部分源码分析

目前网上关于memcached的分析主要是内存管理部分,下面对memcached的线程模型做下简单分析 有不对的地方还请大家指正,对memcahced和libevent不熟悉的请先google之 先看下memcahced启动时线程处理的流程 memcached的多线程主要是通过实例化多个libevent实现的,分别是一个主线程和n个workers线程 无论是主线程还是workers线程全部通过libevent管理网络事件,实际上每个线程都是一个单独的libevent实例 主线程负责监听客户端的建

多线程——线程模型

什么是程序? 安装在磁盘上的一段指令集合,它是静态的概念. 什么是进程? 它是运行中的程序,是动态的概念,每个进程都有独立的资源空间. 什么是线程? 线程,又称为轻量级进程,是程序执行流的最小单元,是程序中一个单一的顺序控制流程.线程是进程的一个实体,是被系统独立调度和分派的基本单位. 什么是多线程? 多线程则指的是在单个程序中可以同时运行多个不同的线程执行不同的任务. 多线程的特点 ①   一个进程可以包含一个或多个线程. ②   一个程序实现多个代码同时交替运行就需要产生多个线程. ③  

JS线程模型&amp;Web Worker

js线程模型 客户端javascript是单线程,浏览器无法同时运行两个事件处理程序 设计为单线程的理论是,客户端的javascript函数必须不能运行太长时间,否则会导致web浏览器无法对用户输入做出响应.这也是为什么Ajax的API都是异步的,以及为什么客户端Javascript不能使用一个简单的异步load()或者require()函数来加载javascript库 如果应用程序不得不执行太多的计算而导致明显的延迟,应该允许文档在执行这个计算之前完全载入,并且确保告诉用户正在进行计算并且浏览