00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00035 #ifndef OW_INDICATION_SERVER_IMPL_HPP_
00036 #define OW_INDICATION_SERVER_IMPL_HPP_
00037 #include "OW_config.h"
00038 #include "OW_Types.hpp"
00039 #include "OW_CIMFwd.hpp"
00040 #include "OW_IfcsFwd.hpp"
00041 #include "OW_List.hpp"
00042 #include "OW_Condition.hpp"
00043 #include "OW_CIMInstance.hpp"
00044 #include "OW_CIMObjectPath.hpp"
00045 #include "OW_IndicationServer.hpp"
00046 #include "OW_HashMultiMap.hpp"
00047 #include "OW_WQLSelectStatement.hpp"
00048 #include "OW_WQLCompile.hpp"
00049 #include "OW_ThreadBarrier.hpp"
00050 #include "OW_ProviderFwd.hpp"
00051 #include "OW_SortedVectorMap.hpp"
00052 #include "OW_Map.hpp"
00053 #include "OW_Mutex.hpp"
00054 #include "OW_Thread.hpp"
00055 
00056 namespace OW_NAMESPACE
00057 {
00058 
00059 class NotifyTrans;
00060 class LifecycleIndicationPoller;
00061 class IndicationServerImplThread;
00062 
00064 class IndicationServerImpl : public IndicationServer
00065 {
00066 public:
00067    IndicationServerImpl();
00068    ~IndicationServerImpl();
00069    virtual String getName() const;
00070    virtual StringArray getDependencies() const;
00071    virtual void init(const ServiceEnvironmentIFCRef& env);
00072    virtual void start();
00073    void shutdown();
00074    void processIndication(const CIMInstance& instance,
00075       const String& instNS);
00076    
00077    virtual void startDeleteSubscription(const String& ns, const CIMObjectPath& subPath);
00078    virtual void startCreateSubscription(const String& ns, const CIMInstance& subInst, const String& username);
00079    virtual void startModifySubscription(const String& ns, const CIMInstance& subInst);
00080    
00081    
00082    void deleteSubscription(const String& ns, const CIMObjectPath& subPath);
00083    void createSubscription(const String& ns, const CIMInstance& subInst, const String& username);
00084    void modifySubscription(const String& ns, const CIMInstance& subInst);
00085    
00086    virtual void modifyFilter(const String& ns, const CIMInstance& filterInst, const String& userName);
00087 
00088 private:
00089    IntrusiveReference<IndicationServerImplThread> m_indicationServerThread;
00090 };
00091 
00092 class IndicationServerImplThread : public Thread
00093 {
00094 public:
00095    IndicationServerImplThread();
00096    ~IndicationServerImplThread();
00097    virtual void init(const CIMOMEnvironmentRef& env);
00098    virtual void waitUntilReady();
00099    virtual Int32 run();
00100    void shutdown();
00101    void processIndication(const CIMInstance& instance,
00102       const String& instNS);
00103    CIMOMEnvironmentRef getEnvironment() const;
00104    bool getNewTrans(NotifyTrans& outTrans);
00105    
00106    virtual void startDeleteSubscription(const String& ns, const CIMObjectPath& subPath);
00107    virtual void startCreateSubscription(const String& ns, const CIMInstance& subInst, const String& username);
00108    virtual void startModifySubscription(const String& ns, const CIMInstance& subInst);
00109    
00110    
00111    void deleteSubscription(const String& ns, const CIMObjectPath& subPath);
00112    void createSubscription(const String& ns, const CIMInstance& subInst, const String& username);
00113    void modifySubscription(const String& ns, const CIMInstance& subInst);
00114    
00115    virtual void modifyFilter(const String& ns, const CIMInstance& filterInst, const String& userName);
00116 
00117    virtual void doCooperativeCancel();
00118 
00119 private:
00120    struct Subscription : public IntrusiveCountableBase
00121    {
00122       Subscription()
00123          : m_subPath(CIMNULL)
00124          , m_sub(CIMNULL)
00125          , m_filter(CIMNULL)
00126       {}
00127       CIMObjectPath m_subPath;
00128       CIMInstance m_sub;
00129       IndicationProviderIFCRefArray m_providers;
00130       CIMInstance m_filter;
00131       WQLSelectStatement m_selectStmt;
00132       WQLCompile m_compiledStmt;
00133       StringArray m_classes;
00134       String m_filterSourceNameSpace;
00135       Array<bool> m_isPolled; 
00136    };
00137    typedef IntrusiveReference<Subscription> SubscriptionRef;
00138 
00139    
00140    
00141    
00142    typedef HashMultiMap<String, SubscriptionRef> subscriptions_t;
00143 
00144 #if defined(OW_AIX)
00145    typedef subscriptions_t subscriptions_copy_t;
00146 #else
00147    
00148    typedef std::vector<subscriptions_t>::value_type subscriptions_copy_t;
00149 #endif // AIX
00150    typedef subscriptions_copy_t::iterator subscriptions_iterator;
00151 
00152    void _processIndication(const CIMInstance& instance,
00153       const String& instNS);
00154    
00155    void _processIndicationRange(
00156       const CIMInstance& instanceArg, const String instNS,
00157       subscriptions_iterator first, subscriptions_iterator last);
00158    
00159    void addTrans(const String& ns, const CIMInstance& indication,
00160       const CIMInstance& handler,
00161       const CIMInstance& subscription,
00162       IndicationExportProviderIFCRef provider);
00163    
00164    IndicationExportProviderIFCRef getProvider(const CIMName& className);
00165    
00166    void deactivateAllSubscriptions();
00167 
00168    struct ProcIndicationTrans
00169    {
00170       ProcIndicationTrans(const CIMInstance& inst,
00171          const String& ns)
00172          : instance(inst)
00173          , nameSpace(ns) {}
00174       CIMInstance instance;
00175       String nameSpace;
00176    };
00177    typedef SortedVectorMap<CIMName, IndicationExportProviderIFCRef> provider_map_t;
00178    provider_map_t m_providers;
00179    
00180    
00181    
00182    List<ProcIndicationTrans> m_procTrans;
00183    bool m_shuttingDown;
00184    NonRecursiveMutex m_mainLoopGuard;
00185    Condition m_mainLoopCondition;
00186    CIMOMEnvironmentRef m_env;
00187    ThreadBarrier m_startedBarrier;
00188    subscriptions_t m_subscriptions;
00189    Mutex m_subGuard;
00190    typedef SharedLibraryReference< IntrusiveReference<LifecycleIndicationPoller> > LifecycleIndicationPollerRef;
00191    typedef Map<CIMName, LifecycleIndicationPollerRef > poller_map_t;
00192    poller_map_t m_pollers;
00193    ThreadPoolRef m_notifierThreadPool;
00194    ThreadPoolRef m_subscriptionPool;
00195    WQLIFCRef m_wqlRef;
00196    LoggerRef m_logger;
00197 };
00198 
00199 } 
00200 
00201 #endif