Whether Thread Pool is Needed for You?

你到底需不需要内存池?

先问自己两个问题:

是否有很多请求需要重复性的进行处理?

而且每个请求是相互独立的?

你是否需要等待IO操作,或是文件操作?

如果你回答YES,那么你需要一个线程池来帮助你。

我们为什么需要内存池?

通常情况下,IO操作都会需要很长的一段时间才能完成。所以,在一个单线程的应用程序中,在IO操作期间,系统资源都会进行等待。如果使用多线程,效率就会大大的提高。所以我们需要线程池更高效的完成多线程操作。

内存池会有什么样的作用?

一个应用程序其实应该避免频繁的创建和销毁线程。

内存池就是可以帮助管理线程,避免频繁的创建和销毁,更加高效。

线程池的设计

线程池会创建一些特定的线程,并且等待被请求。一旦有请求到达了线程池,一个线程就会被激活并且开始执行。当这个请求完成的时候,这个线程会退回到等待请求的状态。当用户请求destroy的时候,线程池中所有的线程都会退出。

设计图如下:

ThreadPool

这个类将会创建、管理、析构线程池。所以,你的应用程序需要创建ThreadPool类的一个对象。

AbstractRequest

这表示线程池的一个请求。客户端应用程序需要继承这个类,并且根据自己的情况实现线程中要执行的函数。

Logger

记录一些日志,错误等信息。

源代码:

.h

#ifndef _THREAD_POOL_MGR_H_
#define _THREAD_POOL_MGR_H_

#include <windows.h>
#include <list>

namespace TP
{

    /**
     * Logger - This is base class for the error logger class and it is polymorphic.
     *          The users of the ThreadPool create a class which derived from this
     *          and override LogError() and LogInfo() for their own error logging mechanism.
     *          The default error logging will be in output window.
     */
    class Logger
    {

    public:

        // Constructor
        Logger(){};
        // Destructor
        virtual ~Logger(){};
        // Log error description.
        void LogError( const long lActiveReq_i, const std::wstring& wstrError_i );
        // Log information.
        void LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i );
        // Override this function to log errors. Default log will be in output window.
        virtual void LogError( const std::wstring& wstrError_i );
        // Override this function to log informations. Default log will be in output window.
        virtual void LogInfo( const std::wstring& wstrInfo_i );

    private:

