00001
00002
00003 #include "orbsvcs/Event/EC_ObserverStrategy.h"
00004 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00005 #include "orbsvcs/Event/EC_ProxySupplier.h"
00006 #include "orbsvcs/Event/EC_ProxyConsumer.h"
00007 #include "orbsvcs/Event/EC_ConsumerAdmin.h"
00008 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00009 #include "orbsvcs/Event_Service_Constants.h"
00010 #include "ace/Auto_Ptr.h"
00011
00012 #if ! defined (__ACE_INLINE__)
00013 #include "orbsvcs/Event/EC_ObserverStrategy.i"
00014 #endif
00015
00016 ACE_RCSID(Event, EC_ObserverStrategy, "EC_ObserverStrategy.cpp,v 1.36 2006/03/14 06:14:25 jtc Exp")
00017
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 TAO_EC_ObserverStrategy::~TAO_EC_ObserverStrategy (void)
00021 {
00022 }
00023
00024
00025
00026 RtecEventChannelAdmin::Observer_Handle
00027 TAO_EC_Null_ObserverStrategy::append_observer (
00028 RtecEventChannelAdmin::Observer_ptr
00029 ACE_ENV_ARG_DECL)
00030 ACE_THROW_SPEC ((
00031 CORBA::SystemException,
00032 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
00033 RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER))
00034 {
00035 ACE_THROW_RETURN (
00036 RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(),
00037 0);
00038 }
00039
00040 void
00041 TAO_EC_Null_ObserverStrategy::remove_observer (
00042 RtecEventChannelAdmin::Observer_Handle
00043 ACE_ENV_ARG_DECL)
00044 ACE_THROW_SPEC ((
00045 CORBA::SystemException,
00046 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
00047 RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER))
00048 {
00049 ACE_THROW (RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER());
00050 }
00051
00052 void
00053 TAO_EC_Null_ObserverStrategy::connected (TAO_EC_ProxyPushConsumer*
00054 ACE_ENV_ARG_DECL_NOT_USED)
00055 {
00056 }
00057
00058 void
00059 TAO_EC_Null_ObserverStrategy::disconnected (TAO_EC_ProxyPushConsumer*
00060 ACE_ENV_ARG_DECL_NOT_USED)
00061 {
00062 }
00063
00064 void
00065 TAO_EC_Null_ObserverStrategy::connected (TAO_EC_ProxyPushSupplier*
00066 ACE_ENV_ARG_DECL_NOT_USED)
00067 {
00068 }
00069
00070 void
00071 TAO_EC_Null_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*
00072 ACE_ENV_ARG_DECL_NOT_USED)
00073 {
00074 }
00075
00076
00077
00078 TAO_EC_Basic_ObserverStrategy::~TAO_EC_Basic_ObserverStrategy (void)
00079 {
00080 delete this->lock_;
00081 this->lock_ = 0;
00082 }
00083
00084 RtecEventChannelAdmin::Observer_Handle
00085 TAO_EC_Basic_ObserverStrategy::append_observer (
00086 RtecEventChannelAdmin::Observer_ptr obs
00087 ACE_ENV_ARG_DECL)
00088 ACE_THROW_SPEC ((
00089 CORBA::SystemException,
00090 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
00091 RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER))
00092 {
00093 {
00094 ACE_GUARD_THROW_EX (
00095 ACE_Lock, ace_mon, *this->lock_,
00096 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00097 ACE_CHECK_RETURN (0);
00098
00099 this->handle_generator_++;
00100 Observer_Entry entry (this->handle_generator_,
00101 RtecEventChannelAdmin::Observer::_duplicate (obs));
00102
00103 if (this->observers_.bind (entry.handle, entry) == -1)
00104 ACE_THROW_RETURN (
00105 RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(),
00106 0);
00107 }
00108
00109 RtecEventChannelAdmin::ConsumerQOS c_qos;
00110 this->fill_qos (c_qos ACE_ENV_ARG_PARAMETER);
00111 ACE_CHECK_RETURN (0);
00112 obs->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
00113 ACE_CHECK_RETURN (0);
00114
00115 RtecEventChannelAdmin::SupplierQOS s_qos;
00116 this->fill_qos (s_qos ACE_ENV_ARG_PARAMETER);
00117 ACE_CHECK_RETURN (0);
00118 obs->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
00119 ACE_CHECK_RETURN (0);
00120
00121 return this->handle_generator_;
00122 }
00123
00124
00125 void
00126 TAO_EC_Basic_ObserverStrategy::remove_observer (
00127 RtecEventChannelAdmin::Observer_Handle handle
00128 ACE_ENV_ARG_DECL)
00129 ACE_THROW_SPEC ((
00130 CORBA::SystemException,
00131 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
00132 RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER))
00133 {
00134 ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00135 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00136 ACE_CHECK;
00137
00138 if (this->observers_.unbind (handle) == -1)
00139 ACE_THROW (
00140 RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER());
00141 }
00142
00143 int
00144 TAO_EC_Basic_ObserverStrategy::create_observer_list (
00145 RtecEventChannelAdmin::Observer_var *&lst
00146 ACE_ENV_ARG_DECL)
00147 {
00148 ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00149 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00150 ACE_CHECK_RETURN (0);
00151
00152 int size = static_cast<int> (this->observers_.current_size ());
00153 RtecEventChannelAdmin::Observer_var *tmp;
00154 ACE_NEW_RETURN (tmp,
00155 RtecEventChannelAdmin::Observer_var[size],
00156 0);
00157 ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00158
00159 Observer_Map_Iterator end = this->observers_.end ();
00160 int j = 0;
00161 for (Observer_Map_Iterator i = this->observers_.begin ();
00162 i != end;
00163 ++i)
00164 {
00165 Observer_Entry& entry = (*i).int_id_;
00166 copy[j++] =
00167 RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ());
00168 }
00169
00170 lst = copy.release ();
00171 return size;
00172 }
00173
00174 void
00175 TAO_EC_Basic_ObserverStrategy::connected (
00176 TAO_EC_ProxyPushConsumer *consumer
00177 ACE_ENV_ARG_DECL)
00178 {
00179 this->supplier_qos_update (consumer ACE_ENV_ARG_PARAMETER);
00180 ACE_CHECK;
00181 }
00182
00183 void
00184 TAO_EC_Basic_ObserverStrategy::disconnected (
00185 TAO_EC_ProxyPushConsumer* consumer
00186 ACE_ENV_ARG_DECL)
00187 {
00188 this->supplier_qos_update (consumer ACE_ENV_ARG_PARAMETER);
00189 ACE_CHECK;
00190 }
00191
00192 void
00193 TAO_EC_Basic_ObserverStrategy::supplier_qos_update (
00194 TAO_EC_ProxyPushConsumer *consumer
00195 ACE_ENV_ARG_DECL)
00196 {
00197 if (consumer->publications ().is_gateway)
00198 return;
00199
00200 RtecEventChannelAdmin::SupplierQOS s_qos;
00201 this->fill_qos (s_qos ACE_ENV_ARG_PARAMETER);
00202 ACE_CHECK;
00203
00204 RtecEventChannelAdmin::Observer_var *tmp = 0;
00205 int size = this->create_observer_list (tmp ACE_ENV_ARG_PARAMETER);
00206 ACE_CHECK;
00207 ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00208
00209 for (int i = 0; i != size; ++i)
00210 {
00211 ACE_TRY
00212 {
00213 copy[i]->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
00214 ACE_TRY_CHECK;
00215 }
00216 ACE_CATCHANY
00217 {
00218
00219
00220 }
00221 ACE_ENDTRY;
00222 }
00223 }
00224
00225 void
00226 TAO_EC_Basic_ObserverStrategy::connected (
00227 TAO_EC_ProxyPushSupplier* supplier
00228 ACE_ENV_ARG_DECL)
00229 {
00230 this->consumer_qos_update (supplier ACE_ENV_ARG_PARAMETER);
00231 ACE_CHECK;
00232 }
00233
00234 void
00235 TAO_EC_Basic_ObserverStrategy::disconnected (
00236 TAO_EC_ProxyPushSupplier* supplier
00237 ACE_ENV_ARG_DECL)
00238 {
00239 this->consumer_qos_update (supplier ACE_ENV_ARG_PARAMETER);
00240 ACE_CHECK;
00241 }
00242
00243 void
00244 TAO_EC_Basic_ObserverStrategy::consumer_qos_update (
00245 TAO_EC_ProxyPushSupplier *supplier
00246 ACE_ENV_ARG_DECL)
00247 {
00248 if (supplier->subscriptions ().is_gateway)
00249 return;
00250
00251 RtecEventChannelAdmin::ConsumerQOS c_qos;
00252 this->fill_qos (c_qos ACE_ENV_ARG_PARAMETER);
00253 ACE_CHECK;
00254
00255 RtecEventChannelAdmin::Observer_var *tmp = 0;
00256 int size = this->create_observer_list (tmp ACE_ENV_ARG_PARAMETER);
00257 ACE_CHECK;
00258 ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp);
00259
00260 for (int i = 0; i != size; ++i)
00261 {
00262 ACE_TRY
00263 {
00264 copy[i]->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
00265 ACE_TRY_CHECK;
00266 }
00267 ACE_CATCHANY
00268 {
00269
00270
00271 }
00272 ACE_ENDTRY;
00273 }
00274 }
00275
00276 void
00277 TAO_EC_Basic_ObserverStrategy::fill_qos (
00278 RtecEventChannelAdmin::ConsumerQOS &qos
00279 ACE_ENV_ARG_DECL)
00280 {
00281 Headers headers;
00282
00283 TAO_EC_Accumulate_Supplier_Headers worker (headers);
00284 this->event_channel_->for_each_consumer (&worker
00285 ACE_ENV_ARG_PARAMETER);
00286 ACE_CHECK;
00287
00288 RtecEventChannelAdmin::DependencySet& dep = qos.dependencies;
00289
00290 dep.length (static_cast<CORBA::ULong> (headers.current_size () + 1));
00291
00292 dep[0].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR;
00293 dep[0].event.header.source = static_cast<CORBA::ULong> (headers.current_size ());
00294 dep[0].event.header.creation_time = ORBSVCS_Time::zero ();
00295 dep[0].rt_info = 0;
00296
00297 CORBA::ULong count = 1;
00298 for (HeadersIterator i = headers.begin (); i != headers.end (); ++i)
00299 {
00300 dep[count++].event.header = (*i).key ();
00301 }
00302 }
00303
00304 void
00305 TAO_EC_Basic_ObserverStrategy::fill_qos (
00306 RtecEventChannelAdmin::SupplierQOS &qos
00307 ACE_ENV_ARG_DECL)
00308 {
00309 Headers headers;
00310
00311 TAO_EC_Accumulate_Consumer_Headers worker (headers);
00312 this->event_channel_->for_each_supplier (&worker
00313 ACE_ENV_ARG_PARAMETER);
00314 ACE_CHECK;
00315
00316 qos.publications.length (static_cast<CORBA::ULong> (headers.current_size ()));
00317
00318 CORBA::ULong count = 0;
00319 for (HeadersIterator i = headers.begin (); i != headers.end (); ++i)
00320 {
00321 qos.publications[count++].event.header = (*i).key ();
00322 }
00323 }
00324
00325
00326
00327
00328 TAO_EC_Reactive_ObserverStrategy::~TAO_EC_Reactive_ObserverStrategy (void)
00329 {
00330 }
00331
00332 void
00333 TAO_EC_Reactive_ObserverStrategy::supplier_qos_update (
00334 TAO_EC_ProxyPushConsumer *consumer
00335 ACE_ENV_ARG_DECL)
00336 {
00337 if (consumer->publications ().is_gateway)
00338 return;
00339
00340 RtecEventChannelAdmin::SupplierQOS s_qos;
00341 this->fill_qos (s_qos ACE_ENV_ARG_PARAMETER);
00342 ACE_CHECK;
00343
00344 Observer_Map copy;
00345 this->create_observer_map (copy ACE_ENV_ARG_PARAMETER);
00346 ACE_CHECK;
00347
00348 Observer_Map_Iterator end = copy.end ();
00349 for (Observer_Map_Iterator i = copy.begin ();
00350 i != end;
00351 ++i)
00352 {
00353 Observer_Entry& entry = (*i).int_id_;
00354 ACE_TRY
00355 {
00356 entry.observer->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
00357 ACE_TRY_CHECK;
00358 }
00359 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00360 {
00361
00362
00363 this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER);
00364 ACE_TRY_CHECK;
00365 }
00366 ACE_CATCH (CORBA::TRANSIENT, transient)
00367 {
00368
00369
00370 this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER);
00371 ACE_TRY_CHECK;
00372 }
00373 ACE_CATCHANY
00374 {
00375
00376 }
00377 ACE_ENDTRY;
00378 }
00379 }
00380
00381 void
00382 TAO_EC_Reactive_ObserverStrategy::consumer_qos_update (
00383 TAO_EC_ProxyPushSupplier *supplier
00384 ACE_ENV_ARG_DECL)
00385 {
00386 if (supplier->subscriptions ().is_gateway)
00387 return;
00388
00389 RtecEventChannelAdmin::ConsumerQOS c_qos;
00390 this->fill_qos (c_qos ACE_ENV_ARG_PARAMETER);
00391 ACE_CHECK;
00392
00393 Observer_Map copy;
00394 this->create_observer_map (copy ACE_ENV_ARG_PARAMETER);
00395 ACE_CHECK;
00396
00397 Observer_Map_Iterator end = copy.end ();
00398 for (Observer_Map_Iterator i = copy.begin ();
00399 i != end;
00400 ++i)
00401 {
00402 Observer_Entry& entry = (*i).int_id_;
00403 ACE_TRY
00404 {
00405 entry.observer->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
00406 ACE_TRY_CHECK;
00407 }
00408 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00409 {
00410
00411
00412 this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER);
00413 ACE_TRY_CHECK;
00414 }
00415 ACE_CATCH (CORBA::TRANSIENT, transient)
00416 {
00417
00418
00419 this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER);
00420 ACE_TRY_CHECK;
00421 }
00422 ACE_CATCHANY
00423 {
00424
00425 }
00426 ACE_ENDTRY;
00427 }
00428 }
00429
00430 int
00431 TAO_EC_Reactive_ObserverStrategy::create_observer_map (Observer_Map &map
00432 ACE_ENV_ARG_DECL)
00433 {
00434 ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
00435 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
00436 ACE_CHECK_RETURN (0);
00437
00438 Observer_Map_Iterator end = this->observers_.end ();
00439 for (Observer_Map_Iterator i = this->observers_.begin ();
00440 i != end;
00441 ++i)
00442 {
00443 Observer_Entry& entry = (*i).int_id_;
00444 Observer_Entry copy (entry.handle,
00445 RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ()));
00446 if (map.bind (copy.handle, copy) == -1)
00447 {
00448 map.unbind_all();
00449 return 0;
00450 }
00451 }
00452
00453 return map.current_size();
00454 }
00455
00456 void
00457 TAO_EC_Reactive_ObserverStrategy::observer_not_exists (Observer_Entry& observer
00458 ACE_ENV_ARG_DECL)
00459 {
00460 ACE_TRY
00461 {
00462 this->remove_observer(observer.handle ACE_ENV_ARG_PARAMETER);
00463 ACE_TRY_CHECK;
00464 }
00465 ACE_CATCHANY
00466 {
00467
00468 }
00469 ACE_ENDTRY;
00470 }
00471
00472
00473
00474 void
00475 TAO_EC_Accumulate_Supplier_Headers::work (TAO_EC_ProxyPushSupplier *supplier
00476 ACE_ENV_ARG_DECL_NOT_USED)
00477 {
00478 const RtecEventChannelAdmin::ConsumerQOS& sub =
00479 supplier->subscriptions ();
00480 if (sub.is_gateway)
00481 return;
00482 for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j)
00483 {
00484 const RtecEventComm::Event& event =
00485 sub.dependencies[j].event;
00486 RtecEventComm::EventType type = event.header.type;
00487
00488 if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
00489 continue;
00490 this->headers_.insert (event.header, 1);
00491 }
00492 }
00493
00494
00495
00496 void
00497 TAO_EC_Accumulate_Consumer_Headers::work (TAO_EC_ProxyPushConsumer *consumer
00498 ACE_ENV_ARG_DECL_NOT_USED)
00499 {
00500 const RtecEventChannelAdmin::SupplierQOS& pub =
00501 consumer->publications ();
00502 if (pub.is_gateway)
00503 return;
00504 for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
00505 {
00506 const RtecEventComm::Event& event =
00507 pub.publications[j].event;
00508 RtecEventComm::EventType type = event.header.type;
00509
00510 if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
00511 continue;
00512 this->headers_.insert (event.header, 1);
00513 }
00514 }
00515
00516 TAO_END_VERSIONED_NAMESPACE_DECL