00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00036 #include "OW_config.h"
00037 #include "OW_Thread.hpp"
00038 #include "OW_Assertion.hpp"
00039 #include "OW_Format.hpp"
00040 #include "OW_ThreadBarrier.hpp"
00041 #include "OW_NonRecursiveMutexLock.hpp"
00042 #include "OW_ExceptionIds.hpp"
00043 
00044 #include <cstring>
00045 #include <cstdio>
00046 #include <cerrno>
00047 #include <iostream> 
00048 #include <csignal>
00049 #include <cassert>
00050 
00051 #ifdef OW_HAVE_OPENSSL
00052 #include <openssl/err.h>
00053 #endif
00054 
00055 
00056 namespace OW_NAMESPACE
00057 {
00058 
00060 OW_DEFINE_EXCEPTION_WITH_ID(Thread);
00061 OW_DEFINE_EXCEPTION_WITH_ID(CancellationDenied);
00063 
00064 struct ThreadParam
00065 {
00066    ThreadParam(Thread* t, const ThreadDoneCallbackRef& c, const ThreadBarrier& b)
00067       : thread(t)
00068       , cb(c)
00069       , thread_barrier(b)
00070    {}
00071    Thread* thread;
00072    ThreadDoneCallbackRef cb;
00073    ThreadBarrier thread_barrier;
00074 };
00075 static Thread_t zeroThread();
00076 static Thread_t NULLTHREAD = zeroThread();
00078 static inline bool
00079 sameId(const Thread_t& t1, const Thread_t& t2)
00080 {
00081    return ThreadImpl::sameThreads(t1, t2);
00082 }
00084 
00085 Thread::Thread() 
00086    : m_id(NULLTHREAD)
00087    , m_isRunning(false)
00088    , m_isStarting(false)
00089    , m_joined(false)
00090    , m_cancelRequested(false)
00091    , m_cancelled(false)
00092 {
00093 }
00095 
00096 Thread::~Thread()
00097 {
00098    try
00099    {
00100       if (!m_joined)
00101       {
00102          join();
00103       }
00104       assert(m_isRunning == false);
00105       if (!sameId(m_id, NULLTHREAD))
00106       {
00107          ThreadImpl::destroyThread(m_id);
00108       }
00109    }
00110    catch (...)
00111    {
00112       
00113    }
00114 }
00116 
00117 void
00118 Thread::start(const ThreadDoneCallbackRef& cb)
00119 {
00120    if (isRunning())
00121    {
00122       OW_THROW(ThreadException,
00123          "Thread::start - thread is already running");
00124    }
00125    if (!sameId(m_id, NULLTHREAD))
00126    {
00127       OW_THROW(ThreadException,
00128          "Thread::start - cannot start previously run thread");
00129    }
00130    m_isStarting = true;
00131    UInt32 flgs = OW_THREAD_FLG_JOINABLE;
00132    ThreadBarrier thread_barrier(2);
00133    
00134    ThreadParam* p = new ThreadParam(this, cb, thread_barrier);
00135    if (ThreadImpl::createThread(m_id, threadRunner, p, flgs) != 0)
00136    {
00137       OW_THROW(ThreadException, "ThreadImpl::createThread failed");
00138    }
00139    m_isStarting = false;
00140    thread_barrier.wait();
00141 }
00143 
00144 Int32
00145 Thread::join()
00146 {
00147    OW_ASSERT(!sameId(m_id, NULLTHREAD));
00148    Int32 rval;
00149    if (ThreadImpl::joinThread(m_id, rval) != 0)
00150    {
00151       OW_THROW(ThreadException,
00152          Format("Thread::join - ThreadImpl::joinThread: %1(%2)", 
00153                errno, strerror(errno)).c_str());
00154    }
00155    
00156    m_isRunning = false;
00157    m_joined = true;
00158    return rval;
00159 }
00161 
00162 
00163 Int32
00164 Thread::threadRunner(void* paramPtr)
00165 {
00166    Thread_t theThreadID;
00167    Int32 rval = -1;
00168    try
00169    {
00170       
00171       OW_ASSERT(paramPtr != NULL);
00172       ThreadParam* pParam = static_cast<ThreadParam*>(paramPtr);
00173       Thread* pTheThread = pParam->thread;
00174       ThreadImpl::saveThreadInTLS(pTheThread);
00175       theThreadID = pTheThread->m_id;
00176       ThreadDoneCallbackRef cb = pParam->cb;
00177       ThreadBarrier thread_barrier = pParam->thread_barrier;
00178       delete pParam;
00179       pTheThread->m_isRunning = true;
00180       thread_barrier.wait();
00181 
00182       try
00183       {
00184          rval = pTheThread->run();
00185       }
00186       
00187       catch (ThreadCancelledException&)
00188       {
00189       }
00190       catch (Exception& ex)
00191       {
00192 #ifdef OW_DEBUG      
00193          std::cerr << "!!! Exception: " << ex.type() << " caught in Thread class\n";
00194          std::cerr << ex << std::endl;
00195 #endif
00196          pTheThread->doneRunning(cb);
00197          
00198          
00199          throw;
00200       }
00201       catch (...)
00202       {
00203 #ifdef OW_DEBUG      
00204          std::cerr << "!!! Unknown Exception caught in Thread class" << std::endl;
00205 #endif
00206          pTheThread->doneRunning(cb);
00207          
00208          
00209          throw;
00210       }
00211 
00212       pTheThread->doneRunning(cb);
00213       
00214    }
00215    catch (Exception& ex)
00216    {
00217 #ifdef OW_DEBUG      
00218       std::cerr << "!!! Exception: " << ex.type() << " caught in Thread class\n";
00219       std::cerr << ex << std::endl;
00220 #endif
00221       
00222       ThreadImpl::exitThread(theThreadID, rval);
00223    }
00224    catch (...)
00225    {
00226 #ifdef OW_DEBUG      
00227       std::cerr << "!!! Unknown Exception caught in Thread class" << std::endl;
00228 #endif
00229       
00230       ThreadImpl::exitThread(theThreadID, rval);
00231    }
00232    
00233    ThreadImpl::exitThread(theThreadID, rval);
00234    return rval;
00235 }
00236 
00238 void
00239 Thread::doneRunning(const ThreadDoneCallbackRef& cb)
00240 {
00241    NonRecursiveMutexLock l(m_cancelLock);
00242    m_isRunning = m_isStarting = false;
00243    m_cancelled = true;
00244    m_cancelCond.notifyAll();
00245    if (cb)
00246    {
00247       cb->notifyThreadDone(this);
00248    }
00249 
00250 #ifdef OW_HAVE_OPENSSL
00251    
00252    ERR_remove_state(0);
00253 #endif
00254 }
00255 
00257 static Thread_t
00258 zeroThread()
00259 {
00260    Thread_t zthr;
00261    ::memset(&zthr, 0, sizeof(zthr));
00262    return zthr;
00263 }
00265 void
00266 Thread::cooperativeCancel()
00267 {
00268    if (!isRunning())
00269    {
00270       return;
00271    }
00272 
00273    
00274    doCooperativeCancel();
00275    NonRecursiveMutexLock l(m_cancelLock);
00276    m_cancelRequested = true;
00277 
00278 #if !defined(OW_WIN32)
00279    
00280    
00281    
00282    
00283    try
00284    {
00285       ThreadImpl::sendSignalToThread(m_id, SIGUSR1);
00286    }
00287    catch (ThreadException&) 
00288    {
00289    }
00290 #endif
00291 }
00293 bool
00294 Thread::definitiveCancel(UInt32 waitForCooperativeSecs)
00295 {
00296    if (!isRunning())
00297    {
00298       return true;
00299    }
00300 
00301    
00302    doCooperativeCancel();
00303    NonRecursiveMutexLock l(m_cancelLock);
00304    m_cancelRequested = true;
00305 
00306 #if !defined(OW_WIN32)
00307    
00308    
00309    
00310    
00311    try
00312    {
00313       ThreadImpl::sendSignalToThread(m_id, SIGUSR1);
00314    }
00315    catch (ThreadException&) 
00316    {
00317    }
00318 #endif
00319 
00320    while (!m_cancelled && isRunning())
00321    {
00322       if (!m_cancelCond.timedWait(l, waitForCooperativeSecs, 0))
00323       {
00324          
00325          doDefinitiveCancel();
00326          
00327          if (!m_cancelled && isRunning())
00328          {
00329             this->cancel();
00330          }
00331          return false;
00332       }
00333    }
00334    return true;
00335 }
00337 void
00338 Thread::cancel()
00339 {
00340    
00341    
00342    try
00343    {
00344       ThreadImpl::cancel(m_id);
00345    }
00346    catch (ThreadException&)
00347    {
00348    }
00349    m_cancelled = true;
00350 }
00352 void
00353 Thread::testCancel()
00354 {
00355    ThreadImpl::testCancel();
00356 }
00358 void 
00359 Thread::doCooperativeCancel()
00360 {
00361 }
00363 void 
00364 Thread::doDefinitiveCancel()
00365 {
00366 }
00367 
00368 } 
00369