        // Log thread ID, Active thread count and last error.
        void PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io );
    };

    /**
     * SyncObject - The class is a wrapper of Critical section object to provide
     *              synchronization for thread pool.
     */
    class SyncObject
    {

    public:
        // Constructor
        SyncObject()
        {
            ::InitializeCriticalSection( &m_stCriticalSection );
        }

        // Destructor
        ~SyncObject()
        {
            ::DeleteCriticalSection( &m_stCriticalSection );
        }

        // Lock critical section.
        bool Lock()
        {
            ::EnterCriticalSection( &m_stCriticalSection );
            return true;
        }

        // Unlock critical section.
        bool Unlock()
        {
            ::LeaveCriticalSection( &m_stCriticalSection );
            return true;
        }

    private:
        SyncObject( const SyncObject& );
        SyncObject& operator = ( const SyncObject& );

    private:

        // Critical section object.
        CRITICAL_SECTION m_stCriticalSection;
    };

    /**
     * AutoLock - This class own synchronization object during construction and
     *            release the ownership during the destruction.
     */
    class AutoLock
    {

    public:

        /**
         * Parameterized constructor
         *
         * @param       LockObj_i - Synchronization object.
         * @return      Nil
         * @exception   Nil
         * @see         Nil
         * @since       1.0
         */
        AutoLock( SyncObject& LockObj_i ) : m_pSyncObject( &LockObj_i )
        {
            if( NULL != m_pSyncObject )
            {
                m_pSyncObject->Lock();
            }
        }

        /**
         * Destructor.
         *
         * @param       Nil
         * @return      Nil
         * @exception   Nil
         * @see         Nil
         * @since       1.0
         */
        ~AutoLock()
        {
            if( NULL != m_pSyncObject )
            {
                m_pSyncObject->Unlock();
                m_pSyncObject = NULL;
            }
        }

    private:
        SyncObject* m_pSyncObject;
    };

    /**
     * AbstractRequest - This is abstract base class for the request to be processed in thread pool.
     *                   and it is polymorphic. The users of the ThreadPool must create a class
     *                   which derived from this and override Execute() function.
     */
    class AbstractRequest
    {

    public:
        // Constructor
        AbstractRequest() : m_bAborted( false ), m_usRequestID( 0u ){}
        // Destructor
        virtual ~AbstractRequest(){}
        // Thread procedure to be override in derived class. This function should return if request aborted.
        // Abort request can check by calling IsAborted() function during time consuming operation.
        virtual long Execute() = 0;
        // Set request ID.
        void SetRequestID( unsigned short uRequestID_i )
        {
            AutoLock LockRequest( m_LockWorkerThread );
            m_usRequestID = uRequestID_i;
        }
        // Get request ID.
        unsigned short GetRequestID()
        {
            AutoLock LockRequest( m_LockWorkerThread );
            return m_usRequestID;
        }
        // Abort the processing of the request.
        void Abort()
        {
            AutoLock LockRequest( m_LockWorkerThread );
            m_bAborted = true;
        }
        // Clear abort flag for re-posting the same request.
        void ClearAbortFlag()
        {
            AutoLock LockRequest( m_LockWorkerThread );
            m_bAborted = false;
        }

    protected:
        // Check for the abort request
        bool IsAborted()
        {
            AutoLock LockRequest( m_LockWorkerThread );
            return m_bAborted;
        }
        // Prepare error or information log.
        void PrepareLog( std::wstring& wstrLog_io );

    protected:
        // Synchronization object for resource locking.
        SyncObject m_LockWorkerThread;

    private:
        // Abort flag.
        bool m_bAborted;
        // Request Identifier.
        unsigned short m_usRequestID;

    };

    /**
     * AutoCounter - Increment and decrement counter
     */
    class AutoCounter
    {

    public:
        // Constructor.
        AutoCounter( unsigned short& usCount_io,
                     SyncObject& Lock_io ) :
                     m_usCount( usCount_io ), m_LockThread( Lock_io )
        {
            AutoLock Lock( m_LockThread );
            m_usCount++;
        }

        // Destructor.
        ~AutoCounter()
        {
            AutoLock Lock( m_LockThread );
            m_usCount--;
        }

    private:
        // Counter variable.
        unsigned short& m_usCount;
        // Synchronization object for resource locking.
        SyncObject& m_LockThread;
    };

    typedef std::list<AbstractRequest*> REQUEST_QUEUE;

    /**
     * ThreadPool - This class create and destroy thread pool based on the request.
     *              The requested to be processed can be post to pool as derived object of
     *              AbstractRequest. Also a class can be derive from Logger to error and
     *              information logging.
     */
    class ThreadPool
    {

    public:
        // Constructor.
        ThreadPool();
        // Destructor.
        ~ThreadPool();

        // Create thread pool with specified number of threads.
        bool Create( const unsigned short usThreadCount_i, Logger* pLogger_io = NULL );
        // Destroy the existing thread pool.
        bool Destroy();
        // Post request to thread pool for processing.
        bool PostRequest( AbstractRequest* pRequest_io );

    private:
        AbstractRequest* PopRequest( REQUEST_QUEUE& RequestQueue_io );
        bool AddThreads();
        bool NotifyThread();
        bool ProcessRequests();
        bool WaitForRequest();
        bool DestroyPool();
        bool IsDestroyed();
        void SetDestroyFlag( const bool bFlag_i );
        void CancelRequests();
        void LogError( const std::wstring& wstrError_i );
        void LogInfo( const std::wstring& wstrInfo_i );
        static UINT WINAPI ThreadProc( LPVOID pParam_i );

    private:
        ThreadPool( const ThreadPool& );
        ThreadPool& operator = ( const ThreadPool& );

    private:
        // Used for thread pool destruction.
        bool m_bDestroyed;
        // Hold thread count in the pool.
        unsigned short m_usThreadCount;
        // Released semaphore count.
        unsigned short m_usSemaphoreCount;
        // Active thread count.
        unsigned short m_lActiveThread;
        // Active thread count.
        unsigned short m_usPendingReqCount;
        // Manage active thread count in pool.
        HANDLE m_hSemaphore;
        // Hold thread handles.
        HANDLE* m_phThreadList;
        // Request queue.
        REQUEST_QUEUE m_RequestQueue;
        // Synchronization object for resource locking.
        SyncObject m_LockWorkerThread;
        // User defined error and information logger class.
        Logger* m_pLogger;
        // Default error and information logger.
        Logger m_Logger;
    };
} // namespace TP

