#include <EC_ObserverStrategy.h>
Inheritance diagram for TAO_EC_Basic_ObserverStrategy:
This class simply keeps the information about the current list of observers, whenever the list of consumers and/or suppliers changes in queries the EC, computes the global subscription and/or publication list and sends the update message to all the observers.
It assumes ownership of the lock, but not of the Event_Channel.
Definition at line 128 of file EC_ObserverStrategy.h.
typedef ACE_RB_Tree<RtecEventComm::EventHeader,int,Header_Compare,ACE_Null_Mutex> TAO_EC_Basic_ObserverStrategy::Headers |
Definition at line 184 of file EC_ObserverStrategy.h.
typedef ACE_RB_Tree_Iterator<RtecEventComm::EventHeader,int,Header_Compare,ACE_Null_Mutex> TAO_EC_Basic_ObserverStrategy::HeadersIterator |
Definition at line 185 of file EC_ObserverStrategy.h.
typedef ACE_Map_Manager<RtecEventChannelAdmin::Observer_Handle,Observer_Entry,ACE_Null_Mutex> TAO_EC_Basic_ObserverStrategy::Observer_Map |
Definition at line 180 of file EC_ObserverStrategy.h.
typedef ACE_Map_Iterator<RtecEventChannelAdmin::Observer_Handle,Observer_Entry,ACE_Null_Mutex> TAO_EC_Basic_ObserverStrategy::Observer_Map_Iterator |
Definition at line 181 of file EC_ObserverStrategy.h.
ACE_INLINE TAO_EC_Basic_ObserverStrategy::TAO_EC_Basic_ObserverStrategy | ( | TAO_EC_Event_Channel_Base * | ec, | |
ACE_Lock * | lock | |||
) |
Constructor.
Definition at line 45 of file EC_ObserverStrategy.inl.
00047 : event_channel_ (ec), 00048 lock_ (lock), 00049 handle_generator_ (1) 00050 { 00051 }
TAO_EC_Basic_ObserverStrategy::~TAO_EC_Basic_ObserverStrategy | ( | void | ) | [virtual] |
RtecEventChannelAdmin::Observer_Handle TAO_EC_Basic_ObserverStrategy::append_observer | ( | RtecEventChannelAdmin::Observer_ptr | ) | [virtual] |
The basic methods to support the EC strategies.
Implements TAO_EC_ObserverStrategy.
Definition at line 69 of file EC_ObserverStrategy.cpp.
References ACE_GUARD_THROW_EX, fill_qos(), TAO_EC_Basic_ObserverStrategy::Observer_Entry::handle, and handle_generator_.
00071 { 00072 { 00073 ACE_GUARD_THROW_EX ( 00074 ACE_Lock, ace_mon, *this->lock_, 00075 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); 00076 00077 this->handle_generator_++; 00078 Observer_Entry entry (this->handle_generator_, 00079 RtecEventChannelAdmin::Observer::_duplicate (obs)); 00080 00081 if (this->observers_.bind (entry.handle, entry) == -1) 00082 throw RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(); 00083 } 00084 00085 RtecEventChannelAdmin::ConsumerQOS c_qos; 00086 this->fill_qos (c_qos); 00087 obs->update_consumer (c_qos); 00088 00089 RtecEventChannelAdmin::SupplierQOS s_qos; 00090 this->fill_qos (s_qos); 00091 obs->update_supplier (s_qos); 00092 00093 return this->handle_generator_; 00094 }
void TAO_EC_Basic_ObserverStrategy::connected | ( | TAO_EC_ProxyPushSupplier * | ) | [virtual] |
Used by the EC to inform the ObserverStrategy that a Supplier has connected or disconnected from it.
Implements TAO_EC_ObserverStrategy.
Definition at line 180 of file EC_ObserverStrategy.cpp.
00182 { 00183 this->consumer_qos_update (supplier); 00184 }
void TAO_EC_Basic_ObserverStrategy::connected | ( | TAO_EC_ProxyPushConsumer * | ) | [virtual] |
Used by the EC to inform the ObserverStrategy that a Consumer has connected or disconnected from it.
Implements TAO_EC_ObserverStrategy.
Definition at line 138 of file EC_ObserverStrategy.cpp.
References supplier_qos_update().
00140 { 00141 this->supplier_qos_update (consumer); 00142 }
void TAO_EC_Basic_ObserverStrategy::consumer_qos_update | ( | TAO_EC_ProxyPushSupplier * | supplier | ) | [protected, virtual] |
Recompute EC consumer subscriptions and send them out to all observers.
Reimplemented in TAO_EC_Reactive_ObserverStrategy.
Definition at line 194 of file EC_ObserverStrategy.cpp.
References create_observer_list(), fill_qos(), RtecEventChannelAdmin::ConsumerQOS::is_gateway, and TAO_EC_ProxyPushSupplier::subscriptions().
Referenced by disconnected().
00196 { 00197 if (supplier->subscriptions ().is_gateway) 00198 return; 00199 00200 RtecEventChannelAdmin::ConsumerQOS c_qos; 00201 this->fill_qos (c_qos); 00202 00203 RtecEventChannelAdmin::Observer_var *tmp = 0; 00204 int size = this->create_observer_list (tmp); 00205 ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp); 00206 00207 for (int i = 0; i != size; ++i) 00208 { 00209 try 00210 { 00211 copy[i]->update_consumer (c_qos); 00212 } 00213 catch (const CORBA::Exception&) 00214 { 00215 // Ignore exceptions, we must isolate other observers from 00216 // failures on this one. 00217 } 00218 } 00219 }
int TAO_EC_Basic_ObserverStrategy::create_observer_list | ( | RtecEventChannelAdmin::Observer_var *& | lst | ) | [protected] |
Copies all current observers into an array and passes it back to the caller through lst. Returns the size of the array.
Definition at line 109 of file EC_ObserverStrategy.cpp.
References ACE_GUARD_THROW_EX, ACE_NEW_RETURN, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::current_size(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), observers_, and ACE_Auto_Basic_Array_Ptr< X >::release().
Referenced by consumer_qos_update(), and supplier_qos_update().
00111 { 00112 ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, 00113 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); 00114 00115 int size = static_cast<int> (this->observers_.current_size ()); 00116 RtecEventChannelAdmin::Observer_var *tmp = 0; 00117 ACE_NEW_RETURN (tmp, 00118 RtecEventChannelAdmin::Observer_var[size], 00119 0); 00120 ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp); 00121 00122 Observer_Map_Iterator end = this->observers_.end (); 00123 int j = 0; 00124 for (Observer_Map_Iterator i = this->observers_.begin (); 00125 i != end; 00126 ++i) 00127 { 00128 Observer_Entry& entry = (*i).int_id_; 00129 copy[j++] = 00130 RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ()); 00131 } 00132 00133 lst = copy.release (); 00134 return size; 00135 }
void TAO_EC_Basic_ObserverStrategy::disconnected | ( | TAO_EC_ProxyPushSupplier * | ) | [virtual] |
Implements TAO_EC_ObserverStrategy.
Definition at line 187 of file EC_ObserverStrategy.cpp.
References consumer_qos_update().
00189 { 00190 this->consumer_qos_update (supplier); 00191 }
void TAO_EC_Basic_ObserverStrategy::disconnected | ( | TAO_EC_ProxyPushConsumer * | ) | [virtual] |
Implements TAO_EC_ObserverStrategy.
Definition at line 145 of file EC_ObserverStrategy.cpp.
References supplier_qos_update().
00147 { 00148 this->supplier_qos_update (consumer); 00149 }
void TAO_EC_Basic_ObserverStrategy::fill_qos | ( | RtecEventChannelAdmin::SupplierQOS & | qos | ) | [protected] |
Compute supplier QOS.
Definition at line 247 of file EC_ObserverStrategy.cpp.
References ACE_RB_Tree< EXT_ID, INT_ID, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_RB_Tree< EXT_ID, INT_ID, COMPARE_KEYS, ACE_LOCK >::current_size(), ACE_RB_Tree< EXT_ID, INT_ID, COMPARE_KEYS, ACE_LOCK >::end(), event_channel_, TAO_EC_Event_Channel_Base::for_each_supplier(), and RtecEventChannelAdmin::SupplierQOS::publications.
00249 { 00250 Headers headers; 00251 00252 TAO_EC_Accumulate_Consumer_Headers worker (headers); 00253 this->event_channel_->for_each_supplier (&worker); 00254 00255 qos.publications.length (static_cast<CORBA::ULong> (headers.current_size ())); 00256 00257 CORBA::ULong count = 0; 00258 for (HeadersIterator i = headers.begin (); i != headers.end (); ++i) 00259 { 00260 qos.publications[count++].event.header = (*i).key (); 00261 } 00262 }
void TAO_EC_Basic_ObserverStrategy::fill_qos | ( | RtecEventChannelAdmin::ConsumerQOS & | qos | ) | [protected] |
Compute consumer QOS.
Definition at line 222 of file EC_ObserverStrategy.cpp.
References ACE_ES_DISJUNCTION_DESIGNATOR, event_channel_, and TAO_EC_Event_Channel_Base::for_each_consumer().
Referenced by append_observer(), TAO_EC_Reactive_ObserverStrategy::consumer_qos_update(), consumer_qos_update(), TAO_EC_Reactive_ObserverStrategy::supplier_qos_update(), and supplier_qos_update().
00224 { 00225 Headers headers; 00226 00227 TAO_EC_Accumulate_Supplier_Headers worker (headers); 00228 this->event_channel_->for_each_consumer (&worker); 00229 00230 RtecEventChannelAdmin::DependencySet& dep = qos.dependencies; 00231 00232 dep.length (static_cast<CORBA::ULong> (headers.current_size () + 1)); 00233 00234 dep[0].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR; 00235 dep[0].event.header.source = static_cast<CORBA::ULong> (headers.current_size ()); 00236 dep[0].event.header.creation_time = ORBSVCS_Time::zero (); 00237 dep[0].rt_info = 0; 00238 00239 CORBA::ULong count = 1; 00240 for (HeadersIterator i = headers.begin (); i != headers.end (); ++i) 00241 { 00242 dep[count++].event.header = (*i).key (); 00243 } 00244 }
void TAO_EC_Basic_ObserverStrategy::remove_observer | ( | RtecEventChannelAdmin::Observer_Handle | ) | [virtual] |
Implements TAO_EC_ObserverStrategy.
Definition at line 98 of file EC_ObserverStrategy.cpp.
References ACE_GUARD_THROW_EX.
Referenced by TAO_EC_Reactive_ObserverStrategy::observer_not_exists().
00100 { 00101 ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, 00102 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); 00103 00104 if (this->observers_.unbind (handle) == -1) 00105 throw RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER(); 00106 }
void TAO_EC_Basic_ObserverStrategy::supplier_qos_update | ( | TAO_EC_ProxyPushConsumer * | consumer | ) | [protected, virtual] |
Recompute EC supplier publications and send them out to all observers.
Reimplemented in TAO_EC_Reactive_ObserverStrategy.
Definition at line 152 of file EC_ObserverStrategy.cpp.
References create_observer_list(), fill_qos(), RtecEventChannelAdmin::SupplierQOS::is_gateway, and TAO_EC_ProxyPushConsumer::publications().
Referenced by connected(), and disconnected().
00154 { 00155 if (consumer->publications ().is_gateway) 00156 return; 00157 00158 RtecEventChannelAdmin::SupplierQOS s_qos; 00159 this->fill_qos (s_qos); 00160 00161 RtecEventChannelAdmin::Observer_var *tmp = 0; 00162 int size = this->create_observer_list (tmp); 00163 ACE_Auto_Basic_Array_Ptr<RtecEventChannelAdmin::Observer_var> copy (tmp); 00164 00165 for (int i = 0; i != size; ++i) 00166 { 00167 try 00168 { 00169 copy[i]->update_supplier (s_qos); 00170 } 00171 catch (const CORBA::Exception&) 00172 { 00173 // Ignore exceptions, we must isolate other observers from 00174 // failures on this one. 00175 } 00176 } 00177 }
The handles are generated in sequential order, but are opaque to the client.
Definition at line 215 of file EC_ObserverStrategy.h.
Referenced by append_observer().
ACE_Lock* TAO_EC_Basic_ObserverStrategy::lock_ [protected] |
The lock.
Definition at line 211 of file EC_ObserverStrategy.h.
Referenced by ~TAO_EC_Basic_ObserverStrategy().
Keep the set of Observers.
Definition at line 218 of file EC_ObserverStrategy.h.
Referenced by create_observer_list(), and TAO_EC_Reactive_ObserverStrategy::create_observer_map().