diy数据库(七)--线程控制块、消息、线程池

一、概述

1、diy数据库使用的是一个多进程and多线程的服务器模型。每个进程作为一个节点实例,监听一个端口;而每个用户连接在数据库节点实例中都会有一个代理线程与之对应。

2、除了主线程外每个线程都有一个EDU(进程调度单元,也可称为线程控制块),另外每种系统线程类型有且只有一个线程实体(这里的系统线程只有一种,即监听线程,主线程不在线程管理池里面)

3、代理线程是专门处理用户请求的 ,由监听线程创建

4、线程池对进行调度,通过传入不同的类型, 内部调用不同的函数执行相应请求

5、EDU回池后可以被分派给其他任务

执行队列:所有取得任务的线程的EDU(即线程控制块)都在执行队列中

等待队列:客户端退出后的代理线程在线程池不满的情况下会放在空闲线程队列中

二、线程类(即线程控制块(EDU))

#ifndef PMDEDU_HPP__
#define PMDEDU_HPP__

#include "core.hpp"
#include "pmdEDUEvent.hpp"
#include "ossQueue.hpp"
#include "ossSocket.hpp"

#define PMD_INVALID_EDUID 0//非法edu_id
#define PMD_IS_EDU_CREATING(x)    ( PMD_EDU_CREATING == x )//线程正在创建
#define PMD_IS_EDU_RUNNING(x)     ( PMD_EDU_RUNNING  == x )//运行状态
#define PMD_IS_EDU_WAITING(x)     ( PMD_EDU_WAITING  == x )//等待状态(对于代理线程,表示其正在等待所代理的客户的响应)
#define PMD_IS_EDU_IDLE(x)        ( PMD_EDU_IDLE     == x )//空闲状态(已经回池)
#define PMD_IS_EDU_DESTROY(x)     ( PMD_EDU_DESTROY  == x )//销毁状态

typedef unsigned long long EDUID ;

enum EDU_TYPES//线程的类型
{
   // System EDU Type监听线程
   EDU_TYPE_TCPLISTENER = 0,
   // Agent EDU Type处理线程
   EDU_TYPE_AGENT,

   EDU_TYPE_UNKNOWN,//用于以后扩展
   EDU_TYPE_MAXIMUM = EDU_TYPE_UNKNOWN
} ;

enum EDU_STATUS//线程的状态
{
   PMD_EDU_CREATING = 0,
   PMD_EDU_RUNNING,
   PMD_EDU_WAITING,
   PMD_EDU_IDLE,
   PMD_EDU_DESTROY,
   PMD_EDU_UNKNOWN,
   PMD_EDU_STATUS_MAXIMUM = PMD_EDU_UNKNOWN
} ;

class pmdEDUMgr ;//线程池声明

class pmdEDUCB//线程控制块
{
public :
   pmdEDUCB ( pmdEDUMgr *mgr, EDU_TYPES type ) ;//从指定线程池得到指定类型的线程
   inline EDUID getID()//线程id
   {
      return _id ;
   }
   inline void postEvent ( pmdEDUEvent const &data )//发送事件
   {
      _queue.push ( data ) ;//实际上是将消息压入消息队列,每一个edu都有一个消息队列
   }
   bool waitEvent ( pmdEDUEvent &data, long long millsec )//millsec<0表示无限等待
   {
      // millsec小于零则无线等待
      bool waitMsg = false ;
      if ( PMD_EDU_IDLE != _status )
      {
         _status = PMD_EDU_WAITING ;
      }
      if ( 0 > millsec )
      {
         _queue.wait_and_pop ( data ) ;//无限等待
         waitMsg = true ;
      }
      else
      {
         waitMsg = _queue.timed_wait_and_pop ( data, millsec ) ;//超时等待
      }

      if ( waitMsg )
      {
         if ( data._eventType == PMD_EDU_EVENT_TERM )//结束事件:想结束edu
         {
            _isDisconnected = true ;
         }
         else
         {
            _status = PMD_EDU_RUNNING ;//回去继续执行
         }
      }
      return waitMsg ;
   }
   inline void force ()//强行停止当前线程
   {
      _isForced = true ;
   }
   inline void disconnect ()//断开连接
   {
      _isDisconnected = true ;
   }
   inline EDU_TYPES getType ()//得到当前的线程类型
   {
      return _type ;
   }
   inline EDU_STATUS getStatus ()//得到当前的线程状态
   {
      return _status ;
   }
   inline void setType ( EDU_TYPES type )//设置线程类型
   {
      _type = type ;
   }
   inline void setID ( EDUID id )//设置当前的id
   {
      _id = id ;
   }
   inline void setStatus ( EDU_STATUS status )//设置当前线程的状态
   {
      _status = status ;
   }
   inline bool isForced ()
   {
      return _isForced ;
   }
   inline pmdEDUMgr *getEDUMgr ()//得到当前线程对应的线程池
   {
      return _mgr ;
   }
private :
   EDU_TYPES  _type ;//线程类型
   pmdEDUMgr *_mgr ;//归属于哪个线程池
   EDU_STATUS _status ;//线程状态
   EDUID      _id ;//线程的eduid
   bool       _isForced ;//是否关闭线程
   bool       _isDisconnected ;//是否断开连接
   ossQueue<pmdEDUEvent> _queue ;//消息队列,每个edu都有一个消息队列
} ;

