EC_ObserverStrategy.cpp

Go to the documentation of this file.
00001 // EC_ObserverStrategy.cpp,v 1.36 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/EC_ObserverStrategy.h"
00004 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00005 #include "orbsvcs/Event/EC_ProxySupplier.h"
00006 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00007 #include "orbsvcs/Event/EC_ConsumerAdmin.h"
00008 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00009 #include "orbsvcs/Event_Service_Constants.h"
00010 #include "ace/Auto_Ptr.h"
00011 
00012 #if ! defined (__ACE_INLINE__)
00013 #include "orbsvcs/Event/EC_ObserverStrategy.i"
00014 #endif /* __ACE_INLINE__ */
00015 
00016 ACE_RCSID(Event, EC_ObserverStrategy, "EC_ObserverStrategy.cpp,v 1.36 2006/03/14 06:14:25 jtc Exp")
00017 
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 TAO_EC_ObserverStrategy::~TAO_EC_ObserverStrategy (void)
00021 {
00022 }
00023 
00024 // ****************************************************************
00025 
00026 RtecEventChannelAdmin::Observer_Handle
00027 TAO_EC_Null_ObserverStrategy::append_observer (
00028        RtecEventChannelAdmin::Observer_ptr
00029        ACE_ENV_ARG_DECL)
00030     ACE_THROW_SPEC ((
00031         CORBA::SystemException,
00032         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
00033         RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER))
00034 {
00035   ACE_THROW_RETURN (
00036       RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(),
00037       0);
00038 }
00039 
00040 void
00041 TAO_EC_Null_ObserverStrategy::remove_observer (
00042        RtecEventChannelAdmin::Observer_Handle
00043        ACE_ENV_ARG_DECL)
00044     ACE_THROW_SPEC ((
00045         CORBA::SystemException,
00046         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
00047         RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER))
00048 {
00049   ACE_THROW (RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER());
00050 }
00051 
00052 void
00053 TAO_EC_Null_ObserverStrategy::connected (TAO_EC_ProxyPushConsumer*
00054                                          ACE_ENV_ARG_DECL_NOT_USED)
00055 {
00056 }
00057 
00058 void
00059 TAO_EC_Null_ObserverStrategy::disconnected (TAO_EC_ProxyPushConsumer*
00060                                             ACE_ENV_ARG_DECL_NOT_USED)
00061 {
00062 }
00063 
00064 void
00065 TAO_EC_Null_ObserverStrategy::connected (TAO_EC_ProxyPushSupplier*
00066                                          ACE_ENV_ARG_DECL_NOT_USED)
00067 {
00068 }
00069 
00070 void
00071 TAO_EC_Null_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*
00072                                             ACE_ENV_ARG_DECL_NOT_USED)
00073 {
00074 }
00075 
00076 // ****************************************************************
00077 
00078 TAO_EC_Basic_ObserverStrategy::~TAO_EC_Basic_ObserverStrategy (void)
00079 {
00080   delete this->lock_;
00081   this->lock_ = 0;
00082 }
00083 
00084 RtecEventChannelAdmin::Observer_Handle
00085 TAO_EC_Basic_ObserverStrategy::append_observer (
00086        RtecEventChannelAdmin::Observer_ptr obs
00087        ACE_ENV_ARG_DECL)
00088     ACE_THROW_SPEC ((
00089         CORBA::SystemException,
00090         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
00091         RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER))
00092 {
00093   {
00094     ACE_GUARD_THROW_EX (
00095         ACE_Lock, ace_mon, *this->lock_,
00096         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00097     ACE_CHECK_RETURN (0);
00098 
00099     this->handle_generator_++;
00100     Observer_Entry entry (this->handle_generator_,
00101                           RtecEventChannelAdmin::Observer::_duplicate (obs));
00102 
00103     if (this->observers_.bind (entry.handle, entry) == -1)
00104       ACE_THROW_RETURN (
00105           RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(),
00106           0);
00107   }
00108 
00109   RtecEventChannelAdmin::ConsumerQOS c_qos;
00110   this->fill_qos (c_qos ACE_ENV_ARG_PARAMETER);
00111   ACE_CHECK_RETURN (0);
00112   obs->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
00113   ACE_CHECK_RETURN (0);
00114 
00115   RtecEventChannelAdmin::SupplierQOS s_qos;
00116   this->fill_qos (s_qos ACE_ENV_ARG_PARAMETER);
00117   ACE_CHECK_RETURN (0);
00118   obs->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
00119   ACE_CHECK_RETURN (0);
00120 
00121   return this->handle_generator_;
00122 }
00123 
00124 
00125 void
00126 TAO_EC_Basic_ObserverStrategy::remove_observer (
00127        RtecEventChannelAdmin::Observer_Handle handle
00128        ACE_ENV_ARG_DECL)
00129     ACE_THROW_SPEC ((
00130         CORBA::SystemException,
00131         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
00132         RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER))
00133 {
00134   ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00135       RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00136   ACE_CHECK;
00137 
00138   if (this->observers_.unbind (handle) == -1)
00139     ACE_THROW (
00140         RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER());
00141 }
00142 
00143 int
00144 TAO_EC_Basic_ObserverStrategy::create_observer_list (
00145                                 RtecEventChannelAdmin::Observer_var *&lst
00146                                 ACE_ENV_ARG_DECL)
00147 {
00148   ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00149                  RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00150   ACE_CHECK_RETURN (0);
00151 
00152   int size = static_cast<int> (this->observers_.current_size ());
00153   RtecEventChannelAdmin::Observer_var *tmp;
00154   ACE_NEW_RETURN (tmp,
00155                   RtecEventChannelAdmin::Observer_var[size],
00156                   0);
00157   ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00158 
00159   Observer_Map_Iterator end = this->observers_.end ();
00160   int j = 0;
00161   for (Observer_Map_Iterator i  = this->observers_.begin ();
00162        i != end;
00163        ++i)
00164     {
00165       Observer_Entry& entry = (*i).int_id_;
00166       copy[j++] =
00167         RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ());
00168     }
00169 
00170   lst = copy.release ();
00171   return size;
00172 }
00173 
00174 void
00175 TAO_EC_Basic_ObserverStrategy::connected (
00176     TAO_EC_ProxyPushConsumer *consumer
00177     ACE_ENV_ARG_DECL)
00178 {
00179   this->supplier_qos_update (consumer ACE_ENV_ARG_PARAMETER);
00180   ACE_CHECK;
00181 }
00182 
00183 void
00184 TAO_EC_Basic_ObserverStrategy::disconnected (
00185     TAO_EC_ProxyPushConsumer* consumer
00186     ACE_ENV_ARG_DECL)
00187 {
00188   this->supplier_qos_update (consumer ACE_ENV_ARG_PARAMETER);
00189   ACE_CHECK;
00190 }
00191 
00192 void
00193 TAO_EC_Basic_ObserverStrategy::supplier_qos_update (
00194                                         TAO_EC_ProxyPushConsumer *consumer
00195                                         ACE_ENV_ARG_DECL)
00196 {
00197   if (consumer->publications ().is_gateway)
00198     return;
00199 
00200   RtecEventChannelAdmin::SupplierQOS s_qos;
00201   this->fill_qos (s_qos ACE_ENV_ARG_PARAMETER);
00202   ACE_CHECK;
00203 
00204   RtecEventChannelAdmin::Observer_var *tmp = 0;
00205   int size = this->create_observer_list (tmp ACE_ENV_ARG_PARAMETER);
00206   ACE_CHECK;
00207   ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00208 
00209   for (int i = 0; i != size; ++i)
00210     {
00211       ACE_TRY
00212         {
00213           copy[i]->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
00214           ACE_TRY_CHECK;
00215         }
00216       ACE_CATCHANY
00217         {
00218           // Ignore exceptions, we must isolate other observers from
00219           // failures on this one.
00220         }
00221       ACE_ENDTRY;
00222     }
00223 }
00224 
00225 void
00226 TAO_EC_Basic_ObserverStrategy::connected (
00227     TAO_EC_ProxyPushSupplier* supplier
00228     ACE_ENV_ARG_DECL)
00229 {
00230   this->consumer_qos_update (supplier ACE_ENV_ARG_PARAMETER);
00231   ACE_CHECK;
00232 }
00233 
00234 void
00235 TAO_EC_Basic_ObserverStrategy::disconnected (
00236     TAO_EC_ProxyPushSupplier* supplier
00237     ACE_ENV_ARG_DECL)
00238 {
00239   this->consumer_qos_update (supplier ACE_ENV_ARG_PARAMETER);
00240   ACE_CHECK;
00241 }
00242 
00243 void
00244 TAO_EC_Basic_ObserverStrategy::consumer_qos_update (
00245                                         TAO_EC_ProxyPushSupplier *supplier
00246                                         ACE_ENV_ARG_DECL)
00247 {
00248   if (supplier->subscriptions ().is_gateway)
00249     return;
00250 
00251   RtecEventChannelAdmin::ConsumerQOS c_qos;
00252   this->fill_qos (c_qos ACE_ENV_ARG_PARAMETER);
00253   ACE_CHECK;
00254 
00255   RtecEventChannelAdmin::Observer_var *tmp = 0;
00256   int size = this->create_observer_list (tmp ACE_ENV_ARG_PARAMETER);
00257   ACE_CHECK;
00258   ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00259 
00260   for (int i = 0; i != size; ++i)
00261     {
00262       ACE_TRY
00263         {
00264           copy[i]->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
00265           ACE_TRY_CHECK;
00266         }
00267       ACE_CATCHANY
00268         {
00269           // Ignore exceptions, we must isolate other observers from
00270           // failures on this one.
00271         }
00272       ACE_ENDTRY;
00273     }
00274 }
00275 
00276 void
00277 TAO_EC_Basic_ObserverStrategy::fill_qos (
00278       RtecEventChannelAdmin::ConsumerQOS &qos
00279       ACE_ENV_ARG_DECL)
00280 {
00281   Headers headers;
00282 
00283   TAO_EC_Accumulate_Supplier_Headers worker (headers);
00284   this->event_channel_->for_each_consumer (&worker
00285                                            ACE_ENV_ARG_PARAMETER);
00286   ACE_CHECK;
00287 
00288   RtecEventChannelAdmin::DependencySet& dep = qos.dependencies;
00289 
00290   dep.length (static_cast<CORBA::ULong> (headers.current_size () + 1));
00291 
00292   dep[0].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR;
00293   dep[0].event.header.source = static_cast<CORBA::ULong> (headers.current_size ());
00294   dep[0].event.header.creation_time = ORBSVCS_Time::zero ();
00295   dep[0].rt_info = 0;
00296 
00297   CORBA::ULong count = 1;
00298   for (HeadersIterator i = headers.begin (); i != headers.end (); ++i)
00299     {
00300       dep[count++].event.header = (*i).key ();
00301     }
00302 }
00303 
00304 void
00305 TAO_EC_Basic_ObserverStrategy::fill_qos (
00306       RtecEventChannelAdmin::SupplierQOS &qos
00307       ACE_ENV_ARG_DECL)
00308 {
00309   Headers headers;
00310 
00311   TAO_EC_Accumulate_Consumer_Headers worker (headers);
00312   this->event_channel_->for_each_supplier (&worker
00313                                            ACE_ENV_ARG_PARAMETER);
00314   ACE_CHECK;
00315 
00316   qos.publications.length (static_cast<CORBA::ULong> (headers.current_size ()));
00317 
00318   CORBA::ULong count = 0;
00319   for (HeadersIterator i = headers.begin (); i != headers.end (); ++i)
00320     {
00321       qos.publications[count++].event.header = (*i).key ();
00322     }
00323 }
00324 
00325 
00326 // ****************************************************************
00327 
00328 TAO_EC_Reactive_ObserverStrategy::~TAO_EC_Reactive_ObserverStrategy (void)
00329 {
00330 }
00331 
00332 void
00333 TAO_EC_Reactive_ObserverStrategy::supplier_qos_update (
00334                                         TAO_EC_ProxyPushConsumer *consumer
00335                                         ACE_ENV_ARG_DECL)
00336 {
00337   if (consumer->publications ().is_gateway)
00338     return;
00339 
00340   RtecEventChannelAdmin::SupplierQOS s_qos;
00341   this->fill_qos (s_qos ACE_ENV_ARG_PARAMETER);
00342   ACE_CHECK;
00343 
00344   Observer_Map copy;
00345   this->create_observer_map (copy ACE_ENV_ARG_PARAMETER);
00346   ACE_CHECK;
00347 
00348   Observer_Map_Iterator end = copy.end ();
00349   for (Observer_Map_Iterator i  = copy.begin ();
00350        i != end;
00351        ++i)
00352     {
00353       Observer_Entry& entry = (*i).int_id_;
00354       ACE_TRY
00355         {
00356           entry.observer->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
00357           ACE_TRY_CHECK;
00358         }
00359       ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00360         {
00361           // Exception occured while updating observer, so remove it from the
00362           // observer list
00363           this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER);
00364           ACE_TRY_CHECK;
00365         }
00366       ACE_CATCH (CORBA::TRANSIENT, transient)
00367         {
00368           // Exception occured while updating observer, so remove it from the
00369           // observer list
00370           this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER);
00371           ACE_TRY_CHECK;
00372         }
00373       ACE_CATCHANY
00374         {
00375           // Ignore all exceptions
00376         }
00377       ACE_ENDTRY;
00378     }
00379 }
00380 
00381 void
00382 TAO_EC_Reactive_ObserverStrategy::consumer_qos_update (
00383                                         TAO_EC_ProxyPushSupplier *supplier
00384                                         ACE_ENV_ARG_DECL)
00385 {
00386   if (supplier->subscriptions ().is_gateway)
00387     return;
00388 
00389   RtecEventChannelAdmin::ConsumerQOS c_qos;
00390   this->fill_qos (c_qos ACE_ENV_ARG_PARAMETER);
00391   ACE_CHECK;
00392 
00393   Observer_Map copy;
00394   this->create_observer_map (copy ACE_ENV_ARG_PARAMETER);
00395   ACE_CHECK;
00396 
00397   Observer_Map_Iterator end = copy.end ();
00398   for (Observer_Map_Iterator i  = copy.begin ();
00399        i != end;
00400        ++i)
00401     {
00402       Observer_Entry& entry = (*i).int_id_;
00403       ACE_TRY
00404         {
00405           entry.observer->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
00406           ACE_TRY_CHECK;
00407         }
00408       ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00409         {
00410           // Exception occured while updating observer, so remove it from the
00411           // observer list
00412           this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER);
00413           ACE_TRY_CHECK;
00414         }
00415       ACE_CATCH (CORBA::TRANSIENT, transient)
00416         {
00417           // Exception occured while updating observer, so remove it from the
00418           // observer list
00419           this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER);
00420           ACE_TRY_CHECK;
00421         }
00422       ACE_CATCHANY
00423         {
00424           // Ignore all exceptions
00425         }
00426       ACE_ENDTRY;
00427     }
00428 }
00429 
00430 int
00431 TAO_EC_Reactive_ObserverStrategy::create_observer_map (Observer_Map &map
00432                                                        ACE_ENV_ARG_DECL)
00433 {
00434   ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00435                  RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00436   ACE_CHECK_RETURN (0);
00437 
00438   Observer_Map_Iterator end = this->observers_.end ();
00439   for (Observer_Map_Iterator i  = this->observers_.begin ();
00440        i != end;
00441        ++i)
00442     {
00443       Observer_Entry& entry = (*i).int_id_;
00444       Observer_Entry copy (entry.handle,
00445                             RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ()));
00446       if (map.bind (copy.handle, copy) == -1)
00447       {
00448         map.unbind_all();
00449         return 0;
00450       }
00451     }
00452 
00453   return map.current_size();
00454 }
00455 
00456 void
00457 TAO_EC_Reactive_ObserverStrategy::observer_not_exists (Observer_Entry& observer
00458                                                        ACE_ENV_ARG_DECL)
00459 {
00460   ACE_TRY
00461     {
00462       this->remove_observer(observer.handle ACE_ENV_ARG_PARAMETER);
00463       ACE_TRY_CHECK;
00464     }
00465   ACE_CATCHANY
00466     {
00467       // Ignore exceptions
00468     }
00469   ACE_ENDTRY;
00470 }
00471 
00472 // ****************************************************************
00473 
00474 void
00475 TAO_EC_Accumulate_Supplier_Headers::work (TAO_EC_ProxyPushSupplier *supplier
00476                                           ACE_ENV_ARG_DECL_NOT_USED)
00477 {
00478   const RtecEventChannelAdmin::ConsumerQOS& sub =
00479     supplier->subscriptions ();
00480   if (sub.is_gateway)
00481     return;
00482   for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j)
00483     {
00484       const RtecEventComm::Event& event =
00485         sub.dependencies[j].event;
00486       RtecEventComm::EventType type = event.header.type;
00487 
00488       if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
00489         continue;
00490       this->headers_.insert (event.header, 1);
00491     }
00492 }
00493 
00494 // ****************************************************************
00495 
00496 void
00497 TAO_EC_Accumulate_Consumer_Headers::work (TAO_EC_ProxyPushConsumer *consumer
00498                                           ACE_ENV_ARG_DECL_NOT_USED)
00499 {
00500   const RtecEventChannelAdmin::SupplierQOS& pub =
00501     consumer->publications ();
00502   if (pub.is_gateway)
00503     return;
00504   for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
00505     {
00506       const RtecEventComm::Event& event =
00507         pub.publications[j].event;
00508       RtecEventComm::EventType type = event.header.type;
00509 
00510       if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
00511         continue;
00512       this->headers_.insert (event.header, 1);
00513     }
00514 }
00515 
00516 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:08 2006 for TAO_RTEvent by doxygen 1.3.6