#include <windows.h>
#include <list>

namespace TP

     * Logger - This is base class for the error logger class and it is polymorphic.
     *          The users of the ThreadPool create a class which derived from this
     *          and override LogError() and LogInfo() for their own error logging mechanism.
     *          The default error logging will be in output window.
    class Logger


        // Constructor
        // Destructor
        virtual ~Logger(){};
        // Log error description.
        void LogError( const long lActiveReq_i, const std::wstring& wstrError_i );
        // Log information.
        void LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i );
        // Override this function to log errors. Default log will be in output window.
        virtual void LogError( const std::wstring& wstrError_i );
        // Override this function to log informations. Default log will be in output window.
        virtual void LogInfo( const std::wstring& wstrInfo_i );


        // Log thread ID, Active thread count and last error.
        void PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io );

     * SyncObject - The class is a wrapper of Critical section object to provide
     *              synchronization for thread pool.
    class SyncObject

        // Constructor
            ::InitializeCriticalSection( &m_stCriticalSection );

        // Destructor
            ::DeleteCriticalSection( &m_stCriticalSection );

        // Lock critical section.
        bool Lock()
            ::EnterCriticalSection( &m_stCriticalSection );
            return true;

        // Unlock critical section.
        bool Unlock()
            ::LeaveCriticalSection( &m_stCriticalSection );
            return true;

        SyncObject( const SyncObject& );
        SyncObject& operator = ( const SyncObject& );


        // Critical section object.
        CRITICAL_SECTION m_stCriticalSection;

     * AutoLock - This class own synchronization object during construction and
     *            release the ownership during the destruction.
    class AutoLock


         * Parameterized constructor
         * @param       LockObj_i - Synchronization object.
         * @return      Nil
         * @exception   Nil
         * @see         Nil
         * @since       1.0
        AutoLock( SyncObject& LockObj_i ) : m_pSyncObject( &LockObj_i )
            if( NULL != m_pSyncObject )

         * Destructor.
         * @param       Nil
         * @return      Nil
         * @exception   Nil
         * @see         Nil
         * @since       1.0
            if( NULL != m_pSyncObject )
                m_pSyncObject = NULL;

        SyncObject* m_pSyncObject;

     * AbstractRequest - This is abstract base class for the request to be processed in thread pool.
     *                   and it is polymorphic. The users of the ThreadPool must create a class
     *                   which derived from this and override Execute() function.
    class AbstractRequest

        // Constructor
        AbstractRequest() : m_bAborted( false ), m_usRequestID( 0u ){}
        // Destructor
        virtual ~AbstractRequest(){}
        // Thread procedure to be override in derived class. This function should return if request aborted.
        // Abort request can check by calling IsAborted() function during time consuming operation.
        virtual long Execute() = 0;
        // Set request ID.
        void SetRequestID( unsigned short uRequestID_i )
            AutoLock LockRequest( m_LockWorkerThread );
            m_usRequestID = uRequestID_i;
        // Get request ID.
        unsigned short GetRequestID()
            AutoLock LockRequest( m_LockWorkerThread );
            return m_usRequestID;
        // Abort the processing of the request.
        void Abort()
            AutoLock LockRequest( m_LockWorkerThread );
            m_bAborted = true;
        // Clear abort flag for re-posting the same request.
        void ClearAbortFlag()
            AutoLock LockRequest( m_LockWorkerThread );
            m_bAborted = false;

        // Check for the abort request
        bool IsAborted()
            AutoLock LockRequest( m_LockWorkerThread );
            return m_bAborted;
        // Prepare error or information log.
        void PrepareLog( std::wstring& wstrLog_io );

        // Synchronization object for resource locking.
        SyncObject m_LockWorkerThread;

        // Abort flag.
        bool m_bAborted;
        // Request Identifier.
        unsigned short m_usRequestID;


     * AutoCounter - Increment and decrement counter
    class AutoCounter

        // Constructor.
        AutoCounter( unsigned short& usCount_io,
                     SyncObject& Lock_io ) :
                     m_usCount( usCount_io ), m_LockThread( Lock_io )
            AutoLock Lock( m_LockThread );

        // Destructor.
            AutoLock Lock( m_LockThread );

        // Counter variable.
        unsigned short& m_usCount;
        // Synchronization object for resource locking.
        SyncObject& m_LockThread;

    typedef std::list<AbstractRequest*> REQUEST_QUEUE;

     * ThreadPool - This class create and destroy thread pool based on the request.
     *              The requested to be processed can be post to pool as derived object of
     *              AbstractRequest. Also a class can be derive from Logger to error and
     *              information logging.
    class ThreadPool

        // Constructor.
        // Destructor.

        // Create thread pool with specified number of threads.
        bool Create( const unsigned short usThreadCount_i, Logger* pLogger_io = NULL );
        // Destroy the existing thread pool.
        bool Destroy();
        // Post request to thread pool for processing.
        bool PostRequest( AbstractRequest* pRequest_io );

        AbstractRequest* PopRequest( REQUEST_QUEUE& RequestQueue_io );
        bool AddThreads();
        bool NotifyThread();
        bool ProcessRequests();
        bool WaitForRequest();
        bool DestroyPool();
        bool IsDestroyed();
        void SetDestroyFlag( const bool bFlag_i );
        void CancelRequests();
        void LogError( const std::wstring& wstrError_i );
        void LogInfo( const std::wstring& wstrInfo_i );
        static UINT WINAPI ThreadProc( LPVOID pParam_i );

        ThreadPool( const ThreadPool& );
        ThreadPool& operator = ( const ThreadPool& );

        // Used for thread pool destruction.
        bool m_bDestroyed;
        // Hold thread count in the pool.
        unsigned short m_usThreadCount;
        // Released semaphore count.
        unsigned short m_usSemaphoreCount;
        // Active thread count.
        unsigned short m_lActiveThread;
        // Active thread count.
        unsigned short m_usPendingReqCount;
        // Manage active thread count in pool.
        HANDLE m_hSemaphore;
        // Hold thread handles.
        HANDLE* m_phThreadList;
        // Request queue.
        REQUEST_QUEUE m_RequestQueue;
        // Synchronization object for resource locking.
        SyncObject m_LockWorkerThread;
        // User defined error and information logger class.
        Logger* m_pLogger;
        // Default error and information logger.
        Logger m_Logger;
} // namespace TP