typedef int (*pmdEntryPoint) ( pmdEDUCB *, void * ) ;//线程入口函数的类型,函数指针
pmdEntryPoint getEntryFuncByType ( EDU_TYPES type ) ;//通过线程类型得到对应的入口函数

int pmdAgentEntryPoint ( pmdEDUCB *cb, void *arg ) ;//代理类型的函数指针
int pmdTcpListenerEntryPoint ( pmdEDUCB *cb, void *arg ) ;//监听类型的函数指针
int pmdEDUEntryPoint ( EDU_TYPES type, pmdEDUCB *cb, void *arg ) ;//线程的入口函数

int pmdRecv ( char *pBuffer, int recvSize,
              ossSocket *sock, pmdEDUCB *cb ) ;//tcp接收
int pmdSend ( const char *pBuffer, int sendSize,
              ossSocket *sock, pmdEDUCB *cb ) ;//tcp发送

#endif

1)diy数据库的线程类结构比较清晰,而且功能比较完整。我们首先从他的属性开始分析。

EDU_TYPES  _type;//线程类型

diydb中主要有两种线程类型,一种是监听线程,一种是代理线程。监听线程负责监听端口,并与客户端建立连接,然后从线程池取得一个代理线程去已连接的客户端的请求。一种是代理线程,所有与客户端的交互都是在代理线程中执行的。

pmdEDUMgr *_mgr;//线程所属的线程池

每一个数据库实例都有一个线程池,这个线程池是可以动态增长的。这个线程池管理者这个数据库实例中的所有线程。

EDUSTADUS _stadus;//线程状态

每一个线程都处于一个有限状态机中,线程一共有五种状态,当线程收到消息时,会在五种状态之间转换。

EDUID  id;//线程的edu id

bool       _isForced ;//是否关闭线程,一般由线程池来设置为true

bool       _isDisconnected ;//是否断开连接,在代理线程接收到客户端的disconnect命令后就会将本属性设置为true

ossQueue<pmdEDUEvent> _queue ;//消息队列,每个edu都有一个消息队列

2)线程的入口函数

typedef int (*pmdEntryPoint) ( pmdEDUCB *, void * ) ;//线程入口函数的类型,函数指针。

pmdEntryPoint getEntryFuncByType ( EDU_TYPES type ) ;//通过线程类型得到对应的入口函数

int pmdAgentEntryPoint ( pmdEDUCB *cb, void *arg ) ;//代理类型的函数指针

int pmdTcpListenerEntryPoint ( pmdEDUCB *cb, void *arg ) ;//监听类型的函数指针

int pmdEDUEntryPoint ( EDU_TYPES type, pmdEDUCB *cb, void *arg ) ;//线程的入口函数,实际上在函数体里面会根据type参数来调用对应的线程入口函数

