Loki之ThreadPool

Loki中的ThreadPool目的主要是对创建出来的线程进行复用。

ThreadPool在Test而非Loki目录下,因此并非是标准Loki的组件之一,不过我们可以对其修改定制,

下面是对其源码的大致分析,ThreadPool顾名思义线程池,一般我们使用线程的时候CreateThread调用我们的回调函数,当回调函数结束之后我们的线程也随之终结销毁,
这样出现的问题是我们想执行一个多线程任务都要CreateThread,我们是否能够CreateThread调用我们的回调函数结束之后暂时将这个线程保存

起来而不是直接调回,当下次我们想要执行一个多线程任务的时候只需要把回调函数传递给该多线程对象,让它再次调用我们新的回调函数,这个

就类似与STL中的内存池对内存的复用,只是STL内存池会对整块大内存进行分片链接到链表处理而已。

想要真正了解ThreadPool最好的办法是浏览源代码

首先是线程对象Thread,它有以下几种状态:

enum Status
    {
        Dead = 0,
        Idle,
        Starting,
        Active
    };

在创建线程并启动的时候起内部调用回调函数是

#if defined( _MSC_VER )
    unsigned int ThreadPool::TopFunction( void * p )
#else
    void * ThreadPool::TopFunction( void * p )
#endif
{
    assert( nullptr != p );

volatile Thread * thread = reinterpret_cast< volatile Thread * >( p );
    Thread::SetCurrentThread( thread );
    while ( ( thread->m_status != Thread::Dead ) && ( !thread->m_stop ) )
    {
        // Call the thread‘s WaitPolicy here?
#if defined( _MSC_VER )
        ::SleepEx( 1, true );
#else
        ::sleep( 1 );
#endif
        if ( thread->m_status == Thread::Starting ) //如果线程状态变为Starting,说明要复用该线程,调用用户的回调函数
        {
            try
            {
                assert( nullptr != thread->m_func );
                thread->m_status = Thread::Active;
                thread->m_func( thread->m_parm );
            }
            catch ( ... )
            {
                // What to do in case of exception?
                // Call an exception policy?
            }

    //用户回调函数运行结束后将线程状态设置为空闲,以便下次复用
            thread->m_status = Thread::Idle;
            thread->m_func = nullptr;
            thread->m_parm = nullptr;
        }
    }

#if defined( _MSC_VER )
    return 0;
#else
    return nullptr;
#endif
}

//该函数判读的是用户的回调函数是否执行结束,而非ThreadPool传递给它的TopFunction是否结束,

//返回true仅仅说明用户的回调函数执行结束,实际该线程依然存在于内存池中

bool Thread::WaitForThread( void ) volatile
{
    assert( IsValid( m_owner ) );
    const volatile Thread * current = Thread::GetCurrentThread();
    if ( this == current )
        return false;
    if ( m_status == Thread::Dead )
        return false;
    while ( this->m_status == Thread::Active )
    {
        // Call the wait policy.
#if defined( _MSC_VER )
        ::SleepEx( 1, true );
#else
        ::sleep( 1 );
#endif
    }
    return true;
}

下面介绍ThreadPool,ThreadPool使用vector对线程进行保存,主要列举关键的几个函数进行说明

//创建threadCount个多线程于内存池中
unsigned int ThreadPool::Create( unsigned int threadCount ) volatile
{
    assert( IsValid() );
    LOKI_DEBUG_CODE( Checker checker( this ); (void)checker; )

ThreadPool * pThis = const_cast< ThreadPool * >( this );

//计算内存池中Idle状态的线程数目,如果不够客户要求则创建足够的线程放到线程池中
    const unsigned int countNow = GetCount( Thread::Idle );
    if ( threadCount <= countNow )
        return threadCount;

const unsigned int totalCount = pThis->m_threads.size();
    const unsigned int howManyToAdd = threadCount - countNow;
    if ( pThis->m_threads.capacity() <= howManyToAdd )
        pThis->m_threads.reserve( totalCount + howManyToAdd );
    for ( unsigned int ii = 0; ii < howManyToAdd; ++ii )
    {
#if defined( _MSC_VER )
        volatile Thread * thread = new Thread( this );
#else
        Thread * thread = new Thread( this );
#endif
        pThis->m_threads.push_back( thread );
        Thread * pThread = const_cast< Thread * >( thread );
        void * p = reinterpret_cast< void * >( pThread );
        // Call thread creation policy? 此处可以调用创建线程策略,留给读者进行改进
        LokiThreadCreate( &thread->m_thread, nullptr, TopFunction, p );
    }
    return howManyToAdd;
}

//开始执行用户的回调函数,function为用户的回调函数,parm为传递给function的void指针
volatile Thread * ThreadPool::Start( CallFunction function, void * parm ) volatile
{
    assert( IsValid() );
    LOKI_DEBUG_CODE( Checker checker( this ); (void)checker; )
    ThreadPool * pThis = const_cast< ThreadPool * >( this );

if ( nullptr == function )
        return nullptr;
#if defined( _MSC_VER )
    volatile Thread * thread = nullptr;
#else
    Thread * thread = nullptr;
#endif
    bool foundOne = false;
  //查找内存池中是否有idle线程
    for ( size_t ii = 0; ii < pThis->m_threads.size(); ii++ )
    {
#if defined( _MSC_VER )
        thread = pThis->m_threads.at( ii );
#else
        thread = const_cast< Thread * >( pThis->m_threads.at( ii ) );
#endif
        assert( nullptr != thread );
        if ( Thread::Idle == thread->m_status )
        {
            foundOne = true;
            break;
        }
    }
  //存在空闲线程的话进行复用,否则创建一个线程并放入内存池vector中,

  //对相应参数赋值,设置线程状态为Starting之后在内存池回调函数TopFunction中会自动调用用户回调函数function,线程状态设置为active,

  //function结束之后在将线程状态设置为idle,这样就可以达到线程复用的效果
    if ( foundOne )
    {
        thread->m_func = function;
        thread->m_parm = parm;
        thread->m_status = Thread::Starting;
    }
    else
    {
        // Did not find an idle thread, so start a new one.
        thread = new Thread( this, function, parm );
        pThis->m_threads.push_back( thread );
        Thread * pThread = const_cast< Thread * >( thread );
        void * p = reinterpret_cast< void * >( pThread );
        // Call to thread creation policy?
        LokiThreadCreate( &thread->m_thread, nullptr, TopFunction, p );
    }

return thread;
}