#endif // #ifndef _THREAD_POOL_MGR_H_

.cpp

/**
 * @author :    Suresh
 */

#include "ThreadPool.h"
#include <sstream>
#include <iomanip>

namespace TP
{

    /**
     * Log error description.
     *
     * @param       lActiveReq_i - Count of active requests.
     * @param       wstrError_i  - Error message.
     */
    void Logger::LogError( const long lActiveReq_i, const std::wstring& wstrError_i )
    {
        std::wstring wstrLog( wstrError_i );
        PrepareLog( lActiveReq_i, wstrLog );
        LogError( wstrLog );
    }

    /**
     * Log information.
     *
     * @param       lActiveReq_i - Count of active requests.
     * @param       wstrInfo_i   - Information message.
     */
    void Logger::LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i )
    {
        std::wstring wstrLog( wstrInfo_i );
        PrepareLog( lActiveReq_i, wstrLog );
        LogInfo( wstrLog );
    }

    /**
     * Override this function to log errors. Default log will be in output window.
     *
     * @param       wstrError_i  - Error description
     */
    void Logger::LogError( const std::wstring& wstrError_i )
    {
        OutputDebugString( wstrError_i.c_str());
    }

    /**
     * Override this function to log informations. Default log will be in output window.
     *
     * @param       wstrInfo_i   - Information description.
     */
    void Logger::LogInfo( const std::wstring& wstrInfo_i )
    {
        OutputDebugString( wstrInfo_i.c_str());
    }

    /**
     * Log thread ID, Active thread count and last error.
     *
     * @param       lActiveReq_i - Active thread count.
     * @param       wstrLog_io   - Error or information description
     */
    void Logger::PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io )
    {
        std::wstringstream wstrmLog;
        wstrmLog << L"##TP## [TID=" << std::setfill( L‘0‘ ) << std::setw(8) << ::GetCurrentThreadId()
                 << L"] [ACTIVE REQUEST=" << std::setw(4) << lActiveReq_i
                 << L"] [LAST ERROR=" << std::setw(4) << ::GetLastError()
                 << L"] " << wstrLog_io.c_str() << + L"]";
        wstrLog_io = wstrmLog.str();
    }

    /**
     * Prepare error or information log.
     *
     * @param       wstrLog_io - Log information
     */
    void AbstractRequest::PrepareLog( std::wstring& wstrLog_io )
    {
        std::wstringstream wstrmLog;
        wstrmLog << std::setfill( L‘0‘ );
        wstrmLog << L"##RQ## [RID=" << std::setw(8) << GetRequestID()
                 << L"] [Desc=" << wstrLog_io.c_str() << + L"]";
        wstrLog_io = wstrmLog.str();
    }

    /**
     * Constructor
     */
    ThreadPool::ThreadPool() : m_bDestroyed( false ),
                               m_usThreadCount( 0u ),
                               m_usSemaphoreCount( 0u ),
                               m_lActiveThread( 0u ),
                               m_usPendingReqCount( 0u ),
                               m_hSemaphore( NULL ),
                               m_phThreadList( NULL ),
                               m_pLogger( &m_Logger )
    {
    }

    /**
     * Destructor
     */
    ThreadPool::~ThreadPool()
    {
        if( NULL != m_phThreadList )
        {
            if( !Destroy())
            {
                LogError( L"Destroy() failed" );
            }
        }
    }

    /**
     * Create thread pool with specified number of threads.
     *
     * @param       usThreadCount_i - Thread count.
     * @param       pLogger_i       - Logger instance to log errors and informations
     */
    bool ThreadPool::Create( const unsigned short usThreadCount_i, Logger* pLogger_i )
    {
        try
        {
            // Assign logger object. If user not provided then use existing and
            // error will be logged in output window.
            m_pLogger = ( NULL != pLogger_i ) ? pLogger_i : &m_Logger;
            // Check thread pool is initialized already.
            if( NULL != m_phThreadList )
            {
                LogError( L"ThreadPool already created" );
                return false;
            }
            // Validate thread count.
            if( 0 == usThreadCount_i )
            {
                LogError( L"Minimum allowed thread count is one" );
                return false;
            }
            if( usThreadCount_i > 64 )
            {
                LogError( L"Maximum allowed thread count is 64" );
                return false;
            }
            LogInfo( L"Thread pool creation requested" );

            // Initialize values.
            m_lActiveThread = 0u;
            m_usSemaphoreCount = 0u;
            m_usPendingReqCount = 0u;
            m_usThreadCount = usThreadCount_i;
            // Create semaphore for thread count management.
            m_hSemaphore = CreateSemaphore( NULL, 0, m_usThreadCount, NULL );
            if( NULL == m_hSemaphore )
            {
                LogError( L"Semaphore creation failed" );
                m_usThreadCount = 0u;
                return false;
            }
            // Create worker threads and make pool active
            if( !AddThreads())
            {
                LogError( L"Threads creation failed" );
                Destroy();
                return false;
            }
            SetDestroyFlag( false );
            LogInfo( L"Thread pool created successfully" );
            return true;
        }
        catch( ... )
        {
            LogError( L"Exception occurred in Create()" );
            return false;
        }
    }

    /**
     * Destroy thread pool.
     */
    bool ThreadPool::Destroy()
    {
        try
        {
            // Check whether thread pool already destroyed.
            if( NULL == m_phThreadList )
            {
                LogError( L"ThreadPool is already destroyed or not created yet" );
                return false;
            }
            // Cancel all requests.
            CancelRequests();
            // Set destroyed flag to true for exiting threads.
            SetDestroyFlag( true );
            // Release remaining semaphores to exit thread.
            {
                AutoLock LockThread( m_LockWorkerThread );
                if( m_lActiveThread < m_usThreadCount )
                {
                    if( NULL == ReleaseSemaphore( m_hSemaphore, m_usThreadCount - m_lActiveThread, NULL ))
                    {
                        LogError( L"Failed to release Semaphore" );
                        return false;
                    }
                }
            }
            // Wait for destroy completion and clean the thread pool.
            if( !DestroyPool())
            {
                LogError( L"Thread pool destruction failed" );
                return false;
            }
            LogInfo( L"Thread Pool destroyed successfully" );
            return true;
        }
        catch( ... )
        {
            LogError( L"Exception occurred in Destroy()" );
            return false;
        }
    }

    /**
     * Post request to thread pool for processing
     *
     * @param       pRequest_io - Request to be processed.
     */
    bool ThreadPool::PostRequest( AbstractRequest* pRequest_io )
    {
        try
        {
            AutoLock LockThread( m_LockWorkerThread );
            if( NULL == m_phThreadList )
            {
                LogError( L"ThreadPool is destroyed or not created yet" );
                return false;
            }
            m_RequestQueue.push_back( pRequest_io );
            if( m_usSemaphoreCount < m_usThreadCount )
            {
                // Thread available to process, so notify thread.
                if( !NotifyThread())
                {
                    LogError( L"NotifyThread failed" );
                    // Request notification failed. Try after some time.
                    m_usPendingReqCount++;
                    return false;
                }
            }
            else
            {
                // Thread not available to process.
                m_usPendingReqCount++;
            }
            return true;
        }
        catch( ... )
        {
            LogError( L"Exception occurred in PostRequest()" );
            return false;
        }
    }

    /**
     * Pop request from queue for processing.
     *
     * @param       RequestQueue_io  - Request queue.
     * @return      AbstractRequest* - Request pointer.
     */
    AbstractRequest* ThreadPool::PopRequest( REQUEST_QUEUE& RequestQueue_io )
    {
        AutoLock LockThread( m_LockWorkerThread );
        if( !RequestQueue_io.empty())
        {
            AbstractRequest* pRequest = RequestQueue_io.front();
            RequestQueue_io.remove( pRequest );
            return pRequest;
        }
        return 0;
    }

    /**
     * Create specified number of threads. Initial status of threads will be waiting.
     */
    bool ThreadPool::AddThreads()
    {
        try
        {
            // Allocate memory for all threads.
            m_phThreadList = new HANDLE[m_usThreadCount];
            if( NULL == m_phThreadList )
            {
                LogError( L"Memory allocation for thread handle failed" );
                return false;
            }
            // Create worker threads.
            DWORD dwThreadID = 0;
            for( unsigned short usIdx = 0u; usIdx < m_usThreadCount; usIdx++ )
            {
                // Create worker thread
                m_phThreadList[usIdx] = CreateThread( 0, 0,
                                                      reinterpret_cast<LPTHREAD_START_ROUTINE>( ThreadPool::ThreadProc ),
                                                      this, 0, &dwThreadID );
                if( NULL == m_phThreadList[usIdx] )
                {
                    LogError( L"CreateThread failed" );
                    return false;
                }
            }
            return true;
        }
        catch( ... )
        {
            LogError( L"Exception occurred in AddThreads()" );
            return false;
        }
    }

    /**
     * Add request to queue and release semaphore by one.
     */
    bool ThreadPool::NotifyThread()
    {
        try
        {
            AutoLock LockThread( m_LockWorkerThread );
            // Release semaphore by one to process this request.
            if( NULL == ReleaseSemaphore( m_hSemaphore, 1, NULL ))
            {
                LogError( L"ReleaseSemaphore failed" );
                return false;
            }
            m_usSemaphoreCount++;
            return true;
        }
        catch( ... )
        {
            LogError( L"Exception occurred in NotifyThread()" );
            m_RequestQueue.pop_back();
            return false;
        }
    }

    /**
     * Process request in queue.
     */
    bool ThreadPool::ProcessRequests()
    {
        bool bContinue( true );
        do
        {
            try
            {
                LogInfo( L"Thread WAITING" );
                // Wait for request.
                if( !WaitForRequest())
                {
                    LogError( L"WaitForRequest() failed" );
                    continue;
                }
                // Thread counter.
                AutoCounter Counter( m_lActiveThread, m_LockWorkerThread );
                LogInfo( L"Thread ACTIVE" );
                // Check thread pool destroy request.
                if( IsDestroyed())
                {
                    LogInfo( L"Thread EXITING" );
                    break;
                }
                // Get request from request queue.
                AbstractRequest* pRequest = PopRequest( m_RequestQueue );
                if( NULL == pRequest )
                {
                    LogError( L"PopRequest failed" );
                    continue;
                }
                // Execute the request.
                long lReturn = pRequest->Execute();
                if( NULL != lReturn )
                {
                    LogError( L"Request execution failed" );
                    continue;
                }
                // Check thread pool destroy request.
                if( IsDestroyed())
                {
                    LogInfo( L"Thread EXITING" );
                    break;
                }
                AutoLock LockThread( m_LockWorkerThread );
                // Inform thread if any pending request.
                if( m_usPendingReqCount > 0 )
                {
                    if( m_usSemaphoreCount < m_usThreadCount )
                    {
                        // Thread available to process, so notify thread.
                        if( !NotifyThread())
                        {
                            LogError( L"NotifyThread failed" );
                            continue;
                        }
                        m_usPendingReqCount--;
                    }
                }
            }
            catch( ... )
            {
                LogError( L"Exception occurred in ProcessRequests()" );
                continue;
            }
        }
        while( bContinue );
        return true;
    }

    /**
     * Wait for request queuing to thread pool.
     */
    bool ThreadPool::WaitForRequest()
    {
        try
        {
            // Wait released when requested queued.
            DWORD dwReturn = WaitForSingleObject( m_hSemaphore, INFINITE );
            if( WAIT_OBJECT_0 != dwReturn )
            {
                LogError( L"WaitForSingleObject failed" );
                return false;
            }
            AutoLock LockThread( m_LockWorkerThread );
            m_usSemaphoreCount--;
            // Clear previous error.
            ::SetLastError( 0 );
            return true;
        }
        catch( ... )
        {
            LogError( L"Exception occurred in WaitForRequest()" );
            return false;
        }
    }

    /**
     * Destroy and clean up thread pool.
     */
    bool ThreadPool::DestroyPool()
    {
        try
        {
            // Wait for the exist of threads.
            DWORD dwReturn = WaitForMultipleObjects( m_usThreadCount, m_phThreadList, TRUE, INFINITE );
            if( WAIT_OBJECT_0 != dwReturn )
            {
                LogError( L"WaitForMultipleObjects failed" );
                return false;
            }
            // Close all threads.
            for( USHORT uIdx = 0u; uIdx < m_usThreadCount; uIdx++ )
            {
                if( TRUE != CloseHandle( m_phThreadList[uIdx] ))
                {
                    LogError( L"CloseHandle failed for threads" );
                    return false;
                }
            }
            // Clear memory allocated for threads.
            delete[] m_phThreadList;
            m_phThreadList = 0;
            // Close the semaphore
            if( TRUE != CloseHandle( m_hSemaphore ))
            {
                LogError( L"CloseHandle failed for semaphore" );
                return false;
            }
            // Clear request queue.
            m_RequestQueue.clear();
            return true;
        }
        catch( ... )
        {
            LogError( L"Exception occurred in DestroyPool()" );
            return false;
        }
    }

    /**
     * Check for destroy request.
     */
    inline bool ThreadPool::IsDestroyed()
    {
        // Avoid synchronization issues if destroy requested after validation.
        AutoLock LockThread( m_LockWorkerThread );
        // During thread pool destruction all semaphores are released
        // to exit all threads.
        return m_bDestroyed;
    }

    /**
     * Set destroy flag
     */
    inline void ThreadPool::SetDestroyFlag( const bool bFlag_i )
    {
        AutoLock LockThread( m_LockWorkerThread );
        m_bDestroyed = bFlag_i;
    }

    /**
     * Cancel all processing request in pool.
     */
    void ThreadPool::CancelRequests()
    {
        try
        {
            // Avoid synchronization issues if destroy requested after validation.
            AutoLock LockThread( m_LockWorkerThread );
            LogInfo( L"Thread pool destroy requested" );
            // Clear main queue.
            m_RequestQueue.clear();
        }
        catch( ... )
        {
            LogError( L"Exception occurred in CancelRequests()" );
        }
    }

    /**
     * Log error in thread pool.
     *
     * @param       wstrError_i - Error description.
     */
    void ThreadPool::LogError( const std::wstring& wstrError_i )
    {
        if( NULL != m_pLogger )
        {
            m_pLogger->LogError( m_lActiveThread, wstrError_i );
        }
    }

    /**
     * Log information in thread pool.
     *
     * @param       wstrInfo_i - Information description.
     */
    void ThreadPool::LogInfo( const std::wstring& wstrInfo_i )
    {
        if( NULL != m_pLogger )
        {
            m_pLogger->LogInfo( m_lActiveThread, wstrInfo_i );
        }
    }

    /**
     * worker thread procedure.
     *
     * @param       pParam_i - ThreadPool instance.
     * @return      UINT      - Return 0 on success.
     */
    UINT ThreadPool::ThreadProc( LPVOID pParam_i )
    {
        ThreadPool* pThreadPool = NULL;
        try
        {
            ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>( pParam_i );
            if( NULL == pThreadPool )
            {
                return 1;
            }
            if( !pThreadPool->ProcessRequests())
            {
                pThreadPool->LogError( L"ProcessRequests() failed" );
                return 1;
            }
            return 0;
        }
        catch( ... )
        {
            if( NULL !=  pThreadPool )
            {
                pThreadPool->LogError( L"Exception occurred in ThreadProc()" );
            }
            return 1;
        }
    }
} // namespace TP
时间: 2024-12-30 23:19:34

