00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00036 #include "OW_config.h"
00037 #include "OW_Exec.hpp"
00038 #include "OW_Format.hpp"
00039 #include "OW_Assertion.hpp"
00040 #include "OW_PosixUnnamedPipe.hpp"
00041 #include "OW_Array.hpp"
00042 #include "OW_IOException.hpp"
00043 #include "OW_Thread.hpp"
00044 #include "OW_Select.hpp"
00045 #include "OW_ExceptionIds.hpp"
00046 #include "OW_IntrusiveCountableBase.hpp"
00047 #include "OW_DateTime.hpp"
00048 #include "OW_AutoPtr.hpp"
00049 
00050 #include <map>
00051 
00052 extern "C"
00053 {
00054 #ifdef OW_HAVE_SYS_RESOURCE_H
00055 #include <sys/resource.h>
00056 #endif
00057 #ifndef OW_WIN32
00058 #include <unistd.h>
00059 #include <sys/wait.h>
00060 #include <fcntl.h>
00061 #endif
00062 #include <errno.h>
00063 #include <stdio.h> 
00064 #include <signal.h>
00065 }
00066 
00067 #include <cerrno>
00068 #include <iostream>  
00069 
00070 
00071 #ifndef NSIG
00072 #define NSIG 64
00073 #endif
00074 
00075 namespace OW_NAMESPACE
00076 {
00077 
00078 using std::cerr;
00079 using std::endl;
00080 OW_DEFINE_EXCEPTION_WITH_ID(ExecTimeout);
00081 OW_DEFINE_EXCEPTION_WITH_ID(ExecBufferFull);
00082 OW_DEFINE_EXCEPTION_WITH_ID(ExecError);
00083 
00084 #ifndef OW_WIN32
00085 class PopenStreamsImpl : public IntrusiveCountableBase
00086 {
00087 public:
00088    PopenStreamsImpl();
00089    ~PopenStreamsImpl();
00090    UnnamedPipeRef in() const;
00091    void in(const UnnamedPipeRef& pipe);
00092    UnnamedPipeRef out() const;
00093    void out(const UnnamedPipeRef& pipe);
00094    UnnamedPipeRef err() const;
00095    void err(const UnnamedPipeRef& pipe);
00096    Array<UnnamedPipeRef> extraPipes() const;
00097    void setExtraPipes(const Array<UnnamedPipeRef>& pipes);
00098 
00099    pid_t pid();
00100    void pid(pid_t newPid);
00101    int getExitStatus();
00102    int getExitStatus(UInt32 wait_initial, UInt32 wait_close, UInt32 wait_term);
00103    void setProcessStatus(int ps)
00104    {
00105       m_processstatus = ps;
00106    }
00107 private:
00108    UnnamedPipeRef m_in;
00109    UnnamedPipeRef m_out;
00110    UnnamedPipeRef m_err;
00111    Array<UnnamedPipeRef> m_extraPipes;
00112    pid_t m_pid;
00113    int m_processstatus;
00114 };
00116 PopenStreamsImpl::PopenStreamsImpl()
00117    : m_pid(-1)
00118    , m_processstatus(-1)
00119 {
00120 }
00122 UnnamedPipeRef PopenStreamsImpl::in() const
00123 {
00124    return m_in;
00125 }
00127 void PopenStreamsImpl::in(const UnnamedPipeRef& pipe)
00128 {
00129    m_in = pipe;
00130 }
00132 UnnamedPipeRef PopenStreamsImpl::out() const
00133 {
00134    return m_out;
00135 }
00137 void PopenStreamsImpl::out(const UnnamedPipeRef& pipe)
00138 {
00139    m_out = pipe;
00140 }
00142 UnnamedPipeRef PopenStreamsImpl::err() const
00143 {
00144    return m_err;
00145 }
00147 void PopenStreamsImpl::err(const UnnamedPipeRef& pipe)
00148 {
00149    m_err = pipe;
00150 }
00152 Array<UnnamedPipeRef> PopenStreamsImpl::extraPipes() const
00153 {
00154    return m_extraPipes;
00155 }
00157 void PopenStreamsImpl::setExtraPipes(const Array<UnnamedPipeRef>& pipes)
00158 {
00159    m_extraPipes = pipes;
00160 }
00162 pid_t PopenStreamsImpl::pid()
00163 {
00164    return m_pid;
00165 }
00167 void PopenStreamsImpl::pid(pid_t newPid)
00168 {
00169    m_pid = newPid;
00170 }
00172 static inline ProcId safeWaitPid(ProcId pid, int* status, int options)
00173 {
00174    
00175    
00176    int localReturnValue = -1;
00177    pid_t returnedPID = ::waitpid(pid, &localReturnValue, options);
00178    if( returnedPID > 0 )
00179    {
00180       *status = localReturnValue;
00181    }  
00182    return returnedPID;
00183 }
00184 
00186 static ProcId noIntrWaitPid(ProcId pid, int* status, int options)
00187 {
00188    pid_t waitpidrv;
00189    do
00190    {
00191       Thread::testCancel();
00192       waitpidrv = safeWaitPid(pid, status, options);
00193    } while (waitpidrv == -1 && errno == EINTR);
00194    return waitpidrv;
00195 }
00196 
00198 static inline void
00199 milliSleep(UInt32 milliSeconds)
00200 {
00201    Thread::sleep(milliSeconds);
00202 }
00204 static inline void
00205 secSleep(UInt32 seconds)
00206 {
00207    Thread::sleep(seconds * 1000);
00208 }
00210 static bool
00211 timedWaitPid(ProcId pid, int * pstatus, UInt32 wait_time)
00212 {
00213    UInt32 const N = 154;
00214    UInt32 const M = 128;  
00215    UInt32 const MAXPERIOD = 5000;
00216    UInt32 period = 100;
00217    UInt32 t = 0;
00218    ProcId waitpidrv = noIntrWaitPid(pid, pstatus, WNOHANG);
00219    while (t < wait_time && waitpidrv == 0) {
00220       milliSleep(period);
00221       t += period;
00222       period *= N;
00223       period /= M; 
00224       if (period > MAXPERIOD)
00225       {
00226          period = MAXPERIOD;
00227       }
00228       waitpidrv = noIntrWaitPid(pid, pstatus, WNOHANG);
00229    }
00230    if (waitpidrv < 0) {
00231       OW_THROW_ERRNO_MSG(ExecErrorException, "waitpid() failed.");
00232    }
00233    return waitpidrv != 0;
00234 }
00235 
00237 
00238 
00239 
00240 static bool killWait(
00241    ProcId pid, int * pstatus, UInt32 wait_time, int sig, char const * signame
00242 )
00243 {
00244    if (::kill(pid, sig) == -1) {
00245       
00246       int errnum = errno;
00247       
00248       if (noIntrWaitPid(pid, pstatus, WNOHANG) > 0) {
00249          return true;
00250       }
00251       else {
00252          Format fmt("Failed sending %1 to process %2.", signame, pid);
00253          char const * msg = fmt.c_str();
00254          errno = errnum;
00255          OW_THROW_ERRNO_MSG(ExecErrorException, msg);
00256       }
00257    }
00258    return timedWaitPid(pid, pstatus, wait_time);
00259 }
00260 
00262 int PopenStreamsImpl::getExitStatus()
00263 {
00264    return this->getExitStatus(0, 10 *1000, 10 * 1000);
00265 }
00266 
00268 int PopenStreamsImpl::getExitStatus(
00269    UInt32 wait_initial, UInt32 wait_close, UInt32 wait_term)
00270 {
00271    if (m_pid < 0)
00272    {
00273       return m_processstatus;
00274    }
00275    if (m_pid == ::getpid())
00276    {
00277       OW_THROW(ExecErrorException, "PopenStreamsImpl::getExitStatus: m_pid == getpid()");
00278    }
00279 
00280    ProcId pid = m_pid;
00281    m_pid = -1;
00282    int * pstatus = &m_processstatus;
00283 
00284    
00285    wait_initial *= 1000;
00286    wait_close *= 1000;
00287    wait_term *= 1000;
00288 
00289    if (wait_initial > 0 && timedWaitPid(pid, pstatus, wait_initial))
00290    {
00291       return m_processstatus;
00292    }
00293 
00294    if (wait_close > 0)
00295    {
00296       
00297       
00298       
00299       
00300       UnnamedPipeRef upr;
00301       if (upr = in())
00302       {
00303          upr->close();
00304       }
00305       if (upr = out())
00306       {
00307          upr->close();
00308       }
00309       if (upr = err())
00310       {
00311          upr->close();
00312       }
00313       if (timedWaitPid(pid, pstatus, wait_close))
00314       {
00315          return m_processstatus;
00316       }
00317    }
00318 
00319    if (wait_term > 0 && killWait(pid, pstatus, wait_term, SIGTERM, "SIGTERM"))
00320    {
00321       return m_processstatus;
00322    }
00323    if (!killWait(pid, pstatus, 5000, SIGKILL, "SIGKILL")) {
00324       OW_THROW(
00325          ExecErrorException, "PopenStreamsImpl::getExitStatus: Child process has not exited after sending it a SIGKILL."
00326       );
00327    }
00328    return m_processstatus;
00329 }
00331 PopenStreamsImpl::~PopenStreamsImpl()
00332 {
00333    try 
00334    {
00335       
00336       getExitStatus();
00337    }
00338    catch (...)
00339    {
00340    }
00341 }
00342 
00344 PopenStreams::PopenStreams()
00345    : m_impl(new PopenStreamsImpl)
00346 {
00347 }
00349 PopenStreams::~PopenStreams()
00350 {
00351 }
00353 UnnamedPipeRef PopenStreams::in() const
00354 {
00355    return m_impl->in();
00356 }
00358 void PopenStreams::in(const UnnamedPipeRef& pipe)
00359 {
00360    m_impl->in(pipe);
00361 }
00363 UnnamedPipeRef PopenStreams::out() const
00364 {
00365    return m_impl->out();
00366 }
00368 void PopenStreams::out(const UnnamedPipeRef& pipe)
00369 {
00370    m_impl->out(pipe);
00371 }
00373 UnnamedPipeRef PopenStreams::err() const
00374 {
00375    return m_impl->err();
00376 }
00378 void PopenStreams::err(const UnnamedPipeRef& pipe)
00379 {
00380    m_impl->err(pipe);
00381 }
00383 Array<UnnamedPipeRef> PopenStreams::extraPipes() const
00384 {
00385    return m_impl->extraPipes();
00386 }
00388 void PopenStreams::setExtraPipes(const Array<UnnamedPipeRef>& pipes)
00389 {
00390    m_impl->setExtraPipes(pipes);
00391 }
00393 pid_t PopenStreams::pid() const
00394 {
00395    return m_impl->pid();
00396 }
00398 void PopenStreams::pid(pid_t newPid)
00399 {
00400    m_impl->pid(newPid);
00401 }
00403 int PopenStreams::getExitStatus()
00404 {
00405    return m_impl->getExitStatus();
00406 }
00408 int PopenStreams::getExitStatus(UInt32 wait0, UInt32 wait1, UInt32 wait2)
00409 {
00410    return m_impl->getExitStatus(wait0, wait1, wait2);
00411 }
00413 void PopenStreams::setProcessStatus(int ps)
00414 {
00415    m_impl->setProcessStatus(ps);
00416 }
00418 PopenStreams::PopenStreams(const PopenStreams& src)
00419    : m_impl(src.m_impl)
00420 {
00421 }
00423 PopenStreams& PopenStreams::operator=(const PopenStreams& src)
00424 {
00425    m_impl = src.m_impl;
00426    return *this;
00427 }
00428 
00430 bool operator==(const PopenStreams& x, const PopenStreams& y)
00431 {
00432    return x.m_impl == y.m_impl;
00433 }
00434 
00436 namespace Exec
00437 {
00438 
00440 int 
00441 safeSystem(const Array<String>& command, const EnvVars& envVars)
00442 {
00443    const char* const* envp = (envVars.size() > 0) ? envVars.getenvp() : 0;
00444    return safeSystem(command, envp);
00445 }
00446 
00448 int
00449 safeSystem(const Array<String>& command, const char* const envp[])
00450 {
00451    int status;
00452    pid_t pid;
00453    if (command.size() == 0)
00454    {
00455       return 1;
00456    }
00457 
00458    
00459    AutoPtrVec<const char*> argv(new const char*[command.size() + 1]);
00460    for (size_t i = 0; i < command.size(); i++)
00461    {
00462       argv[i] = command[i].c_str();
00463    }
00464    argv[command.size()] = 0;
00465 
00466    pid = ::fork();
00467    if (pid == -1)
00468    {
00469       return -1;
00470    }
00471    if (pid == 0)
00472    {
00473       try
00474       {
00475 
00476          
00477          
00478          
00479          
00480          
00481          
00482          
00483          
00484          
00485          
00486          
00487          
00488          
00489          
00490 
00491          
00492          
00493          sigset_t emptymask;
00494          sigemptyset(&emptymask);
00495          ::sigprocmask(SIG_SETMASK, &emptymask, 0);
00496 
00497          for (size_t sig = 1; sig <= NSIG; ++sig)
00498          {
00499             struct sigaction temp;
00500             sigaction(sig, 0, &temp);
00501             temp.sa_handler = SIG_DFL;
00502             sigaction(sig, &temp, NULL);
00503          }
00504 
00505          
00506          rlimit rl;
00507          int i = sysconf(_SC_OPEN_MAX);
00508          if (getrlimit(RLIMIT_NOFILE, &rl) != -1)
00509          {
00510             if ( i < 0 )
00511             {
00512                i = rl.rlim_max;
00513             }
00514             else
00515             {
00516                i = std::min<int>(rl.rlim_max, i);
00517             }
00518          }
00519          while (i > 2)
00520          {
00521             
00522             ::fcntl(i, F_SETFD, FD_CLOEXEC);
00523             i--;
00524          }
00525 
00526          int rval; 
00527          if (envp)
00528          {
00529             rval = execve(argv[0], const_cast<char* const*>(argv.get()), const_cast<char* const*>(envp));
00530          }
00531          else
00532          {
00533             rval = execv(argv[0], const_cast<char* const*>(argv.get()));
00534          }
00535          cerr << Format( "Exec::safeSystem: execv failed for program "
00536                "%1, rval is %2", argv[0], rval);
00537       }
00538       catch (...)
00539       {
00540          cerr << "something threw an exception after fork()!";
00541       }
00542       _exit(127);
00543    }
00544    do
00545    {
00546       Thread::testCancel();
00547       if (waitpid(pid, &status, 0) == -1)
00548       {
00549          if (errno != EINTR)
00550          {
00551             return -1;
00552          }
00553       }
00554       else
00555       {
00556          return WEXITSTATUS(status);
00557       }
00558    } while (1);
00559 }
00560 
00562 PopenStreams
00563 safePopen(const Array<String>& command,
00564       const String& initialInput)
00565 {
00566    PopenStreams retval = safePopen(command);
00567 
00568    if (initialInput != "")
00569    {
00570       if (retval.in()->write(initialInput.c_str(), initialInput.length()) == -1)
00571       {
00572          OW_THROW_ERRNO_MSG(IOException, "Exec::safePopen: Failed writing input to process");
00573       }
00574    }
00575 
00576    return retval;
00577 }
00578 
00580 PopenStreams 
00581 safePopen(const Array<String>& command, const EnvVars& envVars)
00582 {
00583    const char* const* envp = (envVars.size() > 0) ? envVars.getenvp() : 0;
00584    return safePopen(command, envp);
00585 }
00586 
00588 PopenStreams
00589 safePopen(const Array<String>& command, const char* const envp[])
00590 {
00591    
00592    
00593    
00594    const int UNKNOWN_EXCEPTION = -2000; 
00595 
00596    if (command.size() == 0)
00597    {
00598       OW_THROW(ExecErrorException, "Exec::safePopen: command is empty");
00599    }
00600    
00601    PopenStreams retval;
00602    retval.in( UnnamedPipe::createUnnamedPipe() );
00603    UnnamedPipeRef upipeOut = UnnamedPipe::createUnnamedPipe();
00604    retval.out( upipeOut );
00605    UnnamedPipeRef upipeErr = UnnamedPipe::createUnnamedPipe();
00606    retval.err( upipeErr );
00607 
00608    UnnamedPipeRef execErrorPipe = UnnamedPipe::createUnnamedPipe();
00609 
00610    
00611    AutoPtrVec<const char*> argv(new const char*[command.size() + 1]);
00612    for (size_t i = 0; i < command.size(); i++)
00613    {
00614       argv[i] = command[i].c_str();
00615    }
00616    argv[command.size()] = 0;
00617 
00618    pid_t forkrv = ::fork();
00619    if (forkrv == -1)
00620    {
00621       OW_THROW_ERRNO_MSG(ExecErrorException, "Exec::safePopen: fork() failed");
00622    }
00623    if (forkrv == 0)
00624    {
00625       int execErrorFd = -1;
00626       try
00627       {
00628 
00629          
00630          
00631          
00632          
00633          
00634          
00635          
00636          
00637          
00638          
00639          
00640          
00641          
00642          
00643          
00644    
00645          
00646          
00647          sigset_t emptymask;
00648          sigemptyset(&emptymask);
00649          ::sigprocmask(SIG_SETMASK, &emptymask, 0);
00650    
00651          for (size_t sig = 1; sig <= NSIG; ++sig)
00652          {
00653             struct sigaction temp;
00654             sigaction(sig, 0, &temp);
00655             temp.sa_handler = SIG_DFL;
00656             sigaction(sig, &temp, NULL);
00657          }
00658    
00659          
00660          close(0);
00661          close(1);
00662          close(2);
00663 
00664          
00665          UnnamedPipeRef foo1 = retval.in();
00666          PosixUnnamedPipeRef in = foo1.cast_to<PosixUnnamedPipe>();
00667    
00668          UnnamedPipeRef foo2 = retval.out();
00669          PosixUnnamedPipeRef out = foo2.cast_to<PosixUnnamedPipe>();
00670    
00671          UnnamedPipeRef foo3 = retval.err();
00672          PosixUnnamedPipeRef err = foo3.cast_to<PosixUnnamedPipe>();
00673 
00674          
00675          OW_ASSERT(in);
00676          OW_ASSERT(out);
00677          OW_ASSERT(err);
00678          
00679          int rv = dup2(in->getInputHandle(), 0);
00680          OW_ASSERT(rv != -1);
00681          rv = dup2(out->getOutputHandle(), 1);
00682          OW_ASSERT(rv != -1);
00683          rv = dup2(err->getOutputHandle(), 2);
00684          OW_ASSERT(rv != -1);
00685 
00686          
00687          PosixUnnamedPipeRef execError = execErrorPipe.cast_to<PosixUnnamedPipe>();
00688          OW_ASSERT(execError);
00689          execErrorFd = execError->getOutputHandle();
00690 
00691 
00692          
00693          rlimit rl;
00694          int i = sysconf(_SC_OPEN_MAX);
00695          if (getrlimit(RLIMIT_NOFILE, &rl) != -1)
00696          {
00697             if ( i < 0 )
00698             {
00699                i = rl.rlim_max;
00700             }
00701             else
00702             {
00703                i = std::min<int>(rl.rlim_max, i);
00704             }
00705          }
00706          while (i > 2)
00707          {
00708             
00709             ::fcntl(i, F_SETFD, FD_CLOEXEC);
00710             i--;
00711          }
00712    
00713          int rval = 0;
00714          if (envp)
00715          {
00716             rval = execve(argv[0], const_cast<char* const*>(argv.get()), const_cast<char* const*>(envp));
00717          }
00718          else
00719          {
00720             rval = execv(argv[0], const_cast<char* const*>(argv.get()));
00721          }
00722          
00723          int lerrno = errno;
00724          write(execErrorFd, &lerrno, sizeof(lerrno));
00725       }
00726       catch (...)
00727       {
00728          int errorVal = UNKNOWN_EXCEPTION;
00729          write(execErrorFd, &errorVal, sizeof(errorVal));
00730       }
00731       _exit(127);
00732    }
00733 
00734    
00735    retval.pid (forkrv);
00736 
00737    
00738    UnnamedPipeRef foo1 = retval.in();
00739    PosixUnnamedPipeRef in = foo1.cast_to<PosixUnnamedPipe>();
00740    UnnamedPipeRef foo2 = retval.out(); 
00741    PosixUnnamedPipeRef out = foo2.cast_to<PosixUnnamedPipe>();
00742    UnnamedPipeRef foo3 = retval.err(); 
00743    PosixUnnamedPipeRef err = foo3.cast_to<PosixUnnamedPipe>();
00744    OW_ASSERT(in);
00745    OW_ASSERT(out);
00746    OW_ASSERT(err);
00747    
00748    in->closeInputHandle();
00749    out->closeOutputHandle();
00750    err->closeOutputHandle();
00751    
00752    PosixUnnamedPipeRef execErrorPosixPipe = execErrorPipe.cast_to<PosixUnnamedPipe>();
00753    OW_ASSERT(execErrorPosixPipe);
00754    
00755    execErrorPosixPipe->closeOutputHandle();
00756 
00757    const int SECONDS_TO_WAIT_FOR_CHILD_TO_EXEC = 10; 
00758    execErrorPipe->setReadTimeout(SECONDS_TO_WAIT_FOR_CHILD_TO_EXEC);
00759 
00760    int childErrorCode = 0;
00761    int bytesRead = execErrorPipe->read(&childErrorCode, sizeof(childErrorCode));
00762    
00763    if (bytesRead == ETIMEDOUT) 
00764    {
00765       
00766       
00767       kill(forkrv, SIGKILL);
00768       OW_THROW(ExecErrorException, "Exec::safePopen: timed out waiting for child process to exec()");
00769    }
00770    if (bytesRead > 0)
00771    {
00772       
00773       if (childErrorCode == UNKNOWN_EXCEPTION)
00774       {
00775          OW_THROW(ExecErrorException, "Exec::safePopen: child process caught an exception before reaching exec()");
00776       }
00777       else
00778       {
00779          errno = childErrorCode;
00780          OW_THROW_ERRNO_MSG(ExecErrorException, Format("Exec::safePopen: child process failed running exec() process = %1", command[0]));
00781       }
00782    }
00783 
00784    return retval;
00785 }
00786 
00787 namespace
00788 {
00789 
00790 #ifndef OW_MIN
00791 #define OW_MIN(x, y) (x) < (y) ? (x) : (y)
00792 #endif
00793 
00795 class StringOutputGatherer : public OutputCallback
00796 {
00797 public:
00798    StringOutputGatherer(String& output, int outputLimit)
00799       : m_output(output)
00800       , m_outputLimit(outputLimit)
00801    {
00802    }
00803 private:
00804    virtual void doHandleData(const char* data, size_t dataLen, EOutputSource outputSource, PopenStreams& theStream, size_t streamIndex, Array<char>& inputBuffer)
00805    {
00806       if (m_outputLimit >= 0 && m_output.length() + dataLen > static_cast<size_t>(m_outputLimit))
00807       {
00808          
00809          int lentocopy = OW_MIN(m_outputLimit - m_output.length(), dataLen);
00810          if (lentocopy >= 0)
00811          {
00812             m_output += String(data, lentocopy);
00813          }
00814          OW_THROW(ExecBufferFullException, "Exec::StringOutputGatherer::doHandleData(): buffer full");
00815       }
00816 
00817       m_output += data;
00818    }
00819    String& m_output;
00820    int m_outputLimit;
00821 };
00822 
00824 class SingleStringInputCallback : public InputCallback
00825 {
00826 public:
00827    SingleStringInputCallback(const String& s)
00828       : m_s(s)
00829    {
00830    }
00831 private:
00832    virtual void doGetData(Array<char>& inputBuffer, PopenStreams& theStream, size_t streamIndex)
00833    {
00834       if (m_s.length() > 0)
00835       {
00836          inputBuffer.insert(inputBuffer.end(), m_s.c_str(), m_s.c_str() + m_s.length());
00837          m_s.erase();
00838       }
00839       else if (theStream.in()->isOpen())
00840       {
00841          theStream.in()->close();
00842       }
00843    }
00844    String m_s;
00845 };
00846 
00847 }
00848 
00850 void
00851 executeProcessAndGatherOutput(const Array<String>& command,
00852    String& output, int& processStatus,
00853    int timeoutSecs, int outputLimit, const String& input)
00854 {
00855    executeProcessAndGatherOutput(command, output, processStatus, EnvVars(),
00856       timeoutSecs, outputLimit, input);
00857 }
00858 
00860 void executeProcessAndGatherOutput(
00861    const Array<String>& command,
00862    String& output, 
00863    int& processStatus, 
00864    const EnvVars& envVars,
00865    int timeoutSecs, 
00866    int outputLimit, 
00867    const String& input)
00868 {
00869    processStatus = -1;
00870    Array<PopenStreams> streams;
00871    streams.push_back(safePopen(command, envVars));
00872    Array<ProcessStatus> processStatuses(1);
00873    SingleStringInputCallback singleStringInputCallback(input);
00874 
00875    StringOutputGatherer gatherer(output, outputLimit);
00876    processInputOutput(gatherer, streams, processStatuses, 
00877       singleStringInputCallback, timeoutSecs);
00878 
00879    if (processStatuses[0].hasExited())
00880    {
00881       processStatus = processStatuses[0].getStatus();
00882    }
00883    else
00884    {
00885       processStatus = streams[0].getExitStatus();
00886    }
00887 }
00888 
00890 void
00891 gatherOutput(String& output, PopenStreams& stream, int& processStatus, int timeoutSecs, int outputLimit)
00892 {
00893    Array<PopenStreams> streams;
00894    streams.push_back(stream);
00895    Array<ProcessStatus> processStatuses(1);
00896 
00897    StringOutputGatherer gatherer(output, outputLimit);
00898    SingleStringInputCallback singleStringInputCallback = SingleStringInputCallback(String());
00899    processInputOutput(gatherer, streams, processStatuses, singleStringInputCallback, timeoutSecs);
00900    if (processStatuses[0].hasExited())
00901    {
00902       processStatus = processStatuses[0].getStatus();
00903    }
00904 }
00905 
00907 OutputCallback::~OutputCallback()
00908 {
00909 
00910 }
00911 
00913 void
00914 OutputCallback::handleData(const char* data, size_t dataLen, EOutputSource outputSource, PopenStreams& theStream, size_t streamIndex, Array<char>& inputBuffer)
00915 {
00916    doHandleData(data, dataLen, outputSource, theStream, streamIndex, inputBuffer);
00917 }
00918 
00920 InputCallback::~InputCallback()
00921 {
00922 }
00923 
00925 void
00926 InputCallback::getData(Array<char>& inputBuffer, PopenStreams& theStream, size_t streamIndex)
00927 {
00928    doGetData(inputBuffer, theStream, streamIndex);
00929 }
00930 
00931 namespace
00932 {
00933    struct ProcessOutputState
00934    {
00935       bool inIsOpen;
00936       bool outIsOpen;
00937       bool errIsOpen;
00938       size_t availableDataLen;
00939 
00940       ProcessOutputState()
00941          : inIsOpen(true)
00942          , outIsOpen(true)
00943          , errIsOpen(true)
00944          , availableDataLen(0)
00945       {
00946       }
00947    };
00948 
00949 }
00951 void
00952 processInputOutput(OutputCallback& output, Array<PopenStreams>& streams, Array<ProcessStatus>& processStatuses, InputCallback& input, int timeoutsecs)
00953 {
00954    processStatuses.clear();
00955    processStatuses.resize(streams.size());
00956 
00957    Array<ProcessOutputState> processStates(streams.size());
00958    int numOpenPipes(streams.size() * 2); 
00959 
00960    DateTime curTime;
00961    curTime.setToCurrent();
00962    DateTime timeoutEnd(curTime);
00963    timeoutEnd += timeoutsecs;
00964 
00965    Array<Array<char> > inputs(processStates.size());
00966    for (size_t i = 0; i < processStates.size(); ++i)
00967    {
00968       input.getData(inputs[i], streams[i], i);
00969       processStates[i].availableDataLen = inputs[i].size();
00970       if (!streams[i].out()->isOpen())
00971       {
00972          processStates[i].outIsOpen = false;
00973       }
00974       if (!streams[i].err()->isOpen())
00975       {
00976          processStates[i].errIsOpen = false;
00977       }
00978       if (!streams[i].in()->isOpen())
00979       {
00980          processStates[i].inIsOpen = false;
00981       }
00982 
00983    }
00984 
00985    while (numOpenPipes > 0)
00986    {
00987       Select::SelectObjectArray selObjs; 
00988       std::map<int, int> inputIndexProcessIndex;
00989       std::map<int, int> outputIndexProcessIndex;
00990       for (size_t i = 0; i < streams.size(); ++i)
00991       {
00992          if (processStates[i].outIsOpen)
00993          {
00994             Select::SelectObject selObj(streams[i].out()->getSelectObj()); 
00995             selObj.waitForRead = true; 
00996             selObjs.push_back(selObj); 
00997             inputIndexProcessIndex[selObjs.size() - 1] = i;
00998          }
00999          if (processStates[i].errIsOpen)
01000          {
01001             Select::SelectObject selObj(streams[i].err()->getSelectObj()); 
01002             selObj.waitForRead = true; 
01003             selObjs.push_back(selObj); 
01004             inputIndexProcessIndex[selObjs.size() - 1] = i;
01005          }
01006          if (processStates[i].inIsOpen && processStates[i].availableDataLen > 0)
01007          {
01008             Select::SelectObject selObj(streams[i].in()->getWriteSelectObj()); 
01009             selObj.waitForWrite = true; 
01010             selObjs.push_back(selObj); 
01011             outputIndexProcessIndex[selObjs.size() - 1] = i;
01012          }
01013 
01014          
01015          if (streams[i].pid() != -1)
01016          {
01017             pid_t waitpidrv;
01018             int processStatus(-1);
01019             waitpidrv = noIntrWaitPid(streams[i].pid(), &processStatus, WNOHANG);
01020             if (waitpidrv == -1)
01021             {
01022                streams[i].pid(-1);
01023                OW_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: waitpid() failed");
01024             }
01025             else if (waitpidrv != 0)
01026             {
01027                streams[i].pid(-1);
01028                streams[i].setProcessStatus(processStatus);
01029                processStatuses[i] = ProcessStatus(processStatus);
01030             }
01031          }
01032       }
01033 
01034       const int mstimeout = 100; 
01035       int selectrval = Select::selectRW(selObjs, mstimeout);
01036       switch (selectrval)
01037       {
01038          case Select::SELECT_INTERRUPTED:
01039             
01040             break;
01041          case Select::SELECT_ERROR:
01042          {
01043             OW_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: error selecting on stdout and stderr");
01044          }
01045          break;
01046          case Select::SELECT_TIMEOUT:
01047          {
01048             
01049             
01050             for (size_t i = 0; i < streams.size(); ++i)
01051             {
01052                if (streams[i].pid() == -1)
01053                {
01054                   if (processStates[i].inIsOpen)
01055                   {
01056                      processStates[i].inIsOpen = false;
01057                      streams[i].in()->close();
01058                   }
01059                   if (processStates[i].outIsOpen)
01060                   {
01061                      processStates[i].outIsOpen = false;
01062                      streams[i].out()->close();
01063                      --numOpenPipes;
01064                   }
01065                   if (processStates[i].errIsOpen)
01066                   {
01067                      processStates[i].errIsOpen = false;
01068                      streams[i].err()->close();
01069                      --numOpenPipes;
01070                   }
01071                }
01072             }
01073 
01074             curTime.setToCurrent();
01075             if (timeoutsecs >= 0 && curTime > timeoutEnd)
01076             {
01077                OW_THROW(ExecTimeoutException, "Exec::gatherOutput: timedout");
01078             }
01079          }
01080          break;
01081          default:
01082          {
01083             int availableToFind = selectrval;
01084             
01085             curTime.setToCurrent();
01086             timeoutEnd = curTime;
01087             timeoutEnd += timeoutsecs;
01088 
01089             for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
01090             {
01091                if (!selObjs[i].readAvailable)
01092                {
01093                   continue;
01094                }
01095                else
01096                {
01097                   --availableToFind;
01098                }
01099                int streamIndex = inputIndexProcessIndex[i];
01100                UnnamedPipeRef readstream;
01101                if (processStates[streamIndex].outIsOpen)
01102                {
01103                   if (streams[streamIndex].out()->getSelectObj() == selObjs[i].s)
01104                   {
01105                      readstream = streams[streamIndex].out();
01106                   }
01107                }
01108 
01109                if (!readstream && processStates[streamIndex].errIsOpen)
01110                {
01111                   if (streams[streamIndex].err()->getSelectObj() == selObjs[i].s)
01112                   {
01113                      readstream = streams[streamIndex].err();
01114                   }
01115                }
01116 
01117                if (!readstream)
01118                {
01119                   continue; 
01120                }
01121 
01122                char buff[1024];
01123                int readrc = readstream->read(buff, sizeof(buff) - 1);
01124                if (readrc == 0)
01125                {
01126                   if (readstream == streams[streamIndex].out())
01127                   {
01128                      processStates[streamIndex].outIsOpen = false;
01129                      streams[streamIndex].out()->close();
01130                   }
01131                   else
01132                   {
01133                      processStates[streamIndex].errIsOpen = false;
01134                      streams[streamIndex].err()->close();
01135                   }
01136                   --numOpenPipes;
01137                }
01138                else if (readrc == -1)
01139                {
01140                   OW_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: read error");
01141                }
01142                else
01143                {
01144                   buff[readrc] = '\0';
01145                   output.handleData(buff, readrc, readstream == streams[streamIndex].out() ? E_STDOUT : E_STDERR, streams[streamIndex],
01146                      streamIndex, inputs[streamIndex]);
01147                }
01148             }
01149 
01150             
01151             for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
01152             {
01153                if (!selObjs[i].writeAvailable)
01154                {
01155                   continue;
01156                }
01157                else
01158                {
01159                   --availableToFind;
01160                }
01161                int streamIndex = outputIndexProcessIndex[i];
01162                UnnamedPipeRef writestream;
01163                if (processStates[streamIndex].inIsOpen)
01164                {
01165                   writestream = streams[streamIndex].in();
01166                }
01167 
01168                if (!writestream)
01169                {
01170                   continue; 
01171                }
01172 
01173                size_t offset = inputs[streamIndex].size() - processStates[streamIndex].availableDataLen;
01174                int writerc = writestream->write(&inputs[streamIndex][offset], processStates[streamIndex].availableDataLen);
01175                if (writerc == 0)
01176                {
01177                   processStates[streamIndex].inIsOpen = false;
01178                   streams[streamIndex].in()->close();
01179                }
01180                else if (writerc == -1)
01181                {
01182                   OW_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: write error");
01183                }
01184                else
01185                {
01186                   inputs[streamIndex].erase(inputs[streamIndex].begin(), inputs[streamIndex].begin() + writerc);
01187                   input.getData(inputs[streamIndex], streams[streamIndex], streamIndex);
01188                   processStates[streamIndex].availableDataLen = inputs[streamIndex].size();
01189                }
01190             }
01191          }
01192          break;
01193       }
01194    }
01195 }
01196 
01197 } 
01198 #endif
01199 } 
01200