三、线程间通信用的消息类

#ifndef PMDEDUEVENT_HPP__
#define PMDEDUEVENT_HPP__

#include "core.hpp"
enum pmdEDUEventTypes//消息类型
{
   PMD_EDU_EVENT_NONE = 0,//空的消息
   PMD_EDU_EVENT_TERM,     // 结束消息
   PMD_EDU_EVENT_MSG,
   PMD_EDU_EVENT_RESUME   // 唤醒消息
} ;

class pmdEDUEvent
{
public :
   pmdEDUEvent () :
   _eventType(PMD_EDU_EVENT_NONE),
   _release(false),
   _Data(NULL)
   {
   }

   pmdEDUEvent ( pmdEDUEventTypes type ) :
   _eventType(type),
   _release(false),
   _Data(NULL)
   {
   }

   pmdEDUEvent ( pmdEDUEventTypes type, bool release, void *data ) :
   _eventType(type),
   _release(release),
   _Data(data)
   {
   }

   void reset ()//清空
   {
      _eventType = PMD_EDU_EVENT_NONE ;
      _release = false ;
      _Data = NULL ;
   }

   pmdEDUEventTypes _eventType ;//事件类型
   bool             _release ;//释放
   void            *_Data ;//数据
} ;

#endif

我们可以看到一共有四类消息,_release表示收到该消息的线程是否马上释放这个消息中数据指针所指的数据,_Data是指向数据地址的指针,表示消息中所带的数据,这些消息主要是供线程间通信用的。

四、线程池

#ifndef PMDEDUMGR_HPP__
#define PMDEDUMGR_HPP__

#include "core.hpp"
#include "pmdEDU.hpp"
#include "ossLatch.hpp"
#include "ossUtil.hpp"

#define EDU_SYSTEM     0x01//系统级edu
#define EDU_USER       0x02//用户级edu
#define EDU_ALL        ( EDU_SYSTEM | EDU_USER )

class pmdEDUMgr//线程池
{
private :
   std::map<EDUID, pmdEDUCB*> _runQueue ;//运行的线程队列
   std::map<EDUID, pmdEDUCB*> _idleQueue ;//空闲的线程队列
   std::map<unsigned int, EDUID> _tid_eduid_map ;//线程池中所有线程id和其edu的映射

   ossSLatch _mutex ;//对线程队列的的访问时必须用到的读写锁
   EDUID _EDUID ;//new下一个edu时分配的id
   std::map<unsigned int, EDUID> _mapSystemEDUS ;//系统edu的类型和eduid之间的映射
   bool _isQuiesced ;//标识数据库是否被挂起,当数据库正在挂起时线程池不接受请求,挂起数据库是为了使DBA可以对数据库进行一些特殊的操作
   bool _isDestroyed ;//线程池是否被销毁
public :
   pmdEDUMgr () :
   _EDUID(1),
   _isQuiesced(false),
   _isDestroyed(false)
   {
   }

   ~pmdEDUMgr ()
   {
      reset () ;
   }
   void reset ()
   {
      _destroyAll () ;//删除线程池中的所有线程
   }

   unsigned int size ()//得到所有线程的数量
   {
      unsigned int num = 0 ;
      _mutex.get_shared () ;//共享锁,因为是读
      num = ( unsigned int ) _runQueue.size() +
            ( unsigned int ) _idleQueue.size () ;
      _mutex.release_shared () ;
      return num ;
   }

   unsigned int sizeRun ()//得到运行队列的长度
   {
      unsigned int num = 0 ;
      _mutex.get_shared () ;
      num = ( unsigned int ) _runQueue.size () ;
      _mutex.release_shared () ;
      return num ;
   }

   unsigned int sizeIdle ()
   {
      unsigned int num = 0 ;
      _mutex.get_shared () ;
      num = ( unsigned int ) _idleQueue.size () ;
      _mutex.release_shared () ;
      return num ;
   }

   unsigned int sizeSystem ()//系统edu的数量
   {
      unsigned int num = 0 ;
      _mutex.get_shared () ;
      num = _mapSystemEDUS.size() ;
      _mutex.release_shared () ;
      return num ;
   }