该线程池比较容易理解与使用,但是觉得不太好的地方就是TopFunction中循环的SleepEx部分,以及线程创建之后必须要再ThreadPool析构之后才会销毁,在Start的时候如果找不到idle线程

就会创建一个新的线程push_back到vector中,不能够保证控制线程在一个数量下。

设计改进:

用户希望内存池最多用n个线程,Start函数根据当前的线程数量可以在找不到idle线程的时候创建线程,但是达到n之后不能够在创建,这样我们可以把用户的function和parm保存

到一个队列当中,当线程池中的线程有idle线程的时候我们从任务队列中取出元素执行之

时间: 2024-11-04 07:40:59

Loki之ThreadPool的相关文章

c#:ThreadPool实现并行分析,并实现线程同步结束

背景: 一般情况下,经常会遇到一个单线程程序时执行对CPU,MEMORY,IO利用率上不来,且速度慢下问题:那么,怎么解决这些问题呢? 据我个人经验来说有以下两种方式: 1.并行.多线程(Parallel.Task.ThreadPool) 2.多进程MutilpleProcess 恰好工作中又一次遇到单线程程序性能低的问题,本次我主要想尝试使用ThreadPool来实现多线程,并且在实现多线程任务同步结束. 测试代码: 1 static void Main(string[] args) 2 {

ThreadPool.QueueUserWorkItem引发的血案,线程池异步非正确姿势导致程序闪退的问题

ThreadPool是.net System.Threading命名空间下的线程池对象.使用QueueUserWorkItem实现对异步委托的先进先出有序的回调.如果在回调的方法里面发生异常则应用程序会出现闪退.当然是指不处理那个异常的情况下.这不公司的CMS在生产环境频频出现闪退的情况.该死的是,原来用老机器配置不高的情况下没有出现过.换了更好的新机器后出现的. // // 摘要: // 将方法排入队列以便执行,并指定包含该方法所用数据的对象.此方法在有线程池线程变得可用时执行. // //

boost之ThreadPool

boost之ThreadPool 版权声明:本文为博主原创文章,未经博主允许不得转载. threadpool是基于boost库实现的一个线程池子库,但线程池实现起来不是很复杂.我们从threadpool中又能学到什么东西呢? 它是基于boost库实现的,如果大家对boost库有兴趣,看看一个简单的实现还是可以学到点东西的. threadpool基本功能 1.任务封装,包括普通任务(task_func)和优先级任务(prio_task_func). 2.调度策略,包括fifo_scheduler(

C#多线程--线程池(ThreadPool)

先引入一下线程池的概念: 百度百科:线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中.如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙.如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值.超过最大值的线程可以排队,但他们要等到其他线程

ThreadPool.QueueUserWorkItem

ThreadPool.QueueUserWorkItem(_ => { HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://www.baidu/tag/模特?page=" + pageNum); request.Credentials = System.Net.CredentialCache.DefaultCredentials; HttpWebResponse response = (HttpWe

线程(Thread)、线程池(ThreadPool)技术

线程:是Windows任务调度的最小单位.线程是程序中的一个执行流,每个线程都有自己的专有寄存器(栈指针.程序计数器等),但代码区是共享的,即不同的线程可以执行同样的函数,在一个应用程序中,常常需要使用多个线程来处理不同的事情,这样可以提高程序的运行效率,也不会使主界面出现无响应的情况.在这里主要介绍线程(Thread).线程池(ThreadPool)两种不同创建线程的区别 在通常的情况下,当我们需要开启一个新的线程时,我们直接通过Thread(继承自 System.Threading;)去创建

[原创]loki库之内存池SmallObj

loki库之内存池SmallObj 介绍 loki库的内存池实现主要在文件smallobj中,顾名思义它的优势主要在小对象的分配与释放上,loki库是基于策略的方法实现的,简单的说就是把某个类通过模板参数传递给主类,比如某个对象的创建可以通过不同的创建策略进行创建,本文主要讲loki的大致实现. smallobj层次 loki.smallobj主要分四层: 应用层smallobject,重载了operator new 和operator delete,内存通过底层获取 内存分配smallobjA

C# 多线程处理相关说明: WaitHandle,waitCallback, ThreadPool.QueueUserWorkItem

class TestThread { static void Main() { //使用WaitHandle静态方法阻止一个线程,直到一个或多个同步对象接收到信号 WaitHandle[] waitHandles = new WaitHandle[] { new ManualResetEvent(false), new ManualResetEvent(false) }; WaitCallback waitCallback = new WaitCallback(MyThreadWork); Wa

ThreadPool线程池

1.GetMaxThreads,GetMinThreads class Program { static void Main(string[] args) { int workerThreads; int completePortsThreads; ThreadPool.GetMaxThreads(out workerThreads, out completePortsThreads); Console.WriteLine("线程池中最大的线程数{0},线程池中异步IO线程的最大数目{1}&qu