Whether Thread Pool is Needed for You?的相关文章

Improve Scalability With New Thread Pool APIs

Pooled Threads Improve Scalability With New Thread Pool APIs Robert Saccone Portions of this article are based on a prerelease version of Windows Server 2008. Details contained herein are subject to change. Code download available at: VistaThreadPool

MySQL thread pool【转】

本文来自:http://blog.chinaunix.net/uid-26896862-id-3993773.html 刚刚经历了淘宝的双11,真实感受到了紧张的氛围.尽管DB淡定的度过,但是历程中的波折,可谓惊心动魄.其中MySQL在大量连接(万级)的场景下,表现出性能远远低于预期,并且出现明显的波动,成为一个非常重要的问题.问题虽然解决,但是后续的思考和方法的探索,仍然继续.以下是在MySQL层增加Thread pool方式,解决大量连接问题. 1.<MySQL Thread Pool: P

C#多线程实现方法——线程池(Thread Pool)

ThreadPool使用 同步机制 ThreadPool使用 需要定义waitcallback委托形式如 [csharp] view plain copy print? public delegate void WaitCallback(object state); public delegate void WaitCallback(object state); 例如如下例子: [csharp] view plain copy print? static private void ThreadW

Thread Pool Model

//////////////////////////////THREA_POOL_MODEL/////////////////////////////////////////////////////// Task { Task(arg){_arg=arg;}   process();  private: _arg; } Thread::Thread(threadPool) {  _touchThreadPool = threadPool } Thread::threadFun(arg:(this

