OW_Select.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2001-2004 Vintela, Inc. All rights reserved.
00003 *
00004 * Redistribution and use in source and binary forms, with or without
00005 * modification, are permitted provided that the following conditions are met:
00006 *
00007 *  - Redistributions of source code must retain the above copyright notice,
00008 *    this list of conditions and the following disclaimer.
00009 *
00010 *  - Redistributions in binary form must reproduce the above copyright notice,
00011 *    this list of conditions and the following disclaimer in the documentation
00012 *    and/or other materials provided with the distribution.
00013 *
00014 *  - Neither the name of Vintela, Inc. nor the names of its
00015 *    contributors may be used to endorse or promote products derived from this
00016 *    software without specific prior written permission.
00017 *
00018 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00019 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc. OR THE CONTRIBUTORS
00022 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 * POSSIBILITY OF SUCH DAMAGE.
00029 *******************************************************************************/
00030 
00036 #include "OW_config.h"
00037 #include "OW_Select.hpp"
00038 #include "OW_AutoPtr.hpp"
00039 #include "OW_Assertion.hpp"
00040 #include "OW_Thread.hpp" // for testCancel()
00041 
00042 #if defined(OW_WIN32)
00043 #include <cassert>
00044 #endif
00045 
00046 extern "C"
00047 {
00048 
00049 #ifndef OW_WIN32
00050  #ifdef OW_HAVE_SYS_EPOLL_H
00051   #include <sys/epoll.h>
00052  #endif
00053  #if defined (OW_HAVE_SYS_POLL_H)
00054   #include <sys/poll.h>
00055  #endif
00056  #if defined (OW_HAVE_SYS_SELECT_H)
00057   #include <sys/select.h>
00058  #endif
00059 #endif
00060 
00061 #ifdef OW_HAVE_SYS_TIME_H
00062  #include <sys/time.h>
00063 #endif
00064 
00065 #include <sys/types.h>
00066 
00067 #ifdef OW_HAVE_UNISTD_H
00068  #include <unistd.h>
00069 #endif
00070 
00071 #include <errno.h>
00072 }
00073 
00074 namespace OW_NAMESPACE
00075 {
00076 
00077 namespace Select
00078 {
00079 #if defined(OW_WIN32)
00080 
00081 int
00082 selectRW(SelectObjectArray& selarray, UInt32 ms)
00083 {
00084    int rc;
00085    size_t hcount = static_cast<DWORD>(selarray.size());
00086    AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]);
00087 
00088    size_t handleidx = 0;
00089    for (size_t i = 0; i < selarray.size(); i++, handleidx++)
00090    {
00091       if(selarray[i].s.sockfd != INVALID_SOCKET
00092          && selarray[i].s.networkevents)
00093       {
00094          ::WSAEventSelect(selarray[i].s.sockfd, 
00095             selarray[i].s.event, selarray[i].s.networkevents);
00096       }
00097             
00098       hdls[handleidx] = selarray[i].s.event;
00099    }
00100 
00101    DWORD timeout = (ms != ~0U) ? ms : INFINITE;
00102    DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timeout);
00103 
00104    assert(cc != WAIT_ABANDONED);
00105 
00106    switch (cc)
00107    {
00108       case WAIT_FAILED:
00109          rc = Select::SELECT_ERROR;
00110          break;
00111       case WAIT_TIMEOUT:
00112          rc = Select::SELECT_TIMEOUT;
00113          break;
00114       default:
00115          rc = cc - WAIT_OBJECT_0;
00116          
00117          // If this is a socket, set it back to 
00118          // blocking mode
00119          if(selarray[rc].s.sockfd != INVALID_SOCKET)
00120          {
00121             if(selarray[rc].s.networkevents
00122                && selarray[rc].s.doreset == false)
00123             {
00124                ::WSAEventSelect(selarray[rc].s.sockfd, 
00125                   selarray[rc].s.event, selarray[rc].s.networkevents);
00126             }
00127             else
00128             {
00129                // Set socket back to blocking
00130                ::WSAEventSelect(selarray[rc].s.sockfd, 
00131                   selarray[rc].s.event, 0);
00132                u_long ioctlarg = 0;
00133                ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
00134             }
00135          }
00136          break;
00137    }
00138 
00139    if( rc < 0 )
00140       return rc;
00141 
00142    int availableCount = 0;
00143    for (size_t i = 0; i < selarray.size(); i++)
00144    {
00145       if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
00146       {
00147          if( selarray[i].waitForRead )
00148             selarray[i].readAvailable = true;
00149          if( selarray[i].waitForWrite )
00150             selarray[i].writeAvailable = true;
00151          ++availableCount;
00152       }
00153       else
00154       {
00155          selarray[i].readAvailable = false;
00156          selarray[i].writeAvailable = false;
00157       }
00158    }
00159    return availableCount;
00160 }
00161 
00162 
00163 #else
00164 
00166 // epoll version
00167 int
00168 selectRWEpoll(SelectObjectArray& selarray, UInt32 ms)
00169 {
00170 #ifdef OW_HAVE_SYS_EPOLL_H
00171    int lerrno, ecc = 0;
00172    int timeout;
00173    AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]);
00174    int epfd = epoll_create(selarray.size());
00175    if(epfd == -1)
00176    {
00177       if (errno == ENOSYS) // kernel doesn't support it
00178       {
00179          return SELECT_NOT_IMPLEMENTED;
00180       }
00181       // Need to return something else?
00182       return Select::SELECT_ERROR;
00183    }
00184 
00185    UInt32 const read_events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
00186    UInt32 const write_events = EPOLLOUT | EPOLLERR | EPOLLHUP;
00187    for (size_t i = 0; i < selarray.size(); i++)
00188    {
00189       OW_ASSERT(selarray[i].s >= 0);
00190       selarray[i].readAvailable = false;
00191       selarray[i].writeAvailable = false;
00192       selarray[i].wasError = false;
00193       events[i].data.u32 = i;
00194       events[i].events = 0;
00195       if(selarray[i].waitForRead)
00196       {
00197          events[i].events |= read_events;
00198       }
00199       if(selarray[i].waitForWrite)
00200       {
00201          events[i].events |= write_events;
00202       }
00203 
00204       if(epoll_ctl(epfd, EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
00205       {
00206          int errnum = errno;
00207 			::close(epfd);
00208          // Need to return something else?
00209          return errnum == EPERM ? SELECT_NOT_IMPLEMENTED : SELECT_ERROR;
00210       }
00211    }
00212 
00213    // here we spin checking for thread cancellation every so often.
00214    const Int32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
00215    timeval now, end;
00216    gettimeofday(&now, NULL);
00217    end = now;
00218    end.tv_sec  += ms / 1000;
00219    end.tv_usec += (ms % 1000) * 1000;
00220 
00221    while ((ecc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00222        || ((now.tv_sec == end.tv_sec) && (now.tv_usec <= end.tv_usec))))
00223    {
00224       timeval tv;
00225       tv.tv_sec = end.tv_sec - now.tv_sec;
00226       if (end.tv_usec >= now.tv_usec)
00227       {
00228          tv.tv_usec = end.tv_usec - now.tv_usec;
00229       }
00230       else
00231       {
00232          tv.tv_sec--;
00233          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00234       }
00235 
00236       if ((tv.tv_sec != 0) 
00237          || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00238       {
00239          tv.tv_sec = 0;
00240          tv.tv_usec = loopMicroSeconds;
00241       }
00242 
00243       timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
00244       Thread::testCancel();
00245       ecc = epoll_wait(epfd, events.get(), selarray.size(), timeout);
00246       lerrno = errno;
00247       Thread::testCancel();
00248       gettimeofday(&now, NULL);
00249    }
00250 
00251 	::close(epfd);
00252    if (ecc < 0)
00253    {
00254       return (lerrno == EINTR) ? Select::SELECT_INTERRUPTED : Select::SELECT_ERROR;
00255    }
00256    if (ecc == 0)
00257    {
00258       return Select::SELECT_TIMEOUT;
00259    }
00260 
00261    for(int i = 0; i < ecc; i++)
00262    {
00263       SelectObject & so = selarray[events[i].data.u32];
00264       so.readAvailable = so.waitForRead && (events[i].events & read_events);
00265       so.writeAvailable = so.waitForWrite && (events[i].events & write_events);
00266    }
00267 
00268    return ecc;
00269 #else
00270    return SELECT_NOT_IMPLEMENTED;
00271 #endif
00272 }
00273 
00275 // poll() version
00276 int
00277 selectRWPoll(SelectObjectArray& selarray, UInt32 ms)
00278 {
00279 #if defined (OW_HAVE_SYS_POLL_H)
00280    int lerrno, rc = 0;
00281 
00282    AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]);
00283 
00284    // here we spin checking for thread cancellation every so often.
00285    timeval now, end;
00286    const Int32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
00287    gettimeofday(&now, NULL);
00288    end = now;
00289    end.tv_sec  += ms / 1000;
00290    end.tv_usec += (ms % 1000) * 1000;
00291    while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00292        || ((now.tv_sec == end.tv_sec) && (now.tv_usec <= end.tv_usec))))
00293    {
00294       for (size_t i = 0; i < selarray.size(); i++)
00295       {
00296          OW_ASSERT(selarray[i].s >= 0);
00297          selarray[i].readAvailable = false;
00298          selarray[i].writeAvailable = false;
00299          selarray[i].wasError = false;
00300          pfds[i].revents = 0;
00301          pfds[i].fd = selarray[i].s;
00302          pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
00303          if(selarray[i].waitForWrite)
00304             pfds[i].events |= POLLOUT;
00305       }
00306 
00307       timeval tv;
00308       tv.tv_sec = end.tv_sec - now.tv_sec;
00309       if (end.tv_usec >= now.tv_usec)
00310       {
00311          tv.tv_usec = end.tv_usec - now.tv_usec;
00312       }
00313       else
00314       {
00315          tv.tv_sec--;
00316          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00317       }
00318 
00319       if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00320       {
00321          tv.tv_sec = 0;
00322          tv.tv_usec = loopMicroSeconds;
00323       }
00324 
00325       // TODO optimize this. 
00326       int loopMSecs = tv.tv_sec * 1000 + tv.tv_usec / 1000; 
00327 
00328       Thread::testCancel();
00329       rc = ::poll(pfds.get(), selarray.size(), loopMSecs); 
00330       lerrno = errno;
00331       Thread::testCancel();
00332 
00333       gettimeofday(&now, NULL);
00334    }
00335    
00336    if (rc < 0)
00337    {
00338       if (lerrno == EINTR)
00339       {
00340 #ifdef OW_NETWARE
00341          // When the NetWare server is shutting down, select will
00342          // set errno to EINTR on return. If this thread does not
00343          // yield control (cooperative multitasking) then we end
00344          // up in a very tight loop and get a CPUHog server abbend.
00345          pthread_yield();
00346 #endif
00347          return Select::SELECT_INTERRUPTED;
00348       }
00349       else
00350       {
00351          return Select::SELECT_ERROR;
00352       }
00353    }
00354    if (rc == 0)
00355    {
00356       return Select::SELECT_TIMEOUT;
00357    }
00358    for (size_t i = 0; i < selarray.size(); i++)
00359    {
00360       if (pfds[i].revents & (POLLERR | POLLNVAL))
00361       {
00362          selarray[i].wasError = true;
00363       }
00364       else
00365       {
00366          if(selarray[i].waitForRead)
00367          {
00368             selarray[i].readAvailable = (pfds[i].revents & 
00369                (POLLIN | POLLPRI | POLLHUP));
00370          }
00371 
00372          if(selarray[i].waitForWrite)
00373          {
00374             selarray[i].writeAvailable = (pfds[i].revents &
00375                (POLLOUT | POLLHUP));
00376          }
00377       }
00378    }
00379 
00380    return rc;
00381 #else
00382    return SELECT_NOT_IMPLEMENTED;
00383 #endif
00384 }
00386 // ::select() version
00387 int
00388 selectRWSelect(SelectObjectArray& selarray, UInt32 ms)
00389 {
00390 #if defined (OW_HAVE_SYS_SELECT_H)
00391    int lerrno, rc = 0;
00392    fd_set ifds;
00393    fd_set ofds;
00394 
00395    // here we spin checking for thread cancellation every so often.
00396    timeval now, end;
00397    const Int32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
00398    gettimeofday(&now, NULL);
00399    end = now;
00400    end.tv_sec  += ms / 1000;
00401    end.tv_usec += (ms % 1000) * 1000;
00402    while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00403        || ((now.tv_sec == end.tv_sec) && (now.tv_usec <= end.tv_usec))))
00404    {
00405       int maxfd = 0;
00406       FD_ZERO(&ifds);
00407       FD_ZERO(&ofds);
00408       for (size_t i = 0; i < selarray.size(); ++i)
00409       {
00410          int fd = selarray[i].s;
00411          OW_ASSERT(fd >= 0);
00412          if (maxfd < fd)
00413          {
00414             maxfd = fd;
00415          }
00416          if (fd < 0 || fd >= FD_SETSIZE)
00417          {
00418             return Select::SELECT_ERROR;
00419          }
00420          if (selarray[i].waitForRead)
00421          {
00422             FD_SET(fd, &ifds);
00423          }
00424          if (selarray[i].waitForWrite)
00425          {
00426             FD_SET(fd, &ofds);
00427          }
00428       }
00429 
00430       timeval tv;
00431       tv.tv_sec = end.tv_sec - now.tv_sec;
00432       if (end.tv_usec >= now.tv_usec)
00433       {
00434          tv.tv_usec = end.tv_usec - now.tv_usec;
00435       }
00436       else
00437       {
00438          tv.tv_sec--;
00439          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00440       }
00441 
00442       if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00443       {
00444          tv.tv_sec = 0;
00445          tv.tv_usec = loopMicroSeconds;
00446       }
00447 
00448       Thread::testCancel();
00449       rc = ::select(maxfd+1, &ifds, &ofds, NULL, &tv);
00450       lerrno = errno;
00451       Thread::testCancel();
00452 
00453       gettimeofday(&now, NULL);
00454    }
00455    
00456    if (rc < 0)
00457    {
00458       if (lerrno == EINTR)
00459       {
00460 #ifdef OW_NETWARE
00461          // When the NetWare server is shutting down, select will
00462          // set errno to EINTR on return. If this thread does not
00463          // yield control (cooperative multitasking) then we end
00464          // up in a very tight loop and get a CPUHog server abbend.
00465          pthread_yield();
00466 #endif
00467          return Select::SELECT_INTERRUPTED;
00468       }
00469       else
00470       {
00471          return Select::SELECT_ERROR;
00472       }
00473    }
00474    if (rc == 0)
00475    {
00476       return Select::SELECT_TIMEOUT;
00477    }
00478    int availableCount = 0;
00479    int cval;
00480    for (size_t i = 0; i < selarray.size(); i++)
00481    {
00482       selarray[i].wasError = false;
00483       cval = 0;
00484       if (FD_ISSET(selarray[i].s, &ifds))
00485       {
00486          selarray[i].readAvailable = true;
00487          cval = 1;
00488       }
00489       else
00490       {
00491          selarray[i].readAvailable = false;
00492       }
00493 
00494       if (FD_ISSET(selarray[i].s, &ofds))
00495       {
00496          selarray[i].writeAvailable = true;
00497          cval = 1;
00498       }
00499       else
00500       {
00501          selarray[i].writeAvailable = false;
00502       }
00503 
00504       availableCount += cval;
00505 
00506    }
00507       
00508    return availableCount;
00509 #else
00510    return SELECT_NOT_IMPLEMENTED;
00511 #endif
00512 }
00513 
00514 int
00515 selectRW(SelectObjectArray& selarray, UInt32 ms)
00516 {
00517    int rv = selectRWEpoll(selarray, ms);
00518    if (rv != SELECT_NOT_IMPLEMENTED)
00519    {
00520       return rv;
00521    }
00522 
00523    rv = selectRWPoll(selarray, ms);
00524    if (rv != SELECT_NOT_IMPLEMENTED)
00525    {
00526       return rv;
00527    }
00528 
00529    rv = selectRWSelect(selarray, ms);
00530    OW_ASSERT(rv != SELECT_NOT_IMPLEMENTED);
00531    return rv;
00532 }
00533 
00535 #endif   // #else OW_WIN32
00536 
00537 int
00538 select(const SelectTypeArray& selarray, UInt32 ms)
00539 {
00540    SelectObjectArray soa;
00541    soa.reserve(selarray.size());
00542    for (size_t i = 0; i < selarray.size(); ++i)
00543    {
00544       SelectObject curObj(selarray[i]);
00545       curObj.waitForRead = true;
00546       soa.push_back(curObj);
00547    }
00548    int rv = selectRW(soa, ms);
00549    if (rv < 0)
00550    {
00551       return rv;
00552    }
00553 
00554    // find the first selected object
00555    for (size_t i = 0; i < soa.size(); ++i)
00556    {
00557       if (soa[i].readAvailable)
00558       {
00559          return i;
00560       }
00561    }
00562    return SELECT_ERROR;
00563 }
00564 
00565 } // end namespace Select
00566 
00567 } // end namespace OW_NAMESPACE
00568 

Generated on Thu Feb 9 08:48:10 2006 for openwbem by  doxygen 1.4.6