OW_SocketBaseImpl.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2001-2004 Vintela, Inc. All rights reserved.
00003 *
00004 * Redistribution and use in source and binary forms, with or without
00005 * modification, are permitted provided that the following conditions are met:
00006 *
00007 *  - Redistributions of source code must retain the above copyright notice,
00008 *    this list of conditions and the following disclaimer.
00009 *
00010 *  - Redistributions in binary form must reproduce the above copyright notice,
00011 *    this list of conditions and the following disclaimer in the documentation
00012 *    and/or other materials provided with the distribution.
00013 *
00014 *  - Neither the name of Vintela, Inc. nor the names of its
00015 *    contributors may be used to endorse or promote products derived from this
00016 *    software without specific prior written permission.
00017 *
00018 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00019 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc. OR THE CONTRIBUTORS
00022 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 * POSSIBILITY OF SUCH DAMAGE.
00029 *******************************************************************************/
00030 
00035 #include "OW_config.h"
00036 
00037 #if !defined(OW_WIN32)
00038 
00039 #include "OW_SocketBaseImpl.hpp"
00040 #include "OW_SocketUtils.hpp"
00041 #include "OW_Format.hpp"
00042 #include "OW_Assertion.hpp"
00043 #include "OW_IOException.hpp"
00044 #include "OW_Mutex.hpp"
00045 #include "OW_MutexLock.hpp"
00046 #include "OW_PosixUnnamedPipe.hpp"
00047 #include "OW_Socket.hpp"
00048 #include "OW_Thread.hpp"
00049 #include "OW_DateTime.hpp"
00050 
00051 extern "C"
00052 {
00053 #ifdef OW_HAVE_SYS_SELECT_H
00054 #include <sys/select.h>
00055 #endif
00056 
00057 #include <sys/types.h>
00058 #include <sys/time.h>
00059 #include <sys/socket.h>
00060 #include <sys/stat.h>
00061 #include <netdb.h>
00062 #include <arpa/inet.h>
00063 #include <unistd.h>
00064 #include <fcntl.h>
00065 #include <netinet/in.h>
00066 }
00067 
00068 #include <fstream>
00069 #include <cerrno>
00070 #include <cstdio>
00071 
00072 namespace OW_NAMESPACE
00073 {
00074 
00075 using std::istream;
00076 using std::ostream;
00077 using std::iostream;
00078 using std::ifstream;
00079 using std::ofstream;
00080 using std::fstream;
00081 using std::ios;
00082 String SocketBaseImpl::m_traceFileOut;
00083 String SocketBaseImpl::m_traceFileIn;
00084 
00086 SocketBaseImpl::SocketBaseImpl()
00087    : SelectableIFC()
00088    , IOIFC()
00089    , m_isConnected(false)
00090    , m_sockfd(-1)
00091    , m_localAddress()
00092    , m_peerAddress()
00093    , m_recvTimeoutExprd(false)
00094    , m_streamBuf(this)
00095    , m_in(&m_streamBuf)
00096    , m_out(&m_streamBuf)
00097    , m_inout(&m_streamBuf)
00098    , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00099    , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00100    , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00101 {
00102    m_out.exceptions(std::ios::badbit);
00103    m_inout.exceptions(std::ios::badbit);
00104 }
00106 SocketBaseImpl::SocketBaseImpl(SocketHandle_t fd,
00107       SocketAddress::AddressType addrType)
00108    : SelectableIFC()
00109    , IOIFC()
00110    , m_isConnected(true)
00111    , m_sockfd(fd)
00112    , m_localAddress(SocketAddress::getAnyLocalHost())
00113    , m_peerAddress(SocketAddress::allocEmptyAddress(addrType))
00114    , m_recvTimeoutExprd(false)
00115    , m_streamBuf(this)
00116    , m_in(&m_streamBuf)
00117    , m_out(&m_streamBuf)
00118    , m_inout(&m_streamBuf)
00119    , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00120    , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00121    , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00122 {
00123    m_out.exceptions(std::ios::badbit);
00124    m_inout.exceptions(std::ios::badbit);
00125    if (addrType == SocketAddress::INET)
00126    {
00127       fillInetAddrParms();
00128    }
00129    else if (addrType == SocketAddress::UDS)
00130    {
00131       fillUnixAddrParms();
00132    }
00133    else
00134    {
00135       OW_ASSERT(0);
00136    }
00137 }
00139 SocketBaseImpl::SocketBaseImpl(const SocketAddress& addr)
00140    : SelectableIFC()
00141    , IOIFC()
00142    , m_isConnected(false)
00143    , m_sockfd(-1)
00144    , m_localAddress(SocketAddress::getAnyLocalHost())
00145    , m_peerAddress(addr)
00146    , m_recvTimeoutExprd(false)
00147    , m_streamBuf(this)
00148    , m_in(&m_streamBuf)
00149    , m_out(&m_streamBuf)
00150    , m_inout(&m_streamBuf)
00151    , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00152    , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00153    , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00154 {
00155    m_out.exceptions(std::ios::badbit);
00156    m_inout.exceptions(std::ios::badbit);
00157    connect(m_peerAddress);
00158 }
00160 SocketBaseImpl::~SocketBaseImpl()
00161 {
00162    try
00163    {
00164       disconnect();
00165    }
00166    catch (...)
00167    {
00168       // don't let exceptions escape
00169    }
00170 }
00172 Select_t
00173 SocketBaseImpl::getSelectObj() const
00174 {
00175    return m_sockfd;
00176 }
00178 void
00179 SocketBaseImpl::connect(const SocketAddress& addr)
00180 {
00181    if (m_isConnected)
00182    {
00183       disconnect();
00184    }
00185    m_streamBuf.reset();
00186    m_in.clear();
00187    m_out.clear();
00188    m_inout.clear();
00189    OW_ASSERT(addr.getType() == SocketAddress::INET
00190          || addr.getType() == SocketAddress::UDS);
00191    if ((m_sockfd = ::socket(addr.getType() == SocketAddress::INET ?
00192       AF_INET : PF_UNIX, SOCK_STREAM, 0)) == -1)
00193    {
00194       OW_THROW_ERRNO_MSG(SocketException,
00195          "Failed to create a socket");
00196    }
00197    // set the close on exec flag so child process can't keep the socket.
00198    if (::fcntl(m_sockfd, F_SETFD, FD_CLOEXEC) == -1)
00199    {
00200       ::close(m_sockfd);
00201       OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() failed to set close-on-exec flag on socket");
00202    }
00203    int n;
00204    int flags = ::fcntl(m_sockfd, F_GETFL, 0);
00205    ::fcntl(m_sockfd, F_SETFL, flags | O_NONBLOCK);
00206    if ((n = ::connect(m_sockfd, addr.getNativeForm(),
00207                addr.getNativeFormSize())) < 0)
00208    {
00209       if (errno != EINPROGRESS)
00210       {
00211 			::close(m_sockfd);
00212          OW_THROW_ERRNO_MSG(SocketException,
00213             Format("Failed to connect to: %1", addr.toString()).c_str());
00214       }
00215    }
00216    if (n == -1)
00217    {
00218       // because of the above check for EINPROGRESS
00219       // not connected yet, need to select and wait for connection to complete.
00220       PosixUnnamedPipeRef lUPipe;
00221       int pipefd = -1;
00222       if (Socket::getShutDownMechanism())
00223       {
00224          UnnamedPipeRef foo = Socket::getShutDownMechanism();
00225          lUPipe = foo.cast_to<PosixUnnamedPipe>();
00226          OW_ASSERT(lUPipe);
00227          pipefd = lUPipe->getInputHandle();
00228       }
00229       fd_set rset, wset;
00230       // here we spin checking for thread cancellation every so often.
00231       UInt32 remainingMsWait = m_connectTimeout != Socket::INFINITE_TIMEOUT ? m_connectTimeout * 1000 : ~0U;
00232       do
00233       {
00234          FD_ZERO(&rset);
00235          if (m_sockfd < 0 || m_sockfd >= FD_SETSIZE)
00236          {
00237             OW_THROW(SocketException, "Invalid fd (< 0 || >= FD_SETSIZE)");
00238          }
00239          FD_SET(m_sockfd, &rset);
00240          if (pipefd != -1 && pipefd < FD_SETSIZE)
00241          {
00242             FD_SET(pipefd, &rset);
00243          }
00244          FD_ZERO(&wset);
00245          FD_SET(m_sockfd, &wset);
00246          int maxfd = m_sockfd > pipefd ? m_sockfd : pipefd;
00247 
00248          const UInt32 waitMs = 100; // 1/10 of a second
00249          struct timeval tv;
00250          tv.tv_sec = 0;
00251          tv.tv_usec = std::min((waitMs % 1000), remainingMsWait) * 1000;
00252 
00253          Thread::testCancel();
00254          n = ::select(maxfd+1, &rset, &wset, NULL, &tv);
00255 
00256          if (m_connectTimeout != Socket::INFINITE_TIMEOUT)
00257          {
00258             remainingMsWait -= std::min(waitMs, remainingMsWait);
00259          }
00260       } while (n == 0 && remainingMsWait > 0);
00261 
00262       if (n == 0)
00263       {
00264 			::close(m_sockfd);
00265          OW_THROW(SocketException, "SocketBaseImpl::connect() select timedout");
00266       }
00267       else if (n == -1)
00268       {
00269 			::close(m_sockfd);
00270          if (errno == EINTR)
00271          {
00272             Thread::testCancel();
00273          }
00274          OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() select failed");
00275       }
00276       if (pipefd != -1 && FD_ISSET(pipefd, &rset))
00277       {
00278 			::close(m_sockfd);
00279          OW_THROW(SocketException, "Sockets have been shutdown");
00280       }
00281       else if (FD_ISSET(m_sockfd, &rset) || FD_ISSET(m_sockfd, &wset))
00282       {
00283          int error = 0;
00284          socklen_t len = sizeof(error);
00285          if (::getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &error,
00286                   &len) < 0)
00287          {
00288 				::close(m_sockfd);
00289             OW_THROW_ERRNO_MSG(SocketException,
00290                   "SocketBaseImpl::connect() getsockopt() failed");
00291          }
00292          if (error != 0)
00293          {
00294 				::close(m_sockfd);
00295             errno = error;
00296             OW_THROW_ERRNO_MSG(SocketException,
00297                   "SocketBaseImpl::connect() failed");
00298          }
00299       }
00300       else
00301       {
00302 			::close(m_sockfd);
00303          OW_THROW(SocketException, "SocketBaseImpl::connect(). Logic error, m_sockfd not in FD set.");
00304       }
00305    }
00306    ::fcntl(m_sockfd, F_SETFL, flags);
00307    m_isConnected = true;
00308    m_peerAddress = addr; // To get the hostname from addr
00309    if (addr.getType() == SocketAddress::INET)
00310    {
00311       fillInetAddrParms();
00312    }
00313    else if (addr.getType() == SocketAddress::UDS)
00314    {
00315       fillUnixAddrParms();
00316    }
00317    else
00318    {
00319       OW_ASSERT(0);
00320    }
00321 }
00323 void
00324 SocketBaseImpl::disconnect()
00325 {
00326    if (m_in)
00327    {
00328       m_in.clear(ios::eofbit);
00329    }
00330    if (m_out)
00331    {
00332       m_out.clear(ios::eofbit);
00333    }
00334    if (m_inout)
00335    {
00336       m_inout.clear(ios::eofbit);
00337    }
00338    if (m_sockfd != -1 && m_isConnected)
00339    {
00340 		::close(m_sockfd);
00341       m_isConnected = false;
00342       m_sockfd = -1;
00343    }
00344 }
00346 // JBW this needs reworked.
00347 void
00348 SocketBaseImpl::fillInetAddrParms()
00349 {
00350    socklen_t len;
00351    InetSocketAddress_t addr;
00352    memset(&addr, 0, sizeof(addr));
00353    len = sizeof(addr);
00354    if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00355    {
00356 // Don't error out here, we can still operate without working DNS.
00357 //    OW_THROW_ERRNO_MSG(SocketException,
00358 //          "SocketBaseImpl::fillInetAddrParms: getsockname");
00359    }
00360    else
00361    {
00362       m_localAddress.assignFromNativeForm(&addr, len);
00363    }
00364    len = sizeof(addr);
00365    if (getpeername(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00366    {
00367 // Don't error out here, we can still operate without working DNS.
00368 //    OW_THROW_ERRNO_MSG(SocketException,
00369 //          "SocketBaseImpl::fillInetAddrParms: getpeername");
00370    }
00371    else
00372    {
00373       m_peerAddress.assignFromNativeForm(&addr, len);
00374    }
00375 }
00377 void
00378 SocketBaseImpl::fillUnixAddrParms()
00379 {
00380    socklen_t len;
00381    UnixSocketAddress_t addr;
00382    memset(&addr, 0, sizeof(addr));
00383    len = sizeof(addr);
00384    if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00385    {
00386       OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::fillUnixAddrParms: getsockname");
00387    }
00388    m_localAddress.assignFromNativeForm(&addr, len);
00389    m_peerAddress.assignFromNativeForm(&addr, len);
00390 }
00391 static Mutex guard;
00393 int
00394 SocketBaseImpl::write(const void* dataOut, int dataOutLen, bool errorAsException)
00395 {
00396    int rc = 0;
00397    bool isError = false;
00398    if (m_isConnected)
00399    {
00400       isError = waitForOutput(m_sendTimeout);
00401       if (isError)
00402       {
00403          rc = -1;
00404       }
00405       else
00406       {
00407          rc = writeAux(dataOut, dataOutLen);
00408          if (!m_traceFileOut.empty() && rc > 0)
00409          {
00410             MutexLock ml(guard);
00411             ofstream traceFile(m_traceFileOut.c_str(), std::ios::app);
00412             if (!traceFile)
00413             {
00414                OW_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00415             }
00416             if (!traceFile.write(static_cast<const char*>(dataOut), rc))
00417             {
00418                OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00419             }
00420 
00421             ofstream comboTraceFile(String(m_traceFileOut + "Combo").c_str(), std::ios::app);
00422             if (!comboTraceFile)
00423             {
00424                OW_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00425             }
00426             DateTime curDateTime;
00427             curDateTime.setToCurrent();
00428             comboTraceFile << "\n--->Out " << rc << " bytes at " << curDateTime.toString("%X") <<
00429                '.' << curDateTime.getMicrosecond() << "<---\n";
00430             if (!comboTraceFile.write(static_cast<const char*>(dataOut), rc))
00431             {
00432                OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00433             }
00434          }
00435       }
00436    }
00437    else
00438    {
00439       rc = -1;
00440    }
00441    if (rc < 0 && errorAsException)
00442    {
00443       OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::write");
00444    }
00445    return rc;
00446 }
00448 int
00449 SocketBaseImpl::read(void* dataIn, int dataInLen, bool errorAsException)   
00450 {
00451    int rc = 0;
00452    bool isError = false;
00453    if (m_isConnected)
00454    {
00455       isError = waitForInput(m_recvTimeout);
00456       if (isError)
00457       {
00458          rc = -1;
00459       }
00460       else
00461       {
00462          rc = readAux(dataIn, dataInLen);
00463          if (!m_traceFileIn.empty() && rc > 0)
00464          {
00465             MutexLock ml(guard);
00466             ofstream traceFile(m_traceFileIn.c_str(), std::ios::app);
00467             if (!traceFile)
00468             {
00469                OW_THROW_ERRNO_MSG(IOException, "Failed opening tracefile");
00470             }
00471             if (!traceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00472             {
00473                OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00474             }
00475 
00476             ofstream comboTraceFile(String(m_traceFileOut + "Combo").c_str(), std::ios::app);
00477             if (!comboTraceFile)
00478             {
00479                OW_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00480             }
00481             DateTime curDateTime;
00482             curDateTime.setToCurrent();
00483             comboTraceFile << "\n--->In " << rc << " bytes at " << curDateTime.toString("%X") <<
00484                '.' << curDateTime.getMicrosecond() << "<---\n";
00485             if (!comboTraceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00486             {
00487                OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00488             }
00489          }
00490       }
00491    }
00492    else
00493    {
00494       rc = -1;
00495    }
00496    if (rc < 0)
00497    {
00498       if (errorAsException)
00499       {
00500          OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::read");
00501       }
00502    }
00503    return rc;
00504 }
00506 bool
00507 SocketBaseImpl::waitForInput(int timeOutSecs)
00508 {
00509    int rval = SocketUtils::waitForIO(m_sockfd, timeOutSecs, SocketFlags::E_WAIT_FOR_INPUT);
00510    if (rval == ETIMEDOUT)
00511    {
00512       m_recvTimeoutExprd = true;
00513    }
00514    else
00515    {
00516       m_recvTimeoutExprd = false;
00517    }
00518    return (rval != 0);
00519 }
00521 bool
00522 SocketBaseImpl::waitForOutput(int timeOutSecs)
00523 {
00524    return SocketUtils::waitForIO(m_sockfd, timeOutSecs, SocketFlags::E_WAIT_FOR_OUTPUT) != 0;
00525 }
00527 istream&
00528 SocketBaseImpl::getInputStream()
00529 {
00530    return m_in;
00531 }
00533 ostream&
00534 SocketBaseImpl::getOutputStream()
00535 {
00536    return m_out;
00537 }
00539 iostream&
00540 SocketBaseImpl::getIOStream()
00541 {
00542    return m_inout;
00543 }
00545 // STATIC
00546 void
00547 SocketBaseImpl::setDumpFiles(const String& in, const String& out)
00548 {
00549    m_traceFileOut = out;
00550    m_traceFileIn = in;
00551 }
00552 
00553 } // end namespace OW_NAMESPACE
00554 
00555 #endif   // #if !defined(OW_WIN32)
00556 

Generated on Thu Feb 9 08:48:11 2006 for openwbem by  doxygen 1.4.6