#include <EC_Gateway_IIOP.h>
Inheritance diagram for TAO_EC_Gateway_IIOP:
Public Member Functions | |
TAO_EC_Gateway_IIOP (void) | |
virtual | ~TAO_EC_Gateway_IIOP (void) |
int | init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, RtecEventChannelAdmin::EventChannel_ptr consumer_ec) |
void | disconnect_push_supplier (void) |
The channel is disconnecting. | |
void | disconnect_push_consumer (void) |
The channel is disconnecting. | |
void | push (const RtecEventComm::EventSet &events) |
int | shutdown (void) |
Disconnect and shutdown the gateway. | |
virtual void | close (void) |
virtual void | update_consumer (const RtecEventChannelAdmin::ConsumerQOS &sub) |
virtual void | update_supplier (const RtecEventChannelAdmin::SupplierQOS &pub) |
void | reconnect_consumer_ec (void) |
CORBA::Boolean | consumer_ec_non_existent (CORBA::Boolean_out disconnected) |
Check whether the consumer event channel is non existent or not. | |
void | cleanup_consumer_proxies (void) |
int | cleanup_consumer_ec (void) |
int | cleanup_supplier_ec (void) |
void | suspend_supplier_ec (void) |
Suspend the connection to the supplier ec. | |
void | resume_supplier_ec (void) |
Resume the connection to the supplier ec. | |
Protected Types | |
typedef ACE_Map_Manager< RtecEventComm::EventSourceID, RtecEventChannelAdmin::ProxyPushConsumer_ptr, ACE_Null_Mutex > | Consumer_Map |
typedef ACE_Map_Iterator< RtecEventComm::EventSourceID, RtecEventChannelAdmin::ProxyPushConsumer_ptr, ACE_Null_Mutex > | Consumer_Map_Iterator |
Protected Member Functions | |
int | init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, RtecEventChannelAdmin::EventChannel_ptr consumer_ec) |
Do the real work in init(). | |
Protected Attributes | |
TAO_SYNCH_MUTEX | lock_ |
Lock to synchronize internal changes. | |
CORBA::ULong | busy_count_ |
int | update_posted_ |
RtecEventChannelAdmin::ConsumerQOS | c_qos_ |
int | cleanup_posted_ |
int | supplier_ec_suspended_ |
RtecEventChannelAdmin::EventChannel_var | supplier_ec_ |
RtecEventChannelAdmin::EventChannel_var | consumer_ec_ |
The event channel acting as consumer of this gateway. | |
RtecBase::handle_t | supplier_info_ |
Our RT_Infos for the event channel that is the supplier. | |
RtecBase::handle_t | consumer_info_ |
Our RT_Infos for the event channel that is the consumer. | |
ACE_PushConsumer_Adapter< TAO_EC_Gateway_IIOP > | consumer_ |
Our consumer personality.... | |
bool | consumer_is_active_ |
If it is true then we must deactivate the consumer. | |
ACE_PushSupplier_Adapter< TAO_EC_Gateway_IIOP > | supplier_ |
Our supplier personality.... | |
bool | supplier_is_active_ |
If it is true then we must deactivate the supplier. | |
Consumer_Map | consumer_proxy_map_ |
RtecEventChannelAdmin::ProxyPushConsumer_var | default_consumer_proxy_ |
RtecEventChannelAdmin::ProxyPushSupplier_var | supplier_proxy_ |
TAO_ECG_ConsumerEC_Control * | ec_control_ |
TAO_EC_Gateway_IIOP_Factory * | factory_ |
The Gateway IIOP Factory for all the settings. | |
int | use_ttl_ |
If 1, we use the TTL flags, if 0, we just ignore TTL. | |
int | use_consumer_proxy_map_ |
Private Member Functions | |
void | close_i (void) |
void | disconnect_supplier_proxy_i (void) |
Disconnect the supplier proxy. | |
void | disconnect_consumer_proxies_i (void) |
Disconnect all consumer proxies. | |
void | cleanup_consumer_proxies_i (void) |
Remove all consumer proxies without calling disconnect on them. | |
void | update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS &sub) |
void | open_i (const RtecEventChannelAdmin::ConsumerQOS &sub) |
Create all connections to consumer ec and to supplier ec. | |
CORBA::Boolean | is_consumer_ec_connected_i (void) const |
Helper method to see if consumer ec is connected. | |
void | push_to_consumer (RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer, const RtecEventComm::EventSet &event) |
Push the event to the consumer. | |
void | cleanup_consumer_ec_i (void) |
void | cleanup_supplier_ec_i (void) |
This class mediates among two event channels, it connects as a consumer of events with a remote event channel, and as a supplier of events with the local EC. As a consumer it gives a QoS designed to only accept the events in which *local* consumers are interested. Eventually the local EC should create this object and compute its QoS in an automated manner; but this requires some way to filter out the peers registered as consumers, otherwise we will get loops in the QoS graph. It uses exactly the same set of events in the publications list when connected as a supplier.
Definition at line 65 of file EC_Gateway_IIOP.h.
typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> TAO_EC_Gateway_IIOP::Consumer_Map [protected] |
Definition at line 210 of file EC_Gateway_IIOP.h.
typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> TAO_EC_Gateway_IIOP::Consumer_Map_Iterator [protected] |
Definition at line 211 of file EC_Gateway_IIOP.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP | ( | void | ) |
Definition at line 19 of file EC_Gateway_IIOP.cpp.
References ACE_NEW, ACE_Dynamic_Service< TYPE >::instance(), and TAO_EC_Gateway_IIOP_Factory::use_ttl().
00020 : busy_count_ (0), 00021 update_posted_ (0), 00022 cleanup_posted_ (0), 00023 supplier_ec_suspended_ (0), 00024 supplier_info_ (0), 00025 consumer_info_ (0), 00026 consumer_ (this), 00027 consumer_is_active_ (false), 00028 supplier_ (this), 00029 supplier_is_active_ (false), 00030 ec_control_ (0), 00031 factory_ (0), 00032 use_ttl_ (1), 00033 use_consumer_proxy_map_ (1) 00034 { 00035 if (this->factory_ == 0) 00036 { 00037 this->factory_ = 00038 ACE_Dynamic_Service<TAO_EC_Gateway_IIOP_Factory>::instance ("EC_Gateway_IIOP_Factory"); 00039 00040 if (this->factory_ == 0) 00041 { 00042 TAO_EC_Gateway_IIOP_Factory *f = 0; 00043 ACE_NEW (f, 00044 TAO_EC_Gateway_IIOP_Factory); 00045 this->factory_ = f; 00046 } 00047 } 00048 00049 if (this->factory_ != 0) 00050 { 00051 this->use_ttl_ = this->factory_->use_ttl(); 00052 this->use_consumer_proxy_map_ = this->factory_->use_consumer_proxy_map(); 00053 } 00054 }
TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP | ( | void | ) | [virtual] |
Definition at line 56 of file EC_Gateway_IIOP.cpp.
References ec_control_.
00057 { 00058 delete ec_control_; 00059 ec_control_ = 0; 00060 }
int TAO_EC_Gateway_IIOP::cleanup_consumer_ec | ( | void | ) |
Cleanup the connection to the consumer ec. Doesn't call anything on the ec again, just set the object to nil
Definition at line 575 of file EC_Gateway_IIOP.cpp.
References ACE_GUARD_RETURN, cleanup_consumer_ec_i(), and TAO_SYNCH_MUTEX.
Referenced by TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist(), and TAO_ECG_Reactive_ConsumerEC_Control::system_exception().
00576 { 00577 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00578 00579 this->cleanup_consumer_ec_i (); 00580 00581 return 0; 00582 }
void TAO_EC_Gateway_IIOP::cleanup_consumer_ec_i | ( | void | ) | [private] |
Definition at line 595 of file EC_Gateway_IIOP.cpp.
References consumer_ec_.
Referenced by cleanup_consumer_ec(), and shutdown().
00596 { 00597 this->consumer_ec_ = 00598 RtecEventChannelAdmin::EventChannel::_nil (); 00599 }
void TAO_EC_Gateway_IIOP::cleanup_consumer_proxies | ( | void | ) |
Cleanup all consumer proxies we have without trying to tell the consumer that we are going to disconnect. This can be used to cleanup the consumer proxy administration in case we know that the consumers are all unreachable.
Definition at line 106 of file EC_Gateway_IIOP.cpp.
References ACE_GUARD, cleanup_consumer_proxies_i(), cleanup_posted_, and TAO_SYNCH_MUTEX.
Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist(), TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist(), TAO_ECG_Reconnect_ConsumerEC_Control::system_exception(), and TAO_ECG_Reactive_ConsumerEC_Control::system_exception().
00107 { 00108 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00109 00110 // In case we are still pushing, don't cleanup the proxies 00111 if (this->busy_count_ != 0) 00112 { 00113 this->cleanup_posted_ = 1; 00114 return; 00115 } 00116 00117 this->cleanup_consumer_proxies_i (); 00118 }
void TAO_EC_Gateway_IIOP::cleanup_consumer_proxies_i | ( | void | ) | [private] |
Remove all consumer proxies without calling disconnect on them.
Definition at line 211 of file EC_Gateway_IIOP.cpp.
References consumer_proxy_map_, default_consumer_proxy_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), CORBA::is_nil(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::open(), and CORBA::release().
Referenced by cleanup_consumer_proxies(), and push().
00212 { 00213 if (this->consumer_proxy_map_.current_size () > 0) 00214 { 00215 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin (); 00216 j != this->consumer_proxy_map_.end (); 00217 ++j) 00218 { 00219 RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_; 00220 if (CORBA::is_nil (consumer)) 00221 continue; 00222 00223 CORBA::release (consumer); 00224 } 00225 // Remove all the elements on the map. Calling close() does not 00226 // work because the map is left in an inconsistent state. 00227 this->consumer_proxy_map_.open (); 00228 } 00229 00230 if (!CORBA::is_nil (this->default_consumer_proxy_.in ())) 00231 { 00232 this->default_consumer_proxy_ = 00233 RtecEventChannelAdmin::ProxyPushConsumer::_nil (); 00234 } 00235 }
int TAO_EC_Gateway_IIOP::cleanup_supplier_ec | ( | void | ) |
Cleanup the connection to the supplier ec. Doesn't call anything on the ec again, just set the object to nil
Definition at line 585 of file EC_Gateway_IIOP.cpp.
References ACE_GUARD_RETURN, cleanup_supplier_ec_i(), and TAO_SYNCH_MUTEX.
00586 { 00587 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00588 00589 this->cleanup_supplier_ec_i (); 00590 00591 return 0; 00592 }
void TAO_EC_Gateway_IIOP::cleanup_supplier_ec_i | ( | void | ) | [private] |
Definition at line 602 of file EC_Gateway_IIOP.cpp.
References supplier_ec_.
Referenced by cleanup_supplier_ec(), and shutdown().
00603 { 00604 this->supplier_ec_ = 00605 RtecEventChannelAdmin::EventChannel::_nil (); 00606 }
void TAO_EC_Gateway_IIOP::close | ( | void | ) | [virtual] |
The gateway must disconnect from all the relevant event channels, or any other communication media (such as multicast groups).
Implements TAO_EC_Gateway.
Definition at line 98 of file EC_Gateway_IIOP.cpp.
References ACE_GUARD, close_i(), and TAO_SYNCH_MUTEX.
00099 { 00100 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00101 00102 this->close_i (); 00103 }
void TAO_EC_Gateway_IIOP::close_i | ( | void | ) | [private] |
Definition at line 121 of file EC_Gateway_IIOP.cpp.
References disconnect_consumer_proxies_i(), and disconnect_supplier_proxy_i().
Referenced by close(), shutdown(), and update_consumer_i().
00122 { 00123 // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n")); 00124 this->disconnect_consumer_proxies_i (); 00125 00126 this->disconnect_supplier_proxy_i (); 00127 }
CORBA::Boolean TAO_EC_Gateway_IIOP::consumer_ec_non_existent | ( | CORBA::Boolean_out | disconnected | ) |
Check whether the consumer event channel is non existent or not.
Definition at line 615 of file EC_Gateway_IIOP.cpp.
References CORBA::Object::_duplicate(), ACE_GUARD_THROW_EX, and TAO_SYNCH_MUTEX.
Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel(), TAO_ECG_Reactive_ConsumerEC_Control::query_eventchannel(), and TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect().
00617 { 00618 CORBA::Object_var consumer_ec; 00619 { 00620 ACE_GUARD_THROW_EX ( 00621 TAO_SYNCH_MUTEX, ace_mon, this->lock_, 00622 CORBA::INTERNAL ()); 00623 00624 disconnected = 0; 00625 if (this->is_consumer_ec_connected_i () == 0) 00626 { 00627 disconnected = 1; 00628 return 0; 00629 } 00630 00631 consumer_ec = CORBA::Object::_duplicate (this->consumer_ec_.in ()); 00632 } 00633 00634 #if (TAO_HAS_MINIMUM_CORBA == 0) 00635 return consumer_ec->_non_existent (); 00636 #else 00637 return 0; 00638 #endif /* TAO_HAS_MINIMUM_CORBA */ 00639 }
void TAO_EC_Gateway_IIOP::disconnect_consumer_proxies_i | ( | void | ) | [private] |
Disconnect all consumer proxies.
Definition at line 130 of file EC_Gateway_IIOP.cpp.
References consumer_proxy_map_, default_consumer_proxy_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), CORBA::is_nil(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::open(), and CORBA::release().
Referenced by close_i().
00131 { 00132 if (this->consumer_proxy_map_.current_size () > 0) 00133 { 00134 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin (); 00135 j != this->consumer_proxy_map_.end (); 00136 ++j) 00137 { 00138 RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_; 00139 if (CORBA::is_nil (consumer)) 00140 continue; 00141 try 00142 { 00143 consumer->disconnect_push_consumer (); 00144 } 00145 catch (const CORBA::Exception&) 00146 { 00147 } 00148 CORBA::release (consumer); 00149 } 00150 // Remove all the elements on the map. Calling close() does not 00151 // work because the map is left in an inconsistent state. 00152 this->consumer_proxy_map_.open (); 00153 } 00154 00155 if (!CORBA::is_nil (this->default_consumer_proxy_.in ())) 00156 { 00157 this->default_consumer_proxy_->disconnect_push_consumer (); 00158 00159 this->default_consumer_proxy_ = 00160 RtecEventChannelAdmin::ProxyPushConsumer::_nil (); 00161 } 00162 }
void TAO_EC_Gateway_IIOP::disconnect_push_consumer | ( | void | ) |
The channel is disconnecting.
Definition at line 428 of file EC_Gateway_IIOP.cpp.
00429 { 00430 // ACE_DEBUG ((LM_DEBUG, 00431 // "ECG (%t): Supplier-consumer received " 00432 // "disconnect from channel.\n")); 00433 }
void TAO_EC_Gateway_IIOP::disconnect_push_supplier | ( | void | ) |
The channel is disconnecting.
Definition at line 436 of file EC_Gateway_IIOP.cpp.
00437 { 00438 // ACE_DEBUG ((LM_DEBUG, 00439 // "ECG (%t): Supplier received " 00440 // "disconnect from channel.\n")); 00441 }
void TAO_EC_Gateway_IIOP::disconnect_supplier_proxy_i | ( | void | ) | [private] |
Disconnect the supplier proxy.
Definition at line 165 of file EC_Gateway_IIOP.cpp.
References CORBA::is_nil(), and supplier_proxy_.
Referenced by close_i().
00166 { 00167 if (!CORBA::is_nil (this->supplier_proxy_.in ())) 00168 { 00169 this->supplier_proxy_->disconnect_push_supplier (); 00170 00171 this->supplier_proxy_ = 00172 RtecEventChannelAdmin::ProxyPushSupplier::_nil (); 00173 } 00174 }
int TAO_EC_Gateway_IIOP::init | ( | RtecEventChannelAdmin::EventChannel_ptr | supplier_ec, | |
RtecEventChannelAdmin::EventChannel_ptr | consumer_ec | |||
) |
To do its job this class requires to know the local and remote ECs it will connect to.
Definition at line 63 of file EC_Gateway_IIOP.cpp.
References ACE_GUARD_RETURN, init_i(), and TAO_SYNCH_MUTEX.
00065 { 00066 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00067 00068 return this->init_i (supplier_ec, consumer_ec); 00069 }
int TAO_EC_Gateway_IIOP::init_i | ( | RtecEventChannelAdmin::EventChannel_ptr | supplier_ec, | |
RtecEventChannelAdmin::EventChannel_ptr | consumer_ec | |||
) | [protected] |
Do the real work in init().
Definition at line 72 of file EC_Gateway_IIOP.cpp.
References ACE_ERROR_RETURN, TAO_ECG_ConsumerEC_Control::activate(), consumer_ec_, TAO_EC_Gateway_IIOP_Factory::create_consumerec_control(), ec_control_, factory_, CORBA::is_nil(), LM_ERROR, and supplier_ec_.
Referenced by TAO_EC_Gateway_Sched::init(), and init().
00074 { 00075 if (CORBA::is_nil (this->supplier_ec_.in ()) && CORBA::is_nil (this->consumer_ec_.in ())) 00076 { 00077 this->supplier_ec_ = 00078 RtecEventChannelAdmin::EventChannel::_duplicate (supplier_ec); 00079 this->consumer_ec_ = 00080 RtecEventChannelAdmin::EventChannel::_duplicate (consumer_ec); 00081 00082 if (ec_control_ == 0) 00083 { 00084 ec_control_ = factory_->create_consumerec_control(this); 00085 ec_control_->activate(); 00086 } 00087 00088 return 0; 00089 } 00090 else 00091 ACE_ERROR_RETURN ((LM_ERROR, 00092 "TAO_EC_Gateway_IIOP - init_i " 00093 "Supplier and consumer event channel reference " 00094 "should be nil.\n"), -1); 00095 }
CORBA::Boolean TAO_EC_Gateway_IIOP::is_consumer_ec_connected_i | ( | void | ) | const [private] |
Helper method to see if consumer ec is connected.
Definition at line 609 of file EC_Gateway_IIOP.cpp.
References CORBA::is_nil().
00610 { 00611 return !CORBA::is_nil (this->consumer_ec_.in ()); 00612 }
void TAO_EC_Gateway_IIOP::open_i | ( | const RtecEventChannelAdmin::ConsumerQOS & | sub | ) | [private] |
Create all connections to consumer ec and to supplier ec.
Definition at line 253 of file EC_Gateway_IIOP.cpp.
References ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_UNDEFINED, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::bind(), consumer_, consumer_ec_, consumer_info_, consumer_is_active_, consumer_proxy_map_, default_consumer_proxy_, RtecEventChannelAdmin::ConsumerQOS::dependencies, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), RtecEventChannelAdmin::SupplierQOS::is_gateway, RtecEventChannelAdmin::ConsumerQOS::is_gateway, RtecEventChannelAdmin::SupplierQOS::publications, supplier_, supplier_ec_, supplier_is_active_, supplier_proxy_, and RtecBase::TWO_WAY_CALL.
Referenced by update_consumer_i().
00255 { 00256 // = Connect as a supplier to the consumer EC 00257 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = 00258 this->consumer_ec_->for_suppliers (); 00259 00260 RtecEventChannelAdmin::ConsumerQOS sub = c_qos; 00261 sub.is_gateway = 1; 00262 00263 // Change the RT_Info in the consumer QoS. 00264 // On the same loop we discover the subscriptions by event source, 00265 // and fill the consumer proxy map if we have to use this map. 00266 for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i) 00267 { 00268 sub.dependencies[i].rt_info = this->supplier_info_; 00269 00270 RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0; 00271 const RtecEventComm::EventHeader &h = 00272 sub.dependencies[i].event.header; 00273 00274 RtecEventComm::EventSourceID sid = h.source; 00275 00276 //ACE_DEBUG ((LM_DEBUG, 00277 // "ECG (%t) trying (%d,%d)\n", 00278 // sid, h.type)); 00279 00280 // Skip all subscriptions that do not require an specific source 00281 // id or skip all subscriptions when we don't need to use the consumer 00282 // proxy map. 00283 if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0) 00284 continue; 00285 00286 // Skip all the magic event types. 00287 if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED) 00288 continue; 00289 00290 if (this->consumer_proxy_map_.find (sid, proxy) == -1) 00291 { 00292 //ACE_DEBUG ((LM_DEBUG, 00293 // "ECG (%t) binding source %d\n", 00294 // sid)); 00295 proxy = supplier_admin->obtain_push_consumer (); 00296 this->consumer_proxy_map_.bind (sid, proxy); 00297 } 00298 } 00299 //ACE_DEBUG ((LM_DEBUG, 00300 // "ECG (%t) consumer map computed (%d entries)\n", 00301 // this->consumer_proxy_map_.current_size ())); 00302 00303 if (this->consumer_proxy_map_.current_size () > 0) 00304 { 00305 this->supplier_is_active_ = true; 00306 00307 // Obtain a reference to our supplier personality... 00308 RtecEventComm::PushSupplier_var supplier_ref = this->supplier_._this (); 00309 00310 // For each subscription by source build the set of publications 00311 // (they may several, by type, for instance) and connect to the 00312 // consumer proxy. 00313 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin (); 00314 j != this->consumer_proxy_map_.end (); 00315 ++j) 00316 { 00317 RtecEventChannelAdmin::SupplierQOS pub; 00318 pub.publications.length (sub.dependencies.length () + 1); 00319 pub.is_gateway = 1; 00320 00321 int c = 0; 00322 00323 RtecEventComm::EventSourceID sid = (*j).ext_id_; 00324 for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k) 00325 { 00326 const RtecEventComm::EventHeader& h = 00327 sub.dependencies[k].event.header; 00328 if (h.source != sid 00329 || (ACE_ES_EVENT_ANY < h.type 00330 && h.type < ACE_ES_EVENT_UNDEFINED)) 00331 continue; 00332 pub.publications[c].event.header = h; 00333 pub.publications[c].dependency_info.dependency_type = 00334 RtecBase::TWO_WAY_CALL; 00335 pub.publications[c].dependency_info.number_of_calls = 1; 00336 pub.publications[c].dependency_info.rt_info = this->consumer_info_; 00337 c++; 00338 } 00339 //ACE_DEBUG ((LM_DEBUG, 00340 // "ECG (%t) supplier id %d has %d elements\n", 00341 // sid, c)); 00342 if (c == 0) 00343 continue; 00344 00345 pub.publications.length (c); 00346 00347 // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier ")); 00348 // ACE_SupplierQOS_Factory::debug (pub); 00349 (*j).int_id_->connect_push_supplier (supplier_ref.in (), 00350 pub); 00351 } 00352 } 00353 00354 // Also build the subscriptions that are *not* by source when we use the 00355 // consumer proxy map, and all subscriptions when we don't use the map and 00356 // then connect to the default consumer proxy. 00357 RtecEventChannelAdmin::SupplierQOS pub; 00358 pub.publications.length (sub.dependencies.length () + 1); 00359 pub.is_gateway = 1; 00360 int c = 0; 00361 for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k) 00362 { 00363 const RtecEventComm::EventHeader& h = 00364 sub.dependencies[k].event.header; 00365 RtecEventComm::EventSourceID sid = h.source; 00366 00367 // Skip all subscriptions with a specific source when we use the map 00368 if (sid != ACE_ES_EVENT_SOURCE_ANY && this->use_consumer_proxy_map_ == 1) 00369 continue; 00370 00371 // Skip all the magic event types. 00372 if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED) 00373 continue; 00374 00375 pub.publications[c].event.header = h; 00376 pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero (); 00377 pub.publications[c].dependency_info.dependency_type = 00378 RtecBase::TWO_WAY_CALL; 00379 pub.publications[c].dependency_info.number_of_calls = 1; 00380 pub.publications[c].dependency_info.rt_info = this->consumer_info_; 00381 c++; 00382 } 00383 00384 if (c > 0) 00385 { 00386 this->supplier_is_active_ = true; 00387 00388 // Obtain a reference to our supplier personality... 00389 RtecEventComm::PushSupplier_var supplier_ref = 00390 this->supplier_._this (); 00391 00392 // Obtain the consumer.... 00393 this->default_consumer_proxy_ = 00394 supplier_admin->obtain_push_consumer (); 00395 00396 pub.publications.length (c); 00397 // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier ")); 00398 // ACE_SupplierQOS_Factory::debug (pub); 00399 this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (), 00400 pub); 00401 } 00402 00403 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = 00404 this->supplier_ec_->for_consumers (); 00405 00406 this->supplier_proxy_ = 00407 consumer_admin->obtain_push_supplier (); 00408 00409 this->consumer_is_active_ = true; 00410 RtecEventComm::PushConsumer_var consumer_ref = 00411 this->consumer_._this (); 00412 00413 // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer ")); 00414 // ACE_ConsumerQOS_Factory::debug (sub); 00415 00416 this->supplier_proxy_->connect_push_consumer (consumer_ref.in (), 00417 sub); 00418 }
void TAO_EC_Gateway_IIOP::push | ( | const RtecEventComm::EventSet & | events | ) |
This is the consumer side behavior, it pushes the events to the local event channel.
Definition at line 444 of file EC_Gateway_IIOP.cpp.
References ACE_ES_EVENT_SOURCE_ANY, ACE_GUARD, busy_count_, cleanup_consumer_proxies_i(), cleanup_posted_, CORBA::is_nil(), push_to_consumer(), TAO_SYNCH_MUTEX, update_consumer_i(), and update_posted_.
00445 { 00446 // ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n")); 00447 00448 if (events.length () == 0) 00449 { 00450 // ACE_DEBUG ((LM_DEBUG, "no events\n")); 00451 return; 00452 } 00453 00454 { 00455 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00456 00457 this->busy_count_++; 00458 } 00459 00460 // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ())); 00461 00462 // @@ TODO, there is an extra data copy here, we should do the event 00463 // modification without it and only compact the necessary events. 00464 RtecEventComm::EventSet out (1); 00465 out.length (1); 00466 for (CORBA::ULong i = 0; i < events.length (); ++i) 00467 { 00468 if (this->use_ttl_ == 1) 00469 { 00470 if (events[i].header.ttl == 0) 00471 continue; 00472 } 00473 00474 RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0; 00475 RtecEventComm::EventSourceID sid = events[i].header.source; 00476 if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0 00477 || this->consumer_proxy_map_.find (sid, proxy) == -1) 00478 { 00479 // If the source is not in our map or we should not use the map then 00480 // use the default consumer proxy. 00481 proxy = this->default_consumer_proxy_.in (); 00482 } 00483 00484 if (CORBA::is_nil (proxy)) 00485 continue; 00486 00487 out[0] = events[i]; 00488 00489 if (this->use_ttl_ == 1) 00490 out[0].header.ttl--; 00491 00492 // ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n")); 00493 this->push_to_consumer(proxy, out); 00494 } 00495 00496 { 00497 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00498 00499 this->busy_count_--; 00500 00501 if (this->busy_count_ == 0 && this->cleanup_posted_ != 0) 00502 { 00503 this->cleanup_posted_ = 0; 00504 this->cleanup_consumer_proxies_i (); 00505 } 00506 00507 if (this->busy_count_ == 0 && this->update_posted_ != 0) 00508 { 00509 this->update_posted_ = 0; 00510 this->update_consumer_i (this->c_qos_); 00511 } 00512 } 00513 }
void TAO_EC_Gateway_IIOP::push_to_consumer | ( | RtecEventChannelAdmin::ProxyPushConsumer_ptr | consumer, | |
const RtecEventComm::EventSet & | event | |||
) | [private] |
Push the event to the consumer.
Definition at line 516 of file EC_Gateway_IIOP.cpp.
References ec_control_, TAO_ECG_ConsumerEC_Control::event_channel_not_exist(), and TAO_ECG_ConsumerEC_Control::system_exception().
Referenced by push().
00519 { 00520 try 00521 { 00522 consumer->push (event); 00523 } 00524 catch (const CORBA::OBJECT_NOT_EXIST&) 00525 { 00526 ec_control_->event_channel_not_exist (this); 00527 } 00528 catch (CORBA::SystemException& sysex) 00529 { 00530 ec_control_->system_exception (this, 00531 sysex); 00532 } 00533 catch (const CORBA::Exception&) 00534 { 00535 // Shouldn't happen. 00536 } 00537 }
void TAO_EC_Gateway_IIOP::reconnect_consumer_ec | ( | void | ) |
Definition at line 177 of file EC_Gateway_IIOP.cpp.
References ACE_GUARD, c_qos_, TAO_SYNCH_MUTEX, update_consumer_i(), and update_posted_.
Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::reconnect().
00178 { 00179 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00180 00181 if (this->busy_count_ != 0) 00182 { 00183 this->update_posted_ = 1; 00184 return; 00185 } 00186 00187 this->update_consumer_i (c_qos_); 00188 }
void TAO_EC_Gateway_IIOP::resume_supplier_ec | ( | void | ) |
Resume the connection to the supplier ec.
Definition at line 653 of file EC_Gateway_IIOP.cpp.
References CORBA::is_nil(), supplier_ec_suspended_, and supplier_proxy_.
00654 { 00655 if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 1) 00656 { 00657 this->supplier_proxy_->resume_connection (); 00658 00659 supplier_ec_suspended_ = 0; 00660 } 00661 }
int TAO_EC_Gateway_IIOP::shutdown | ( | void | ) |
Disconnect and shutdown the gateway.
Definition at line 540 of file EC_Gateway_IIOP.cpp.
References ACE_GUARD_RETURN, cleanup_consumer_ec_i(), cleanup_supplier_ec_i(), close_i(), consumer_, consumer_is_active_, ec_control_, TAO_ECG_ConsumerEC_Control::shutdown(), supplier_, supplier_is_active_, and TAO_SYNCH_MUTEX.
00541 { 00542 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00543 00544 ec_control_->shutdown(); 00545 00546 this->close_i (); 00547 00548 if (this->supplier_is_active_) 00549 { 00550 PortableServer::POA_var poa = 00551 this->supplier_._default_POA (); 00552 PortableServer::ObjectId_var id = 00553 poa->servant_to_id (&this->supplier_); 00554 poa->deactivate_object (id.in ()); 00555 this->supplier_is_active_ = false; 00556 } 00557 00558 if (this->consumer_is_active_) 00559 { 00560 PortableServer::POA_var poa = 00561 this->consumer_._default_POA (); 00562 PortableServer::ObjectId_var id = 00563 poa->servant_to_id (&this->consumer_); 00564 poa->deactivate_object (id.in ()); 00565 this->consumer_is_active_ = false; 00566 } 00567 00568 this->cleanup_consumer_ec_i (); 00569 this->cleanup_supplier_ec_i (); 00570 00571 return 0; 00572 }
void TAO_EC_Gateway_IIOP::suspend_supplier_ec | ( | void | ) |
Suspend the connection to the supplier ec.
Definition at line 642 of file EC_Gateway_IIOP.cpp.
References CORBA::is_nil(), supplier_ec_suspended_, and supplier_proxy_.
Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist(), and TAO_ECG_Reconnect_ConsumerEC_Control::system_exception().
00643 { 00644 if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 0) 00645 { 00646 this->supplier_proxy_->suspend_connection (); 00647 00648 supplier_ec_suspended_ = 1; 00649 } 00650 }
void TAO_EC_Gateway_IIOP::update_consumer | ( | const RtecEventChannelAdmin::ConsumerQOS & | sub | ) | [virtual] |
Definition at line 191 of file EC_Gateway_IIOP.cpp.
References ACE_GUARD, c_qos_, RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_SYNCH_MUTEX, update_consumer_i(), and update_posted_.
00193 { 00194 if (c_qos.dependencies.length () == 0) 00195 return; 00196 00197 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00198 00199 this->c_qos_ = c_qos; 00200 00201 if (this->busy_count_ != 0) 00202 { 00203 this->update_posted_ = 1; 00204 return; 00205 } 00206 00207 this->update_consumer_i (c_qos); 00208 }
void TAO_EC_Gateway_IIOP::update_consumer_i | ( | const RtecEventChannelAdmin::ConsumerQOS & | sub | ) | [private] |
Definition at line 238 of file EC_Gateway_IIOP.cpp.
References close_i(), CORBA::is_nil(), and open_i().
Referenced by push(), reconnect_consumer_ec(), and update_consumer().
00240 { 00241 this->close_i (); 00242 00243 if (CORBA::is_nil (this->consumer_ec_.in ()) 00244 || CORBA::is_nil (this->supplier_ec_.in ())) 00245 return; 00246 00247 // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n")); 00248 00249 this->open_i (c_qos); 00250 }
void TAO_EC_Gateway_IIOP::update_supplier | ( | const RtecEventChannelAdmin::SupplierQOS & | pub | ) | [virtual] |
CORBA::ULong TAO_EC_Gateway_IIOP::busy_count_ [protected] |
How many threads are running push() we cannot make changes until that reaches 0
Definition at line 164 of file EC_Gateway_IIOP.h.
Referenced by push().
Definition at line 172 of file EC_Gateway_IIOP.h.
Referenced by reconnect_consumer_ec(), and update_consumer().
int TAO_EC_Gateway_IIOP::cleanup_posted_ [protected] |
We have a cleanup outstanding and must wait doing cleanup until all pushes are ready.
Definition at line 178 of file EC_Gateway_IIOP.h.
Referenced by cleanup_consumer_proxies(), and push().
ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> TAO_EC_Gateway_IIOP::consumer_ [protected] |
Our consumer personality....
Definition at line 198 of file EC_Gateway_IIOP.h.
Referenced by open_i(), and shutdown().
RtecEventChannelAdmin::EventChannel_var TAO_EC_Gateway_IIOP::consumer_ec_ [protected] |
The event channel acting as consumer of this gateway.
Definition at line 190 of file EC_Gateway_IIOP.h.
Referenced by cleanup_consumer_ec_i(), init_i(), and open_i().
Our RT_Infos for the event channel that is the consumer.
Definition at line 195 of file EC_Gateway_IIOP.h.
Referenced by TAO_EC_Gateway_Sched::init(), and open_i().
bool TAO_EC_Gateway_IIOP::consumer_is_active_ [protected] |
If it is true then we must deactivate the consumer.
Definition at line 201 of file EC_Gateway_IIOP.h.
Referenced by open_i(), and shutdown().
Consumer_Map TAO_EC_Gateway_IIOP::consumer_proxy_map_ [protected] |
We talk to the EC (as a supplier) using either an per-supplier proxy or a generic proxy for the type only subscriptions. We push the events to these proxies
Definition at line 216 of file EC_Gateway_IIOP.h.
Referenced by cleanup_consumer_proxies_i(), disconnect_consumer_proxies_i(), and open_i().
RtecEventChannelAdmin::ProxyPushConsumer_var TAO_EC_Gateway_IIOP::default_consumer_proxy_ [protected] |
Definition at line 217 of file EC_Gateway_IIOP.h.
Referenced by cleanup_consumer_proxies_i(), disconnect_consumer_proxies_i(), and open_i().
The consumer ec control which controls the behaviour in case of a misbehaving consumer ec
Definition at line 225 of file EC_Gateway_IIOP.h.
Referenced by init_i(), push_to_consumer(), shutdown(), and ~TAO_EC_Gateway_IIOP().
TAO_EC_Gateway_IIOP_Factory* TAO_EC_Gateway_IIOP::factory_ [protected] |
The Gateway IIOP Factory for all the settings.
Definition at line 228 of file EC_Gateway_IIOP.h.
Referenced by init_i().
TAO_SYNCH_MUTEX TAO_EC_Gateway_IIOP::lock_ [protected] |
ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> TAO_EC_Gateway_IIOP::supplier_ [protected] |
Our supplier personality....
Definition at line 204 of file EC_Gateway_IIOP.h.
Referenced by open_i(), and shutdown().
RtecEventChannelAdmin::EventChannel_var TAO_EC_Gateway_IIOP::supplier_ec_ [protected] |
The event channel acting as supplier for this gateway so we can reconnect when the list changes.
Definition at line 187 of file EC_Gateway_IIOP.h.
Referenced by cleanup_supplier_ec_i(), init_i(), and open_i().
int TAO_EC_Gateway_IIOP::supplier_ec_suspended_ [protected] |
Is the supplier ec suspended?
Definition at line 183 of file EC_Gateway_IIOP.h.
Referenced by resume_supplier_ec(), and suspend_supplier_ec().
Our RT_Infos for the event channel that is the supplier.
Definition at line 193 of file EC_Gateway_IIOP.h.
Referenced by TAO_EC_Gateway_Sched::init().
bool TAO_EC_Gateway_IIOP::supplier_is_active_ [protected] |
If it is true then we must deactivate the supplier.
Definition at line 207 of file EC_Gateway_IIOP.h.
Referenced by open_i(), and shutdown().
RtecEventChannelAdmin::ProxyPushSupplier_var TAO_EC_Gateway_IIOP::supplier_proxy_ [protected] |
We talk to the EC (as a consumer) using this proxy. We receive the events from these proxy
Definition at line 221 of file EC_Gateway_IIOP.h.
Referenced by disconnect_supplier_proxy_i(), open_i(), resume_supplier_ec(), and suspend_supplier_ec().
int TAO_EC_Gateway_IIOP::update_posted_ [protected] |
An update_consumer() message arrived *while* we were doing a push() the modification is stored, if multiple update_consumer messages arrive only the last one is executed.
Definition at line 171 of file EC_Gateway_IIOP.h.
Referenced by push(), reconnect_consumer_ec(), and update_consumer().
int TAO_EC_Gateway_IIOP::use_consumer_proxy_map_ [protected] |
The flag for using the consumer proxy map. With 1 the consumer proxy map is used, meaning that for each unique source id we use a different proxy push consumer, if 0, we only use one proxy push consumer (the default) for all source ids.
Definition at line 237 of file EC_Gateway_IIOP.h.
int TAO_EC_Gateway_IIOP::use_ttl_ [protected] |
If 1, we use the TTL flags, if 0, we just ignore TTL.
Definition at line 231 of file EC_Gateway_IIOP.h.