今天抽空主要看了一下mysql线程池(cached threads)的实现原理,总体并不那么复杂,也学到了一些设计原理,值得记录一下。为了简化代码,让思路更清晰,我删去了不少错误处理,线程同步锁的代码,mysql中大量使用全局变量,这些都需要锁了控制访问。
先大致说一下几个关键的东西:
1、List结构:这个看名字就知道,是一个list,可以理解为队列,这个数据结构是用来放thd的,就是线程数据的,这里不去深究list如何实现了(暂时)。
2、threads:一个thd的list,这些thd都是活跃线程(wake thread)。
3、cached_threads:也是一个thd的list,但并不活跃,实际上是一个等待thd,即如果当前有数据在里面,且有活跃线程结束,那么这个走进结束流程的线程会取出thd,然后运行这个thd,这样就不用新起线程了。
首先看看创建线程的代码:
void create_new_thread(thd){ // 检查连接数是否过大 mysql_mutext_lock(&lock_connection_count); if (connetion_count >= max_connections + 1 || abort_loop){ mysql_mutext_unlock(&lock_connection_count); delete thd; // will close the connection return; } ++connection_count; if (connection_count > max_used_connections){ max_used_connections = connection_count; } mysql_mutext_unlock(&lock_connection_count); mysql_mutex_lock(&lock_thread_count); // 我在想如果连接很多很多,溢出了怎么办?? thd->thread_id = thd->variables.pseudo_thread_id = thread_id++; thread_count++; // add_connection当然是个函数指针,thread_scheduler是一个放了这些函数指针的结构体 // 为了跨平台,却没有使用C++的多态,而是用了函数指针实现了多态 thread_scheduler->add_connection(thd); }
// 这个就是add_connection的实现 void create_thread_to_handle_connection(thd){ // 注意这个判断,cache_thread_count大于wake_thread的时候才会进来, // 当线程结束的时候,线程cachd_thread_count++ // 当发现有空闲,先wake_thread_count++,cachd_thread_count-- // 成功启动线程后,wake_thread_count--,这时wake_thread_count和cachd_thread_count又归于平衡 // 简单来说,cache_thread_count是用来记录可用线程数量的,而wake_thread是用来记录就绪线程数量的 // 多数时候,wake_thread_count基本是0,而cache_thread_count则大于等于0 if (cache_thread_count > wake_thread){ thread_cache.push_back(thd); wake_thread++; mysql_cond_signal(&cond_thread_cache); } else { // 如果没有cached thread则直接创建新的线程 thread_created++; // threads这个list放着所有正在运行时的thd threads.append(thd); thd->prior_thr_create_utime = thd->start_utime = my_micro_time(); // 真正创建一个线程出来 if (error = mysql_thread_create(key_thread_one_connection, &thd->real_id, &connection_attrib handle_one_connection)) { // do some clear up // ignore the lock operations // 创建失败清理一下 thread_count--; connection_count--; delete thd; } } }
那么什么时候会进入if (connetion_count >= max_connections + 1 || abort_loop)这个if语句当中呢?就是有线程结束,并等待新的thd来的时候,看看线程结束的时候做了些什么:
// 线程结束时的回调 bool one_thread_per_connection_end(thd, bool put_in_cache){ unlink_thd(thd); // delete thd if (put_in_cache) { // 关键函数cache_thread put_in_cache = cache_thread(); } // 如果已经获得新thd,就不结束了 if (put_in_cache){ return 0; } // 否则结束线程 my_thread_end(); mysql_cond_broadcast(&cond_thread_count); pthread_exit(0); return 0; }
其中的cache_thread是关键函数:
// 这个函数就是cache thread的关键,他的思路就是线程结束的时候,wait for cache_thread队列 // 取出新的thd,运行之 static bool cache_thread(){ // 判断是否到达cache 线程的阈值 if (cache_thread_count < thread_cache_size && !abort_loop && !kill_cache_threads){ cache_thread_count++; // 进入cache状态 // wait for the signal to relive the thread // 开始wait for新的thd装进cache_thread list // 注意这里用的是while而不是if,因为 // 1、cond_wait可能虚假唤醒,可能因为竞争,并没有真正获得新的thd // 2、若获取失败则再次等新的thd,总之就是尽可能的获得新的thd while (!abort_loop && !wake_thread && !kill_cached_threads){ mysql_cond_wait(&cond_thread_cache, &lock_thread_count); cached_thread_count--; // 此处没仔细看是什么逻辑,暂时跳过 if (kill_cached_threads){ mysql_cond_signal(&cond_flush_thread_cache); } // 发现有就绪的thd,获取之,然后执行 if (wake_thread){ THD* thd; wake_thread--; thd = thread_cache.get(); thd->thread_stack = (char*)&thd; thd->store_globals(); thd->mysys_var->abort = 0; thd->prior_thr_create_utime = thd->start_utime = my_micro_time(); // 返回1使得线程不会结束,跑新的thd了 return 1; } } } // 结束线程 return 0; }
分析cache thread思路完毕。
时间: 2024-10-12 03:01:27