00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00035 #include "OW_config.h"
00036 
00037 #if !defined(OW_WIN32)
00038 
00039 #include "OW_SocketBaseImpl.hpp"
00040 #include "OW_SocketUtils.hpp"
00041 #include "OW_Format.hpp"
00042 #include "OW_Assertion.hpp"
00043 #include "OW_IOException.hpp"
00044 #include "OW_Mutex.hpp"
00045 #include "OW_MutexLock.hpp"
00046 #include "OW_PosixUnnamedPipe.hpp"
00047 #include "OW_Socket.hpp"
00048 #include "OW_Thread.hpp"
00049 #include "OW_DateTime.hpp"
00050 
00051 extern "C"
00052 {
00053 #ifdef OW_HAVE_SYS_SELECT_H
00054 #include <sys/select.h>
00055 #endif
00056 
00057 #include <sys/types.h>
00058 #include <sys/time.h>
00059 #include <sys/socket.h>
00060 #include <sys/stat.h>
00061 #include <netdb.h>
00062 #include <arpa/inet.h>
00063 #include <unistd.h>
00064 #include <fcntl.h>
00065 #include <netinet/in.h>
00066 }
00067 
00068 #include <fstream>
00069 #include <cerrno>
00070 #include <cstdio>
00071 
00072 namespace OW_NAMESPACE
00073 {
00074 
00075 using std::istream;
00076 using std::ostream;
00077 using std::iostream;
00078 using std::ifstream;
00079 using std::ofstream;
00080 using std::fstream;
00081 using std::ios;
00082 String SocketBaseImpl::m_traceFileOut;
00083 String SocketBaseImpl::m_traceFileIn;
00084 
00086 SocketBaseImpl::SocketBaseImpl()
00087    : SelectableIFC()
00088    , IOIFC()
00089    , m_isConnected(false)
00090    , m_sockfd(-1)
00091    , m_localAddress()
00092    , m_peerAddress()
00093    , m_recvTimeoutExprd(false)
00094    , m_streamBuf(this)
00095    , m_in(&m_streamBuf)
00096    , m_out(&m_streamBuf)
00097    , m_inout(&m_streamBuf)
00098    , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00099    , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00100    , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00101 {
00102    m_out.exceptions(std::ios::badbit);
00103    m_inout.exceptions(std::ios::badbit);
00104 }
00106 SocketBaseImpl::SocketBaseImpl(SocketHandle_t fd,
00107       SocketAddress::AddressType addrType)
00108    : SelectableIFC()
00109    , IOIFC()
00110    , m_isConnected(true)
00111    , m_sockfd(fd)
00112    , m_localAddress(SocketAddress::getAnyLocalHost())
00113    , m_peerAddress(SocketAddress::allocEmptyAddress(addrType))
00114    , m_recvTimeoutExprd(false)
00115    , m_streamBuf(this)
00116    , m_in(&m_streamBuf)
00117    , m_out(&m_streamBuf)
00118    , m_inout(&m_streamBuf)
00119    , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00120    , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00121    , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00122 {
00123    m_out.exceptions(std::ios::badbit);
00124    m_inout.exceptions(std::ios::badbit);
00125    if (addrType == SocketAddress::INET)
00126    {
00127       fillInetAddrParms();
00128    }
00129    else if (addrType == SocketAddress::UDS)
00130    {
00131       fillUnixAddrParms();
00132    }
00133    else
00134    {
00135       OW_ASSERT(0);
00136    }
00137 }
00139 SocketBaseImpl::SocketBaseImpl(const SocketAddress& addr)
00140    : SelectableIFC()
00141    , IOIFC()
00142    , m_isConnected(false)
00143    , m_sockfd(-1)
00144    , m_localAddress(SocketAddress::getAnyLocalHost())
00145    , m_peerAddress(addr)
00146    , m_recvTimeoutExprd(false)
00147    , m_streamBuf(this)
00148    , m_in(&m_streamBuf)
00149    , m_out(&m_streamBuf)
00150    , m_inout(&m_streamBuf)
00151    , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00152    , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00153    , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00154 {
00155    m_out.exceptions(std::ios::badbit);
00156    m_inout.exceptions(std::ios::badbit);
00157    connect(m_peerAddress);
00158 }
00160 SocketBaseImpl::~SocketBaseImpl()
00161 {
00162    try
00163    {
00164       disconnect();
00165    }
00166    catch (...)
00167    {
00168       
00169    }
00170 }
00172 Select_t
00173 SocketBaseImpl::getSelectObj() const
00174 {
00175    return m_sockfd;
00176 }
00178 void
00179 SocketBaseImpl::connect(const SocketAddress& addr)
00180 {
00181    if (m_isConnected)
00182    {
00183       disconnect();
00184    }
00185    m_streamBuf.reset();
00186    m_in.clear();
00187    m_out.clear();
00188    m_inout.clear();
00189    OW_ASSERT(addr.getType() == SocketAddress::INET
00190          || addr.getType() == SocketAddress::UDS);
00191    if ((m_sockfd = ::socket(addr.getType() == SocketAddress::INET ?
00192       AF_INET : PF_UNIX, SOCK_STREAM, 0)) == -1)
00193    {
00194       OW_THROW_ERRNO_MSG(SocketException,
00195          "Failed to create a socket");
00196    }
00197    
00198    if (::fcntl(m_sockfd, F_SETFD, FD_CLOEXEC) == -1)
00199    {
00200       ::close(m_sockfd);
00201       OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() failed to set close-on-exec flag on socket");
00202    }
00203    int n;
00204    int flags = ::fcntl(m_sockfd, F_GETFL, 0);
00205    ::fcntl(m_sockfd, F_SETFL, flags | O_NONBLOCK);
00206    if ((n = ::connect(m_sockfd, addr.getNativeForm(),
00207                addr.getNativeFormSize())) < 0)
00208    {
00209       if (errno != EINPROGRESS)
00210       {
00211 			::close(m_sockfd);
00212          OW_THROW_ERRNO_MSG(SocketException,
00213             Format("Failed to connect to: %1", addr.toString()).c_str());
00214       }
00215    }
00216    if (n == -1)
00217    {
00218       
00219       
00220       PosixUnnamedPipeRef lUPipe;
00221       int pipefd = -1;
00222       if (Socket::getShutDownMechanism())
00223       {
00224          UnnamedPipeRef foo = Socket::getShutDownMechanism();
00225          lUPipe = foo.cast_to<PosixUnnamedPipe>();
00226          OW_ASSERT(lUPipe);
00227          pipefd = lUPipe->getInputHandle();
00228       }
00229       fd_set rset, wset;
00230       
00231       UInt32 remainingMsWait = m_connectTimeout != Socket::INFINITE_TIMEOUT ? m_connectTimeout * 1000 : ~0U;
00232       do
00233       {
00234          FD_ZERO(&rset);
00235          if (m_sockfd < 0 || m_sockfd >= FD_SETSIZE)
00236          {
00237             OW_THROW(SocketException, "Invalid fd (< 0 || >= FD_SETSIZE)");
00238          }
00239          FD_SET(m_sockfd, &rset);
00240          if (pipefd != -1 && pipefd < FD_SETSIZE)
00241          {
00242             FD_SET(pipefd, &rset);
00243          }
00244          FD_ZERO(&wset);
00245          FD_SET(m_sockfd, &wset);
00246          int maxfd = m_sockfd > pipefd ? m_sockfd : pipefd;
00247 
00248          const UInt32 waitMs = 100; 
00249          struct timeval tv;
00250          tv.tv_sec = 0;
00251          tv.tv_usec = std::min((waitMs % 1000), remainingMsWait) * 1000;
00252 
00253          Thread::testCancel();
00254          n = ::select(maxfd+1, &rset, &wset, NULL, &tv);
00255 
00256          if (m_connectTimeout != Socket::INFINITE_TIMEOUT)
00257          {
00258             remainingMsWait -= std::min(waitMs, remainingMsWait);
00259          }
00260       } while (n == 0 && remainingMsWait > 0);
00261 
00262       if (n == 0)
00263       {
00264 			::close(m_sockfd);
00265          OW_THROW(SocketException, "SocketBaseImpl::connect() select timedout");
00266       }
00267       else if (n == -1)
00268       {
00269 			::close(m_sockfd);
00270          if (errno == EINTR)
00271          {
00272             Thread::testCancel();
00273          }
00274          OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() select failed");
00275       }
00276       if (pipefd != -1 && FD_ISSET(pipefd, &rset))
00277       {
00278 			::close(m_sockfd);
00279          OW_THROW(SocketException, "Sockets have been shutdown");
00280       }
00281       else if (FD_ISSET(m_sockfd, &rset) || FD_ISSET(m_sockfd, &wset))
00282       {
00283          int error = 0;
00284          socklen_t len = sizeof(error);
00285          if (::getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &error,
00286                   &len) < 0)
00287          {
00288 				::close(m_sockfd);
00289             OW_THROW_ERRNO_MSG(SocketException,
00290                   "SocketBaseImpl::connect() getsockopt() failed");
00291          }
00292          if (error != 0)
00293          {
00294 				::close(m_sockfd);
00295             errno = error;
00296             OW_THROW_ERRNO_MSG(SocketException,
00297                   "SocketBaseImpl::connect() failed");
00298          }
00299       }
00300       else
00301       {
00302 			::close(m_sockfd);
00303          OW_THROW(SocketException, "SocketBaseImpl::connect(). Logic error, m_sockfd not in FD set.");
00304       }
00305    }
00306    ::fcntl(m_sockfd, F_SETFL, flags);
00307    m_isConnected = true;
00308    m_peerAddress = addr; 
00309    if (addr.getType() == SocketAddress::INET)
00310    {
00311       fillInetAddrParms();
00312    }
00313    else if (addr.getType() == SocketAddress::UDS)
00314    {
00315       fillUnixAddrParms();
00316    }
00317    else
00318    {
00319       OW_ASSERT(0);
00320    }
00321 }
00323 void
00324 SocketBaseImpl::disconnect()
00325 {
00326    if (m_in)
00327    {
00328       m_in.clear(ios::eofbit);
00329    }
00330    if (m_out)
00331    {
00332       m_out.clear(ios::eofbit);
00333    }
00334    if (m_inout)
00335    {
00336       m_inout.clear(ios::eofbit);
00337    }
00338    if (m_sockfd != -1 && m_isConnected)
00339    {
00340 		::close(m_sockfd);
00341       m_isConnected = false;
00342       m_sockfd = -1;
00343    }
00344 }
00346 
00347 void
00348 SocketBaseImpl::fillInetAddrParms()
00349 {
00350    socklen_t len;
00351    InetSocketAddress_t addr;
00352    memset(&addr, 0, sizeof(addr));
00353    len = sizeof(addr);
00354    if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00355    {
00356 
00357 
00358 
00359    }
00360    else
00361    {
00362       m_localAddress.assignFromNativeForm(&addr, len);
00363    }
00364    len = sizeof(addr);
00365    if (getpeername(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00366    {
00367 
00368 
00369 
00370    }
00371    else
00372    {
00373       m_peerAddress.assignFromNativeForm(&addr, len);
00374    }
00375 }
00377 void
00378 SocketBaseImpl::fillUnixAddrParms()
00379 {
00380    socklen_t len;
00381    UnixSocketAddress_t addr;
00382    memset(&addr, 0, sizeof(addr));
00383    len = sizeof(addr);
00384    if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00385    {
00386       OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::fillUnixAddrParms: getsockname");
00387    }
00388    m_localAddress.assignFromNativeForm(&addr, len);
00389    m_peerAddress.assignFromNativeForm(&addr, len);
00390 }
00391 static Mutex guard;
00393 int
00394 SocketBaseImpl::write(const void* dataOut, int dataOutLen, bool errorAsException)
00395 {
00396    int rc = 0;
00397    bool isError = false;
00398    if (m_isConnected)
00399    {
00400       isError = waitForOutput(m_sendTimeout);
00401       if (isError)
00402       {
00403          rc = -1;
00404       }
00405       else
00406       {
00407          rc = writeAux(dataOut, dataOutLen);
00408          if (!m_traceFileOut.empty() && rc > 0)
00409          {
00410             MutexLock ml(guard);
00411             ofstream traceFile(m_traceFileOut.c_str(), std::ios::app);
00412             if (!traceFile)
00413             {
00414                OW_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00415             }
00416             if (!traceFile.write(static_cast<const char*>(dataOut), rc))
00417             {
00418                OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00419             }
00420 
00421             ofstream comboTraceFile(String(m_traceFileOut + "Combo").c_str(), std::ios::app);
00422             if (!comboTraceFile)
00423             {
00424                OW_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00425             }
00426             DateTime curDateTime;
00427             curDateTime.setToCurrent();
00428             comboTraceFile << "\n--->Out " << rc << " bytes at " << curDateTime.toString("%X") <<
00429                '.' << curDateTime.getMicrosecond() << "<---\n";
00430             if (!comboTraceFile.write(static_cast<const char*>(dataOut), rc))
00431             {
00432                OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00433             }
00434          }
00435       }
00436    }
00437    else
00438    {
00439       rc = -1;
00440    }
00441    if (rc < 0 && errorAsException)
00442    {
00443       OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::write");
00444    }
00445    return rc;
00446 }
00448 int
00449 SocketBaseImpl::read(void* dataIn, int dataInLen, bool errorAsException)   
00450 {
00451    int rc = 0;
00452    bool isError = false;
00453    if (m_isConnected)
00454    {
00455       isError = waitForInput(m_recvTimeout);
00456       if (isError)
00457       {
00458          rc = -1;
00459       }
00460       else
00461       {
00462          rc = readAux(dataIn, dataInLen);
00463          if (!m_traceFileIn.empty() && rc > 0)
00464          {
00465             MutexLock ml(guard);
00466             ofstream traceFile(m_traceFileIn.c_str(), std::ios::app);
00467             if (!traceFile)
00468             {
00469                OW_THROW_ERRNO_MSG(IOException, "Failed opening tracefile");
00470             }
00471             if (!traceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00472             {
00473                OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00474             }
00475 
00476             ofstream comboTraceFile(String(m_traceFileOut + "Combo").c_str(), std::ios::app);
00477             if (!comboTraceFile)
00478             {
00479                OW_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00480             }
00481             DateTime curDateTime;
00482             curDateTime.setToCurrent();
00483             comboTraceFile << "\n--->In " << rc << " bytes at " << curDateTime.toString("%X") <<
00484                '.' << curDateTime.getMicrosecond() << "<---\n";
00485             if (!comboTraceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00486             {
00487                OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00488             }
00489          }
00490       }
00491    }
00492    else
00493    {
00494       rc = -1;
00495    }
00496    if (rc < 0)
00497    {
00498       if (errorAsException)
00499       {
00500          OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::read");
00501       }
00502    }
00503    return rc;
00504 }
00506 bool
00507 SocketBaseImpl::waitForInput(int timeOutSecs)
00508 {
00509    int rval = SocketUtils::waitForIO(m_sockfd, timeOutSecs, SocketFlags::E_WAIT_FOR_INPUT);
00510    if (rval == ETIMEDOUT)
00511    {
00512       m_recvTimeoutExprd = true;
00513    }
00514    else
00515    {
00516       m_recvTimeoutExprd = false;
00517    }
00518    return (rval != 0);
00519 }
00521 bool
00522 SocketBaseImpl::waitForOutput(int timeOutSecs)
00523 {
00524    return SocketUtils::waitForIO(m_sockfd, timeOutSecs, SocketFlags::E_WAIT_FOR_OUTPUT) != 0;
00525 }
00527 istream&
00528 SocketBaseImpl::getInputStream()
00529 {
00530    return m_in;
00531 }
00533 ostream&
00534 SocketBaseImpl::getOutputStream()
00535 {
00536    return m_out;
00537 }
00539 iostream&
00540 SocketBaseImpl::getIOStream()
00541 {
00542    return m_inout;
00543 }
00545 
00546 void
00547 SocketBaseImpl::setDumpFiles(const String& in, const String& out)
00548 {
00549    m_traceFileOut = out;
00550    m_traceFileIn = in;
00551 }
00552 
00553 } 
00554 
00555 #endif   // #if !defined(OW_WIN32)
00556