OW_IndicationServerImpl.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2001-2004 Vintela, Inc. All rights reserved.
00003 *
00004 * Redistribution and use in source and binary forms, with or without
00005 * modification, are permitted provided that the following conditions are met:
00006 *
00007 *  - Redistributions of source code must retain the above copyright notice,
00008 *    this list of conditions and the following disclaimer.
00009 *
00010 *  - Redistributions in binary form must reproduce the above copyright notice,
00011 *    this list of conditions and the following disclaimer in the documentation
00012 *    and/or other materials provided with the distribution.
00013 *
00014 *  - Neither the name of Vintela, Inc. nor the names of its
00015 *    contributors may be used to endorse or promote products derived from this
00016 *    software without specific prior written permission.
00017 *
00018 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00019 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc. OR THE CONTRIBUTORS
00022 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 * POSSIBILITY OF SUCH DAMAGE.
00029 *******************************************************************************/
00030 
00035 #include "OW_config.h"
00036 #include "OW_IndicationServerImpl.hpp"
00037 #include "OW_DateTime.hpp"
00038 #include "OW_Assertion.hpp"
00039 #include "OW_Format.hpp"
00040 #include "OW_ProviderManager.hpp"
00041 #include "OW_ConfigOpts.hpp"
00042 #include "OW_WQLIFC.hpp"
00043 #include "OW_CIMInstanceEnumeration.hpp"
00044 #include "OW_CIMValueCast.hpp"
00045 #include "OW_SortedVectorSet.hpp"
00046 #include "OW_NULLValueException.hpp"
00047 #include "OW_PollingManager.hpp"
00048 #include "OW_CppProxyProvider.hpp"
00049 #include "OW_Platform.hpp"
00050 #include "OW_CIMNameSpaceUtils.hpp"
00051 #include "OW_MutexLock.hpp"
00052 #include "OW_CIMClass.hpp"
00053 #include "OW_WQLInstancePropertySource.hpp"
00054 #include "OW_OperationContext.hpp"
00055 #include "OW_LocalCIMOMHandle.hpp"
00056 #include "OW_ExceptionIds.hpp"
00057 #include "OW_CIMException.hpp"
00058 #include "OW_CIMDateTime.hpp"
00059 #include "OW_LifecycleIndicationPoller.hpp"
00060 #include "OW_ServiceIFCNames.hpp"
00061 #include "OW_CIMNameSpaceUtils.hpp"
00062 
00063 #include <iterator>
00064 #include <set>
00065 
00066 #if defined(OW_THREADS_RUN_AS_USER)
00067    #if defined(OW_NETWARE)
00068       #include <client.h>
00069       #include <fsio.h>
00070       #include <nks/dirio.h>
00071       #define USE_CIMOM_UID setcwd2(0)
00072    #elif defined(OW_GNU_LINUX)
00073       #ifdef OW_HAVE_UNISTD_H
00074          #include <unistd.h>
00075       #endif
00076       #ifdef OW_HAVE_SYS_TYPES_H
00077          #include <sys/types.h>
00078       #endif
00079       #define USE_CIMOM_UID seteuid(getuid())
00080    #else
00081       #define USE_CIMOM_UID
00082    #endif
00083 #else
00084    #define USE_CIMOM_UID
00085 #endif
00086 
00087 namespace OW_NAMESPACE
00088 {
00089 
00090 OW_DECLARE_EXCEPTION(IndicationServer);
00091 OW_DEFINE_EXCEPTION_WITH_ID(IndicationServer);
00092 
00093 using namespace WBEMFlags;
00094 
00095 namespace
00096 {
00097 String COMPONENT_NAME("ow.owcimomd.indication.Server");
00098 }
00099 
00101 IndicationServerImpl::IndicationServerImpl()
00102    : m_indicationServerThread(new IndicationServerImplThread)
00103 {
00104 }
00105 
00107 IndicationServerImpl::~IndicationServerImpl()
00108 {
00109 }
00110 
00112 String
00113 IndicationServerImpl::getName() const
00114 {
00115    return ServiceIFCNames::IndicationServer;
00116 }
00117 
00119 StringArray
00120 IndicationServerImpl::getDependencies() const
00121 {
00122    StringArray rv;
00123    rv.push_back(ServiceIFCNames::CIMServer);
00124    rv.push_back(ServiceIFCNames::ProviderManager);
00125    return rv;
00126 }
00127 
00129 void
00130 IndicationServerImpl::init(const ServiceEnvironmentIFCRef& env)
00131 {
00132    CIMOMEnvironmentRef cimomEnv(env.cast_to<CIMOMEnvironment>());
00133    OW_ASSERT(cimomEnv);
00134    m_indicationServerThread->init(cimomEnv);
00135 }
00136 
00138 void
00139 IndicationServerImpl::start()
00140 {
00141    m_indicationServerThread->start();
00142    m_indicationServerThread->waitUntilReady();
00143 }
00144 
00146 void
00147 IndicationServerImpl::shutdown()
00148 {
00149    m_indicationServerThread->shutdown();
00150 }
00151 
00153 void
00154 IndicationServerImpl::processIndication(const CIMInstance& instance,const String& instNS)
00155 {
00156    m_indicationServerThread->processIndication(instance, instNS);
00157 }
00158 
00160 void
00161 IndicationServerImpl::startDeleteSubscription(const String& ns, const CIMObjectPath& subPath)
00162 {
00163    m_indicationServerThread->startDeleteSubscription(ns, subPath);
00164 }
00165 
00167 void
00168 IndicationServerImpl::startCreateSubscription(const String& ns, const CIMInstance& subInst, const String& username)
00169 {
00170    m_indicationServerThread->startCreateSubscription(ns, subInst, username);
00171 }
00172 
00174 void
00175 IndicationServerImpl::startModifySubscription(const String& ns, const CIMInstance& subInst)
00176 {
00177    m_indicationServerThread->startModifySubscription(ns, subInst);
00178 }
00179    
00181 void
00182 IndicationServerImpl::modifyFilter(const String& ns, const CIMInstance& filterInst, const String& userName)
00183 {
00184    m_indicationServerThread->modifyFilter(ns, filterInst, userName);
00185 }
00186 
00188 struct NotifyTrans
00189 {
00190    NotifyTrans(
00191       const String& ns,
00192       const CIMInstance& indication,
00193       const CIMInstance& handler,
00194       const CIMInstance& subscription,
00195       const IndicationExportProviderIFCRef provider) :
00196          m_ns(ns), m_indication(indication), m_handler(handler), m_subscription(subscription), m_provider(provider) {}
00197    String m_ns;
00198    CIMInstance m_indication;
00199    CIMInstance m_handler;
00200    CIMInstance m_subscription;
00201    IndicationExportProviderIFCRef m_provider;
00202 };
00204 namespace
00205 {
00207 class Notifier : public Runnable
00208 {
00209 public:
00210    Notifier(IndicationServerImplThread* pmgr, NotifyTrans& ntrans) :
00211       m_pmgr(pmgr), m_trans(ntrans) {}
00212    virtual void run();
00213 private:
00214    virtual void doCooperativeCancel();
00215    virtual void doDefinitiveCancel();
00216    IndicationServerImplThread* m_pmgr;
00217    NotifyTrans m_trans;
00218 };
00219 class IndicationServerProviderEnvironment : public ProviderEnvironmentIFC
00220 {
00221 public:
00222    IndicationServerProviderEnvironment(
00223       const CIMOMEnvironmentRef& env)
00224       : ProviderEnvironmentIFC()
00225       , m_opctx()
00226       , m_env(env)
00227    {
00228    }
00229    virtual CIMOMHandleIFCRef getCIMOMHandle() const
00230    {
00231       return m_env->getCIMOMHandle(m_opctx);;
00232    }
00233    
00234    virtual CIMOMHandleIFCRef getRepositoryCIMOMHandle() const
00235    {
00236       return m_env->getCIMOMHandle(m_opctx, ServiceEnvironmentIFC::E_BYPASS_PROVIDERS);;
00237    }
00238    virtual RepositoryIFCRef getRepository() const
00239    {
00240       return m_env->getRepository();
00241    }
00242    virtual String getConfigItem(const String& name, const String& defRetVal="") const
00243    {
00244       return m_env->getConfigItem(name, defRetVal);
00245    }
00246    virtual StringArray getMultiConfigItem(const String &itemName, 
00247       const StringArray& defRetVal, const char* tokenizeSeparator = 0) const
00248    {
00249       return m_env->getMultiConfigItem(itemName, defRetVal, tokenizeSeparator);
00250    }
00251    
00252    virtual LoggerRef getLogger() const
00253    {
00254       return m_env->getLogger(COMPONENT_NAME);
00255    }
00256    virtual LoggerRef getLogger(const String& componentName) const
00257    {
00258       return m_env->getLogger(componentName);
00259    }
00260    virtual String getUserName() const
00261    {
00262       return Platform::getCurrentUserName();
00263    }
00264    virtual OperationContext& getOperationContext()
00265    {
00266       return m_opctx;
00267    }
00268    virtual ProviderEnvironmentIFCRef clone() const
00269    {
00270       return ProviderEnvironmentIFCRef(new IndicationServerProviderEnvironment(m_env));
00271    }
00272 private:
00273    mutable OperationContext m_opctx;
00274    CIMOMEnvironmentRef m_env;
00275 };
00276 ProviderEnvironmentIFCRef createProvEnvRef(CIMOMEnvironmentRef env)
00277 {
00278    return ProviderEnvironmentIFCRef(new IndicationServerProviderEnvironment(env));
00279 }
00281 void
00282 Notifier::run()
00283 {
00284    // TODO: process the subscription error handling here
00285    CIMOMEnvironmentRef env = m_pmgr->getEnvironment();
00286    try
00287    {
00288       m_trans.m_provider->exportIndication(createProvEnvRef(env),
00289          m_trans.m_ns, m_trans.m_handler, m_trans.m_indication);
00290    }
00291    catch(Exception& e)
00292    {
00293       OW_LOG_ERROR(env->getLogger(COMPONENT_NAME), Format("Caught exception while exporting indication: %1", e));
00294    }
00295    catch(ThreadCancelledException&)
00296    {
00297       throw;
00298    }
00299    catch(...)
00300    {
00301       OW_LOG_ERROR(env->getLogger(COMPONENT_NAME), "Unknown exception caught while exporting indication");
00302    }
00303 }
00305 void
00306 Notifier::doCooperativeCancel()
00307 {
00308    m_trans.m_provider->doCooperativeCancel();
00309 }
00311 void
00312 Notifier::doDefinitiveCancel()
00313 {
00314    m_trans.m_provider->doDefinitiveCancel();
00315 }
00316 
00317 } // end anonymous namespace
00319 IndicationServerImplThread::IndicationServerImplThread()
00320    : m_shuttingDown(false)
00321    , m_startedBarrier(2)
00322 {
00323 }
00324 namespace
00325 {
00327 class instanceEnumerator : public CIMInstanceResultHandlerIFC
00328 {
00329 public:
00330    instanceEnumerator(IndicationServerImplThread* is_,
00331       const String& ns_)
00332       : is(is_)
00333       , ns(ns_)
00334    {}
00335 private:
00336    void doHandle(const CIMInstance& i)
00337    {
00338       // try and get the name of whoever first created the subscription
00339       String username;
00340       CIMProperty p = i.getProperty("__Subscription_UserName");
00341       if (p)
00342       {
00343          CIMValue v = p.getValue();
00344          if (v)
00345          {
00346             username = v.toString();
00347          }
00348       }
00349 
00350       try
00351       {
00352          // TODO: If the provider rejects the subscription, we need to disable it!
00353          is->createSubscription(ns, i, username);
00354       }
00355       catch(Exception& e)
00356       {
00357          // Something was wrong with the last subscription.
00358          // If we allow this exception to pass through, all subsequent
00359          // subscriptions will be ignored at init time.
00360          // We'll ignore it here to keep that from happening.
00361       }
00362    }
00363    IndicationServerImplThread* is;
00364    String ns;
00365 };
00367 class namespaceEnumerator : public StringResultHandlerIFC
00368 {
00369 public:
00370    namespaceEnumerator(
00371       const CIMOMHandleIFCRef& ch_,
00372       IndicationServerImplThread* is_)
00373       : ch(ch_)
00374       , is(is_)
00375    {}
00376 private:
00377    void doHandle(const String& ns)
00378    {
00379       instanceEnumerator ie(is, ns);
00380       try
00381       {
00382          ch->enumInstances(ns,"CIM_IndicationSubscription", ie);
00383       }
00384       catch (const CIMException& ce)
00385       {
00386          // do nothing, class probably doesn't exist in the namespace
00387       }
00388    }
00389    CIMOMHandleIFCRef ch;
00390    IndicationServerImplThread* is;
00391 };
00392 } // end anonymous namespace
00394 void
00395 IndicationServerImplThread::init(const CIMOMEnvironmentRef& env)
00396 {
00397    m_env = env;
00398    m_logger = env->getLogger(COMPONENT_NAME);
00399    // set up the thread pool
00400    Int32 maxIndicationExportThreads;
00401    try
00402    {
00403       maxIndicationExportThreads = env->getConfigItem(ConfigOpts::MAX_INDICATION_EXPORT_THREADS_opt, OW_DEFAULT_MAX_INDICATION_EXPORT_THREADS).toInt32();
00404    }
00405    catch (const StringConversionException&)
00406    {
00407          maxIndicationExportThreads = String(OW_DEFAULT_MAX_INDICATION_EXPORT_THREADS).toInt32();
00408    }
00409    m_notifierThreadPool = ThreadPoolRef(new ThreadPool(ThreadPool::DYNAMIC_SIZE,
00410             maxIndicationExportThreads, maxIndicationExportThreads * 100, m_logger, "Indication Server Notifiers"));
00411    
00412    // pool to handle threads modifying subscriptions
00413    m_subscriptionPool = ThreadPoolRef(new ThreadPool(ThreadPool::DYNAMIC_SIZE,
00414       1, // 1 thread because only 1 can run at a time because of mutex locking.
00415          // Also modifyFilter() takes advantage of this detail to make sure a delete/create are processed in order.
00416       0, // unlimited size queue
00417       m_logger, "Indication Server Subscriptions"));
00418 
00419    //-----------------
00420    // Load map with available indication export providers
00421    //-----------------
00422    ProviderManagerRef pProvMgr = m_env->getProviderManager();
00423    IndicationExportProviderIFCRefArray pra =
00424       pProvMgr->getIndicationExportProviders(createProvEnvRef(m_env));
00425    OW_LOG_DEBUG(m_logger, Format("IndicationServerImplThread: %1 export providers found",
00426       pra.size()));
00427    for (size_t i = 0; i < pra.size(); i++)
00428    {
00429       StringArray clsNames = pra[i]->getHandlerClassNames();
00430       for (size_t j = 0; j < clsNames.size(); j++)
00431       {
00432          m_providers[clsNames[j]] = pra[i];
00433          OW_LOG_DEBUG(m_logger, Format("IndicationServerImplThread: Handling"
00434             " indication type %1", clsNames[j]));
00435       }
00436    }
00437 
00438    // get the wql lib
00439    m_wqlRef = m_env->getWQLRef();
00440    if (!m_wqlRef)
00441    {
00442       const char* const err = "Cannot process indications, because there is no "
00443          "WQL library.";
00444       OW_LOG_FATAL_ERROR(m_logger, err);
00445       OW_THROW(IndicationServerException, err);
00446    }
00447 
00448 }
00449 
00451 CIMOMEnvironmentRef
00452 IndicationServerImplThread::getEnvironment() const
00453 {
00454    return m_env;
00455 }
00456 
00458 void
00459 IndicationServerImplThread::waitUntilReady()
00460 {
00461    m_startedBarrier.wait();
00462 }
00464 IndicationServerImplThread::~IndicationServerImplThread()
00465 {
00466    try
00467    {
00468       m_providers.clear();
00469    }
00470    catch (...)
00471    {
00472       // don't let exceptions escape
00473    }
00474 }
00476 Int32
00477 IndicationServerImplThread::run()
00478 {
00479    // let CIMOMEnvironment know we're running and ready to go.
00480    m_startedBarrier.wait();
00481 
00482    // Now initialize for all the subscriptions that exist in the repository.
00483    // This calls createSubscription for every instance of
00484    // CIM_IndicationSubscription in all namespaces.
00485    // TODO: If the provider rejects the subscription, we need to disable it!
00486    OperationContext context;
00487    CIMOMHandleIFCRef lch = m_env->getCIMOMHandle(context);
00488    namespaceEnumerator nsHandler(lch, this);
00489    m_env->getRepository()->enumNameSpace(nsHandler, context);
00490 
00491    {
00492       NonRecursiveMutexLock l(m_mainLoopGuard);
00493       while (!m_shuttingDown)
00494       {
00495          m_mainLoopCondition.wait(l);
00496          
00497          try
00498          {
00499             while (!m_procTrans.empty() && !m_shuttingDown)
00500             {
00501                ProcIndicationTrans trans = m_procTrans.front();
00502                m_procTrans.pop_front();
00503                l.release();
00504                _processIndication(trans.instance, trans.nameSpace);
00505                l.lock();
00506             }
00507          }
00508          catch (const Exception& e)
00509          {
00510             OW_LOG_ERROR(m_logger, Format("IndicationServerImplThread::run caught "
00511                " exception %1", e));
00512          }
00513          catch(ThreadCancelledException&)
00514          {
00515             throw;
00516          }
00517          catch(...)
00518          {
00519             OW_LOG_ERROR(m_logger, "IndicationServerImplThread::run caught unknown"
00520                " exception");
00521             // Ignore?
00522          }
00523       }
00524    }
00525    OW_LOG_DEBUG(m_logger, "IndicationServerImplThread::run shutting down");
00526    m_subscriptionPool->shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 5);
00527    m_notifierThreadPool->shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 60);
00528    return 0;
00529 }
00530 
00532 void
00533 IndicationServerImplThread::deactivateAllSubscriptions()
00534 {
00535    typedef std::set<SubscriptionRef> SubSet;
00536    SubSet uniqueSubscriptions;
00537 
00538    for (subscriptions_t::iterator curSubscription = m_subscriptions.begin();
00539         curSubscription != m_subscriptions.end(); ++curSubscription)
00540    {
00541       uniqueSubscriptions.insert(curSubscription->second);
00542    }
00543 
00544    for (SubSet::iterator curSubscription = uniqueSubscriptions.begin(); curSubscription != uniqueSubscriptions.end(); ++curSubscription)
00545    {
00546       Subscription& sub(**curSubscription);
00547       IndicationProviderIFCRefArray& providers(sub.m_providers);
00548       for (IndicationProviderIFCRefArray::iterator curProvider = providers.begin();
00549            curProvider != providers.end(); ++curProvider)
00550       {
00551          try
00552          {
00553             OW_LOG_DEBUG(m_logger, Format("About to call deActivateFilter() for subscription %1, provider %2",
00554                sub.m_subPath.toString(), curProvider - providers.begin()));
00555             (*curProvider)->deActivateFilter(createProvEnvRef(m_env), sub.m_selectStmt, sub.m_selectStmt.getClassName(),
00556                sub.m_subPath.getNameSpace(), sub.m_classes);
00557             OW_LOG_DEBUG(m_logger, "deActivateFilter() done");
00558          }
00559          catch (Exception& e)
00560          {
00561             OW_LOG_ERROR(m_logger, Format("Caught exception while calling deActivateFilter(): %1", e));
00562          }
00563       }
00564    }
00565 }
00566 
00568 void
00569 IndicationServerImplThread::shutdown()
00570 {
00571    {
00572       NonRecursiveMutexLock l(m_mainLoopGuard);
00573       m_shuttingDown = true;
00574       m_mainLoopCondition.notifyAll();
00575    }
00576    // wait until the main thread exits.
00577    this->join();
00578    
00579    deactivateAllSubscriptions();
00580 
00581    // clear out variables to avoid circular reference counts.
00582    m_providers.clear();
00583    m_procTrans.clear();
00584    m_env = 0;
00585    m_subscriptions.clear();
00586    m_pollers.clear();
00587    m_notifierThreadPool = 0;
00588    m_subscriptionPool = 0;
00589    m_wqlRef.setNull();
00590 }
00592 void
00593 IndicationServerImplThread::processIndication(const CIMInstance& instanceArg,
00594    const String& instNS)
00595 {
00596    NonRecursiveMutexLock ml(m_mainLoopGuard);
00597    if (m_shuttingDown)
00598    {
00599       return;
00600    }
00601    ProcIndicationTrans trans(instanceArg, instNS);
00602    m_procTrans.push_back(trans);
00603    m_mainLoopCondition.notifyOne();
00604 }
00606 namespace
00607 {
00608 void splitUpProps(const StringArray& props,
00609    HashMap<String, StringArray>& map)
00610 {
00611    // This function may appear a little complicated...
00612    // It's handling the many cases needed to split up
00613    // the props so they can be quickly accessed in
00614    // filterInstance().
00615    // The props that are possible are:
00616    // *
00617    // PropertyName
00618    // ClassName.PropertyName
00619    // ClassName.*
00620    // PropertyName.*
00621    // PropertyName.EmbedName
00622    // ClassName.PropertyName.*
00623    // ClassName.PropertyName.EmbedName
00624    for (size_t i = 0; i < props.size(); ++i)
00625    {
00626       String prop = props[i];
00627       prop.toLowerCase();
00628       size_t idx = prop.indexOf('.');
00629       map[""].push_back(prop); // for no ClassName
00630       if (idx != String::npos)
00631       {
00632          String key = prop.substring(0, idx);
00633          String val = prop.substring(idx+1);
00634          map[""].push_back(key); // Store PropertyName for PropertyName.EmbedName
00635          map[key].push_back(val); // Store PropertyName for ClassName.PropertyName and EmbedName for PropertyName.EmbedName
00636          // now remove trailing periods.
00637          idx = val.indexOf('.');
00638          if (idx != String::npos)
00639          {
00640             val = val.substring(0, idx);
00641          }
00642          map[key].push_back(val); // Store PropertyName for ClassName.PropertyName.EmbedName
00643       }
00644    }
00645 }
00646 CIMInstance filterInstance(const CIMInstance& toFilter, const StringArray& props)
00647 {
00648    CIMInstance rval(toFilter.clone(E_NOT_LOCAL_ONLY,
00649       E_EXCLUDE_QUALIFIERS,
00650       E_EXCLUDE_CLASS_ORIGIN));
00651    if (props.empty())
00652    {
00653       return rval;
00654    }
00655    HashMap<String, StringArray> propMap;
00656    splitUpProps(props, propMap);
00657    // find "" and toFilter.getClassName() and keep those properties.
00658    StringArray propsToKeepArray(propMap[""]);
00659    
00660    String lowerClassName(toFilter.getClassName());
00661    lowerClassName.toLowerCase();
00662    propsToKeepArray.appendArray(propMap[lowerClassName]);
00663    // create a sorted set to get faster look-up time.
00664    SortedVectorSet<String> propsToKeep(propsToKeepArray.begin(),
00665       propsToKeepArray.end());
00666    CIMPropertyArray propArray = toFilter.getProperties();
00667    CIMPropertyArray propArrayToKeep;
00668    for (size_t i = 0; i < propArray.size(); ++i)
00669    {
00670       String lowerPropName(propArray[i].getName());
00671       lowerPropName.toLowerCase();
00672       if (propsToKeep.count(lowerPropName) > 0 || propsToKeep.count("*") > 0)
00673       {
00674          CIMProperty thePropToKeep(propArray[i]);
00675          // if it's an embedded instance, we need to recurse on it.
00676          if (thePropToKeep.getDataType().getType() == CIMDataType::EMBEDDEDINSTANCE)
00677          {
00678             CIMValue v = thePropToKeep.getValue();
00679             if (v)
00680             {
00681                CIMInstance embed;
00682                v.get(embed);
00683                if (embed)
00684                {
00685                   StringArray embeddedProps;
00686                   for (size_t i = 0; i < propsToKeepArray.size(); ++i)
00687                   {
00688                      const String& curPropName = propsToKeepArray[i];
00689                      if (curPropName.startsWith(lowerPropName))
00690                      {
00691                         size_t idx = curPropName.indexOf('.');
00692                         if (idx != String::npos)
00693                         {
00694                            embeddedProps.push_back(curPropName.substring(idx));
00695                         }
00696                      }
00697                   }
00698                   thePropToKeep.setValue(CIMValue(
00699                      filterInstance(embed, embeddedProps)));
00700                }
00701             }
00702          }
00703          propArrayToKeep.push_back(thePropToKeep);
00704       }
00705    }
00706    rval.setProperties(propArrayToKeep);
00707    return rval;
00708 }
00709 } // end anonymous namespace
00711 void
00712 IndicationServerImplThread::_processIndication(const CIMInstance& instanceArg,
00713    const String& instNS)
00714 {
00715    OW_LOG_DEBUG(m_logger, Format("IndicationServerImplThread::_processIndication "
00716       "instanceArg = %1 instNS = %2", instanceArg.toString(), instNS));
00717    
00718    // If the provider didn't set the IndicationTime property, then we'll set it.
00719    // DN 01/25/2005: removing this, since not all indications may have the IndicationTime property, and it's not required anyway.
00720    // The indication producers should set it if necessary.
00721    //CIMInstance instanceArg(instanceArg_);
00722    //if (!instanceArg.getProperty("IndicationTime"))
00723    //{
00724    // DateTime dtm;
00725    // dtm.setToCurrent();
00726    // CIMDateTime cdt(dtm);
00727    // instanceArg.setProperty("IndicationTime", CIMValue(cdt));
00728    //}
00729 
00730    CIMName curClassName = instanceArg.getClassName();
00731    if (curClassName == CIMName())
00732    {
00733       OW_LOG_ERROR(m_logger, "Cannot process indication, because it has no "
00734          "class name.");
00735    }
00736    while (curClassName != CIMName())
00737    {
00738       String key = curClassName.toString();
00739       key.toLowerCase();
00740       {
00741          MutexLock lock(m_subGuard);
00742          OW_LOG_DEBUG(m_logger, Format("searching for key %1", key));
00743          std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
00744             m_subscriptions.equal_range(key);
00745          OW_LOG_DEBUG(m_logger, Format("found %1 items", distance(range.first, range.second)));
00746          
00747          // make a copy so we can free the lock, otherwise we may cause a deadlock.
00748          subscriptions_copy_t subs(range.first, range.second);
00749          lock.release();
00750          _processIndicationRange(instanceArg, instNS, subs.begin(), subs.end());
00751       }
00752       CIMProperty prop = instanceArg.getProperty("SourceInstance");
00753       if (prop)
00754       {
00755          CIMValue v = prop.getValue();
00756          if (v && v.getType() == CIMDataType::EMBEDDEDINSTANCE)
00757          {
00758             CIMInstance embed;
00759             v.get(embed);
00760             key += ":";
00761             key += embed.getClassName();
00762             key.toLowerCase();
00763             {
00764                MutexLock lock(m_subGuard);
00765                OW_LOG_DEBUG(m_logger, Format("searching for key %1", key));
00766                std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
00767                   m_subscriptions.equal_range(key);
00768                OW_LOG_DEBUG(m_logger, Format("found %1 items", distance(range.first, range.second)));
00769                
00770                // make a copy of the subscriptions so we can free the lock, otherwise we may cause a deadlock.
00771                subscriptions_copy_t subs;
00772                for (subscriptions_t::iterator curSub = range.first; curSub != range.second; ++curSub)
00773                {
00774                   subs.insert(subscriptions_copy_t::value_type(curSub->first, SubscriptionRef(new Subscription(*curSub->second))));
00775                }
00776 
00777                lock.release();
00778                _processIndicationRange(instanceArg, instNS, subs.begin(), subs.end());
00779             }
00780          }
00781       }
00782       CIMClass cc;
00783       try
00784       {
00785          OperationContext context;
00786          cc = m_env->getRepositoryCIMOMHandle(context)->getClass(instNS, curClassName.toString());
00787          curClassName = cc.getSuperClass();
00788       }
00789       catch (const CIMException& e)
00790       {
00791          curClassName = CIMName();
00792       }
00793    }
00794 }
00796 void
00797 IndicationServerImplThread::_processIndicationRange(
00798    const CIMInstance& instanceArg, const String instNS,
00799    IndicationServerImplThread::subscriptions_iterator first,
00800    IndicationServerImplThread::subscriptions_iterator last)
00801 {
00802    OperationContext context;
00803    CIMOMHandleIFCRef hdl = m_env->getCIMOMHandle(context, CIMOMEnvironment::E_DONT_SEND_INDICATIONS);
00804    for ( ;first != last; ++first)
00805    {
00806       try
00807       {
00808          Subscription& sub = *(first->second);
00809          CIMInstance filterInst = sub.m_filter;
00810          String queryLanguage = sub.m_filter.getPropertyT("QueryLanguage").getValueT().toString();
00811          if (!sub.m_filterSourceNameSpace.equalsIgnoreCase(instNS))
00812          {
00813             OW_LOG_DEBUG(m_logger, Format("skipping sub because namespace doesn't match. Filter ns = %1, Sub ns = %2", sub.m_filterSourceNameSpace, instNS));
00814             continue;
00815          }
00816          //-----------------------------------------------------------------
00817          // Here we need to call into the WQL process with the query string
00818          // and the indication instance
00819          //-----------------------------------------------------------------
00820          WQLInstancePropertySource propSource(instanceArg, hdl, instNS);
00821          if (!sub.m_compiledStmt.evaluate(propSource))
00822          {
00823             OW_LOG_DEBUG(m_logger, "skipping sub because wql.evaluate doesn't match");
00824             continue;
00825          }
00826          CIMInstance filteredInstance(filterInstance(instanceArg,
00827             sub.m_selectStmt.getSelectPropertyNames()));
00828          // Now get the export handler for this indication subscription
00829          // TODO: get this when the subscription is created. No reason to keep fetching it whenever an indication is exported. We'll have to watch it for changes.
00830          CIMObjectPath handlerCOP =
00831             sub.m_subPath.getKeyT("Handler").getValueT().toCIMObjectPath();
00832 
00833          String handlerNS = handlerCOP.getNameSpace();
00834          if (handlerNS.empty())
00835             handlerNS = instNS;
00836 
00837          CIMInstance handler = hdl->getInstance(handlerNS,
00838             handlerCOP);
00839          if (!handler)
00840          {
00841             OW_LOG_ERROR(m_logger, Format("Handler does not exist: %1",
00842                handlerCOP.toString()));
00843             continue;
00844          }
00845          // Get the appropriate export provider for the subscription
00846          IndicationExportProviderIFCRef pref = getProvider(
00847             handler.getClassName());
00848          
00849          if (!pref)
00850          {
00851             OW_LOG_ERROR(m_logger, Format("No indication handler for class name:"
00852                " %1", handler.getClassName()));
00853          
00854             continue;
00855          }
00856          addTrans(instNS, filteredInstance, handler, sub.m_sub, pref);
00857       }
00858       catch(Exception& e)
00859       {
00860          OW_LOG_ERROR(m_logger, Format("Error occurred while exporting indications:"
00861             " %1", e).c_str());
00862       }
00863    }
00864 }
00866 void
00867 IndicationServerImplThread::addTrans(
00868    const String& ns,
00869    const CIMInstance& indication,
00870    const CIMInstance& handler,
00871    const CIMInstance& subscription,
00872    IndicationExportProviderIFCRef provider)
00873 {
00874    NotifyTrans trans(ns, indication, handler, subscription, provider);
00875    if (!m_notifierThreadPool->tryAddWork(RunnableRef(new Notifier(this, trans))))
00876    {
00877       OW_LOG_ERROR(m_logger, Format("Indication export notifier pool overloaded.  Dropping indication: %1", indication.toMOF()));
00878    }
00879 }
00881 IndicationExportProviderIFCRef
00882 IndicationServerImplThread::getProvider(const CIMName& className)
00883 {
00884    IndicationExportProviderIFCRef pref(0);
00885    provider_map_t::iterator it =
00886       m_providers.find(className);
00887    if (it != m_providers.end())
00888    {
00889       pref = it->second;
00890    }
00891    return pref;
00892 }
00894 void
00895 IndicationServerImplThread::deleteSubscription(const String& ns, const CIMObjectPath& subPath)
00896 {
00897    OW_LOG_DEBUG(m_logger, Format("IndicationServerImplThread::deleteSubscription ns = %1, subPath = %2", ns, subPath.toString()));
00898    CIMObjectPath cop(subPath);
00899    cop.setNameSpace(ns);
00900    OW_LOG_DEBUG(m_logger, Format("cop = %1", cop));
00901    
00902    typedef std::set<SubscriptionRef> SubSet;
00903    SubSet uniqueSubscriptions;
00904 
00905    // The hash map m_subscriptions has duplicate entries for the same subscription, so we have to create a unique set, which
00906    // should end up containing only one entry for the subscription that is being deleted.
00907    {
00908       MutexLock l(m_subGuard);
00909       subscriptions_t::iterator curSubscription = m_subscriptions.begin();
00910       while (curSubscription != m_subscriptions.end())
00911       {
00912          OW_LOG_DEBUG(m_logger, Format("subPath = %1", curSubscription->second->m_subPath));
00913          if (cop.equals(curSubscription->second->m_subPath))
00914          {
00915             OW_LOG_DEBUG(m_logger, "found a match");
00916             uniqueSubscriptions.insert(curSubscription->second);
00917             m_subscriptions.erase(curSubscription++);
00918          }
00919          else
00920          {
00921             ++curSubscription;
00922          }
00923       }
00924    }
00925 
00926    OW_ASSERT(uniqueSubscriptions.size() == 1);
00927 
00928    for (SubSet::iterator curSubscription = uniqueSubscriptions.begin(); curSubscription != uniqueSubscriptions.end(); ++curSubscription)
00929    {
00930       Subscription& sub(**curSubscription);
00931       for (size_t i = 0; i < sub.m_providers.size(); ++i)
00932       {
00933          try
00934          {
00935             if (sub.m_isPolled[i])
00936             {
00937                // loop through all the classes in the subscription.
00938                // TODO: This is slightly less than optimal, since
00939                // m_classes may contain a class that isn't handled by
00940                // the provider
00941                for (size_t j = 0; j < sub.m_classes.size(); ++j)
00942                {
00943                   CIMName key = sub.m_classes[j];
00944                   poller_map_t::iterator iter = m_pollers.find(key);
00945                   if (iter != m_pollers.end())
00946                   {
00947                      LifecycleIndicationPollerRef p = iter->second;
00948                      CIMName subClsName = sub.m_selectStmt.getClassName();
00949                      bool removePoller = false;
00950                      if (subClsName == "CIM_InstCreation")
00951                      {
00952                         removePoller = p->removePollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_CREATION);
00953                      }
00954                      else if (subClsName == "CIM_InstModification")
00955                      {
00956                         removePoller = p->removePollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_MODIFICATION);
00957                      }
00958                      else if (subClsName == "CIM_InstDeletion")
00959                      {
00960                         removePoller = p->removePollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_DELETION);
00961                      }
00962                      else if (subClsName == "CIM_InstIndication" || subClsName == "CIM_Indication")
00963                      {
00964                         p->removePollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_CREATION);
00965                         p->removePollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_MODIFICATION);
00966                         removePoller = p->removePollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_DELETION);
00967                      }
00968                      if (removePoller)
00969                      {
00970                         m_pollers.erase(iter);
00971                      }
00972                   }
00973                }
00974             }
00975             else
00976             {
00977                IndicationProviderIFCRef p = sub.m_providers[i];
00978                p->deActivateFilter(createProvEnvRef(m_env), sub.m_selectStmt, sub.m_selectStmt.getClassName(), ns, sub.m_classes);
00979             }
00980             
00981          }
00982          catch (const Exception& e)
00983          {
00984             OW_LOG_ERROR(m_logger, Format("Caught exception while calling deActivateFilter for provider: %1", e));
00985          }
00986          catch(ThreadCancelledException&)
00987          {
00988             throw;
00989          }
00990          catch (...)
00991          {
00992             OW_LOG_ERROR(m_logger, "Caught unknown exception while calling deActivateFilter for provider");
00993          }
00994       }
00995    }
00996 }
00998 namespace // unnamed
00999 {
01000 String getSourceNameSpace(const CIMInstance& inst)
01001 {
01002    try
01003    {
01004       return CIMNameSpaceUtils::prepareNamespace(inst.getPropertyT("SourceNamespace").getValueT().toString());
01005    }
01006    catch (const NoSuchPropertyException& e)
01007    {
01008       return "";
01009    }
01010    catch (const NULLValueException& e)
01011    {
01012       return "";
01013    }
01014 }
01015 
01016 class createSubscriptionRunnable : public Runnable
01017 {
01018    String ns;
01019    CIMInstance subInst;
01020    String username;
01021    IndicationServerImplThread* is;
01022 public:
01023    createSubscriptionRunnable(const String& ns_, const CIMInstance& subInst_, const String& username_, IndicationServerImplThread* is_)
01024    : ns(ns_)
01025    , subInst(subInst_)
01026    , username(username_)
01027    , is(is_)
01028    {}
01029 
01030    virtual void run()
01031    {
01032       USE_CIMOM_UID;
01033 
01034       is->createSubscription(ns, subInst, username);
01035    }
01036 }; // end class createSubscriptionRunnable
01037 
01038 class modifySubscriptionRunnable : public Runnable
01039 {
01040    String ns;
01041    CIMInstance subInst;
01042    IndicationServerImplThread* is;
01043 public:
01044    modifySubscriptionRunnable(const String& ns_, const CIMInstance& subInst_, IndicationServerImplThread* is_)
01045    : ns(ns_)
01046    , subInst(subInst_)
01047    , is(is_)
01048    {}
01049 
01050    virtual void run()
01051    {
01052       USE_CIMOM_UID;
01053       is->modifySubscription(ns, subInst);
01054    }
01055 }; // end class modifySubscriptionRunnable
01056 
01057 class deleteSubscriptionRunnable : public Runnable
01058 {
01059    String ns;
01060    CIMObjectPath sub;
01061    IndicationServerImplThread* is;
01062 public:
01063    deleteSubscriptionRunnable(const String& ns_, const CIMObjectPath& sub_, IndicationServerImplThread* is_)
01064    : ns(ns_)
01065    , sub(sub_)
01066    , is(is_)
01067    {}
01068 
01069    virtual void run()
01070    {
01071       USE_CIMOM_UID;
01072       is->deleteSubscription(ns, sub);
01073    }
01074 }; // end class deleteSubscriptionRunnable
01075 
01076 } // end unnamed namespace
01077 
01079 void
01080 IndicationServerImplThread::startCreateSubscription(const String& ns, const CIMInstance& subInst, const String& username)
01081 {
01082    RunnableRef rr(new createSubscriptionRunnable(ns, subInst, username, this));
01083    m_subscriptionPool->addWork(rr);
01084 }
01085 
01087 void
01088 IndicationServerImplThread::startModifySubscription(const String& ns, const CIMInstance& subInst)
01089 {
01090    RunnableRef rr(new modifySubscriptionRunnable(ns, subInst, this));
01091    m_subscriptionPool->addWork(rr);
01092 }
01093 
01095 void
01096 IndicationServerImplThread::startDeleteSubscription(const String& ns, const CIMObjectPath& sub)
01097 {
01098    RunnableRef rr(new deleteSubscriptionRunnable(ns, sub, this));
01099    m_subscriptionPool->addWork(rr);
01100 }
01101 
01103 void
01104 IndicationServerImplThread::createSubscription(const String& ns, const CIMInstance& subInst, const String& username)
01105 {
01106    OW_LOG_DEBUG(m_logger, Format("IndicationServerImplThread::createSubscription ns = %1, subInst = %2", ns, subInst.toString()));
01107    
01108    // get the filter
01109    OperationContext context;
01110    CIMOMHandleIFCRef hdl = m_env->getRepositoryCIMOMHandle(context);
01111    CIMObjectPath filterPath = subInst.getProperty("Filter").getValueT().toCIMObjectPath();
01112    String filterNS = filterPath.getNameSpace();
01113    if (filterNS.empty())
01114    {
01115       filterNS = ns;
01116    }
01117 
01118    CIMInstance filterInst = hdl->getInstance(filterNS, filterPath);
01119    String filterQuery = filterInst.getPropertyT("Query").getValueT().toString();
01120    
01121    // parse the filter
01122    // Get query language
01123    String queryLanguage = filterInst.getPropertyT("QueryLanguage").getValueT().toString();
01124    OW_LOG_DEBUG(m_logger, Format("Got query statement (%1) in %2", filterQuery, queryLanguage));
01125    if (!m_wqlRef->supportsQueryLanguage(queryLanguage))
01126    {
01127       OW_THROWCIMMSG(CIMException::FAILED, Format("Filter uses queryLanguage %1, which is"
01128          " not supported", queryLanguage).c_str());
01129    }
01130 
01131    WQLSelectStatement selectStmt(m_wqlRef->createSelectStatement(filterQuery));
01132    WQLCompile compiledStmt(selectStmt);
01133    const WQLCompile::Tableau& tableau(compiledStmt.getTableau());
01134    CIMName indicationClassName = selectStmt.getClassName();
01135    OW_LOG_DEBUG(m_logger, Format("query is for indication class: %1", indicationClassName));
01136 
01137    // collect up all the class names
01138    CIMNameArray isaClassNames;
01139    for (size_t i = 0; i < tableau.size(); ++i)
01140    {
01141       for (size_t j = 0; j < tableau[i].size(); ++j)
01142       {
01143          if (tableau[i][j].op == WQL_ISA)
01144          {
01145             const WQLOperand& opn1(tableau[i][j].opn1);
01146             const WQLOperand& opn2(tableau[i][j].opn2);
01147             if (opn1.getType() == WQLOperand::PROPERTY_NAME && opn1.getPropertyName().equalsIgnoreCase("SourceInstance"))
01148             {
01149                if (opn2.getType() == WQLOperand::PROPERTY_NAME)
01150                {
01151                   isaClassNames.push_back(opn2.getPropertyName());
01152                   OW_LOG_DEBUG(m_logger, Format("Found ISA class name: %1", opn2.getPropertyName()));
01153                }
01154                else if (opn2.getType() == WQLOperand::STRING_VALUE)
01155                {
01156                   isaClassNames.push_back(opn2.getStringValue());
01157                   OW_LOG_DEBUG(m_logger, Format("Found ISA class name: %1", opn2.getStringValue()));
01158                }
01159             }
01160          }
01161       }
01162    }
01163 
01164    //"The path to a local namespace where the Indications "
01165    //"originate. If NULL, the namespace of the Filter registration "
01166    //"is assumed."
01167    // first try to get it from the property
01168    String filterSourceNameSpace = getSourceNameSpace(filterInst);
01169    if (filterSourceNameSpace.empty())
01170    {
01171       filterSourceNameSpace = filterNS;
01172    }
01173 
01174    // look up all the subclasses of the classes in isaClassNames.
01175    CIMNameArray subClasses;
01176    for (size_t i = 0; i < isaClassNames.size(); ++i)
01177    {
01178       try
01179       {
01180          StringArray tmp(hdl->enumClassNamesA(filterSourceNameSpace, isaClassNames[i].toString()));
01181          subClasses.insert(subClasses.end(), tmp.begin(), tmp.end());
01182       }
01183       catch (CIMException& e)
01184       {
01185          String msg = Format("Indication Server (subscription creation): failed to get subclass names of %1:%2 because: %3",
01186             filterSourceNameSpace, isaClassNames[i], e.getMessage());
01187          OW_LOG_ERROR(m_logger, msg);
01188          OW_THROWCIMMSG_SUBEX(CIMException::FAILED, msg.c_str(), e);
01189       }
01190    }
01191    
01192    isaClassNames.appendArray(subClasses);
01193 
01194    // get rid of duplicates - unique() requires that the range be sorted
01195    std::sort(isaClassNames.begin(), isaClassNames.end());
01196    isaClassNames.erase(std::unique(isaClassNames.begin(), isaClassNames.end()), isaClassNames.end());
01197 
01198    OStringStream ss;
01199    std::copy(isaClassNames.begin(), isaClassNames.end(), std::ostream_iterator<CIMName>(ss, ", "));
01200    OW_LOG_DEBUG(m_logger, Format("isaClassNames = %1", ss.toString()));
01201 
01202    // we need to make a copy of this to pass to indication provider.  Darn backward compatibility :(
01203    StringArray strIsaClassNames;
01204    strIsaClassNames.reserve(isaClassNames.size());
01205    for (size_t i = 0; i < isaClassNames.size(); ++i)
01206    {
01207       strIsaClassNames.push_back(isaClassNames[i].toString());
01208    }
01209 
01210    // find providers that support this query. If none are found, throw an exception.
01211    ProviderManagerRef pm (m_env->getProviderManager());
01212    IndicationProviderIFCRefArray providers;
01213 
01214    if (!isaClassNames.empty())
01215    {
01216       providers = pm->getIndicationProviders(createProvEnvRef(m_env),
01217          ns, indicationClassName, isaClassNames);
01218    }
01219    else
01220    {
01221       providers = pm->getIndicationProviders(createProvEnvRef(m_env), ns,
01222          indicationClassName, CIMNameArray());
01223    }
01224    
01225    OW_LOG_DEBUG(m_logger, Format("Found %1 providers for the subscription", providers.size()));
01226    if (providers.empty())
01227    {
01228       OW_THROWCIMMSG(CIMException::FAILED, "No indication provider found for this subscription");
01229    }
01230 
01231    // verify that there is an indication export provider that can handle the handler for the subscription
01232    CIMObjectPath handlerPath = subInst.getProperty("Handler").getValueT().toCIMObjectPath();
01233    CIMName handlerClass = handlerPath.getClassName();
01234    if (!getProvider(handlerClass))
01235    {
01236       OW_THROWCIMMSG(CIMException::FAILED, "No indication export provider found for this subscription");
01237    }
01238    // call authorizeFilter on all the indication providers
01239    for (size_t i = 0; i < providers.size(); ++i)
01240    {
01241       OW_LOG_DEBUG(m_logger, Format("Calling authorizeFilter for provider %1", i));
01242       providers[i]->authorizeFilter(createProvEnvRef(m_env),
01243          selectStmt, indicationClassName.toString(), ns, strIsaClassNames, username);
01244    }
01245    // Call mustPoll on all the providers
01246    Array<bool> isPolled(providers.size(), false);
01247    for (size_t i = 0; i < providers.size(); ++i)
01248    {
01249       try
01250       {
01251          OW_LOG_DEBUG(m_logger, Format("Calling mustPoll for provider %1", i));
01252          int pollInterval = providers[i]->mustPoll(createProvEnvRef(m_env),
01253             selectStmt, indicationClassName.toString(), ns, strIsaClassNames);
01254          OW_LOG_DEBUG(m_logger, Format("got pollInterval %1", pollInterval));
01255          if (pollInterval > 0)
01256          {
01257             isPolled[i] = true;
01258             for (size_t j = 0; j < isaClassNames.size(); ++j)
01259             {
01260                CIMName key = isaClassNames[j];
01261                OW_LOG_DEBUG(m_logger, Format("searching on class key %1", isaClassNames[j]));
01262                poller_map_t::iterator iter = m_pollers.find(key);
01263                LifecycleIndicationPollerRef p;
01264                if (iter != m_pollers.end())
01265                {
01266                   OW_LOG_DEBUG(m_logger, Format("found on class key %1: %2", isaClassNames[j], iter->first));
01267                   p = iter->second;
01268                }
01269                else
01270                {
01271                   OW_LOG_DEBUG(m_logger, Format("not found on class key %1", isaClassNames[j]));
01272                   p = LifecycleIndicationPollerRef(SharedLibraryRef(0),
01273                      LifecycleIndicationPollerRef::element_type(new LifecycleIndicationPoller(ns, key, pollInterval)));
01274                }
01275                CIMName subClsName = selectStmt.getClassName();
01276                if (subClsName == "CIM_InstCreation")
01277                {
01278                   p->addPollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_CREATION);
01279                }
01280                else if (subClsName == "CIM_InstModification")
01281                {
01282                   p->addPollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_MODIFICATION);
01283                }
01284                else if (subClsName == "CIM_InstDeletion")
01285                {
01286                   p->addPollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_DELETION);
01287                }
01288                else if (subClsName == "CIM_InstIndication" || subClsName == "CIM_Indication")
01289                {
01290                   p->addPollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_CREATION);
01291                   p->addPollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_MODIFICATION);
01292                   p->addPollOp(LifecycleIndicationPoller::POLL_FOR_INSTANCE_DELETION);
01293                }
01294                p->addPollInterval(pollInterval);
01295                if (iter == m_pollers.end())
01296                {
01297                   OW_LOG_DEBUG(m_logger, Format("Inserting %1 into m_pollers", key));
01298                   m_pollers.insert(std::make_pair(key, p));
01299                   m_env->getPollingManager()->addPolledProvider(
01300                      PolledProviderIFCRef(
01301                         new CppPolledProviderProxy(
01302                            CppPolledProviderIFCRef(p))));
01303                }
01304             }
01305          }
01306 
01307       }
01308       catch (CIMException& ce)
01309       {
01310          OW_LOG_ERROR(m_logger, Format("Caught exception while calling mustPoll for provider: %1", ce));
01311       }
01312       catch(ThreadCancelledException&)
01313       {
01314          throw;
01315       }
01316       catch (...)
01317       {
01318          OW_LOG_ERROR(m_logger, "Caught unknown exception while calling mustPoll for provider");
01319       }
01320    }
01321 
01322    // create a subscription (save the compiled filter and other info)
01323    SubscriptionRef sub(new Subscription);
01324    sub->m_subPath = CIMObjectPath(ns, subInst);
01325    sub->m_sub = subInst;
01326    sub->m_providers = providers;
01327    sub->m_isPolled = isPolled;
01328    sub->m_filter = filterInst;
01329    sub->m_selectStmt = selectStmt;
01330    sub->m_compiledStmt = compiledStmt;
01331    sub->m_classes = strIsaClassNames;
01332 
01333    // m_filterSourceNamespace is saved so _processIndication can do what the
01334    // schema says:
01335    //"The path to a local namespace where the Indications "
01336    //"originate. If NULL, the namespace of the Filter registration "
01337    //"is assumed."
01338    // first try to get it from the property
01339    sub->m_filterSourceNameSpace = filterSourceNameSpace;
01340 
01341    // get the lock and put it in m_subscriptions
01342    {
01343       MutexLock l(m_subGuard);
01344       if (isaClassNames.empty())
01345       {
01346          String subKey = indicationClassName.toString();
01347          subKey.toLowerCase();
01348          m_subscriptions.insert(std::make_pair(subKey, sub));
01349       }
01350       else
01351       {
01352          for (size_t i = 0; i < isaClassNames.size(); ++i)
01353          {
01354             String subKey = indicationClassName.toString() + ':' + isaClassNames[i].toString();
01355             subKey.toLowerCase();
01356             m_subscriptions.insert(std::make_pair(subKey, sub));
01357          }
01358       }
01359    }
01360    // call activateFilter on all the providers
01361    // If activateFilter calls fail or throw, just ignore it and keep going.
01362    // If none succeed, we need to remove it from m_subscriptions and throw
01363    // to indicate that subscription creation failed.
01364    int successfulActivations = 0;
01365    for (size_t i = 0; i < providers.size(); ++i)
01366    {
01367       try
01368       {
01369          providers[i]->activateFilter(createProvEnvRef(m_env),
01370             selectStmt, indicationClassName.toString(), ns, strIsaClassNames);
01371 
01372          ++successfulActivations;
01373       }
01374       catch (CIMException& ce)
01375       {
01376          OW_LOG_ERROR(m_logger, Format("Caught exception while calling activateFilter for provider: %1", ce));
01377       }
01378       catch(ThreadCancelledException&)
01379       {
01380          throw;
01381       }
01382       catch (...)
01383       {
01384          OW_LOG_ERROR(m_logger, "Caught unknown exception while calling activateFilter for provider");
01385       }
01386    }
01387 
01388    if (successfulActivations == 0)
01389    {
01390       // Remove it and throw
01391       MutexLock l(m_subGuard);
01392       if (isaClassNames.empty())
01393       {
01394          String subKey = indicationClassName.toString();
01395          subKey.toLowerCase();
01396          m_subscriptions.erase(subKey);
01397       }
01398       else
01399       {
01400          for (size_t i = 0; i < isaClassNames.size(); ++i)
01401          {
01402             String subKey = indicationClassName.toString() + ':' + isaClassNames[i].toString();
01403             subKey.toLowerCase();
01404             m_subscriptions.erase(subKey);
01405          }
01406       }
01407       OW_THROWCIMMSG(CIMException::FAILED, "activateFilter failed for all providers");
01408    }
01409 }
01411 void
01412 IndicationServerImplThread::modifySubscription(const String& ns, const CIMInstance& subInst)
01413 {
01414    // since you can't modify an instance's path which includes the paths to
01415    // the filter and the handler, if a subscription was modified, it will
01416    // have only really changed the non-key, non-ref properties, so we can just
01417    // find it in the subscriptions map and update it.
01418    CIMObjectPath cop(ns, subInst);
01419    
01420    MutexLock l(m_subGuard);
01421    for (subscriptions_t::iterator iter = m_subscriptions.begin();
01422        iter != m_subscriptions.end(); ++iter)
01423    {
01424       Subscription& sub = *(iter->second);
01425       if (cop.equals(sub.m_subPath))
01426       {
01427          sub.m_sub = subInst;
01428          break; // should only be one subscription to update
01429       }
01430    }
01431 }
01433 void
01434 IndicationServerImplThread::modifyFilter(const String& ns, const CIMInstance& filterInst, const String& userName)
01435 {
01436 #ifndef OW_DISABLE_ASSOCIATION_TRAVERSAL
01437    // Implementation note: This depends on the fact that the indication subscription creation/deletion events are
01438    // processed sequentially (the thread pool only has 1 worker thread), so that the deletion is processed
01439    // before the creation.
01440    try
01441    {
01442       OperationContext context;
01443       CIMOMHandleIFCRef hdl(m_env->getRepositoryCIMOMHandle(context));
01444       // get all the CIM_IndicationSubscription instances referencing the filter
01445       CIMObjectPath filterPath(ns, filterInst);
01446       CIMInstanceArray subscriptions(hdl->referencesA(ns, filterPath, "CIM_IndicationSubscription", "Filter"));
01447 
01448       // call startDeleteSubscription on the old instances
01449       for (size_t i = 0; i < subscriptions.size(); ++i)
01450       {
01451          startDeleteSubscription(ns, CIMObjectPath(ns, subscriptions[i]));
01452       }
01453       
01454       // call startCreateSubscription on the new instances
01455       for (size_t i = 0; i < subscriptions.size(); ++i)
01456       {
01457          startCreateSubscription(ns, subscriptions[i], userName);
01458       }
01459 
01460    }
01461    catch (CIMException& e)
01462    {
01463       OW_THROWCIMMSG_SUBEX(CIMException::FAILED, "modifying the filter failed", e);
01464    }
01465 
01466 #else
01467    OW_THROWCIMMSG(CIMException::FAILED, "Modifying the filter not allowed because association traversal is disabled");
01468 #endif
01469 }
01470 
01471 void
01472 IndicationServerImplThread::doCooperativeCancel()
01473 {
01474    NonRecursiveMutexLock l(m_mainLoopGuard);
01475    m_shuttingDown = true;
01476    m_mainLoopCondition.notifyAll();
01477 }
01478 
01479 } // end namespace OW_NAMESPACE
01480 
01482 extern "C" OW_EXPORT OW_NAMESPACE::IndicationServer*
01483 createIndicationServer()
01484 {
01485    return new OW_NAMESPACE::IndicationServerImpl();
01486 }
01488 #if !defined(OW_STATIC_SERVICES)
01489 extern "C" OW_EXPORT const char*
01490 getOWVersion()
01491 {
01492    return OW_VERSION;
01493 }
01494 #endif /* !defined(OW_STATIC_SERVICES) */

Generated on Thu Feb 9 08:48:01 2006 for openwbem by  doxygen 1.4.6