#include <EC_ObserverStrategy.h>
Inheritance diagram for TAO_EC_Reactive_ObserverStrategy:
Public Member Functions | |
TAO_EC_Reactive_ObserverStrategy (TAO_EC_Event_Channel_Base *ec, ACE_Lock *lock) | |
Constructor. | |
virtual | ~TAO_EC_Reactive_ObserverStrategy (void) |
Destructor. | |
Protected Member Functions | |
virtual void | consumer_qos_update (TAO_EC_ProxyPushSupplier *supplier) |
Recompute EC consumer subscriptions and send them out to all observers. | |
virtual void | supplier_qos_update (TAO_EC_ProxyPushConsumer *consumer) |
Recompute EC supplier publications and send them out to all observers. | |
int | create_observer_map (Observer_Map &map) |
void | observer_not_exists (Observer_Entry &observer) |
The observer doesn't exist anymore. |
This class simply keeps the information about the current list of observers, whenever the list of consumers and/or suppliers changes in queries the EC, computes the global subscription and/or publication list and sends the update message to all the observers. When an observer isn't reachable it is removed from the observer list.
It assumes ownership of the , but not of the Event_Channel.
Definition at line 286 of file EC_ObserverStrategy.h.
|
Constructor.
Definition at line 57 of file EC_ObserverStrategy.i.
00059 : TAO_EC_Basic_ObserverStrategy (ec, lock) 00060 { 00061 } |
|
Destructor.
Definition at line 328 of file EC_ObserverStrategy.cpp.
00329 { 00330 } |
|
Recompute EC consumer subscriptions and send them out to all observers.
Reimplemented from TAO_EC_Basic_ObserverStrategy. Definition at line 382 of file EC_ObserverStrategy.cpp. References ACE_CATCH, ACE_CATCHANY, ACE_CHECK, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, create_observer_map(), TAO_EC_Basic_ObserverStrategy::fill_qos(), and observer_not_exists().
00385 { 00386 if (supplier->subscriptions ().is_gateway) 00387 return; 00388 00389 RtecEventChannelAdmin::ConsumerQOS c_qos; 00390 this->fill_qos (c_qos ACE_ENV_ARG_PARAMETER); 00391 ACE_CHECK; 00392 00393 Observer_Map copy; 00394 this->create_observer_map (copy ACE_ENV_ARG_PARAMETER); 00395 ACE_CHECK; 00396 00397 Observer_Map_Iterator end = copy.end (); 00398 for (Observer_Map_Iterator i = copy.begin (); 00399 i != end; 00400 ++i) 00401 { 00402 Observer_Entry& entry = (*i).int_id_; 00403 ACE_TRY 00404 { 00405 entry.observer->update_consumer (c_qos ACE_ENV_ARG_PARAMETER); 00406 ACE_TRY_CHECK; 00407 } 00408 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) 00409 { 00410 // Exception occured while updating observer, so remove it from the 00411 // observer list 00412 this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER); 00413 ACE_TRY_CHECK; 00414 } 00415 ACE_CATCH (CORBA::TRANSIENT, transient) 00416 { 00417 // Exception occured while updating observer, so remove it from the 00418 // observer list 00419 this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER); 00420 ACE_TRY_CHECK; 00421 } 00422 ACE_CATCHANY 00423 { 00424 // Ignore all exceptions 00425 } 00426 ACE_ENDTRY; 00427 } 00428 } |
|
Copies all current observers into a map and passes it back to the caller through map.
Definition at line 431 of file EC_ObserverStrategy.cpp. References ACE_CHECK_RETURN, and ACE_GUARD_THROW_EX. Referenced by consumer_qos_update(), and supplier_qos_update().
00433 { 00434 ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, 00435 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); 00436 ACE_CHECK_RETURN (0); 00437 00438 Observer_Map_Iterator end = this->observers_.end (); 00439 for (Observer_Map_Iterator i = this->observers_.begin (); 00440 i != end; 00441 ++i) 00442 { 00443 Observer_Entry& entry = (*i).int_id_; 00444 Observer_Entry copy (entry.handle, 00445 RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ())); 00446 if (map.bind (copy.handle, copy) == -1) 00447 { 00448 map.unbind_all(); 00449 return 0; 00450 } 00451 } 00452 00453 return map.current_size(); 00454 } |
|
The observer doesn't exist anymore.
Definition at line 457 of file EC_ObserverStrategy.cpp. References ACE_CATCHANY, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and TAO_EC_Basic_ObserverStrategy::remove_observer(). Referenced by consumer_qos_update(), and supplier_qos_update().
00459 { 00460 ACE_TRY 00461 { 00462 this->remove_observer(observer.handle ACE_ENV_ARG_PARAMETER); 00463 ACE_TRY_CHECK; 00464 } 00465 ACE_CATCHANY 00466 { 00467 // Ignore exceptions 00468 } 00469 ACE_ENDTRY; 00470 } |
|
Recompute EC supplier publications and send them out to all observers.
Reimplemented from TAO_EC_Basic_ObserverStrategy. Definition at line 333 of file EC_ObserverStrategy.cpp. References ACE_CATCH, ACE_CATCHANY, ACE_CHECK, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, create_observer_map(), TAO_EC_Basic_ObserverStrategy::fill_qos(), and observer_not_exists().
00336 { 00337 if (consumer->publications ().is_gateway) 00338 return; 00339 00340 RtecEventChannelAdmin::SupplierQOS s_qos; 00341 this->fill_qos (s_qos ACE_ENV_ARG_PARAMETER); 00342 ACE_CHECK; 00343 00344 Observer_Map copy; 00345 this->create_observer_map (copy ACE_ENV_ARG_PARAMETER); 00346 ACE_CHECK; 00347 00348 Observer_Map_Iterator end = copy.end (); 00349 for (Observer_Map_Iterator i = copy.begin (); 00350 i != end; 00351 ++i) 00352 { 00353 Observer_Entry& entry = (*i).int_id_; 00354 ACE_TRY 00355 { 00356 entry.observer->update_supplier (s_qos ACE_ENV_ARG_PARAMETER); 00357 ACE_TRY_CHECK; 00358 } 00359 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) 00360 { 00361 // Exception occured while updating observer, so remove it from the 00362 // observer list 00363 this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER); 00364 ACE_TRY_CHECK; 00365 } 00366 ACE_CATCH (CORBA::TRANSIENT, transient) 00367 { 00368 // Exception occured while updating observer, so remove it from the 00369 // observer list 00370 this->observer_not_exists (entry ACE_ENV_ARG_PARAMETER); 00371 ACE_TRY_CHECK; 00372 } 00373 ACE_CATCHANY 00374 { 00375 // Ignore all exceptions 00376 } 00377 ACE_ENDTRY; 00378 } 00379 } |