MySQL线程池(THREAD POOL)的原理

MySQL常用(目前线上使用)的线程调度方式是one-thread-per-connection(每连接一个线程),server为每一个连接创建一个线程来服务,连接断开后,这个线程进入thread_cache或者直接退出(取决于thread_cache设置及系统当前已经cache的线程数目),one-thread-per-connection调度的好处是实现简单,而且能够在系统没有遇到瓶颈之前保证较小的响应时间,比较适合活跃的长连接的应用场景,而在大量短连接或者高并发情况下,one-thread

nginx thread pool

对于nginx在1.7.1版本之后加入线程池的这个future, 看了好几遍infoq的那篇文章,里面介绍了如何利用nginx 线程池实现9倍的性能-. 个人看了下,他核心的概念就是把你认为堵塞的io模块扔到线程池里面,然后去worker去接受新的任务,当有结果的时候,在从线程池里面取结果,然后返回...

Smart Thread Pool (智能线程池)

STPStartInfo stp = new STPStartInfo(); stp.DisposeOfStateObjects = true; stp.CallToPostExecute = CallToPostExecute.Never; stp.ThreadPriority = ThreadPriority.Highest; stp.UseCallerCallContext = true; stp.MaxWorkerThreads = 4; //职能线程池 var smartThreadP

Block pool ID needed, but service not yet registered with NN java.lang.Exception: trace 异常解决

以上为报错信息: 原因大概为:dd和nd关联的versionId不同导致, 解决方案,备份之前的current文件夹,让其自己生成新的. 原文地址:https://www.cnblogs.com/hanhaotian/p/11721998.html

How to stop a Thread(怎样停止一个线程)

Abstract How to stop a Thread is a perannual question for Java programmers. Finally with the release of Java V5.0 (or V1.5), which incorporates java.util.concurrent, a definitive answer can be given. The answer is you stop a thread by using interrupt