00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00036 #include "OW_config.h"
00037 #include "OW_Select.hpp"
00038 #include "OW_AutoPtr.hpp"
00039 #include "OW_Assertion.hpp"
00040 #include "OW_Thread.hpp" 
00041 
00042 #if defined(OW_WIN32)
00043 #include <cassert>
00044 #endif
00045 
00046 extern "C"
00047 {
00048 
00049 #ifndef OW_WIN32
00050  #ifdef OW_HAVE_SYS_EPOLL_H
00051   #include <sys/epoll.h>
00052  #endif
00053  #if defined (OW_HAVE_SYS_POLL_H)
00054   #include <sys/poll.h>
00055  #endif
00056  #if defined (OW_HAVE_SYS_SELECT_H)
00057   #include <sys/select.h>
00058  #endif
00059 #endif
00060 
00061 #ifdef OW_HAVE_SYS_TIME_H
00062  #include <sys/time.h>
00063 #endif
00064 
00065 #include <sys/types.h>
00066 
00067 #ifdef OW_HAVE_UNISTD_H
00068  #include <unistd.h>
00069 #endif
00070 
00071 #include <errno.h>
00072 }
00073 
00074 namespace OW_NAMESPACE
00075 {
00076 
00077 namespace Select
00078 {
00079 #if defined(OW_WIN32)
00080 
00081 int
00082 selectRW(SelectObjectArray& selarray, UInt32 ms)
00083 {
00084    int rc;
00085    size_t hcount = static_cast<DWORD>(selarray.size());
00086    AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]);
00087 
00088    size_t handleidx = 0;
00089    for (size_t i = 0; i < selarray.size(); i++, handleidx++)
00090    {
00091       if(selarray[i].s.sockfd != INVALID_SOCKET
00092          && selarray[i].s.networkevents)
00093       {
00094          ::WSAEventSelect(selarray[i].s.sockfd, 
00095             selarray[i].s.event, selarray[i].s.networkevents);
00096       }
00097             
00098       hdls[handleidx] = selarray[i].s.event;
00099    }
00100 
00101    DWORD timeout = (ms != ~0U) ? ms : INFINITE;
00102    DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timeout);
00103 
00104    assert(cc != WAIT_ABANDONED);
00105 
00106    switch (cc)
00107    {
00108       case WAIT_FAILED:
00109          rc = Select::SELECT_ERROR;
00110          break;
00111       case WAIT_TIMEOUT:
00112          rc = Select::SELECT_TIMEOUT;
00113          break;
00114       default:
00115          rc = cc - WAIT_OBJECT_0;
00116          
00117          
00118          
00119          if(selarray[rc].s.sockfd != INVALID_SOCKET)
00120          {
00121             if(selarray[rc].s.networkevents
00122                && selarray[rc].s.doreset == false)
00123             {
00124                ::WSAEventSelect(selarray[rc].s.sockfd, 
00125                   selarray[rc].s.event, selarray[rc].s.networkevents);
00126             }
00127             else
00128             {
00129                
00130                ::WSAEventSelect(selarray[rc].s.sockfd, 
00131                   selarray[rc].s.event, 0);
00132                u_long ioctlarg = 0;
00133                ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
00134             }
00135          }
00136          break;
00137    }
00138 
00139    if( rc < 0 )
00140       return rc;
00141 
00142    int availableCount = 0;
00143    for (size_t i = 0; i < selarray.size(); i++)
00144    {
00145       if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
00146       {
00147          if( selarray[i].waitForRead )
00148             selarray[i].readAvailable = true;
00149          if( selarray[i].waitForWrite )
00150             selarray[i].writeAvailable = true;
00151          ++availableCount;
00152       }
00153       else
00154       {
00155          selarray[i].readAvailable = false;
00156          selarray[i].writeAvailable = false;
00157       }
00158    }
00159    return availableCount;
00160 }
00161 
00162 
00163 #else
00164 
00166 
00167 int
00168 selectRWEpoll(SelectObjectArray& selarray, UInt32 ms)
00169 {
00170 #ifdef OW_HAVE_SYS_EPOLL_H
00171    int lerrno, ecc = 0;
00172    int timeout;
00173    AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]);
00174    int epfd = epoll_create(selarray.size());
00175    if(epfd == -1)
00176    {
00177       if (errno == ENOSYS) 
00178       {
00179          return SELECT_NOT_IMPLEMENTED;
00180       }
00181       
00182       return Select::SELECT_ERROR;
00183    }
00184 
00185    UInt32 const read_events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
00186    UInt32 const write_events = EPOLLOUT | EPOLLERR | EPOLLHUP;
00187    for (size_t i = 0; i < selarray.size(); i++)
00188    {
00189       OW_ASSERT(selarray[i].s >= 0);
00190       selarray[i].readAvailable = false;
00191       selarray[i].writeAvailable = false;
00192       selarray[i].wasError = false;
00193       events[i].data.u32 = i;
00194       events[i].events = 0;
00195       if(selarray[i].waitForRead)
00196       {
00197          events[i].events |= read_events;
00198       }
00199       if(selarray[i].waitForWrite)
00200       {
00201          events[i].events |= write_events;
00202       }
00203 
00204       if(epoll_ctl(epfd, EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
00205       {
00206          int errnum = errno;
00207 			::close(epfd);
00208          
00209          return errnum == EPERM ? SELECT_NOT_IMPLEMENTED : SELECT_ERROR;
00210       }
00211    }
00212 
00213    
00214    const Int32 loopMicroSeconds = 100 * 1000; 
00215    timeval now, end;
00216    gettimeofday(&now, NULL);
00217    end = now;
00218    end.tv_sec  += ms / 1000;
00219    end.tv_usec += (ms % 1000) * 1000;
00220 
00221    while ((ecc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00222        || ((now.tv_sec == end.tv_sec) && (now.tv_usec <= end.tv_usec))))
00223    {
00224       timeval tv;
00225       tv.tv_sec = end.tv_sec - now.tv_sec;
00226       if (end.tv_usec >= now.tv_usec)
00227       {
00228          tv.tv_usec = end.tv_usec - now.tv_usec;
00229       }
00230       else
00231       {
00232          tv.tv_sec--;
00233          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00234       }
00235 
00236       if ((tv.tv_sec != 0) 
00237          || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00238       {
00239          tv.tv_sec = 0;
00240          tv.tv_usec = loopMicroSeconds;
00241       }
00242 
00243       timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
00244       Thread::testCancel();
00245       ecc = epoll_wait(epfd, events.get(), selarray.size(), timeout);
00246       lerrno = errno;
00247       Thread::testCancel();
00248       gettimeofday(&now, NULL);
00249    }
00250 
00251 	::close(epfd);
00252    if (ecc < 0)
00253    {
00254       return (lerrno == EINTR) ? Select::SELECT_INTERRUPTED : Select::SELECT_ERROR;
00255    }
00256    if (ecc == 0)
00257    {
00258       return Select::SELECT_TIMEOUT;
00259    }
00260 
00261    for(int i = 0; i < ecc; i++)
00262    {
00263       SelectObject & so = selarray[events[i].data.u32];
00264       so.readAvailable = so.waitForRead && (events[i].events & read_events);
00265       so.writeAvailable = so.waitForWrite && (events[i].events & write_events);
00266    }
00267 
00268    return ecc;
00269 #else
00270    return SELECT_NOT_IMPLEMENTED;
00271 #endif
00272 }
00273 
00275 
00276 int
00277 selectRWPoll(SelectObjectArray& selarray, UInt32 ms)
00278 {
00279 #if defined (OW_HAVE_SYS_POLL_H)
00280    int lerrno, rc = 0;
00281 
00282    AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]);
00283 
00284    
00285    timeval now, end;
00286    const Int32 loopMicroSeconds = 100 * 1000; 
00287    gettimeofday(&now, NULL);
00288    end = now;
00289    end.tv_sec  += ms / 1000;
00290    end.tv_usec += (ms % 1000) * 1000;
00291    while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00292        || ((now.tv_sec == end.tv_sec) && (now.tv_usec <= end.tv_usec))))
00293    {
00294       for (size_t i = 0; i < selarray.size(); i++)
00295       {
00296          OW_ASSERT(selarray[i].s >= 0);
00297          selarray[i].readAvailable = false;
00298          selarray[i].writeAvailable = false;
00299          selarray[i].wasError = false;
00300          pfds[i].revents = 0;
00301          pfds[i].fd = selarray[i].s;
00302          pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
00303          if(selarray[i].waitForWrite)
00304             pfds[i].events |= POLLOUT;
00305       }
00306 
00307       timeval tv;
00308       tv.tv_sec = end.tv_sec - now.tv_sec;
00309       if (end.tv_usec >= now.tv_usec)
00310       {
00311          tv.tv_usec = end.tv_usec - now.tv_usec;
00312       }
00313       else
00314       {
00315          tv.tv_sec--;
00316          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00317       }
00318 
00319       if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00320       {
00321          tv.tv_sec = 0;
00322          tv.tv_usec = loopMicroSeconds;
00323       }
00324 
00325       
00326       int loopMSecs = tv.tv_sec * 1000 + tv.tv_usec / 1000; 
00327 
00328       Thread::testCancel();
00329       rc = ::poll(pfds.get(), selarray.size(), loopMSecs); 
00330       lerrno = errno;
00331       Thread::testCancel();
00332 
00333       gettimeofday(&now, NULL);
00334    }
00335    
00336    if (rc < 0)
00337    {
00338       if (lerrno == EINTR)
00339       {
00340 #ifdef OW_NETWARE
00341          
00342          
00343          
00344          
00345          pthread_yield();
00346 #endif
00347          return Select::SELECT_INTERRUPTED;
00348       }
00349       else
00350       {
00351          return Select::SELECT_ERROR;
00352       }
00353    }
00354    if (rc == 0)
00355    {
00356       return Select::SELECT_TIMEOUT;
00357    }
00358    for (size_t i = 0; i < selarray.size(); i++)
00359    {
00360       if (pfds[i].revents & (POLLERR | POLLNVAL))
00361       {
00362          selarray[i].wasError = true;
00363       }
00364       else
00365       {
00366          if(selarray[i].waitForRead)
00367          {
00368             selarray[i].readAvailable = (pfds[i].revents & 
00369                (POLLIN | POLLPRI | POLLHUP));
00370          }
00371 
00372          if(selarray[i].waitForWrite)
00373          {
00374             selarray[i].writeAvailable = (pfds[i].revents &
00375                (POLLOUT | POLLHUP));
00376          }
00377       }
00378    }
00379 
00380    return rc;
00381 #else
00382    return SELECT_NOT_IMPLEMENTED;
00383 #endif
00384 }
00386 
00387 int
00388 selectRWSelect(SelectObjectArray& selarray, UInt32 ms)
00389 {
00390 #if defined (OW_HAVE_SYS_SELECT_H)
00391    int lerrno, rc = 0;
00392    fd_set ifds;
00393    fd_set ofds;
00394 
00395    
00396    timeval now, end;
00397    const Int32 loopMicroSeconds = 100 * 1000; 
00398    gettimeofday(&now, NULL);
00399    end = now;
00400    end.tv_sec  += ms / 1000;
00401    end.tv_usec += (ms % 1000) * 1000;
00402    while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00403        || ((now.tv_sec == end.tv_sec) && (now.tv_usec <= end.tv_usec))))
00404    {
00405       int maxfd = 0;
00406       FD_ZERO(&ifds);
00407       FD_ZERO(&ofds);
00408       for (size_t i = 0; i < selarray.size(); ++i)
00409       {
00410          int fd = selarray[i].s;
00411          OW_ASSERT(fd >= 0);
00412          if (maxfd < fd)
00413          {
00414             maxfd = fd;
00415          }
00416          if (fd < 0 || fd >= FD_SETSIZE)
00417          {
00418             return Select::SELECT_ERROR;
00419          }
00420          if (selarray[i].waitForRead)
00421          {
00422             FD_SET(fd, &ifds);
00423          }
00424          if (selarray[i].waitForWrite)
00425          {
00426             FD_SET(fd, &ofds);
00427          }
00428       }
00429 
00430       timeval tv;
00431       tv.tv_sec = end.tv_sec - now.tv_sec;
00432       if (end.tv_usec >= now.tv_usec)
00433       {
00434          tv.tv_usec = end.tv_usec - now.tv_usec;
00435       }
00436       else
00437       {
00438          tv.tv_sec--;
00439          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00440       }
00441 
00442       if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00443       {
00444          tv.tv_sec = 0;
00445          tv.tv_usec = loopMicroSeconds;
00446       }
00447 
00448       Thread::testCancel();
00449       rc = ::select(maxfd+1, &ifds, &ofds, NULL, &tv);
00450       lerrno = errno;
00451       Thread::testCancel();
00452 
00453       gettimeofday(&now, NULL);
00454    }
00455    
00456    if (rc < 0)
00457    {
00458       if (lerrno == EINTR)
00459       {
00460 #ifdef OW_NETWARE
00461          
00462          
00463          
00464          
00465          pthread_yield();
00466 #endif
00467          return Select::SELECT_INTERRUPTED;
00468       }
00469       else
00470       {
00471          return Select::SELECT_ERROR;
00472       }
00473    }
00474    if (rc == 0)
00475    {
00476       return Select::SELECT_TIMEOUT;
00477    }
00478    int availableCount = 0;
00479    int cval;
00480    for (size_t i = 0; i < selarray.size(); i++)
00481    {
00482       selarray[i].wasError = false;
00483       cval = 0;
00484       if (FD_ISSET(selarray[i].s, &ifds))
00485       {
00486          selarray[i].readAvailable = true;
00487          cval = 1;
00488       }
00489       else
00490       {
00491          selarray[i].readAvailable = false;
00492       }
00493 
00494       if (FD_ISSET(selarray[i].s, &ofds))
00495       {
00496          selarray[i].writeAvailable = true;
00497          cval = 1;
00498       }
00499       else
00500       {
00501          selarray[i].writeAvailable = false;
00502       }
00503 
00504       availableCount += cval;
00505 
00506    }
00507       
00508    return availableCount;
00509 #else
00510    return SELECT_NOT_IMPLEMENTED;
00511 #endif
00512 }
00513 
00514 int
00515 selectRW(SelectObjectArray& selarray, UInt32 ms)
00516 {
00517    int rv = selectRWEpoll(selarray, ms);
00518    if (rv != SELECT_NOT_IMPLEMENTED)
00519    {
00520       return rv;
00521    }
00522 
00523    rv = selectRWPoll(selarray, ms);
00524    if (rv != SELECT_NOT_IMPLEMENTED)
00525    {
00526       return rv;
00527    }
00528 
00529    rv = selectRWSelect(selarray, ms);
00530    OW_ASSERT(rv != SELECT_NOT_IMPLEMENTED);
00531    return rv;
00532 }
00533 
00535 #endif   // #else OW_WIN32
00536 
00537 int
00538 select(const SelectTypeArray& selarray, UInt32 ms)
00539 {
00540    SelectObjectArray soa;
00541    soa.reserve(selarray.size());
00542    for (size_t i = 0; i < selarray.size(); ++i)
00543    {
00544       SelectObject curObj(selarray[i]);
00545       curObj.waitForRead = true;
00546       soa.push_back(curObj);
00547    }
00548    int rv = selectRW(soa, ms);
00549    if (rv < 0)
00550    {
00551       return rv;
00552    }
00553 
00554    
00555    for (size_t i = 0; i < soa.size(); ++i)
00556    {
00557       if (soa[i].readAvailable)
00558       {
00559          return i;
00560       }
00561    }
00562    return SELECT_ERROR;
00563 }
00564 
00565 } 
00566 
00567 } 
00568