00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00035 #include "OW_config.h"
00036 #include "OW_ThreadImpl.hpp"
00037 #include "OW_Mutex.hpp"
00038 #include "OW_Assertion.hpp"
00039 #include "OW_Thread.hpp"
00040 #include "OW_NonRecursiveMutexLock.hpp"
00041 #include "OW_Format.hpp"
00042 #if defined(OW_WIN32)
00043 #include "OW_Map.hpp"
00044 #include "OW_MutexLock.hpp"
00045 #endif
00046 #include <cassert>
00047 #include <cstring>
00048 #include <cstddef>
00049 
00050 extern "C"
00051 {
00052 #ifdef OW_HAVE_SYS_TIME_H
00053 #include <sys/time.h>
00054 #endif
00055 
00056 #include <sys/types.h>
00057 
00058 #ifdef OW_HAVE_UNISTD_H
00059 #include <unistd.h>
00060 #endif
00061 
00062 #include <errno.h>
00063 #include <signal.h>
00064 
00065 #ifdef OW_USE_PTHREAD
00066 #include <pthread.h>
00067 #endif
00068 
00069 #ifdef OW_WIN32
00070 #include <process.h>
00071 #endif
00072 }
00073 
00074 namespace OW_NAMESPACE
00075 {
00076 
00077 namespace ThreadImpl {
00078 
00080 
00081 void
00082 sleep(UInt32 milliSeconds)
00083 {
00084    ThreadImpl::testCancel();
00085 #if defined(OW_HAVE_NANOSLEEP)
00086    struct timespec wait;
00087    wait.tv_sec = milliSeconds / 1000;
00088    wait.tv_nsec = (milliSeconds % 1000) * 1000000;
00089    while (nanosleep(&wait, &wait) == -1 && errno == EINTR)
00090    {
00091       ThreadImpl::testCancel();
00092    }
00093 #elif OW_WIN32
00094    Sleep(milliSeconds);
00095 #else
00096    timeval now, end;
00097    unsigned long microSeconds = milliSeconds * 1000;
00098    const UInt32 loopMicroSeconds = 100 * 1000; 
00099    gettimeofday(&now, NULL);
00100    end = now;
00101    end.tv_sec  += microSeconds / 1000000;
00102    end.tv_usec += microSeconds % 1000000;
00103    while ((now.tv_sec < end.tv_sec)
00104        || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec)))
00105    {
00106       timeval tv;
00107       tv.tv_sec = end.tv_sec - now.tv_sec;
00108       if (end.tv_usec >= now.tv_usec)
00109       {
00110          tv.tv_usec = end.tv_usec - now.tv_usec;
00111       }
00112       else
00113       {
00114          tv.tv_sec--;
00115          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00116       }
00117       if (tv.tv_sec > 0 || tv.tv_usec > loopMicroSeconds)
00118       {
00119          tv.tv_sec = 0;
00120          tv.tv_usec = loopMicroSeconds;
00121       }
00122       ThreadImpl::testCancel();
00123       select(0, NULL, NULL, NULL, &tv);
00124       gettimeofday(&now, NULL);
00125    }
00126 #endif
00127 }
00129 
00130 void
00131 yield()
00132 {
00133 #if defined(OW_HAVE_SCHED_YIELD)
00134    sched_yield();
00135 #elif defined(OW_WIN32)
00136    ThreadImpl::testCancel();
00137    ::SwitchToThread();
00138 #else
00139    ThreadImpl::sleep(1);
00140 #endif
00141 }
00142 
00143 #if defined(OW_USE_PTHREAD)
00144 namespace {
00145 struct LocalThreadParm
00146 {
00147    ThreadFunction m_func;
00148    void* m_funcParm;
00149 };
00150 extern "C" {
00151 static void*
00152 threadStarter(void* arg)
00153 {
00154    
00155    
00156    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00157    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00158 
00159    
00160    sigset_t signalSet;
00161    int rv = sigfillset(&signalSet);
00162    OW_ASSERT(rv == 0);
00163    rv = sigdelset(&signalSet, SIGUSR1);
00164    OW_ASSERT(rv == 0);
00165    rv = pthread_sigmask(SIG_SETMASK, &signalSet, 0);
00166    OW_ASSERT(rv == 0);
00167 
00168    LocalThreadParm* parg = static_cast<LocalThreadParm*>(arg);
00169    ThreadFunction func = parg->m_func;
00170    void* funcParm = parg->m_funcParm;
00171    delete parg;
00172    Int32 rval = (*func)(funcParm);
00173    void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00174    pthread_exit(prval);
00175    return prval;
00176 }
00177 }
00178 
00179 
00180 struct default_stack_size
00181 {
00182    default_stack_size()
00183    {
00184       
00185       val = 0;
00186       needsSetting = false;
00187 
00188 
00189 
00190 
00191 #ifdef _POSIX_THREAD_ATTR_STACKSIZE
00192       pthread_attr_t stack_size_attr;
00193       if (pthread_attr_init(&stack_size_attr) != 0)
00194       {
00195          return;
00196       }
00197       if (pthread_attr_getstacksize(&stack_size_attr, &val) != 0)
00198       {
00199          return;
00200       }
00201 
00202       if (val < 1048576) 
00203       {
00204          val = 1048576; 
00205          needsSetting = true;
00206       }
00207 #ifdef PTHREAD_STACK_MIN
00208       if (PTHREAD_STACK_MIN > val) 
00209       {
00210          val = PTHREAD_STACK_MIN;
00211          needsSetting = true;
00212       }
00213 #endif
00214 #endif
00215    }
00216    static size_t val;
00217    static bool needsSetting;
00218 };
00219 size_t default_stack_size::val = 0;
00220 bool default_stack_size::needsSetting(false);
00221 default_stack_size g_theDefaultStackSize;
00223 pthread_once_t once_control = PTHREAD_ONCE_INIT;
00224 pthread_key_t theKey;
00225 extern "C" {
00227 static void initializeTheKey()
00228 {
00229    pthread_key_create(&theKey,NULL);
00230    
00231    struct sigaction temp;
00232    memset(&temp, '\0', sizeof(temp));
00233    sigaction(SIGUSR1, 0, &temp);
00234    if (temp.sa_handler != SIG_IGN)
00235    {
00236       temp.sa_handler = SIG_IGN;
00237       sigemptyset(&temp.sa_mask);
00238       temp.sa_flags = 0;
00239       sigaction(SIGUSR1, &temp, NULL);
00240    }
00241 }
00242 } 
00243 } 
00245 
00246 int
00247 createThread(Thread_t& handle, ThreadFunction func,
00248    void* funcParm, UInt32 threadFlags)
00249 {
00250    int cc = 0;
00251    pthread_attr_t attr;
00252    pthread_attr_init(&attr);
00253    if (!(threadFlags & OW_THREAD_FLG_JOINABLE))
00254    {
00255       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00256    }
00257 
00258 #if !defined(OW_VALGRIND_SUPPORT) // valgrind doesn't like us to set the stack size
00259    
00260    if (default_stack_size::needsSetting)
00261    {
00262       pthread_attr_setstacksize(&attr, default_stack_size::val);
00263    }
00264 #endif
00265 
00266    LocalThreadParm* parg = new LocalThreadParm;
00267    parg->m_func = func;
00268    parg->m_funcParm = funcParm;
00269    if (pthread_create(&handle, &attr, threadStarter, parg) != 0)
00270    {
00271       cc = -1;
00272    }
00273    pthread_attr_destroy(&attr);
00274    return cc;
00275 }
00277 
00278 void
00279 exitThread(Thread_t&, Int32 rval)
00280 {
00281    void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00282    pthread_exit(prval);
00283 }
00284 
00285 
00286 #if defined(OW_SIZEOF_PTHREAD_T)
00287 #if OW_SIZEOF_PTHREAD_T == 2
00288 #define OW_THREAD_CONVERTER UInt16
00289 #elif OW_SIZEOF_PTHREAD_T == 4
00290 #define OW_THREAD_CONVERTER UInt32
00291 #elif OW_SIZEOF_PTHREAD_T == 8
00292 #define OW_THREAD_CONVERTER UInt64
00293 #else
00294 #error Unexpected size for pthread_t
00295 #endif 
00296 #else
00297 #error No pthread_t size was found!
00298 #endif 
00299 
00300 UInt64 thread_t_ToUInt64(Thread_t thr)
00301 {
00302    return UInt64(OW_THREAD_CONVERTER(thr));
00303 }
00304 #undef OW_THREAD_CONVERTER
00305 
00307 
00308 void
00309 destroyThread(Thread_t& )
00310 {
00311 }
00313 
00314 int
00315 setThreadDetached(Thread_t& handle)
00316 {
00317    int cc = pthread_detach(handle);
00318    if (cc != 0)
00319    {
00320       if (cc != EINVAL)
00321       {
00322          cc = -1;
00323       }
00324    }
00325    return cc;
00326 }
00328 
00329 int
00330 joinThread(Thread_t& handle, Int32& rval)
00331 {
00332    void* prval(0);
00333    if ((errno = pthread_join(handle, &prval)) == 0)
00334    {
00335       rval = static_cast<Int32>(reinterpret_cast<ptrdiff_t>(prval));
00336       return 0;
00337    }
00338    else
00339    {
00340       return 1;
00341    }
00342 }
00344 void
00345 testCancel()
00346 {
00347    
00348    pthread_once(&once_control, &initializeTheKey);
00349    Thread* theThread = reinterpret_cast<Thread*>(pthread_getspecific(theKey));
00350    if (theThread == 0)
00351    {
00352       return;
00353    }
00354    NonRecursiveMutexLock l(theThread->m_cancelLock);
00355    if (theThread->m_cancelRequested)
00356    {
00357       
00358       
00359       
00360       
00361       
00362       throw ThreadCancelledException();
00363    }
00364 }
00366 void saveThreadInTLS(void* pTheThread)
00367 {
00368    
00369    pthread_once(&once_control, &initializeTheKey);
00370    int rc;
00371    if ((rc = pthread_setspecific(theKey, pTheThread)) != 0)
00372    {
00373       OW_THROW(ThreadException, Format("pthread_setspecific failed.  error = %1(%2)", rc, strerror(rc)).c_str());
00374    }
00375 }
00377 void sendSignalToThread(Thread_t threadID, int signo)
00378 {
00379    int rc;
00380    if ((rc = pthread_kill(threadID, signo)) != 0)
00381    {
00382       OW_THROW(ThreadException, Format("pthread_kill failed.  error = %1(%2)", rc, strerror(rc)).c_str());
00383    }
00384 }
00386 void cancel(Thread_t threadID)
00387 {
00388    int rc;
00389    if ((rc = pthread_cancel(threadID)) != 0)
00390    {
00391       OW_THROW(ThreadException, Format("pthread_cancel failed.  error = %1(%2)", rc, strerror(rc)).c_str());
00392    }
00393 }
00394 #endif // #ifdef OW_USE_PTHREAD
00395 
00396 #if defined(OW_WIN32)
00397 
00398 namespace {
00399 
00400 struct WThreadInfo
00401 {
00402    HANDLE   handle;
00403    OW_NAMESPACE::Thread* pTheThread;
00404 };
00405 
00406 typedef Map<DWORD, WThreadInfo> Win32ThreadMap;
00407 Win32ThreadMap g_threads;
00408 Mutex g_threadsGuard;
00409 
00410 struct LocalThreadParm
00411 {
00412    ThreadFunction m_func;
00413    void* m_funcParm;
00414 };
00415 
00417 extern "C" {
00418 unsigned __stdcall threadStarter(void* arg)
00419 {
00420    LocalThreadParm* parg = reinterpret_cast<LocalThreadParm*>(arg);
00421    ThreadFunction func = parg->m_func;
00422    void* funcParm = parg->m_funcParm;
00423    delete parg;
00424    Int32 rval = (*func)(funcParm);
00425    ::_endthreadex(static_cast<unsigned>(rval));
00426    return rval;
00427 }
00428 }  
00429 
00431 void
00432 addThreadToMap(DWORD threadId, HANDLE threadHandle)
00433 {
00434    MutexLock ml(g_threadsGuard);
00435    WThreadInfo wi;
00436    wi.handle = threadHandle;
00437    wi.pTheThread = 0;
00438    g_threads[threadId] = wi;
00439 }
00440 
00442 HANDLE
00443 getThreadHandle(DWORD threadId)
00444 {
00445    MutexLock ml(g_threadsGuard);
00446    HANDLE chdl = 0;
00447    Win32ThreadMap::iterator it = g_threads.find(threadId);
00448    if (it != g_threads.end())
00449    {
00450       chdl = it->second.handle;
00451    }
00452    return chdl;
00453 }
00454 
00456 void
00457 setThreadPointer(DWORD threadId, Thread* pTheThread)
00458 {
00459    MutexLock ml(g_threadsGuard);
00460    Win32ThreadMap::iterator it = g_threads.find(threadId);
00461    if (it != g_threads.end())
00462    {
00463       it->second.pTheThread = pTheThread;
00464    }
00465 }
00466 
00468 HANDLE
00469 removeThreadFromMap(DWORD threadId)
00470 {
00471    MutexLock ml(g_threadsGuard);
00472    HANDLE chdl = 0;
00473    Win32ThreadMap::iterator it = g_threads.find(threadId);
00474    if (it != g_threads.end())
00475    {
00476       chdl = it->second.handle;
00477       g_threads.erase(it);
00478    }
00479    return chdl;
00480 }
00481 
00483 Thread*
00484 getThreadObject(DWORD threadId)
00485 {
00486    Thread* pTheThread = 0;
00487    MutexLock ml(g_threadsGuard);
00488    Win32ThreadMap::iterator it = g_threads.find(threadId);
00489    if (it != g_threads.end())
00490    {
00491       pTheThread = it->second.pTheThread;
00492    }
00493    return pTheThread;
00494 }
00495 
00496 }  
00497 
00499 
00500 int
00501 createThread(Thread_t& handle, ThreadFunction func,
00502    void* funcParm, UInt32 threadFlags)
00503 {
00504    int cc = -1;
00505    HANDLE hThread;
00506    unsigned threadId;
00507 
00508    LocalThreadParm* parg = new LocalThreadParm;
00509    parg->m_func = func;
00510    parg->m_funcParm = funcParm;
00511    hThread = reinterpret_cast<HANDLE>(::_beginthreadex(NULL, 0, threadStarter,
00512       parg, 0, &threadId));
00513    if (hThread != 0)
00514    {
00515       addThreadToMap(threadId, hThread);
00516       handle = threadId;
00517       cc = 0;
00518    }
00519 
00520    return cc;
00521 }
00523 
00524 void
00525 exitThread(Thread_t&, Int32 rval)
00526 {
00527    ::_endthreadex(static_cast<unsigned>(rval));
00528 }
00529 
00531 
00532 UInt64 thread_t_ToUInt64(Thread_t thr)
00533 {
00534    
00535    OW_ASSERTMSG(sizeof(unsigned long) >= sizeof(Thread_t),"  Thread_t truncated!");
00536    return static_cast<UInt64>(thr);
00537 }
00538 
00540 
00541 void
00542 destroyThread(Thread_t& threadId)
00543 {
00544    HANDLE thdl = removeThreadFromMap(threadId);
00545    if (thdl != 0)
00546    {
00547       ::CloseHandle(thdl);
00548    }
00549 }
00551 
00552 int
00553 setThreadDetached(Thread_t& handle)
00554 {
00555    
00556    return 0;
00557 }
00559 
00560 int
00561 joinThread(Thread_t& threadId, Int32& rvalArg)
00562 {
00563    int cc = -1;
00564    DWORD rval;
00565    HANDLE thdl = getThreadHandle(threadId);
00566    if (thdl != 0)
00567    {
00568       if (::WaitForSingleObject(thdl, INFINITE) != WAIT_FAILED)
00569       {
00570          if (::GetExitCodeThread(thdl, &rval) != 0)
00571          {
00572             rvalArg = static_cast<Int32>(rval);
00573             cc = 0;
00574          }
00575       }
00576    }
00577    return cc;
00578 }
00579 
00581 void
00582 testCancel()
00583 {
00584    DWORD threadId = ThreadImpl::currentThread();
00585    Thread* pTheThread = getThreadObject(threadId);
00586    if (pTheThread)
00587    {
00588       NonRecursiveMutexLock l(pTheThread->m_cancelLock);
00589       if (pTheThread->m_cancelRequested)
00590       {
00591          
00592          
00593          
00594          
00595          
00596          throw ThreadCancelledException();
00597       }
00598    }
00599 }
00601 void saveThreadInTLS(void* pThreadArg)
00602 {
00603    Thread* pThread = static_cast<Thread*>(pThreadArg);
00604    DWORD threadId = pThread->getId();
00605     setThreadPointer(threadId, pThread);
00606 }
00608 void sendSignalToThread(Thread_t threadID, int signo)
00609 {
00610 }
00612 void cancel(Thread_t threadId)
00613 {
00614    HANDLE thdl = getThreadHandle(threadId);
00615    if (thdl != 0)
00616    {
00617       ::TerminateThread(thdl, -1);
00618    }
00619 }
00620 
00621 #endif // #ifdef OW_WIN32
00622 } 
00623 
00624 } 
00625