#endif // #ifndef _THREAD_POOL_MGR_H_


 * @author :    Suresh

#include "ThreadPool.h"
#include <sstream>
#include <iomanip>

namespace TP

     * Log error description.
     * @param       lActiveReq_i - Count of active requests.
     * @param       wstrError_i  - Error message.
    void Logger::LogError( const long lActiveReq_i, const std::wstring& wstrError_i )
        std::wstring wstrLog( wstrError_i );
        PrepareLog( lActiveReq_i, wstrLog );
        LogError( wstrLog );

     * Log information.
     * @param       lActiveReq_i - Count of active requests.
     * @param       wstrInfo_i   - Information message.
    void Logger::LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i )
        std::wstring wstrLog( wstrInfo_i );
        PrepareLog( lActiveReq_i, wstrLog );
        LogInfo( wstrLog );

     * Override this function to log errors. Default log will be in output window.
     * @param       wstrError_i  - Error description
    void Logger::LogError( const std::wstring& wstrError_i )
        OutputDebugString( wstrError_i.c_str());

     * Override this function to log informations. Default log will be in output window.
     * @param       wstrInfo_i   - Information description.
    void Logger::LogInfo( const std::wstring& wstrInfo_i )
        OutputDebugString( wstrInfo_i.c_str());

     * Log thread ID, Active thread count and last error.
     * @param       lActiveReq_i - Active thread count.
     * @param       wstrLog_io   - Error or information description
    void Logger::PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io )
        std::wstringstream wstrmLog;
        wstrmLog << L"##TP## [TID=" << std::setfill( L‘0‘ ) << std::setw(8) << ::GetCurrentThreadId()
                 << L"] [ACTIVE REQUEST=" << std::setw(4) << lActiveReq_i
                 << L"] [LAST ERROR=" << std::setw(4) << ::GetLastError()
                 << L"] " << wstrLog_io.c_str() << + L"]";
        wstrLog_io = wstrmLog.str();

     * Prepare error or information log.
     * @param       wstrLog_io - Log information
    void AbstractRequest::PrepareLog( std::wstring& wstrLog_io )
        std::wstringstream wstrmLog;
        wstrmLog << std::setfill( L‘0‘ );
        wstrmLog << L"##RQ## [RID=" << std::setw(8) << GetRequestID()
                 << L"] [Desc=" << wstrLog_io.c_str() << + L"]";
        wstrLog_io = wstrmLog.str();

     * Constructor
    ThreadPool::ThreadPool() : m_bDestroyed( false ),
                               m_usThreadCount( 0u ),
                               m_usSemaphoreCount( 0u ),
                               m_lActiveThread( 0u ),
                               m_usPendingReqCount( 0u ),
                               m_hSemaphore( NULL ),
                               m_phThreadList( NULL ),
                               m_pLogger( &m_Logger )

     * Destructor
        if( NULL != m_phThreadList )
            if( !Destroy())
                LogError( L"Destroy() failed" );

     * Create thread pool with specified number of threads.
     * @param       usThreadCount_i - Thread count.
     * @param       pLogger_i       - Logger instance to log errors and informations
    bool ThreadPool::Create( const unsigned short usThreadCount_i, Logger* pLogger_i )
            // Assign logger object. If user not provided then use existing and
            // error will be logged in output window.
            m_pLogger = ( NULL != pLogger_i ) ? pLogger_i : &m_Logger;
            // Check thread pool is initialized already.
            if( NULL != m_phThreadList )
                LogError( L"ThreadPool already created" );
                return false;
            // Validate thread count.
            if( 0 == usThreadCount_i )
                LogError( L"Minimum allowed thread count is one" );
                return false;
            if( usThreadCount_i > 64 )
                LogError( L"Maximum allowed thread count is 64" );
                return false;
            LogInfo( L"Thread pool creation requested" );

            // Initialize values.
            m_lActiveThread = 0u;
            m_usSemaphoreCount = 0u;
            m_usPendingReqCount = 0u;
            m_usThreadCount = usThreadCount_i;
            // Create semaphore for thread count management.
            m_hSemaphore = CreateSemaphore( NULL, 0, m_usThreadCount, NULL );
            if( NULL == m_hSemaphore )
                LogError( L"Semaphore creation failed" );
                m_usThreadCount = 0u;
                return false;
            // Create worker threads and make pool active
            if( !AddThreads())
                LogError( L"Threads creation failed" );
                return false;
            SetDestroyFlag( false );
            LogInfo( L"Thread pool created successfully" );
            return true;
        catch( ... )
            LogError( L"Exception occurred in Create()" );
            return false;

     * Destroy thread pool.
    bool ThreadPool::Destroy()
            // Check whether thread pool already destroyed.
            if( NULL == m_phThreadList )
                LogError( L"ThreadPool is already destroyed or not created yet" );
                return false;
            // Cancel all requests.
            // Set destroyed flag to true for exiting threads.
            SetDestroyFlag( true );
            // Release remaining semaphores to exit thread.
                AutoLock LockThread( m_LockWorkerThread );
                if( m_lActiveThread < m_usThreadCount )
                    if( NULL == ReleaseSemaphore( m_hSemaphore, m_usThreadCount - m_lActiveThread, NULL ))
                        LogError( L"Failed to release Semaphore" );
                        return false;
            // Wait for destroy completion and clean the thread pool.
            if( !DestroyPool())
                LogError( L"Thread pool destruction failed" );
                return false;
            LogInfo( L"Thread Pool destroyed successfully" );
            return true;
        catch( ... )
            LogError( L"Exception occurred in Destroy()" );
            return false;

     * Post request to thread pool for processing
     * @param       pRequest_io - Request to be processed.
    bool ThreadPool::PostRequest( AbstractRequest* pRequest_io )
            AutoLock LockThread( m_LockWorkerThread );
            if( NULL == m_phThreadList )
                LogError( L"ThreadPool is destroyed or not created yet" );
                return false;
            m_RequestQueue.push_back( pRequest_io );
            if( m_usSemaphoreCount < m_usThreadCount )
                // Thread available to process, so notify thread.
                if( !NotifyThread())
                    LogError( L"NotifyThread failed" );
                    // Request notification failed. Try after some time.
                    return false;
                // Thread not available to process.
            return true;
        catch( ... )
            LogError( L"Exception occurred in PostRequest()" );
            return false;

     * Pop request from queue for processing.
     * @param       RequestQueue_io  - Request queue.
     * @return      AbstractRequest* - Request pointer.
    AbstractRequest* ThreadPool::PopRequest( REQUEST_QUEUE& RequestQueue_io )
        AutoLock LockThread( m_LockWorkerThread );
        if( !RequestQueue_io.empty())
            AbstractRequest* pRequest = RequestQueue_io.front();
            RequestQueue_io.remove( pRequest );
            return pRequest;
        return 0;

     * Create specified number of threads. Initial status of threads will be waiting.
    bool ThreadPool::AddThreads()
            // Allocate memory for all threads.
            m_phThreadList = new HANDLE[m_usThreadCount];
            if( NULL == m_phThreadList )
                LogError( L"Memory allocation for thread handle failed" );
                return false;
            // Create worker threads.
            DWORD dwThreadID = 0;
            for( unsigned short usIdx = 0u; usIdx < m_usThreadCount; usIdx++ )
                // Create worker thread
                m_phThreadList[usIdx] = CreateThread( 0, 0,
                                                      reinterpret_cast<LPTHREAD_START_ROUTINE>( ThreadPool::ThreadProc ),
                                                      this, 0, &dwThreadID );
                if( NULL == m_phThreadList[usIdx] )
                    LogError( L"CreateThread failed" );
                    return false;
            return true;
        catch( ... )
            LogError( L"Exception occurred in AddThreads()" );
            return false;

     * Add request to queue and release semaphore by one.
    bool ThreadPool::NotifyThread()
            AutoLock LockThread( m_LockWorkerThread );
            // Release semaphore by one to process this request.
            if( NULL == ReleaseSemaphore( m_hSemaphore, 1, NULL ))
                LogError( L"ReleaseSemaphore failed" );
                return false;
            return true;
        catch( ... )
            LogError( L"Exception occurred in NotifyThread()" );
            return false;

     * Process request in queue.
    bool ThreadPool::ProcessRequests()
        bool bContinue( true );
                LogInfo( L"Thread WAITING" );
                // Wait for request.
                if( !WaitForRequest())
                    LogError( L"WaitForRequest() failed" );
                // Thread counter.
                AutoCounter Counter( m_lActiveThread, m_LockWorkerThread );
                LogInfo( L"Thread ACTIVE" );
                // Check thread pool destroy request.
                if( IsDestroyed())
                    LogInfo( L"Thread EXITING" );
                // Get request from request queue.
                AbstractRequest* pRequest = PopRequest( m_RequestQueue );
                if( NULL == pRequest )
                    LogError( L"PopRequest failed" );
                // Execute the request.
                long lReturn = pRequest->Execute();
                if( NULL != lReturn )
                    LogError( L"Request execution failed" );
                // Check thread pool destroy request.
                if( IsDestroyed())
                    LogInfo( L"Thread EXITING" );
                AutoLock LockThread( m_LockWorkerThread );
                // Inform thread if any pending request.
                if( m_usPendingReqCount > 0 )
                    if( m_usSemaphoreCount < m_usThreadCount )
                        // Thread available to process, so notify thread.
                        if( !NotifyThread())
                            LogError( L"NotifyThread failed" );
            catch( ... )
                LogError( L"Exception occurred in ProcessRequests()" );
        while( bContinue );
        return true;

     * Wait for request queuing to thread pool.
    bool ThreadPool::WaitForRequest()
            // Wait released when requested queued.
            DWORD dwReturn = WaitForSingleObject( m_hSemaphore, INFINITE );
            if( WAIT_OBJECT_0 != dwReturn )
                LogError( L"WaitForSingleObject failed" );
                return false;
            AutoLock LockThread( m_LockWorkerThread );
            // Clear previous error.
            ::SetLastError( 0 );
            return true;
        catch( ... )
            LogError( L"Exception occurred in WaitForRequest()" );
            return false;

     * Destroy and clean up thread pool.
    bool ThreadPool::DestroyPool()
            // Wait for the exist of threads.
            DWORD dwReturn = WaitForMultipleObjects( m_usThreadCount, m_phThreadList, TRUE, INFINITE );
            if( WAIT_OBJECT_0 != dwReturn )
                LogError( L"WaitForMultipleObjects failed" );
                return false;
            // Close all threads.
            for( USHORT uIdx = 0u; uIdx < m_usThreadCount; uIdx++ )
                if( TRUE != CloseHandle( m_phThreadList[uIdx] ))
                    LogError( L"CloseHandle failed for threads" );
                    return false;
            // Clear memory allocated for threads.
            delete[] m_phThreadList;
            m_phThreadList = 0;
            // Close the semaphore
            if( TRUE != CloseHandle( m_hSemaphore ))
                LogError( L"CloseHandle failed for semaphore" );
                return false;
            // Clear request queue.
            return true;
        catch( ... )
            LogError( L"Exception occurred in DestroyPool()" );
            return false;

     * Check for destroy request.
    inline bool ThreadPool::IsDestroyed()
        // Avoid synchronization issues if destroy requested after validation.
        AutoLock LockThread( m_LockWorkerThread );
        // During thread pool destruction all semaphores are released
        // to exit all threads.
        return m_bDestroyed;

     * Set destroy flag
    inline void ThreadPool::SetDestroyFlag( const bool bFlag_i )
        AutoLock LockThread( m_LockWorkerThread );
        m_bDestroyed = bFlag_i;

     * Cancel all processing request in pool.
    void ThreadPool::CancelRequests()
            // Avoid synchronization issues if destroy requested after validation.
            AutoLock LockThread( m_LockWorkerThread );
            LogInfo( L"Thread pool destroy requested" );
            // Clear main queue.
        catch( ... )
            LogError( L"Exception occurred in CancelRequests()" );

     * Log error in thread pool.
     * @param       wstrError_i - Error description.
    void ThreadPool::LogError( const std::wstring& wstrError_i )
        if( NULL != m_pLogger )
            m_pLogger->LogError( m_lActiveThread, wstrError_i );

     * Log information in thread pool.
     * @param       wstrInfo_i - Information description.
    void ThreadPool::LogInfo( const std::wstring& wstrInfo_i )
        if( NULL != m_pLogger )
            m_pLogger->LogInfo( m_lActiveThread, wstrInfo_i );

     * worker thread procedure.
     * @param       pParam_i - ThreadPool instance.
     * @return      UINT      - Return 0 on success.
    UINT ThreadPool::ThreadProc( LPVOID pParam_i )
        ThreadPool* pThreadPool = NULL;
            ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>( pParam_i );
            if( NULL == pThreadPool )
                return 1;
            if( !pThreadPool->ProcessRequests())
                pThreadPool->LogError( L"ProcessRequests() failed" );
                return 1;
            return 0;
        catch( ... )
            if( NULL !=  pThreadPool )
                pThreadPool->LogError( L"Exception occurred in ThreadProc()" );
            return 1;
} // namespace TP