   EDUID getSystemEDU ( EDU_TYPES edu )//由系统edu类型得到其对应的eduid
   {
      EDUID eduID = PMD_INVALID_EDUID;
      _mutex.get_shared () ;
      std::map<unsigned int, EDUID>::iterator it = _mapSystemEDUS.find( edu ) ;
      if ( it != _mapSystemEDUS.end() )
      {
         eduID = it->second  ;
      }
      _mutex.release_shared () ;
      return eduID ;
   }

   bool isSystemEDU ( EDUID eduID )
   {
      bool isSys = false ;
      _mutex.get_shared () ;
      isSys = _isSystemEDU ( eduID ) ;
      _mutex.release_shared () ;
      return isSys ;
   }

   void regSystemEDU ( EDU_TYPES edu, EDUID eduid )//注册一个系统edu
   {
      _mutex.get() ;
      _mapSystemEDUS[ edu ] = eduid ;
      _mutex.release () ;
   }

   bool isQuiesced ()
   {
      return _isQuiesced ;
   }
   void setQuiesced ( bool b )
   {
      _isQuiesced = b ;
   }
   bool isDestroyed ()
   {
      return _isDestroyed ;
   }
   static bool isPoolable ( EDU_TYPES type )//线程edu的类型对应的线程是否能回收到线程池,目前只有agent线程的edu可以被回收,因为正常情况下监听线程是不会退出的
   {
      return ( EDU_TYPE_AGENT == type ) ;
   }

private :
   int _createNewEDU ( EDU_TYPES type, void *arg, EDUID *eduid ) ;//没有空闲edu时,创建新的edu
   int _destroyAll () ;//销毁所有的线程
   int _forceEDUs ( int property = EDU_ALL ) ;//干掉某种类型的edu
   unsigned int _getEDUCount ( int property = EDU_ALL ) ;
   void _setDestroyed ( bool b )
   {
      _isDestroyed = b ;
   }
   bool _isSystemEDU ( EDUID eduID )
   {
      std::map<unsigned int, EDUID>::iterator it = _mapSystemEDUS.begin() ;
      while ( it != _mapSystemEDUS.end() )
      {
         if ( eduID == it->second )
         {
            return true ;
         }
         ++it ;
      }
      return false ;
   }

   int _destroyEDU ( EDUID eduID ) ;//销毁一个特定的edu

   int _deactivateEDU ( EDUID eduID ) ;//把一个edu放回线程池
public :
   /*
    * EDU Status Transition Table
    * C: CREATING
    * R: RUNNING
    * W: WAITING
    * I: IDLE
    * D: DESTROY
    * 函数
    * c: createNewEDU
    * a: activateEDU
    * d: destroyEDU
    * w: waitEDU
    * t: deactivateEDU
    *   C   R   W   I   D  <--- from
    * C c
    * R -   -   a   a   -  <--- Creating/Idle/Waiting status can move to Running status
    * W w   w   -   -   -  <--- Running status move to Waiting
    * I -   -   t   -   -  <--- Creating/Waiting status move to Idle
    * D d   -   d   d   -  <--- Creating / Waiting / Idle can be destroyed
    * ^ To
    */

   int activateEDU ( EDUID eduID ) ;//将waiting和idle线程激活成running线程
   int waitEDU ( EDUID eduID ) ;//使edu从执行状态转换成等待状态
   int startEDU ( EDU_TYPES type, void* arg, EDUID *eduid ) ;//可以创建一个edu,并创建相应的线程。如果创建的是代理,而且线程池中有空闲线程则从线程池里面去拿,并把相应的EDU从空闲队列放到运行队列
   int postEDUPost ( EDUID eduID, pmdEDUEventTypes type,
                     bool release = false, void *pData = NULL ) ;//向一个指定线程的队列发送一个消息,将激活因为等待消息队列中的消息而阻塞的线程
   int waitEDUPost ( EDUID eduID, pmdEDUEvent& event,
                     long long millsecond ) ;//等待指定edu的事件
   int returnEDU ( EDUID eduID, bool force, bool* destroyed ) ;//把edu销毁或者回池
   int forceUserEDU ( EDUID eduID ) ;
   pmdEDUCB *getEDU ( unsigned int tid ) ;//通过线程id得到edu
   pmdEDUCB *getEDU () ;//得到当前的edu
   pmdEDUCB *getEDUByID ( EDUID eduID ) ;
   void setEDU ( unsigned int tid, EDUID eduid ) ;

} ;

