00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00035 #include "OW_config.h"
00036 #include "OW_ThreadPool.hpp"
00037 #include "OW_Array.hpp"
00038 #include "OW_Thread.hpp"
00039 #include "OW_NonRecursiveMutex.hpp"
00040 #include "OW_NonRecursiveMutexLock.hpp"
00041 #include "OW_Condition.hpp"
00042 #include "OW_Format.hpp"
00043 #include "OW_Mutex.hpp"
00044 #include "OW_MutexLock.hpp"
00045 #include "OW_NullLogger.hpp"
00046 
00047 #include <deque>
00048 
00049 #ifdef OW_DEBUG      
00050 #include <iostream> 
00051 #endif
00052 
00053 namespace OW_NAMESPACE
00054 {
00055 
00056 OW_DEFINE_EXCEPTION(ThreadPool);
00057 
00058 
00059 #define OW_POOL_LOG_DEBUG(logger, arg) do { if ((logger)) OW_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
00060 #define OW_POOL_LOG_FATAL_ERROR(logger, arg) do { if ((logger)) OW_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
00061 
00063 class ThreadPoolImpl : public IntrusiveCountableBase
00064 {
00065 public:
00066    
00067    virtual bool addWork(const RunnableRef& work, bool blockWhenFull) = 0;
00068    virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs) = 0;
00069    virtual void waitForEmptyQueue() = 0;
00070    virtual ~ThreadPoolImpl()
00071    {
00072    }
00073 };
00074 namespace {
00075 class FixedSizePoolImpl;
00077 class FixedSizePoolWorkerThread : public Thread
00078 {
00079 public:
00080    FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
00081       : Thread()
00082       , m_thePool(thePool)
00083    {
00084    }
00085    virtual Int32 run();
00086 private:
00087    virtual void doCooperativeCancel()
00088    {
00089       MutexLock lock(m_guard);
00090       if (m_currentRunnable)
00091       {
00092          m_currentRunnable->doCooperativeCancel();
00093       }
00094    }
00095    virtual void doDefinitiveCancel()
00096    {
00097       MutexLock lock(m_guard);
00098       if (m_currentRunnable)
00099       {
00100          m_currentRunnable->doCooperativeCancel();
00101       }
00102    }
00103 
00104    FixedSizePoolImpl* m_thePool;
00105 
00106    Mutex m_guard;
00107    RunnableRef m_currentRunnable;
00108 
00109    
00110    FixedSizePoolWorkerThread(const FixedSizePoolWorkerThread&);
00111    FixedSizePoolWorkerThread& operator=(const FixedSizePoolWorkerThread&);
00112 };
00114 class CommonPoolImpl : public ThreadPoolImpl
00115 {
00116 protected:
00117    CommonPoolImpl(UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00118       : m_maxQueueSize(maxQueueSize)
00119       , m_queueClosed(false)
00120       , m_shutdown(false)
00121       , m_logger(logger)
00122       , m_poolName(poolName)
00123    {
00124    }
00125 
00126    virtual ~CommonPoolImpl()
00127    {
00128    }
00129    
00130    
00131    virtual bool queueIsFull() const
00132    {
00133       return ((m_maxQueueSize > 0) && (m_queue.size() == m_maxQueueSize));
00134    }
00135    
00136    
00137    bool queueClosed() const
00138    {
00139       return m_shutdown || m_queueClosed;
00140    }
00141    
00142    bool finishOffWorkInQueue(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00143    {
00144       NonRecursiveMutexLock l(m_queueLock);
00145       
00146       if (queueClosed())
00147       {
00148          OW_POOL_LOG_DEBUG(m_logger, "Queue is already closed.  Why are you trying to shutdown again?");
00149          return false;
00150       }
00151       m_queueClosed = true;
00152       OW_POOL_LOG_DEBUG(m_logger, "Queue closed");
00153       
00154       if (finishWorkInQueue)
00155       {
00156          while (m_queue.size() != 0)
00157          {
00158             if (shutdownSecs < 0)
00159             {
00160                OW_POOL_LOG_DEBUG(m_logger, "Waiting forever for queue to empty");
00161                m_queueEmpty.wait(l);
00162             }
00163             else
00164             {
00165                OW_POOL_LOG_DEBUG(m_logger, "Waiting w/timout for queue to empty");
00166                if (!m_queueEmpty.timedWait(l, shutdownSecs))
00167                {
00168                   OW_POOL_LOG_DEBUG(m_logger, "Wait timed out. Work in queue will be discarded.");
00169                   break; 
00170                }
00171             }
00172          }
00173       }
00174       m_shutdown = true;
00175       return true;
00176    }
00177 
00178    virtual void waitForEmptyQueue()
00179    {
00180       NonRecursiveMutexLock l(m_queueLock);
00181       while (m_queue.size() != 0)
00182       {
00183          OW_POOL_LOG_DEBUG(m_logger, "Waiting for empty queue");
00184          m_queueEmpty.wait(l);
00185       }
00186       OW_POOL_LOG_DEBUG(m_logger, "Queue empty: the wait is over");
00187    }
00188    
00189    void shutdownThreads(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00190    {
00191       if (!finishOffWorkInQueue(finishWorkInQueue, shutdownSecs))
00192       {
00193          return;
00194       }
00195 
00196       
00197       m_queueNotEmpty.notifyAll();
00198       m_queueNotFull.notifyAll();
00199 
00200       if (shutdownSecs >= 0)
00201       {
00202          
00203          for (UInt32 i = 0; i < m_threads.size(); ++i)
00204          {
00205             OW_POOL_LOG_DEBUG(m_logger, Format("Calling cooperativeCancel on thread %1", i));
00206             m_threads[i]->cooperativeCancel();
00207          }
00208          
00209          for (UInt32 i = 0; i < m_threads.size(); ++i)
00210          {
00211             OW_POOL_LOG_DEBUG(m_logger, Format("Calling definitiveCancel on thread %1", i));
00212             if (!m_threads[i]->definitiveCancel(shutdownSecs))
00213             {
00214                OW_POOL_LOG_FATAL_ERROR(m_logger, Format("Thread %1 was forcibly cancelled.", i));
00215             }
00216          }
00217       }
00218       
00219       for (UInt32 i = 0; i < m_threads.size(); ++i)
00220       {
00221          OW_POOL_LOG_DEBUG(m_logger, Format("calling join() on thread %1", i));
00222          m_threads[i]->join();
00223          OW_POOL_LOG_DEBUG(m_logger, Format("join() finished for thread %1", i));
00224       }
00225    }
00226 
00227    RunnableRef getWorkFromQueue(bool waitForWork)
00228    {
00229       NonRecursiveMutexLock l(m_queueLock);
00230       while ((m_queue.size() == 0) && (!m_shutdown))
00231       {
00232          if (waitForWork)
00233          {
00234             OW_POOL_LOG_DEBUG(m_logger, "Waiting for work");
00235             m_queueNotEmpty.wait(l);
00236          }
00237          else
00238          {
00239             
00240             
00241             if (!m_queueNotEmpty.timedWait(l,1))
00242             {
00243                OW_POOL_LOG_DEBUG(m_logger, "No work after 1 sec. I'm not waiting any longer");
00244                return RunnableRef();
00245             }
00246          }
00247       }
00248       
00249       if (m_shutdown)
00250       {
00251          OW_POOL_LOG_DEBUG(m_logger, "The pool is shutdown, not getting any more work");
00252          return RunnableRef();
00253       }
00254 
00255       RunnableRef work = m_queue.front();
00256       m_queue.pop_front();
00257 
00258       
00259       if (!queueIsFull())
00260       {
00261          m_queueNotFull.notifyAll();
00262       }
00263 
00264       
00265       if (m_queue.size() == 0)
00266       {
00267          m_queueEmpty.notifyAll();
00268       }
00269       OW_POOL_LOG_DEBUG(m_logger, "A thread got some work to do");
00270       return work;
00271    }
00272 
00273    
00274    UInt32 m_maxQueueSize;
00275    
00276    Array<ThreadRef> m_threads;
00277    std::deque<RunnableRef> m_queue;
00278    bool m_queueClosed;
00279    bool m_shutdown;
00280    
00281    NonRecursiveMutex m_queueLock;
00282    Condition m_queueNotFull;
00283    Condition m_queueEmpty;
00284    Condition m_queueNotEmpty;
00285    LoggerRef m_logger;
00286    String m_poolName;
00287 };
00288 class FixedSizePoolImpl : public CommonPoolImpl
00289 {
00290 public:
00291    FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00292       : CommonPoolImpl(maxQueueSize, logger, poolName)
00293    {
00294       
00295       m_threads.reserve(numThreads);
00296       for (UInt32 i = 0; i < numThreads; ++i)
00297       {
00298          m_threads.push_back(ThreadRef(new FixedSizePoolWorkerThread(this)));
00299       }
00300       for (UInt32 i = 0; i < numThreads; ++i)
00301       {
00302          m_threads[i]->start();
00303       }
00304       OW_POOL_LOG_DEBUG(m_logger, "Threads are started and ready to go");
00305    }
00306    
00307    virtual bool addWork(const RunnableRef& work, bool blockWhenFull)
00308    {
00309       
00310       if (!work)
00311       {
00312          OW_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00313          return false;
00314       }
00315       NonRecursiveMutexLock l(m_queueLock);
00316       if (!blockWhenFull && queueIsFull())
00317       {
00318          OW_POOL_LOG_DEBUG(m_logger, "Queue is full. Not adding work and returning false");
00319          return false;
00320       }
00321       while ( queueIsFull() && !queueClosed() )
00322       {
00323          OW_POOL_LOG_DEBUG(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00324          m_queueNotFull.wait(l);
00325       }
00326       
00327       if (queueClosed())
00328       {
00329          OW_POOL_LOG_DEBUG(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00330          return false;
00331       }
00332       m_queue.push_back(work);
00333       
00334       
00335       if (m_queue.size() == 1)
00336       {
00337          OW_POOL_LOG_DEBUG(m_logger, "Waking up sleepy workers");
00338          m_queueNotEmpty.notifyAll();
00339       }
00340 
00341       OW_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00342       return true;
00343    }
00344 
00345    
00346    virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00347    {
00348       shutdownThreads(finishWorkInQueue, shutdownSecs);
00349    }
00350    virtual ~FixedSizePoolImpl()
00351    {
00352       
00353       try
00354       {
00355          
00356          if (!queueClosed())
00357          {
00358             
00359             
00360             this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 1);
00361          }
00362       }
00363       catch (...)
00364       {
00365       }
00366    }
00367 private:
00368    friend class FixedSizePoolWorkerThread;
00369 };
00370 void runRunnable(const RunnableRef& work)
00371 {
00372    
00373    try
00374    {
00375       work->run();
00376    }
00377    catch (ThreadCancelledException&)
00378    {
00379       throw;
00380    }
00381    catch (Exception& ex)
00382    {
00383 #ifdef OW_DEBUG      
00384       std::cerr << "!!! Exception: " << ex.type() << " caught in ThreadPool worker: " << ex << std::endl;
00385 #endif
00386    }
00387    catch(std::exception& ex)
00388    {
00389 #ifdef OW_DEBUG
00390       std::cerr << "!!! std::exception what = " << ex.what() << std::endl;
00391 #endif
00392    }
00393    catch (...)
00394    {
00395 #ifdef OW_DEBUG      
00396       std::cerr << "!!! Unknown Exception caught in ThreadPool worker" << std::endl;
00397 #endif
00398    }
00399 }
00400 Int32 FixedSizePoolWorkerThread::run()
00401 {
00402    while (true)
00403    {
00404       
00405       RunnableRef work = m_thePool->getWorkFromQueue(true);
00406       if (!work)
00407       {
00408          return 0;
00409       }
00410       
00411       {
00412          MutexLock lock(m_guard);
00413          m_currentRunnable = work;
00414       }
00415       runRunnable(work);
00416       {
00417          MutexLock lock(m_guard);
00418          m_currentRunnable = 0;
00419       }
00420    }
00421    return 0;
00422 }
00423 class DynamicSizePoolImpl;
00425 class DynamicSizePoolWorkerThread : public Thread
00426 {
00427 public:
00428    DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
00429       : Thread()
00430       , m_thePool(thePool)
00431    {
00432    }
00433    virtual Int32 run();
00434 private:
00435    virtual void doCooperativeCancel()
00436    {
00437       MutexLock lock(m_guard);
00438       if (m_currentRunnable)
00439       {
00440          m_currentRunnable->doCooperativeCancel();
00441       }
00442    }
00443    virtual void doDefinitiveCancel()
00444    {
00445       MutexLock lock(m_guard);
00446       if (m_currentRunnable)
00447       {
00448          m_currentRunnable->doCooperativeCancel();
00449       }
00450    }
00451 
00452    DynamicSizePoolImpl* m_thePool;
00453 
00454    Mutex m_guard;
00455    RunnableRef m_currentRunnable;
00456 
00457    
00458    DynamicSizePoolWorkerThread(const DynamicSizePoolWorkerThread&);
00459    DynamicSizePoolWorkerThread& operator=(const DynamicSizePoolWorkerThread&);
00460 };
00462 class DynamicSizePoolImpl : public CommonPoolImpl
00463 {
00464 public:
00465    DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00466       : CommonPoolImpl(maxQueueSize, logger, poolName)
00467       , m_maxThreads(maxThreads)
00468    {
00469    }
00470    
00471    virtual bool addWork(const RunnableRef& work, bool blockWhenFull)
00472    {
00473       
00474       if (!work)
00475       {
00476          OW_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00477          return false;
00478       }
00479       NonRecursiveMutexLock l(m_queueLock);
00480 
00481       
00482       if (queueClosed())
00483       {
00484          OW_POOL_LOG_DEBUG(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00485          return false;
00486       }
00487 
00488       
00489       
00490       
00491       size_t i = 0;
00492       while (i < m_threads.size())
00493       {
00494          if (!m_threads[i]->isRunning())
00495          {
00496             OW_POOL_LOG_DEBUG(m_logger, Format("Thread %1 is finished. Cleaning up it's remains.", i));
00497             m_threads[i]->join();
00498             m_threads.remove(i);
00499          }
00500          else
00501          {
00502             ++i;
00503          }
00504       }
00505 
00506       if (!blockWhenFull && queueIsFull())
00507       {
00508          OW_POOL_LOG_DEBUG(m_logger, "Queue is full. Not adding work and returning false");
00509          return false;
00510       }
00511       while ( queueIsFull() && !queueClosed() )
00512       {
00513          OW_POOL_LOG_DEBUG(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00514          m_queueNotFull.wait(l);
00515       }
00516       
00517       m_queue.push_back(work);
00518       
00519       OW_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00520 
00521       
00522       
00523       
00524       
00525       
00526       
00527       l.release();
00528       m_queueNotEmpty.notifyOne();
00529       Thread::yield(); 
00530       l.lock();
00531 
00532       
00533       if (!m_queue.empty() && m_threads.size() < m_maxThreads)
00534       {
00535          ThreadRef theThread(new DynamicSizePoolWorkerThread(this));
00536          m_threads.push_back(theThread);
00537          OW_POOL_LOG_DEBUG(m_logger, "About to start a new thread");
00538          theThread->start();
00539          OW_POOL_LOG_DEBUG(m_logger, "New thread started");
00540       }
00541       return true;
00542    }
00543 
00544    
00545    virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00546    {
00547       shutdownThreads(finishWorkInQueue, shutdownSecs);
00548    }
00549    virtual ~DynamicSizePoolImpl()
00550    {
00551       
00552       try
00553       {
00554          
00555          if (!queueClosed())
00556          {
00557             
00558             
00559             this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 1);
00560          }
00561       }
00562       catch (...)
00563       {
00564       }
00565    }
00566 protected:
00567    UInt32 getMaxThreads() const
00568    {
00569       return m_maxThreads;
00570    }
00571 
00572 private:
00573    
00574    UInt32 m_maxThreads;
00575    friend class DynamicSizePoolWorkerThread;
00576 };
00577 Int32 DynamicSizePoolWorkerThread::run()
00578 {
00579    while (true)
00580    {
00581       
00582       RunnableRef work = m_thePool->getWorkFromQueue(false);
00583       if (!work)
00584       {
00585          return 0;
00586       }
00587       
00588       {
00589          MutexLock lock(m_guard);
00590          m_currentRunnable = work;
00591       }
00592       runRunnable(work);
00593       {
00594          MutexLock lock(m_guard);
00595          m_currentRunnable = 0;
00596       }
00597    }
00598    return 0;
00599 }
00600 
00602 class DynamicSizeNoQueuePoolImpl : public DynamicSizePoolImpl
00603 {
00604 public:
00605    DynamicSizeNoQueuePoolImpl(UInt32 maxThreads, const LoggerRef& logger, const String& poolName)
00606       : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName) 
00607    {
00608    }
00609 
00610    virtual ~DynamicSizeNoQueuePoolImpl()
00611    {
00612    }
00613 
00614    
00615    virtual bool queueIsFull() const
00616    {
00617       
00618       
00619       size_t freeThreads = getMaxThreads() -  m_threads.size(); 
00620       return (freeThreads <= m_queue.size());
00621    }
00622 
00623 };
00624 
00625 } 
00627 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const LoggerRef& logger_, const String& poolName)
00628 {
00629    LoggerRef logger(logger_);
00630    if (!logger)
00631    {
00632       logger = LoggerRef(new NullLogger());
00633    }
00634    switch (poolType)
00635    {
00636       case FIXED_SIZE:
00637          m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00638          break;
00639       case DYNAMIC_SIZE:
00640          m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00641          break;
00642       case DYNAMIC_SIZE_NO_QUEUE:
00643          m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
00644          break;
00645    }
00646 }
00648 bool ThreadPool::addWork(const RunnableRef& work)
00649 {
00650    return m_impl->addWork(work, true);
00651 }
00653 bool ThreadPool::tryAddWork(const RunnableRef& work)
00654 {
00655    return m_impl->addWork(work, false);
00656 }
00658 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00659 {
00660    m_impl->shutdown(finishWorkInQueue, shutdownSecs);
00661 }
00663 void ThreadPool::waitForEmptyQueue()
00664 {
00665    m_impl->waitForEmptyQueue();
00666 }
00668 ThreadPool::~ThreadPool()
00669 {
00670 }
00672 ThreadPool::ThreadPool(const ThreadPool& x)
00673    : IntrusiveCountableBase(x)
00674    , m_impl(x.m_impl)
00675 {
00676 }
00678 ThreadPool& ThreadPool::operator=(const ThreadPool& x)
00679 {
00680    m_impl = x.m_impl;
00681    return *this;
00682 }
00683 
00684 } 
00685