EC_ObserverStrategy.cpp

Go to the documentation of this file.
00001 // $Id: EC_ObserverStrategy.cpp 78710 2007-07-02 07:33:53Z johnnyw $
00002 
00003 #include "orbsvcs/Event/EC_ObserverStrategy.h"
00004 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00005 #include "orbsvcs/Event/EC_ProxySupplier.h"
00006 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00007 #include "orbsvcs/Event/EC_ConsumerAdmin.h"
00008 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00009 #include "orbsvcs/Event_Service_Constants.h"
00010 #include "ace/Auto_Ptr.h"
00011 
00012 #if ! defined (__ACE_INLINE__)
00013 #include "orbsvcs/Event/EC_ObserverStrategy.inl"
00014 #endif /* __ACE_INLINE__ */
00015 
00016 ACE_RCSID(Event, EC_ObserverStrategy, "$Id: EC_ObserverStrategy.cpp 78710 2007-07-02 07:33:53Z johnnyw $")
00017 
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 TAO_EC_ObserverStrategy::~TAO_EC_ObserverStrategy (void)
00021 {
00022 }
00023 
00024 // ****************************************************************
00025 
00026 RtecEventChannelAdmin::Observer_Handle
00027 TAO_EC_Null_ObserverStrategy::append_observer (
00028        RtecEventChannelAdmin::Observer_ptr)
00029 {
00030   throw RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER();
00031 }
00032 
00033 void
00034 TAO_EC_Null_ObserverStrategy::remove_observer (
00035        RtecEventChannelAdmin::Observer_Handle)
00036 {
00037   throw RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER();
00038 }
00039 
00040 void
00041 TAO_EC_Null_ObserverStrategy::connected (TAO_EC_ProxyPushConsumer*)
00042 {
00043 }
00044 
00045 void
00046 TAO_EC_Null_ObserverStrategy::disconnected (TAO_EC_ProxyPushConsumer*)
00047 {
00048 }
00049 
00050 void
00051 TAO_EC_Null_ObserverStrategy::connected (TAO_EC_ProxyPushSupplier*)
00052 {
00053 }
00054 
00055 void
00056 TAO_EC_Null_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*)
00057 {
00058 }
00059 
00060 // ****************************************************************
00061 
00062 TAO_EC_Basic_ObserverStrategy::~TAO_EC_Basic_ObserverStrategy (void)
00063 {
00064   delete this->lock_;
00065   this->lock_ = 0;
00066 }
00067 
00068 RtecEventChannelAdmin::Observer_Handle
00069 TAO_EC_Basic_ObserverStrategy::append_observer (
00070        RtecEventChannelAdmin::Observer_ptr obs)
00071 {
00072   {
00073     ACE_GUARD_THROW_EX (
00074         ACE_Lock, ace_mon, *this->lock_,
00075         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00076 
00077     this->handle_generator_++;
00078     Observer_Entry entry (this->handle_generator_,
00079                           RtecEventChannelAdmin::Observer::_duplicate (obs));
00080 
00081     if (this->observers_.bind (entry.handle, entry) == -1)
00082       throw RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER();
00083   }
00084 
00085   RtecEventChannelAdmin::ConsumerQOS c_qos;
00086   this->fill_qos (c_qos);
00087   obs->update_consumer (c_qos);
00088 
00089   RtecEventChannelAdmin::SupplierQOS s_qos;
00090   this->fill_qos (s_qos);
00091   obs->update_supplier (s_qos);
00092 
00093   return this->handle_generator_;
00094 }
00095 
00096 
00097 void
00098 TAO_EC_Basic_ObserverStrategy::remove_observer (
00099        RtecEventChannelAdmin::Observer_Handle handle)
00100 {
00101   ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00102       RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00103 
00104   if (this->observers_.unbind (handle) == -1)
00105     throw RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER();
00106 }
00107 
00108 int
00109 TAO_EC_Basic_ObserverStrategy::create_observer_list (
00110                                 RtecEventChannelAdmin::Observer_var *&lst)
00111 {
00112   ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00113                  RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00114 
00115   int size = static_cast<int> (this->observers_.current_size ());
00116   RtecEventChannelAdmin::Observer_var *tmp = 0;
00117   ACE_NEW_RETURN (tmp,
00118                   RtecEventChannelAdmin::Observer_var[size],
00119                   0);
00120   ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00121 
00122   Observer_Map_Iterator end = this->observers_.end ();
00123   int j = 0;
00124   for (Observer_Map_Iterator i  = this->observers_.begin ();
00125        i != end;
00126        ++i)
00127     {
00128       Observer_Entry& entry = (*i).int_id_;
00129       copy[j++] =
00130         RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ());
00131     }
00132 
00133   lst = copy.release ();
00134   return size;
00135 }
00136 
00137 void
00138 TAO_EC_Basic_ObserverStrategy::connected (
00139     TAO_EC_ProxyPushConsumer *consumer)
00140 {
00141   this->supplier_qos_update (consumer);
00142 }
00143 
00144 void
00145 TAO_EC_Basic_ObserverStrategy::disconnected (
00146     TAO_EC_ProxyPushConsumer* consumer)
00147 {
00148   this->supplier_qos_update (consumer);
00149 }
00150 
00151 void
00152 TAO_EC_Basic_ObserverStrategy::supplier_qos_update (
00153                                         TAO_EC_ProxyPushConsumer *consumer)
00154 {
00155   if (consumer->publications ().is_gateway)
00156     return;
00157 
00158   RtecEventChannelAdmin::SupplierQOS s_qos;
00159   this->fill_qos (s_qos);
00160 
00161   RtecEventChannelAdmin::Observer_var *tmp = 0;
00162   int size = this->create_observer_list (tmp);
00163   ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00164 
00165   for (int i = 0; i != size; ++i)
00166     {
00167       try
00168         {
00169           copy[i]->update_supplier (s_qos);
00170         }
00171       catch (const CORBA::Exception&)
00172         {
00173           // Ignore exceptions, we must isolate other observers from
00174           // failures on this one.
00175         }
00176     }
00177 }
00178 
00179 void
00180 TAO_EC_Basic_ObserverStrategy::connected (
00181     TAO_EC_ProxyPushSupplier* supplier)
00182 {
00183   this->consumer_qos_update (supplier);
00184 }
00185 
00186 void
00187 TAO_EC_Basic_ObserverStrategy::disconnected (
00188     TAO_EC_ProxyPushSupplier* supplier)
00189 {
00190   this->consumer_qos_update (supplier);
00191 }
00192 
00193 void
00194 TAO_EC_Basic_ObserverStrategy::consumer_qos_update (
00195                                         TAO_EC_ProxyPushSupplier *supplier)
00196 {
00197   if (supplier->subscriptions ().is_gateway)
00198     return;
00199 
00200   RtecEventChannelAdmin::ConsumerQOS c_qos;
00201   this->fill_qos (c_qos);
00202 
00203   RtecEventChannelAdmin::Observer_var *tmp = 0;
00204   int size = this->create_observer_list (tmp);
00205   ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00206 
00207   for (int i = 0; i != size; ++i)
00208     {
00209       try
00210         {
00211           copy[i]->update_consumer (c_qos);
00212         }
00213       catch (const CORBA::Exception&)
00214         {
00215           // Ignore exceptions, we must isolate other observers from
00216           // failures on this one.
00217         }
00218     }
00219 }
00220 
00221 void
00222 TAO_EC_Basic_ObserverStrategy::fill_qos (
00223       RtecEventChannelAdmin::ConsumerQOS &qos)
00224 {
00225   Headers headers;
00226 
00227   TAO_EC_Accumulate_Supplier_Headers worker (headers);
00228   this->event_channel_->for_each_consumer (&worker);
00229 
00230   RtecEventChannelAdmin::DependencySet& dep = qos.dependencies;
00231 
00232   dep.length (static_cast<CORBA::ULong> (headers.current_size () + 1));
00233 
00234   dep[0].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR;
00235   dep[0].event.header.source = static_cast<CORBA::ULong> (headers.current_size ());
00236   dep[0].event.header.creation_time = ORBSVCS_Time::zero ();
00237   dep[0].rt_info = 0;
00238 
00239   CORBA::ULong count = 1;
00240   for (HeadersIterator i = headers.begin (); i != headers.end (); ++i)
00241     {
00242       dep[count++].event.header = (*i).key ();
00243     }
00244 }
00245 
00246 void
00247 TAO_EC_Basic_ObserverStrategy::fill_qos (
00248       RtecEventChannelAdmin::SupplierQOS &qos)
00249 {
00250   Headers headers;
00251 
00252   TAO_EC_Accumulate_Consumer_Headers worker (headers);
00253   this->event_channel_->for_each_supplier (&worker);
00254 
00255   qos.publications.length (static_cast<CORBA::ULong> (headers.current_size ()));
00256 
00257   CORBA::ULong count = 0;
00258   for (HeadersIterator i = headers.begin (); i != headers.end (); ++i)
00259     {
00260       qos.publications[count++].event.header = (*i).key ();
00261     }
00262 }
00263 
00264 
00265 // ****************************************************************
00266 
00267 TAO_EC_Reactive_ObserverStrategy::~TAO_EC_Reactive_ObserverStrategy (void)
00268 {
00269 }
00270 
00271 void
00272 TAO_EC_Reactive_ObserverStrategy::supplier_qos_update (
00273                                         TAO_EC_ProxyPushConsumer *consumer)
00274 {
00275   if (consumer->publications ().is_gateway)
00276     return;
00277 
00278   RtecEventChannelAdmin::SupplierQOS s_qos;
00279   this->fill_qos (s_qos);
00280 
00281   Observer_Map copy;
00282   this->create_observer_map (copy);
00283 
00284   Observer_Map_Iterator end = copy.end ();
00285   for (Observer_Map_Iterator i  = copy.begin ();
00286        i != end;
00287        ++i)
00288     {
00289       Observer_Entry& entry = (*i).int_id_;
00290       try
00291         {
00292           entry.observer->update_supplier (s_qos);
00293         }
00294       catch (const CORBA::OBJECT_NOT_EXIST&)
00295         {
00296           // Exception occured while updating observer, so remove it from the
00297           // observer list
00298           this->observer_not_exists (entry);
00299         }
00300       catch (const CORBA::TRANSIENT&)
00301         {
00302           // Exception occured while updating observer, so remove it from the
00303           // observer list
00304           this->observer_not_exists (entry);
00305         }
00306       catch (const CORBA::Exception&)
00307         {
00308           // Ignore all exceptions
00309         }
00310     }
00311 }
00312 
00313 void
00314 TAO_EC_Reactive_ObserverStrategy::consumer_qos_update (
00315                                         TAO_EC_ProxyPushSupplier *supplier)
00316 {
00317   if (supplier->subscriptions ().is_gateway)
00318     return;
00319 
00320   RtecEventChannelAdmin::ConsumerQOS c_qos;
00321   this->fill_qos (c_qos);
00322 
00323   Observer_Map copy;
00324   this->create_observer_map (copy);
00325 
00326   Observer_Map_Iterator end = copy.end ();
00327   for (Observer_Map_Iterator i  = copy.begin ();
00328        i != end;
00329        ++i)
00330     {
00331       Observer_Entry& entry = (*i).int_id_;
00332       try
00333         {
00334           entry.observer->update_consumer (c_qos);
00335         }
00336       catch (const CORBA::OBJECT_NOT_EXIST&)
00337         {
00338           // Exception occured while updating observer, so remove it from the
00339           // observer list
00340           this->observer_not_exists (entry);
00341         }
00342       catch (const CORBA::TRANSIENT&)
00343         {
00344           // Exception occured while updating observer, so remove it from the
00345           // observer list
00346           this->observer_not_exists (entry);
00347         }
00348       catch (const CORBA::Exception&)
00349         {
00350           // Ignore all exceptions
00351         }
00352     }
00353 }
00354 
00355 int
00356 TAO_EC_Reactive_ObserverStrategy::create_observer_map (Observer_Map &map)
00357 {
00358   ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00359                  RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00360 
00361   Observer_Map_Iterator end = this->observers_.end ();
00362   for (Observer_Map_Iterator i  = this->observers_.begin ();
00363        i != end;
00364        ++i)
00365     {
00366       Observer_Entry& entry = (*i).int_id_;
00367       Observer_Entry copy (entry.handle,
00368                             RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ()));
00369       if (map.bind (copy.handle, copy) == -1)
00370       {
00371         map.unbind_all();
00372         return 0;
00373       }
00374     }
00375 
00376   return map.current_size();
00377 }
00378 
00379 void
00380 TAO_EC_Reactive_ObserverStrategy::observer_not_exists (Observer_Entry& observer)
00381 {
00382   try
00383     {
00384       this->remove_observer(observer.handle);
00385     }
00386   catch (const CORBA::Exception&)
00387     {
00388       // Ignore exceptions
00389     }
00390 }
00391 
00392 // ****************************************************************
00393 
00394 void
00395 TAO_EC_Accumulate_Supplier_Headers::work (TAO_EC_ProxyPushSupplier *supplier)
00396 {
00397   const RtecEventChannelAdmin::ConsumerQOS& sub =
00398     supplier->subscriptions ();
00399   if (sub.is_gateway)
00400     return;
00401   for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j)
00402     {
00403       const RtecEventComm::Event& event =
00404         sub.dependencies[j].event;
00405       RtecEventComm::EventType type = event.header.type;
00406 
00407       if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
00408         continue;
00409       this->headers_.insert (event.header, 1);
00410     }
00411 }
00412 
00413 // ****************************************************************
00414 
00415 void
00416 TAO_EC_Accumulate_Consumer_Headers::work (TAO_EC_ProxyPushConsumer *consumer)
00417 {
00418   const RtecEventChannelAdmin::SupplierQOS& pub =
00419     consumer->publications ();
00420   if (pub.is_gateway)
00421     return;
00422   for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
00423     {
00424       const RtecEventComm::Event& event =
00425         pub.publications[j].event;
00426       RtecEventComm::EventType type = event.header.type;
00427 
00428       if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
00429         continue;
00430       this->headers_.insert (event.header, 1);
00431     }
00432 }
00433 
00434 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:05 2010 for TAO_RTEvent by  doxygen 1.4.7