#endif

分析

1、EDU 的状态迁移图

* C: CREATING

* R: RUNNING

* W: WAITING

* I: IDLE

* D: DESTROY

* 函数

* c: createNewEDU

* a: activateEDU

* d: destroyEDU

* w: waitEDU

* t: deactivateEDU

*   C   R   W   I   D  <--- from

* C c

* R -   -   a   a   -  <---Idle/Waiting status can move to Running status

* W w   w   -   -   -  <--- Creating/Running status move to Waiting

* I t   -   t   -   -  <--- Waiting status move to Idle

* D d   -   d   d   -  <--- Creating / Waiting / Idle can be destroyed

2、CREATING、 RUNNING、 WAITING这三种状态的线程的EDU存放在运行线程队列中,IDLE状态的线程放在空闲线程队列中,DESTROY线程的EDU不在任何线程队列中(因为当一个线程的状态为销毁状态时,他的线程管理块已经被线程池从两个线程队列中删除了),当线程实体检测到自己的状态为DESTROY时,该线程实体就会从线程执行函数返回,从而真正意义上结束线程

五、总结:

1、本文主要分析了Diydb的线程模块,实际上就是一个比较完善的线程池模型

2、线程池主要由以下几个类型实现:

线程控制块:主要是对线程实体的封装。每个线程控制块实体与一个线程实体对应,其包含了这个线程的状态、类型、对应的线程池以及该线程的消息队列等。

消             息:消息实体用来在线程间传递信息,通过这些消息可以传递普通的数据,也可以控制接收到消息的线程的状态

线程管理 类:整个Diydb中只有一个线程管理类的实例,她是线程池的核心,主要管理着两个队列,一个是运行线程队列,一个是空闲线程队列(空闲线程队列中只可能是代理线程)。她控制着所有线程的状态,从而来维护线程池。

3、本线程池中的线程分为系统线程(这里只有监听线程)和用户线程(这里只有代理线程),每个系统线程类型对应的只有一个线程实体,而每个用户线程类型的线程对应的是多个线程实体。

4、只有代理线程的状态可能是空闲线程,所以从某种意义上来讲这个线程池实际上只对代理线程的效率有益,尽管系统线程的EDU也在运行线程队列中,而且其状态也受线程管理类的控制。

5、线程池中的空闲线程不是系统启动时一次性创建完毕的,是当创建的代理线程处理完用户请求后(即用户断开连接),返回给空闲线程队列的(当然前提条件是当前空闲线程的个数小于maxpool,即空闲线程上限)。

6、线程池的大小是由一个参数(maxpool)来控制的,当线程池中空闲线程的个数等于maxpool时,运行完毕的线程就不允许回池。

7、当线程池中的空闲线程都用完以后,如果有新的用户请求过来,就会创建一个新的代理线程去处理这个用户的任务。所以,这个线程池中的线程数是可以动态变化的。

时间: 2025-01-01 09:32:12

diy数据库(七)--线程控制块、消息、线程池的相关文章

【转】Spring线程及线程池的使用

最近公司项目正逐渐从dubbo向springCloud转型,在本次新开发的需求中,全部使用springcloud进行,在使用时线程池,考虑使用spring封装的线程池,现将本次使用心得及内容记录下来 一.线程池常规使用方式 之前使用线程池的方式,都是自己定义线程池,然后写多线程类,用线程池去调用,如下: package cn.leadeon.message.client; import cn.leadeon.comm.log.Log; import cn.leadeon.message.req.

System、应用程序进程的Binder线程池和Handler消息循环

