00001
00002
00003 #include "orbsvcs/Event/EC_ObserverStrategy.h"
00004 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00005 #include "orbsvcs/Event/EC_ProxySupplier.h"
00006 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00007 #include "orbsvcs/Event/EC_ConsumerAdmin.h"
00008 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00009 #include "orbsvcs/Event_Service_Constants.h"
00010 #include "ace/Auto_Ptr.h"
00011
00012 #if ! defined (__ACE_INLINE__)
00013 #include "orbsvcs/Event/EC_ObserverStrategy.inl"
00014 #endif
00015
00016 ACE_RCSID(Event, EC_ObserverStrategy, "$Id: EC_ObserverStrategy.cpp 78710 2007-07-02 07:33:53Z johnnyw $")
00017
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 TAO_EC_ObserverStrategy::~TAO_EC_ObserverStrategy (void)
00021 {
00022 }
00023
00024
00025
00026 RtecEventChannelAdmin::Observer_Handle
00027 TAO_EC_Null_ObserverStrategy::append_observer (
00028 RtecEventChannelAdmin::Observer_ptr)
00029 {
00030 throw RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER();
00031 }
00032
00033 void
00034 TAO_EC_Null_ObserverStrategy::remove_observer (
00035 RtecEventChannelAdmin::Observer_Handle)
00036 {
00037 throw RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER();
00038 }
00039
00040 void
00041 TAO_EC_Null_ObserverStrategy::connected (TAO_EC_ProxyPushConsumer*)
00042 {
00043 }
00044
00045 void
00046 TAO_EC_Null_ObserverStrategy::disconnected (TAO_EC_ProxyPushConsumer*)
00047 {
00048 }
00049
00050 void
00051 TAO_EC_Null_ObserverStrategy::connected (TAO_EC_ProxyPushSupplier*)
00052 {
00053 }
00054
00055 void
00056 TAO_EC_Null_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*)
00057 {
00058 }
00059
00060
00061
00062 TAO_EC_Basic_ObserverStrategy::~TAO_EC_Basic_ObserverStrategy (void)
00063 {
00064 delete this->lock_;
00065 this->lock_ = 0;
00066 }
00067
00068 RtecEventChannelAdmin::Observer_Handle
00069 TAO_EC_Basic_ObserverStrategy::append_observer (
00070 RtecEventChannelAdmin::Observer_ptr obs)
00071 {
00072 {
00073 ACE_GUARD_THROW_EX (
00074 ACE_Lock, ace_mon, *this->lock_,
00075 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00076
00077 this->handle_generator_++;
00078 Observer_Entry entry (this->handle_generator_,
00079 RtecEventChannelAdmin::Observer::_duplicate (obs));
00080
00081 if (this->observers_.bind (entry.handle, entry) == -1)
00082 throw RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER();
00083 }
00084
00085 RtecEventChannelAdmin::ConsumerQOS c_qos;
00086 this->fill_qos (c_qos);
00087 obs->update_consumer (c_qos);
00088
00089 RtecEventChannelAdmin::SupplierQOS s_qos;
00090 this->fill_qos (s_qos);
00091 obs->update_supplier (s_qos);
00092
00093 return this->handle_generator_;
00094 }
00095
00096
00097 void
00098 TAO_EC_Basic_ObserverStrategy::remove_observer (
00099 RtecEventChannelAdmin::Observer_Handle handle)
00100 {
00101 ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00102 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00103
00104 if (this->observers_.unbind (handle) == -1)
00105 throw RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER();
00106 }
00107
00108 int
00109 TAO_EC_Basic_ObserverStrategy::create_observer_list (
00110 RtecEventChannelAdmin::Observer_var *&lst)
00111 {
00112 ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00113 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00114
00115 int size = static_cast<int> (this->observers_.current_size ());
00116 RtecEventChannelAdmin::Observer_var *tmp = 0;
00117 ACE_NEW_RETURN (tmp,
00118 RtecEventChannelAdmin::Observer_var[size],
00119 0);
00120 ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00121
00122 Observer_Map_Iterator end = this->observers_.end ();
00123 int j = 0;
00124 for (Observer_Map_Iterator i = this->observers_.begin ();
00125 i != end;
00126 ++i)
00127 {
00128 Observer_Entry& entry = (*i).int_id_;
00129 copy[j++] =
00130 RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ());
00131 }
00132
00133 lst = copy.release ();
00134 return size;
00135 }
00136
00137 void
00138 TAO_EC_Basic_ObserverStrategy::connected (
00139 TAO_EC_ProxyPushConsumer *consumer)
00140 {
00141 this->supplier_qos_update (consumer);
00142 }
00143
00144 void
00145 TAO_EC_Basic_ObserverStrategy::disconnected (
00146 TAO_EC_ProxyPushConsumer* consumer)
00147 {
00148 this->supplier_qos_update (consumer);
00149 }
00150
00151 void
00152 TAO_EC_Basic_ObserverStrategy::supplier_qos_update (
00153 TAO_EC_ProxyPushConsumer *consumer)
00154 {
00155 if (consumer->publications ().is_gateway)
00156 return;
00157
00158 RtecEventChannelAdmin::SupplierQOS s_qos;
00159 this->fill_qos (s_qos);
00160
00161 RtecEventChannelAdmin::Observer_var *tmp = 0;
00162 int size = this->create_observer_list (tmp);
00163 ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00164
00165 for (int i = 0; i != size; ++i)
00166 {
00167 try
00168 {
00169 copy[i]->update_supplier (s_qos);
00170 }
00171 catch (const CORBA::Exception&)
00172 {
00173
00174
00175 }
00176 }
00177 }
00178
00179 void
00180 TAO_EC_Basic_ObserverStrategy::connected (
00181 TAO_EC_ProxyPushSupplier* supplier)
00182 {
00183 this->consumer_qos_update (supplier);
00184 }
00185
00186 void
00187 TAO_EC_Basic_ObserverStrategy::disconnected (
00188 TAO_EC_ProxyPushSupplier* supplier)
00189 {
00190 this->consumer_qos_update (supplier);
00191 }
00192
00193 void
00194 TAO_EC_Basic_ObserverStrategy::consumer_qos_update (
00195 TAO_EC_ProxyPushSupplier *supplier)
00196 {
00197 if (supplier->subscriptions ().is_gateway)
00198 return;
00199
00200 RtecEventChannelAdmin::ConsumerQOS c_qos;
00201 this->fill_qos (c_qos);
00202
00203 RtecEventChannelAdmin::Observer_var *tmp = 0;
00204 int size = this->create_observer_list (tmp);
00205 ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00206
00207 for (int i = 0; i != size; ++i)
00208 {
00209 try
00210 {
00211 copy[i]->update_consumer (c_qos);
00212 }
00213 catch (const CORBA::Exception&)
00214 {
00215
00216
00217 }
00218 }
00219 }
00220
00221 void
00222 TAO_EC_Basic_ObserverStrategy::fill_qos (
00223 RtecEventChannelAdmin::ConsumerQOS &qos)
00224 {
00225 Headers headers;
00226
00227 TAO_EC_Accumulate_Supplier_Headers worker (headers);
00228 this->event_channel_->for_each_consumer (&worker);
00229
00230 RtecEventChannelAdmin::DependencySet& dep = qos.dependencies;
00231
00232 dep.length (static_cast<CORBA::ULong> (headers.current_size () + 1));
00233
00234 dep[0].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR;
00235 dep[0].event.header.source = static_cast<CORBA::ULong> (headers.current_size ());
00236 dep[0].event.header.creation_time = ORBSVCS_Time::zero ();
00237 dep[0].rt_info = 0;
00238
00239 CORBA::ULong count = 1;
00240 for (HeadersIterator i = headers.begin (); i != headers.end (); ++i)
00241 {
00242 dep[count++].event.header = (*i).key ();
00243 }
00244 }
00245
00246 void
00247 TAO_EC_Basic_ObserverStrategy::fill_qos (
00248 RtecEventChannelAdmin::SupplierQOS &qos)
00249 {
00250 Headers headers;
00251
00252 TAO_EC_Accumulate_Consumer_Headers worker (headers);
00253 this->event_channel_->for_each_supplier (&worker);
00254
00255 qos.publications.length (static_cast<CORBA::ULong> (headers.current_size ()));
00256
00257 CORBA::ULong count = 0;
00258 for (HeadersIterator i = headers.begin (); i != headers.end (); ++i)
00259 {
00260 qos.publications[count++].event.header = (*i).key ();
00261 }
00262 }
00263
00264
00265
00266
00267 TAO_EC_Reactive_ObserverStrategy::~TAO_EC_Reactive_ObserverStrategy (void)
00268 {
00269 }
00270
00271 void
00272 TAO_EC_Reactive_ObserverStrategy::supplier_qos_update (
00273 TAO_EC_ProxyPushConsumer *consumer)
00274 {
00275 if (consumer->publications ().is_gateway)
00276 return;
00277
00278 RtecEventChannelAdmin::SupplierQOS s_qos;
00279 this->fill_qos (s_qos);
00280
00281 Observer_Map copy;
00282 this->create_observer_map (copy);
00283
00284 Observer_Map_Iterator end = copy.end ();
00285 for (Observer_Map_Iterator i = copy.begin ();
00286 i != end;
00287 ++i)
00288 {
00289 Observer_Entry& entry = (*i).int_id_;
00290 try
00291 {
00292 entry.observer->update_supplier (s_qos);
00293 }
00294 catch (const CORBA::OBJECT_NOT_EXIST&)
00295 {
00296
00297
00298 this->observer_not_exists (entry);
00299 }
00300 catch (const CORBA::TRANSIENT&)
00301 {
00302
00303
00304 this->observer_not_exists (entry);
00305 }
00306 catch (const CORBA::Exception&)
00307 {
00308
00309 }
00310 }
00311 }
00312
00313 void
00314 TAO_EC_Reactive_ObserverStrategy::consumer_qos_update (
00315 TAO_EC_ProxyPushSupplier *supplier)
00316 {
00317 if (supplier->subscriptions ().is_gateway)
00318 return;
00319
00320 RtecEventChannelAdmin::ConsumerQOS c_qos;
00321 this->fill_qos (c_qos);
00322
00323 Observer_Map copy;
00324 this->create_observer_map (copy);
00325
00326 Observer_Map_Iterator end = copy.end ();
00327 for (Observer_Map_Iterator i = copy.begin ();
00328 i != end;
00329 ++i)
00330 {
00331 Observer_Entry& entry = (*i).int_id_;
00332 try
00333 {
00334 entry.observer->update_consumer (c_qos);
00335 }
00336 catch (const CORBA::OBJECT_NOT_EXIST&)
00337 {
00338
00339
00340 this->observer_not_exists (entry);
00341 }
00342 catch (const CORBA::TRANSIENT&)
00343 {
00344
00345
00346 this->observer_not_exists (entry);
00347 }
00348 catch (const CORBA::Exception&)
00349 {
00350
00351 }
00352 }
00353 }
00354
00355 int
00356 TAO_EC_Reactive_ObserverStrategy::create_observer_map (Observer_Map &map)
00357 {
00358 ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00359 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00360
00361 Observer_Map_Iterator end = this->observers_.end ();
00362 for (Observer_Map_Iterator i = this->observers_.begin ();
00363 i != end;
00364 ++i)
00365 {
00366 Observer_Entry& entry = (*i).int_id_;
00367 Observer_Entry copy (entry.handle,
00368 RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ()));
00369 if (map.bind (copy.handle, copy) == -1)
00370 {
00371 map.unbind_all();
00372 return 0;
00373 }
00374 }
00375
00376 return map.current_size();
00377 }
00378
00379 void
00380 TAO_EC_Reactive_ObserverStrategy::observer_not_exists (Observer_Entry& observer)
00381 {
00382 try
00383 {
00384 this->remove_observer(observer.handle);
00385 }
00386 catch (const CORBA::Exception&)
00387 {
00388
00389 }
00390 }
00391
00392
00393
00394 void
00395 TAO_EC_Accumulate_Supplier_Headers::work (TAO_EC_ProxyPushSupplier *supplier)
00396 {
00397 const RtecEventChannelAdmin::ConsumerQOS& sub =
00398 supplier->subscriptions ();
00399 if (sub.is_gateway)
00400 return;
00401 for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j)
00402 {
00403 const RtecEventComm::Event& event =
00404 sub.dependencies[j].event;
00405 RtecEventComm::EventType type = event.header.type;
00406
00407 if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
00408 continue;
00409 this->headers_.insert (event.header, 1);
00410 }
00411 }
00412
00413
00414
00415 void
00416 TAO_EC_Accumulate_Consumer_Headers::work (TAO_EC_ProxyPushConsumer *consumer)
00417 {
00418 const RtecEventChannelAdmin::SupplierQOS& pub =
00419 consumer->publications ();
00420 if (pub.is_gateway)
00421 return;
00422 for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
00423 {
00424 const RtecEventComm::Event& event =
00425 pub.publications[j].event;
00426 RtecEventComm::EventType type = event.header.type;
00427
00428 if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
00429 continue;
00430 this->headers_.insert (event.header, 1);
00431 }
00432 }
00433
00434 TAO_END_VERSIONED_NAMESPACE_DECL