00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00036 #include "OW_config.h"
00037 #include "OW_PollingManager.hpp"
00038 #include "OW_NonRecursiveMutexLock.hpp"
00039 #include "OW_DateTime.hpp"
00040 #include "OW_CIMOMHandleIFC.hpp"
00041 #include "OW_Format.hpp"
00042 #include "OW_ConfigOpts.hpp"
00043 #include "OW_PolledProviderIFC.hpp"
00044 #include "OW_ProviderManager.hpp"
00045 #include "OW_Platform.hpp"
00046 #include "OW_TimeoutException.hpp"
00047 #include "OW_OperationContext.hpp"
00048 #include "OW_RepositoryIFC.hpp"
00049 #include "OW_ServiceIFCNames.hpp"
00050 
00051 
00052 #include <limits>
00053 
00054 namespace
00055 {
00056    template <bool b> struct compile_time_assert;
00057    template <> struct compile_time_assert<true> { };
00058    using OpenWBEM::Int32;
00059 
00060    
00061    time_t safe_add(time_t x, Int32 y)
00062    {
00063       compile_time_assert<(sizeof(time_t) >= sizeof(Int32))> dummy;
00064       time_t const max_time = std::numeric_limits<time_t>::max();
00065       return (
00066          y <= 0 ? x :
00067          x > max_time - y ? max_time :
00068          x + y
00069       );
00070    }
00071 }
00072 
00073 namespace OW_NAMESPACE
00074 {
00075 
00076 namespace
00077 {
00078 const String COMPONENT_NAME("ow.owcimomd.PollingManager");
00079 }
00080 
00082 PollingManager::PollingManager(const ProviderManagerRef& providerManager)
00083    : m_pollingManagerThread(new PollingManagerThread(providerManager))
00084 {
00085 
00086 }
00087 
00089 PollingManager::~PollingManager()
00090 {
00091 }
00092 
00094 void
00095 PollingManager::init(const ServiceEnvironmentIFCRef& env)
00096 {
00097    m_pollingManagerThread->init(env);
00098 }
00099 
00101 void
00102 PollingManager::start()
00103 {
00104    m_pollingManagerThread->start();
00105    m_pollingManagerThread->waitUntilReady();
00106 }
00107 
00109 void
00110 PollingManager::shutdown()
00111 {
00112    m_pollingManagerThread->shutdown();
00113 }
00114 
00116 void
00117 PollingManager::addPolledProvider(const PolledProviderIFCRef& p)
00118 {
00119    m_pollingManagerThread->addPolledProvider(p);
00120 }
00121 
00122 
00123 
00125 PollingManagerThread::PollingManagerThread(const ProviderManagerRef& providerManager)
00126    : Thread()
00127    , m_shuttingDown(false)
00128    , m_providerManager(providerManager)
00129    , m_startedBarrier(2)
00130 {
00131 }
00133 PollingManagerThread::~PollingManagerThread()
00134 {
00135 }
00137 String
00138 PollingManager::getName() const
00139 {
00140    return ServiceIFCNames::PollingManager;
00141 }
00142 
00144 StringArray
00145 PollingManager::getDependencies() const
00146 {
00147    StringArray rv;
00148    rv.push_back(ServiceIFCNames::CIMServer);
00149    return rv;
00150 }
00151 
00153 void
00154 PollingManagerThread::init(const ServiceEnvironmentIFCRef& env)
00155 {
00156    m_env = env;
00157    m_logger = m_env->getLogger(COMPONENT_NAME);
00158    Int32 maxThreads;
00159    try
00160    {
00161       maxThreads = env->getConfigItem(ConfigOpts::POLLING_MANAGER_MAX_THREADS_opt, OW_DEFAULT_POLLING_MANAGER_MAX_THREADS).toInt32();
00162    }
00163    catch (const StringConversionException&)
00164    {
00165       maxThreads = String(OW_DEFAULT_POLLING_MANAGER_MAX_THREADS).toInt32();
00166    }
00167    
00168    m_triggerRunnerThreadPool = ThreadPoolRef(new ThreadPool(ThreadPool::DYNAMIC_SIZE, maxThreads, maxThreads * 10,
00169       m_logger, "Polling Manager"));
00170 }
00171 
00173 namespace
00174 {
00175    class PollingManagerProviderEnvironment : public ProviderEnvironmentIFC
00176    {
00177    public:
00178       PollingManagerProviderEnvironment(ServiceEnvironmentIFCRef env)
00179          : m_context()
00180          , m_env(env)
00181       {}
00182       virtual CIMOMHandleIFCRef getCIMOMHandle() const
00183       {
00184          return m_env->getCIMOMHandle(m_context);
00185       }
00186       virtual CIMOMHandleIFCRef getRepositoryCIMOMHandle() const
00187       {
00188          return m_env->getCIMOMHandle(m_context, ServiceEnvironmentIFC::E_BYPASS_PROVIDERS);
00189       }
00190       virtual RepositoryIFCRef getRepository() const
00191       {
00192          return m_env->getRepository();
00193       }
00194       virtual String getConfigItem(const String& name, const String& defRetVal="") const
00195       {
00196          return m_env->getConfigItem(name, defRetVal);
00197       }
00198       virtual StringArray getMultiConfigItem(const String &itemName, 
00199          const StringArray& defRetVal, const char* tokenizeSeparator = 0) const
00200       {
00201          return m_env->getMultiConfigItem(itemName, defRetVal, tokenizeSeparator);
00202       }
00203       
00204       virtual LoggerRef getLogger() const
00205       {
00206          return m_env->getLogger(COMPONENT_NAME);
00207       }
00208       virtual LoggerRef getLogger(const String& componentName) const
00209       {
00210          return m_env->getLogger(componentName);
00211       }
00212       virtual String getUserName() const
00213       {
00214          return Platform::getCurrentUserName();
00215       }
00216       virtual OperationContext& getOperationContext()
00217       {
00218          return m_context;
00219       }
00220       virtual ProviderEnvironmentIFCRef clone() const
00221       {
00222          return ProviderEnvironmentIFCRef(new PollingManagerProviderEnvironment(m_env));
00223       }
00224    private:
00225       mutable OperationContext m_context;
00226       ServiceEnvironmentIFCRef m_env;
00227    };
00228    ProviderEnvironmentIFCRef createProvEnvRef(ServiceEnvironmentIFCRef env)
00229    {
00230       return ProviderEnvironmentIFCRef(new PollingManagerProviderEnvironment(env));
00231    }
00232 }
00234 Int32
00235 PollingManagerThread::run()
00236 {
00237    
00238    m_startedBarrier.wait();
00239 
00240    bool doInit = true;
00241 
00242    
00243    PolledProviderIFCRefArray itpra =
00244          m_providerManager->getPolledProviders(createProvEnvRef(m_env));
00245 
00246    OW_LOG_DEBUG(m_logger, Format("PollingManager found %1 polled providers",
00247       itpra.size()));
00248    {
00249       
00250       NonRecursiveMutexLock ml(m_triggerGuard);
00251       for (size_t i = 0; i < itpra.size(); ++i)
00252       {
00253          TriggerRunnerRef tr(new TriggerRunner(this, m_env));
00254          tr->m_pollInterval =
00255             itpra[i]->getInitialPollingInterval(createProvEnvRef(m_env));
00256          OW_LOG_DEBUG(m_logger, Format("PollingManager poll interval for provider"
00257             " %1: %2", i, tr->m_pollInterval));
00258          if (!tr->m_pollInterval)
00259          {
00260             continue;
00261          }
00262          tr->m_itp = itpra[i];
00263          m_triggerRunners.append(tr);
00264       }
00265    }
00266    {
00267       NonRecursiveMutexLock l(m_triggerGuard);
00268       while (!m_shuttingDown)
00269       {
00270          bool rightNow;
00271          UInt32 sleepTime = calcSleepTime(rightNow, doInit);
00272          doInit = false;
00273          if (!rightNow)
00274          {
00275             if (sleepTime == 0)
00276             {
00277                m_triggerCondition.wait(l);
00278             }
00279             else
00280             {
00281                m_triggerCondition.timedWait(l, sleepTime);
00282             }
00283          }
00284          if (m_shuttingDown)
00285          {
00286             break;
00287          }
00288          processTriggers();
00289       }
00290    }
00291    
00292    m_triggerRunnerThreadPool->shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 60);
00293    m_triggerRunners.clear();
00294    return 0;
00295 }
00297 UInt32
00298 PollingManagerThread::calcSleepTime(bool& rightNow, bool doInit)
00299 {
00300    rightNow = false;
00301    DateTime dtm;
00302    dtm.setToCurrent();
00303    time_t tm = dtm.get();
00304 
00305    Int32 const int32_max = std::numeric_limits<Int32>::max();
00306    time_t const time_t_max = std::numeric_limits<time_t>::max();
00307    time_t leastTime = (time_t_max > int32_max ? int32_max : time_t_max);
00308    
00309    
00310 
00311    int checkedCount = 0;
00312    
00313    for (size_t i = 0; i < m_triggerRunners.size(); i++)
00314    {
00315       if (m_triggerRunners[i]->m_isRunning
00316          || m_triggerRunners[i]->m_pollInterval == 0)
00317       {
00318          continue;
00319       }
00320       if (doInit)
00321       {
00322          m_triggerRunners[i]->m_nextPoll =
00323             safe_add(tm, m_triggerRunners[i]->m_pollInterval);
00324       }
00325       else if (m_triggerRunners[i]->m_nextPoll <= tm)
00326       {
00327          rightNow = true;
00328          return 0;
00329       }
00330       
00331       checkedCount++;
00332       time_t diff = m_triggerRunners[i]->m_nextPoll - tm;
00333       if (diff < leastTime)
00334       {
00335          leastTime = diff;
00336       }
00337    }
00338    return (checkedCount == 0) ? 0 : UInt32(leastTime);
00339 }
00341 void
00342 PollingManagerThread::processTriggers()
00343 {
00344    DateTime dtm;
00345    dtm.setToCurrent();
00346    time_t tm = dtm.get();
00347    for (size_t i = 0; i < m_triggerRunners.size(); i++)
00348    {
00349       if (m_triggerRunners[i]->m_isRunning)
00350       {
00351          continue;
00352       }
00353       if (m_triggerRunners[i]->m_pollInterval == 0)
00354       {
00355          
00356          m_triggerRunners.remove(i--);
00357          continue;
00358       }
00359       if (tm >= m_triggerRunners[i]->m_nextPoll)
00360       {
00361          m_triggerRunners[i]->m_isRunning = true;
00362          if (!m_triggerRunnerThreadPool->tryAddWork(m_triggerRunners[i]))
00363          {
00364             OW_LOG_INFO(m_logger, "Failed to run polled provider, because there are too many already running!");
00365          }
00366       }
00367    }
00368 }
00370 void
00371 PollingManagerThread::shutdown()
00372 {
00373    {
00374       NonRecursiveMutexLock l(m_triggerGuard);
00375       m_shuttingDown = true;
00376       m_triggerCondition.notifyAll();
00377    }
00378    
00379    this->join();
00380 
00381    
00382    m_triggerRunners.clear();
00383    m_env = 0;
00384    m_providerManager = 0;
00385    m_triggerRunnerThreadPool = 0;
00386 
00387 }
00389 void
00390 PollingManagerThread::addPolledProvider(const PolledProviderIFCRef& p)
00391 {
00392    NonRecursiveMutexLock l(m_triggerGuard);
00393    if (m_shuttingDown)
00394       return;
00395    TriggerRunnerRef tr(new TriggerRunner(this, m_env));
00396    tr->m_pollInterval = 
00397       p->getInitialPollingInterval(createProvEnvRef(m_env));
00398    OW_LOG_DEBUG(m_logger, Format("PollingManager poll interval for provider"
00399       " %1", tr->m_pollInterval));
00400    if (!tr->m_pollInterval)
00401    {
00402       return;
00403    }
00404    DateTime dtm;
00405    dtm.setToCurrent();
00406    time_t tm = dtm.get();
00407    tr->m_nextPoll = safe_add(tm, tr->m_pollInterval);
00408    tr->m_itp = p;
00409    m_triggerRunners.append(tr);
00410    m_triggerCondition.notifyAll();
00411 }
00413 PollingManagerThread::TriggerRunner::TriggerRunner(PollingManagerThread* svr,
00414    ServiceEnvironmentIFCRef env)
00415    : Runnable()
00416    , m_itp(0)
00417    , m_nextPoll(0)
00418    , m_isRunning(false)
00419    , m_pollInterval(0)
00420    , m_pollMan(svr)
00421    , m_env(env)
00422    , m_logger(env->getLogger(COMPONENT_NAME))
00423 {
00424 }
00426 void
00427 PollingManagerThread::TriggerRunner::run()
00428 {
00429    Int32 nextInterval = 0;
00430    try
00431    {
00432       nextInterval = m_itp->poll(createProvEnvRef(m_env));
00433    }
00434    catch(std::exception& e)
00435    {
00436       OW_LOG_ERROR(m_logger, Format("Caught Exception while running poll: %1",
00437          e.what()));
00438    }
00439    catch(ThreadCancelledException& e)
00440    {
00441       throw;
00442    }
00443    catch(...)
00444    {
00445       OW_LOG_ERROR(m_logger, "Caught Unknown Exception while running poll");
00446    }
00447    NonRecursiveMutexLock l(m_pollMan->m_triggerGuard);
00448    if (nextInterval == 0 || m_pollInterval == 0) 
00449    {
00450       m_pollInterval = 0;
00451       m_nextPoll = 0;
00452    }
00453    else
00454    {
00455       if (nextInterval > 0)
00456       {
00457          m_pollInterval = nextInterval;
00458       }
00459       DateTime dtm;
00460       dtm.setToCurrent();
00461       m_nextPoll = safe_add(dtm.get(), m_pollInterval);
00462    }
00463    m_isRunning = false;
00464    m_pollMan->m_triggerCondition.notifyOne();
00465 }
00466 
00468 void
00469 PollingManagerThread::TriggerRunner::doCooperativeCancel()
00470 {
00471    m_itp->doCooperativeCancel();
00472 }
00473 
00475 void
00476 PollingManagerThread::TriggerRunner::doDefinitiveCancel()
00477 {
00478    m_itp->doDefinitiveCancel();
00479 }
00480 
00482 void
00483 PollingManagerThread::doCooperativeCancel()
00484 {
00485    NonRecursiveMutexLock l(m_triggerGuard);
00486    m_shuttingDown = true;
00487    m_triggerCondition.notifyAll();
00488 }
00489 
00490 } 
00491