首先看一张Android系统启动流程图: 一个进程最重要的两项指标一个是启动了Binder线程池,也就是能够进程Binder进程间通信了.还有一个是启动了Handler消息循环,能够使用了消息循环机制. 1.那么systemserver进程是什么时候实现上面两个机制的呢?见代码: 启动了Binder线程池.是子线程池. public static final void zygoteInit(String[] argv) throws ZygoteInit.MethodAndArgsCaller

8天玩转并行开发——第七天 简要分析任务与线程池

原文:8天玩转并行开发--第七天 简要分析任务与线程池 其实说到上一篇,我们要说的task的知识也说的差不多了,这一篇我们开始站在理论上了解下“线程池”和“任务”之间的关系,不管是 说线程还是任务,我们都不可避免的要讨论下线程池,然而在.net 4.0以后,线程池引擎考虑了未来的扩展性,已经充分利用多核微处理器 架构,只要在可能的情况下,我们应该尽量使用task,而不是线程池. 首先看一下task的结构 从图中我们可以看出Task.Factory.StartNew()貌似等同于用ThreadPo

基于线程池、消息队列和epoll模型实现Client-Server并发架构

引言 并发是什么?企业在进行产品开发过程中为什么需要考虑这个问题?想象一下天猫的双11和京东的618活动,一秒的点击量就有几十万甚至上百万,这么多请求一下子涌入到服务器,服务器需要对这么多的请求逐个进行消化掉,假如服务器一秒的处理能力就几万,那么剩下的不能及时得到处理的这些请求作何处理?总不能让用户界面一直等着,因此消息队列应运而生,所有的请求都统一放入消息队列,工作线程从消息队列不断的消费,消息队列相当于一个缓冲区,可达到解藕.异步和削峰的目的. Kafka.ActiveMQ.RabbitMQ

【面试题2020-03-24】Java线程池七个参数详解

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the

多线程篇七:通过Callable和Future获取线程池中单个务完成后的结果

使用场景:如果需要拿到线程的结果,或者在线程完成后做其他操作,可以使用Callable 和 Futrue 1.定义一个线程池,向线程池中提交单个callable任务 ExecutorService threadPools=Executors.newSingleThreadExecutor(); Future<String> future=threadPools.submit(new Callable<String>() { @Override public String call(

JAVA 并发编程-线程池(七)

线程池的作用: 线程池作用就是限制系统中运行线程的数量. 依据系统的环境情况.能够自己主动或手动设置线程数量,达到运行的最佳效果:少了浪费了系统资源,多了造成系统拥挤效率不高.用线程池控制线程数量,其它线程排队等候.一个任务运行完毕,再从队列的中取最前面的任务開始运行. 为什么要用线程池: 1.降低了创建和销毁线程的次数,每一个工作线程都能够被反复利用.可运行多个任务. 2.能够依据系统的承受能力,调整线程池中工作线线程的数目,防止由于消耗过多的内存,而把server累趴下 Java里面线程池的

Java 多线程(七)——线程组与线程池

1 线程组 1.1 概述 Java中使用ThreadGroup来表示线程组,它可以对一批线程进行分类管理.对线程组的控管理,即同时控制线程组里面的这一批线程. 用户创建的所有线程都属于指定线程组,如果没有显示指定属于哪个线程组,那么该线程就属于默认线程组(即main线程组).默认情况下,子线程和父线程处于同一个线程组. 只有在创建线程时才能指定其所在的线程组,线程运行中途不能改变它所属的线程组,也就是说线程一旦指定所在的线程组,就直到该线程结束. 线程组与线程之间结构类似于树形的结构: 1.2

Linux组件封装(七)——线程池的简单封装

线程池的封装,基础思想与生产者消费者的封装一样,只不过我们是将线程池封装为自动获取任务.执行任务,让用户调用相应的接口来添加任务. 在线程池的封装中,我们同样需要用到的是MutexLock.Condition.Thread这些基本的封装. 基础封装如下: MutexLock: 1 #ifndef MUTEXLOCK_H 2 #define MUTEXLOCK_H 3 4 #include "NonCopyable.h" 5 #include <pthread.h> 6 #i