#include <EC_Gateway_IIOP.h>
Inheritance diagram for TAO_EC_Gateway_IIOP:
Public Member Functions | |
TAO_EC_Gateway_IIOP (void) | |
virtual | ~TAO_EC_Gateway_IIOP (void) |
int | init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, RtecEventChannelAdmin::EventChannel_ptr consumer_ec) |
void | disconnect_push_supplier (void) |
The channel is disconnecting. | |
void | disconnect_push_consumer (void) |
The channel is disconnecting. | |
void | push (const RtecEventComm::EventSet &events) |
int | shutdown (void) |
Disconnect and shutdown the gateway. | |
virtual void | close (void) |
virtual void | update_consumer (const RtecEventChannelAdmin::ConsumerQOS &sub) |
virtual void | update_supplier (const RtecEventChannelAdmin::SupplierQOS &pub) |
void | reconnect_consumer_ec (void) |
CORBA::Boolean | consumer_ec_non_existent (CORBA::Boolean_out disconnected) |
Check whether the consumer event channel is non existent or not. | |
void | cleanup_consumer_proxies (void) |
int | cleanup_consumer_ec (void) |
int | cleanup_supplier_ec (void) |
void | suspend_supplier_ec (void) |
Suspend the connection to the supplier ec. | |
void | resume_supplier_ec (void) |
Resume the connection to the supplier ec. | |
Protected Types | |
typedef ACE_Map_Manager< RtecEventComm::EventSourceID, RtecEventChannelAdmin::ProxyPushConsumer_ptr, ACE_Null_Mutex > | Consumer_Map |
typedef ACE_Map_Iterator< RtecEventComm::EventSourceID, RtecEventChannelAdmin::ProxyPushConsumer_ptr, ACE_Null_Mutex > | Consumer_Map_Iterator |
Protected Member Functions | |
int | init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, RtecEventChannelAdmin::EventChannel_ptr consumer_ec) |
Do the real work in init(). | |
Protected Attributes | |
TAO_SYNCH_MUTEX | lock_ |
Lock to synchronize internal changes. | |
CORBA::ULong | busy_count_ |
int | update_posted_ |
RtecEventChannelAdmin::ConsumerQOS | c_qos_ |
int | cleanup_posted_ |
int | supplier_ec_suspended_ |
RtecEventChannelAdmin::EventChannel_var | supplier_ec_ |
RtecEventChannelAdmin::EventChannel_var | consumer_ec_ |
The event channel acting as consumer of this gateway. | |
RtecBase::handle_t | supplier_info_ |
Our RT_Infos for the event channel that is the supplier. | |
RtecBase::handle_t | consumer_info_ |
Our RT_Infos for the event channel that is the consumer. | |
ACE_PushConsumer_Adapter< TAO_EC_Gateway_IIOP > | consumer_ |
Our consumer personality.... | |
int | consumer_is_active_ |
If it is not 0 then we must deactivate the consumer. | |
ACE_PushSupplier_Adapter< TAO_EC_Gateway_IIOP > | supplier_ |
Our supplier personality.... | |
int | supplier_is_active_ |
If it is not 0 then we must deactivate the supplier. | |
Consumer_Map | consumer_proxy_map_ |
RtecEventChannelAdmin::ProxyPushConsumer_var | default_consumer_proxy_ |
RtecEventChannelAdmin::ProxyPushSupplier_var | supplier_proxy_ |
TAO_ECG_ConsumerEC_Control * | ec_control_ |
TAO_EC_Gateway_IIOP_Factory * | factory_ |
The Gateway IIOP Factory for all the settings. | |
int | use_ttl_ |
If 1, we use the TTL flags, if 0, we just ignore TTL. | |
int | use_consumer_proxy_map_ |
Private Member Functions | |
void | close_i (void) |
void | disconnect_supplier_proxy_i (void) |
Disconnect the supplier proxy. | |
void | disconnect_consumer_proxies_i (void) |
Disconnect all consumer proxies. | |
void | cleanup_consumer_proxies_i (void) |
Remove all consumer proxies without calling disconnect on them. | |
void | update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS &sub) |
void | open_i (const RtecEventChannelAdmin::ConsumerQOS &sub) |
Create all connections to consumer ec and to supplier ec. | |
CORBA::Boolean | is_consumer_ec_connected_i (void) const |
Helper method to see if consumer ec is connected. | |
void | push_to_consumer (RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer, const RtecEventComm::EventSet &event) |
Push the event to the consumer. | |
void | cleanup_consumer_ec_i (void) |
void | cleanup_supplier_ec_i (void) |
This class mediates among two event channels, it connects as a consumer of events with a remote event channel, and as a supplier of events with the local EC. As a consumer it gives a QoS designed to only accept the events in which *local* consumers are interested. Eventually the local EC should create this object and compute its QoS in an automated manner; but this requires some way to filter out the peers registered as consumers, otherwise we will get loops in the QoS graph. It uses exactly the same set of events in the publications list when connected as a supplier.
Definition at line 65 of file EC_Gateway_IIOP.h.
|
Definition at line 210 of file EC_Gateway_IIOP.h. |
|
Definition at line 211 of file EC_Gateway_IIOP.h. Referenced by cleanup_consumer_proxies_i(), disconnect_consumer_proxies_i(), and open_i(). |
|
|
Definition at line 56 of file EC_Gateway_IIOP.cpp. References ec_control_.
00057 { 00058 delete ec_control_; 00059 ec_control_ = 0; 00060 } |
|
Cleanup the connection to the consumer ec. Doesn't call anything on the ec again, just set the object to nil Definition at line 576 of file EC_Gateway_IIOP.cpp. References ACE_GUARD_RETURN, cleanup_consumer_ec_i(), and TAO_SYNCH_MUTEX. Referenced by TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist(), and TAO_ECG_Reactive_ConsumerEC_Control::system_exception().
00577 { 00578 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00579 00580 this->cleanup_consumer_ec_i (); 00581 00582 return 0; 00583 } |
|
Definition at line 596 of file EC_Gateway_IIOP.cpp. References consumer_ec_. Referenced by cleanup_consumer_ec(), and shutdown().
00597 { 00598 this->consumer_ec_ = 00599 RtecEventChannelAdmin::EventChannel::_nil (); 00600 } |
|
Cleanup all consumer proxies we have without trying to tell the consumer that we are going to disconnect. This can be used to cleanup the consumer proxy administration in case we know that the consumers are all unreachable. Definition at line 106 of file EC_Gateway_IIOP.cpp. References ACE_GUARD, cleanup_consumer_proxies_i(), cleanup_posted_, and TAO_SYNCH_MUTEX. Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist(), TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist(), TAO_ECG_Reconnect_ConsumerEC_Control::system_exception(), and TAO_ECG_Reactive_ConsumerEC_Control::system_exception().
00107 { 00108 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00109 00110 // In case we are still pushing, don't cleanup the proxies 00111 if (this->busy_count_ != 0) 00112 { 00113 this->cleanup_posted_ = 1; 00114 return; 00115 } 00116 00117 this->cleanup_consumer_proxies_i (); 00118 } |
|
Remove all consumer proxies without calling disconnect on them.
Definition at line 211 of file EC_Gateway_IIOP.cpp. References ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::begin(), Consumer_Map_Iterator, consumer_proxy_map_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::current_size(), default_consumer_proxy_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), CORBA::is_nil(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::open(), and CORBA::release(). Referenced by cleanup_consumer_proxies(), and push().
00212 { 00213 if (this->consumer_proxy_map_.current_size () > 0) 00214 { 00215 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin (); 00216 j != this->consumer_proxy_map_.end (); 00217 ++j) 00218 { 00219 RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_; 00220 if (CORBA::is_nil (consumer)) 00221 continue; 00222 00223 CORBA::release (consumer); 00224 } 00225 // Remove all the elements on the map. Calling close() does not 00226 // work because the map is left in an inconsistent state. 00227 this->consumer_proxy_map_.open (); 00228 } 00229 00230 if (!CORBA::is_nil (this->default_consumer_proxy_.in ())) 00231 { 00232 this->default_consumer_proxy_ = 00233 RtecEventChannelAdmin::ProxyPushConsumer::_nil (); 00234 } 00235 } |
|
Cleanup the connection to the supplier ec. Doesn't call anything on the ec again, just set the object to nil Definition at line 586 of file EC_Gateway_IIOP.cpp. References ACE_GUARD_RETURN, cleanup_supplier_ec_i(), and TAO_SYNCH_MUTEX.
00587 { 00588 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00589 00590 this->cleanup_supplier_ec_i (); 00591 00592 return 0; 00593 } |
|
Definition at line 603 of file EC_Gateway_IIOP.cpp. References supplier_ec_. Referenced by cleanup_supplier_ec(), and shutdown().
00604 { 00605 this->supplier_ec_ = 00606 RtecEventChannelAdmin::EventChannel::_nil (); 00607 } |
|
The gateway must disconnect from all the relevant event channels, or any other communication media (such as multicast groups). Implements TAO_EC_Gateway. Definition at line 98 of file EC_Gateway_IIOP.cpp. References ACE_GUARD, close_i(), and TAO_SYNCH_MUTEX.
|
|
Definition at line 121 of file EC_Gateway_IIOP.cpp. References disconnect_consumer_proxies_i(), and disconnect_supplier_proxy_i(). Referenced by close(), shutdown(), and update_consumer_i().
00122 { 00123 // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n")); 00124 this->disconnect_consumer_proxies_i (); 00125 00126 this->disconnect_supplier_proxy_i (); 00127 } |
|
Check whether the consumer event channel is non existent or not.
Definition at line 616 of file EC_Gateway_IIOP.cpp. References CORBA::Object::_duplicate(), ACE_GUARD_THROW_EX, is_consumer_ec_connected_i(), and TAO_SYNCH_MUTEX. Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel(), TAO_ECG_Reactive_ConsumerEC_Control::query_eventchannel(), and TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect().
00618 { 00619 CORBA::Object_var consumer_ec; 00620 { 00621 ACE_GUARD_THROW_EX ( 00622 TAO_SYNCH_MUTEX, ace_mon, this->lock_, 00623 CORBA::INTERNAL ()); 00624 00625 disconnected = 0; 00626 if (this->is_consumer_ec_connected_i () == 0) 00627 { 00628 disconnected = 1; 00629 return 0; 00630 } 00631 00632 consumer_ec = CORBA::Object::_duplicate (this->consumer_ec_.in ()); 00633 } 00634 00635 #if (TAO_HAS_MINIMUM_CORBA == 0) 00636 return consumer_ec->_non_existent (); 00637 #else 00638 return 0; 00639 #endif /* TAO_HAS_MINIMUM_CORBA */ 00640 } |
|
Disconnect all consumer proxies.
Definition at line 130 of file EC_Gateway_IIOP.cpp. References ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::begin(), Consumer_Map_Iterator, consumer_proxy_map_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::current_size(), default_consumer_proxy_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), CORBA::is_nil(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::open(), and CORBA::release(). Referenced by close_i().
00131 { 00132 if (this->consumer_proxy_map_.current_size () > 0) 00133 { 00134 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin (); 00135 j != this->consumer_proxy_map_.end (); 00136 ++j) 00137 { 00138 RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_; 00139 if (CORBA::is_nil (consumer)) 00140 continue; 00141 try 00142 { 00143 consumer->disconnect_push_consumer (); 00144 } 00145 catch (const CORBA::Exception&) 00146 { 00147 } 00148 CORBA::release (consumer); 00149 } 00150 // Remove all the elements on the map. Calling close() does not 00151 // work because the map is left in an inconsistent state. 00152 this->consumer_proxy_map_.open (); 00153 } 00154 00155 if (!CORBA::is_nil (this->default_consumer_proxy_.in ())) 00156 { 00157 this->default_consumer_proxy_->disconnect_push_consumer (); 00158 00159 this->default_consumer_proxy_ = 00160 RtecEventChannelAdmin::ProxyPushConsumer::_nil (); 00161 } 00162 } |
|
The channel is disconnecting.
Definition at line 429 of file EC_Gateway_IIOP.cpp.
00430 { 00431 // ACE_DEBUG ((LM_DEBUG, 00432 // "ECG (%t): Supplier-consumer received " 00433 // "disconnect from channel.\n")); 00434 } |
|
The channel is disconnecting.
Definition at line 437 of file EC_Gateway_IIOP.cpp.
00438 { 00439 // ACE_DEBUG ((LM_DEBUG, 00440 // "ECG (%t): Supplier received " 00441 // "disconnect from channel.\n")); 00442 } |
|
Disconnect the supplier proxy.
Definition at line 165 of file EC_Gateway_IIOP.cpp. References CORBA::is_nil(), and supplier_proxy_. Referenced by close_i().
00166 { 00167 if (!CORBA::is_nil (this->supplier_proxy_.in ())) 00168 { 00169 this->supplier_proxy_->disconnect_push_supplier (); 00170 00171 this->supplier_proxy_ = 00172 RtecEventChannelAdmin::ProxyPushSupplier::_nil (); 00173 } 00174 } |
|
To do its job this class requires to know the local and remote ECs it will connect to.
Definition at line 63 of file EC_Gateway_IIOP.cpp. References ACE_GUARD_RETURN, init_i(), and TAO_SYNCH_MUTEX.
00065 { 00066 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00067 00068 return this->init_i (supplier_ec, consumer_ec); 00069 } |
|
Do the real work in init().
Definition at line 72 of file EC_Gateway_IIOP.cpp. References ACE_ERROR_RETURN, TAO_ECG_ConsumerEC_Control::activate(), consumer_ec_, TAO_EC_Gateway_IIOP_Factory::create_consumerec_control(), ec_control_, CORBA::is_nil(), LM_ERROR, and supplier_ec_. Referenced by TAO_EC_Gateway_Sched::init(), and init().
00074 { 00075 if (CORBA::is_nil (this->supplier_ec_.in ()) && CORBA::is_nil (this->consumer_ec_.in ())) 00076 { 00077 this->supplier_ec_ = 00078 RtecEventChannelAdmin::EventChannel::_duplicate (supplier_ec); 00079 this->consumer_ec_ = 00080 RtecEventChannelAdmin::EventChannel::_duplicate (consumer_ec); 00081 00082 if (ec_control_ == 0) 00083 { 00084 ec_control_ = factory_->create_consumerec_control(this); 00085 ec_control_->activate(); 00086 } 00087 00088 return 0; 00089 } 00090 else 00091 ACE_ERROR_RETURN ((LM_ERROR, 00092 "TAO_EC_Gateway_IIOP - init_i " 00093 "Supplier and consumer event channel reference " 00094 "should be nil.\n"), -1); 00095 } |
|
Helper method to see if consumer ec is connected.
Definition at line 610 of file EC_Gateway_IIOP.cpp. References CORBA::is_nil(). Referenced by consumer_ec_non_existent().
00611 { 00612 return !CORBA::is_nil (this->consumer_ec_.in ()); 00613 } |
|
Create all connections to consumer ec and to supplier ec.
Definition at line 253 of file EC_Gateway_IIOP.cpp. References ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_UNDEFINED, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::begin(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::bind(), consumer_ec_, consumer_info_, consumer_is_active_, Consumer_Map_Iterator, consumer_proxy_map_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::current_size(), default_consumer_proxy_, RtecEventChannelAdmin::ConsumerQOS::dependencies, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), RtecEventComm::EventSourceID, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::find(), RtecEventChannelAdmin::SupplierQOS::is_gateway, RtecEventChannelAdmin::ConsumerQOS::is_gateway, RtecEventChannelAdmin::SupplierQOS::publications, RtecEventComm::EventHeader::source, supplier_, supplier_ec_, supplier_info_, supplier_is_active_, supplier_proxy_, RtecEventComm::EventHeader::type, and use_consumer_proxy_map_. Referenced by update_consumer_i().
00255 { 00256 // = Connect as a supplier to the consumer EC 00257 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = 00258 this->consumer_ec_->for_suppliers (); 00259 00260 RtecEventChannelAdmin::ConsumerQOS sub = c_qos; 00261 sub.is_gateway = 1; 00262 00263 // Change the RT_Info in the consumer QoS. 00264 // On the same loop we discover the subscriptions by event source, 00265 // and fill the consumer proxy map if we have to use this map. 00266 for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i) 00267 { 00268 sub.dependencies[i].rt_info = this->supplier_info_; 00269 00270 RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0; 00271 const RtecEventComm::EventHeader &h = 00272 sub.dependencies[i].event.header; 00273 00274 RtecEventComm::EventSourceID sid = h.source; 00275 00276 //ACE_DEBUG ((LM_DEBUG, 00277 // "ECG (%t) trying (%d,%d)\n", 00278 // sid, h.type)); 00279 00280 // Skip all subscriptions that do not require an specific source 00281 // id or skip all subscriptions when we don't need to use the consumer 00282 // proxy map. 00283 if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0) 00284 continue; 00285 00286 // Skip all the magic event types. 00287 if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED) 00288 continue; 00289 00290 if (this->consumer_proxy_map_.find (sid, proxy) == -1) 00291 { 00292 //ACE_DEBUG ((LM_DEBUG, 00293 // "ECG (%t) binding source %d\n", 00294 // sid)); 00295 proxy = supplier_admin->obtain_push_consumer (); 00296 this->consumer_proxy_map_.bind (sid, proxy); 00297 } 00298 } 00299 //ACE_DEBUG ((LM_DEBUG, 00300 // "ECG (%t) consumer map computed (%d entries)\n", 00301 // this->consumer_proxy_map_.current_size ())); 00302 00303 if (this->consumer_proxy_map_.current_size () > 0) 00304 { 00305 this->supplier_is_active_ = 1; 00306 00307 // Obtain a reference to our supplier personality... 00308 RtecEventComm::PushSupplier_var supplier_ref = 00309 this->supplier_._this (); 00310 00311 // For each subscription by source build the set of publications 00312 // (they may several, by type, for instance) and connect to the 00313 // consumer proxy. 00314 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin (); 00315 j != this->consumer_proxy_map_.end (); 00316 ++j) 00317 { 00318 RtecEventChannelAdmin::SupplierQOS pub; 00319 pub.publications.length (sub.dependencies.length () + 1); 00320 pub.is_gateway = 1; 00321 00322 int c = 0; 00323 00324 RtecEventComm::EventSourceID sid = (*j).ext_id_; 00325 for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k) 00326 { 00327 const RtecEventComm::EventHeader& h = 00328 sub.dependencies[k].event.header; 00329 if (h.source != sid 00330 || (ACE_ES_EVENT_ANY < h.type 00331 && h.type < ACE_ES_EVENT_UNDEFINED)) 00332 continue; 00333 pub.publications[c].event.header = h; 00334 pub.publications[c].dependency_info.dependency_type = 00335 RtecBase::TWO_WAY_CALL; 00336 pub.publications[c].dependency_info.number_of_calls = 1; 00337 pub.publications[c].dependency_info.rt_info = this->consumer_info_; 00338 c++; 00339 } 00340 //ACE_DEBUG ((LM_DEBUG, 00341 // "ECG (%t) supplier id %d has %d elements\n", 00342 // sid, c)); 00343 if (c == 0) 00344 continue; 00345 00346 pub.publications.length (c); 00347 00348 // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier ")); 00349 // ACE_SupplierQOS_Factory::debug (pub); 00350 (*j).int_id_->connect_push_supplier (supplier_ref.in (), 00351 pub); 00352 } 00353 } 00354 00355 // Also build the subscriptions that are *not* by source when we use the 00356 // consumer proxy map, and all subscriptions when we don't use the map and 00357 // then connect to the default consumer proxy. 00358 RtecEventChannelAdmin::SupplierQOS pub; 00359 pub.publications.length (sub.dependencies.length () + 1); 00360 pub.is_gateway = 1; 00361 int c = 0; 00362 for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k) 00363 { 00364 const RtecEventComm::EventHeader& h = 00365 sub.dependencies[k].event.header; 00366 RtecEventComm::EventSourceID sid = h.source; 00367 00368 // Skip all subscriptions with a specific source when we use the map 00369 if (sid != ACE_ES_EVENT_SOURCE_ANY && this->use_consumer_proxy_map_ == 1) 00370 continue; 00371 00372 // Skip all the magic event types. 00373 if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED) 00374 continue; 00375 00376 pub.publications[c].event.header = h; 00377 pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero (); 00378 pub.publications[c].dependency_info.dependency_type = 00379 RtecBase::TWO_WAY_CALL; 00380 pub.publications[c].dependency_info.number_of_calls = 1; 00381 pub.publications[c].dependency_info.rt_info = this->consumer_info_; 00382 c++; 00383 } 00384 00385 if (c > 0) 00386 { 00387 this->supplier_is_active_ = 1; 00388 00389 // Obtain a reference to our supplier personality... 00390 RtecEventComm::PushSupplier_var supplier_ref = 00391 this->supplier_._this (); 00392 00393 // Obtain the consumer.... 00394 this->default_consumer_proxy_ = 00395 supplier_admin->obtain_push_consumer (); 00396 00397 pub.publications.length (c); 00398 // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier ")); 00399 // ACE_SupplierQOS_Factory::debug (pub); 00400 this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (), 00401 pub); 00402 } 00403 00404 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = 00405 this->supplier_ec_->for_consumers (); 00406 00407 this->supplier_proxy_ = 00408 consumer_admin->obtain_push_supplier (); 00409 00410 this->consumer_is_active_ = 1; 00411 RtecEventComm::PushConsumer_var consumer_ref = 00412 this->consumer_._this (); 00413 00414 // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer ")); 00415 // ACE_ConsumerQOS_Factory::debug (sub); 00416 00417 this->supplier_proxy_->connect_push_consumer (consumer_ref.in (), 00418 sub); 00419 } |
|
This is the consumer side behavior, it pushes the events to the local event channel. Definition at line 445 of file EC_Gateway_IIOP.cpp. References ACE_ES_EVENT_SOURCE_ANY, ACE_GUARD, cleanup_consumer_proxies_i(), cleanup_posted_, consumer_proxy_map_, default_consumer_proxy_, RtecEventComm::EventSet, RtecEventComm::EventSourceID, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::find(), CORBA::is_nil(), push_to_consumer(), TAO_SYNCH_MUTEX, update_consumer_i(), update_posted_, use_consumer_proxy_map_, and use_ttl_.
00446 { 00447 // ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n")); 00448 00449 if (events.length () == 0) 00450 { 00451 // ACE_DEBUG ((LM_DEBUG, "no events\n")); 00452 return; 00453 } 00454 00455 { 00456 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00457 00458 this->busy_count_++; 00459 } 00460 00461 // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ())); 00462 00463 // @@ TODO, there is an extra data copy here, we should do the event 00464 // modification without it and only compact the necessary events. 00465 RtecEventComm::EventSet out (1); 00466 out.length (1); 00467 for (CORBA::ULong i = 0; i < events.length (); ++i) 00468 { 00469 if (this->use_ttl_ == 1) 00470 { 00471 if (events[i].header.ttl == 0) 00472 continue; 00473 } 00474 00475 RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0; 00476 RtecEventComm::EventSourceID sid = events[i].header.source; 00477 if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0 00478 || this->consumer_proxy_map_.find (sid, proxy) == -1) 00479 { 00480 // If the source is not in our map or we should not use the map then 00481 // use the default consumer proxy. 00482 proxy = this->default_consumer_proxy_.in (); 00483 } 00484 00485 if (CORBA::is_nil (proxy)) 00486 continue; 00487 00488 out[0] = events[i]; 00489 00490 if (this->use_ttl_ == 1) 00491 out[0].header.ttl--; 00492 00493 // ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n")); 00494 this->push_to_consumer(proxy, out); 00495 } 00496 00497 { 00498 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00499 00500 this->busy_count_--; 00501 00502 if (this->busy_count_ == 0 && this->cleanup_posted_ != 0) 00503 { 00504 this->cleanup_posted_ = 0; 00505 this->cleanup_consumer_proxies_i (); 00506 } 00507 00508 if (this->busy_count_ == 0 && this->update_posted_ != 0) 00509 { 00510 this->update_posted_ = 0; 00511 this->update_consumer_i (this->c_qos_); 00512 } 00513 } 00514 } |
|
Push the event to the consumer.
Definition at line 517 of file EC_Gateway_IIOP.cpp. References ec_control_, TAO_ECG_ConsumerEC_Control::event_channel_not_exist(), RtecEventComm::EventSet, and TAO_ECG_ConsumerEC_Control::system_exception(). Referenced by push().
00520 { 00521 try 00522 { 00523 consumer->push (event); 00524 } 00525 catch (const CORBA::OBJECT_NOT_EXIST&) 00526 { 00527 ec_control_->event_channel_not_exist (this); 00528 } 00529 catch (CORBA::SystemException& sysex) 00530 { 00531 ec_control_->system_exception (this, 00532 sysex); 00533 } 00534 catch (const CORBA::Exception&) 00535 { 00536 // Shouldn't happen. 00537 } 00538 } |
|
Definition at line 177 of file EC_Gateway_IIOP.cpp. References ACE_GUARD, c_qos_, TAO_SYNCH_MUTEX, update_consumer_i(), and update_posted_. Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::reconnect().
00178 { 00179 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00180 00181 if (this->busy_count_ != 0) 00182 { 00183 this->update_posted_ = 1; 00184 return; 00185 } 00186 00187 this->update_consumer_i (c_qos_); 00188 } |
|
Resume the connection to the supplier ec.
Definition at line 654 of file EC_Gateway_IIOP.cpp. References CORBA::is_nil(), supplier_ec_suspended_, and supplier_proxy_.
00655 { 00656 if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 1) 00657 { 00658 this->supplier_proxy_->resume_connection (); 00659 00660 supplier_ec_suspended_ = 0; 00661 } 00662 } |
|
Disconnect and shutdown the gateway.
Definition at line 541 of file EC_Gateway_IIOP.cpp. References ACE_GUARD_RETURN, cleanup_consumer_ec_i(), cleanup_supplier_ec_i(), close_i(), consumer_is_active_, ec_control_, TAO_ECG_ConsumerEC_Control::shutdown(), supplier_, supplier_is_active_, and TAO_SYNCH_MUTEX.
00542 { 00543 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00544 00545 ec_control_->shutdown(); 00546 00547 this->close_i (); 00548 00549 if (this->supplier_is_active_) 00550 { 00551 PortableServer::POA_var poa = 00552 this->supplier_._default_POA (); 00553 PortableServer::ObjectId_var id = 00554 poa->servant_to_id (&this->supplier_); 00555 poa->deactivate_object (id.in ()); 00556 this->supplier_is_active_ = 0; 00557 } 00558 00559 if (this->consumer_is_active_) 00560 { 00561 PortableServer::POA_var poa = 00562 this->consumer_._default_POA (); 00563 PortableServer::ObjectId_var id = 00564 poa->servant_to_id (&this->consumer_); 00565 poa->deactivate_object (id.in ()); 00566 this->consumer_is_active_ = 0; 00567 } 00568 00569 this->cleanup_consumer_ec_i (); 00570 this->cleanup_supplier_ec_i (); 00571 00572 return 0; 00573 } |
|
Suspend the connection to the supplier ec.
Definition at line 643 of file EC_Gateway_IIOP.cpp. References CORBA::is_nil(), supplier_ec_suspended_, and supplier_proxy_. Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist(), and TAO_ECG_Reconnect_ConsumerEC_Control::system_exception().
00644 { 00645 if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 0) 00646 { 00647 this->supplier_proxy_->suspend_connection (); 00648 00649 supplier_ec_suspended_ = 1; 00650 } 00651 } |
|
Definition at line 191 of file EC_Gateway_IIOP.cpp. References ACE_GUARD, c_qos_, RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_SYNCH_MUTEX, update_consumer_i(), and update_posted_.
00193 { 00194 if (c_qos.dependencies.length () == 0) 00195 return; 00196 00197 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); 00198 00199 this->c_qos_ = c_qos; 00200 00201 if (this->busy_count_ != 0) 00202 { 00203 this->update_posted_ = 1; 00204 return; 00205 } 00206 00207 this->update_consumer_i (c_qos); 00208 } |
|
Definition at line 238 of file EC_Gateway_IIOP.cpp. References close_i(), CORBA::is_nil(), and open_i(). Referenced by push(), reconnect_consumer_ec(), and update_consumer().
00240 { 00241 this->close_i (); 00242 00243 if (CORBA::is_nil (this->consumer_ec_.in ()) 00244 || CORBA::is_nil (this->supplier_ec_.in ())) 00245 return; 00246 00247 // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n")); 00248 00249 this->open_i (c_qos); 00250 } |
|
Definition at line 422 of file EC_Gateway_IIOP.cpp.
00424 {
00425 // Do nothing...
00426 }
|
|
How many threads are running push() we cannot make changes until that reaches 0 Definition at line 164 of file EC_Gateway_IIOP.h. |
|
Definition at line 172 of file EC_Gateway_IIOP.h. Referenced by reconnect_consumer_ec(), and update_consumer(). |
|
We have a cleanup outstanding and must wait doing cleanup until all pushes are ready. Definition at line 178 of file EC_Gateway_IIOP.h. Referenced by cleanup_consumer_proxies(), and push(). |
|
Our consumer personality....
Definition at line 198 of file EC_Gateway_IIOP.h. |
|
The event channel acting as consumer of this gateway.
Definition at line 190 of file EC_Gateway_IIOP.h. Referenced by cleanup_consumer_ec_i(), init_i(), and open_i(). |
|
Our RT_Infos for the event channel that is the consumer.
Definition at line 195 of file EC_Gateway_IIOP.h. Referenced by open_i(). |
|
If it is not 0 then we must deactivate the consumer.
Definition at line 201 of file EC_Gateway_IIOP.h. Referenced by open_i(), and shutdown(). |
|
We talk to the EC (as a supplier) using either an per-supplier proxy or a generic proxy for the type only subscriptions. We push the events to these proxies Definition at line 216 of file EC_Gateway_IIOP.h. Referenced by cleanup_consumer_proxies_i(), disconnect_consumer_proxies_i(), open_i(), and push(). |
|
Definition at line 217 of file EC_Gateway_IIOP.h. Referenced by cleanup_consumer_proxies_i(), disconnect_consumer_proxies_i(), open_i(), and push(). |
|
The consumer ec control which controls the behaviour in case of a misbehaving consumer ec Definition at line 225 of file EC_Gateway_IIOP.h. Referenced by init_i(), push_to_consumer(), shutdown(), and ~TAO_EC_Gateway_IIOP(). |
|
The Gateway IIOP Factory for all the settings.
Definition at line 228 of file EC_Gateway_IIOP.h. |
|
Lock to synchronize internal changes.
Definition at line 160 of file EC_Gateway_IIOP.h. |
|
Our supplier personality....
Definition at line 204 of file EC_Gateway_IIOP.h. Referenced by open_i(), and shutdown(). |
|
The event channel acting as supplier for this gateway so we can reconnect when the list changes. Definition at line 187 of file EC_Gateway_IIOP.h. Referenced by cleanup_supplier_ec_i(), init_i(), and open_i(). |
|
Is the supplier ec suspended? Definition at line 183 of file EC_Gateway_IIOP.h. Referenced by resume_supplier_ec(), and suspend_supplier_ec(). |
|
Our RT_Infos for the event channel that is the supplier.
Definition at line 193 of file EC_Gateway_IIOP.h. Referenced by open_i(). |
|
If it is not 0 then we must deactivate the supplier.
Definition at line 207 of file EC_Gateway_IIOP.h. Referenced by open_i(), and shutdown(). |
|
We talk to the EC (as a consumer) using this proxy. We receive the events from these proxy Definition at line 221 of file EC_Gateway_IIOP.h. Referenced by disconnect_supplier_proxy_i(), open_i(), resume_supplier_ec(), and suspend_supplier_ec(). |
|
An update_consumer() message arrived *while* we were doing a push() the modification is stored, if multiple update_consumer messages arrive only the last one is executed. Definition at line 171 of file EC_Gateway_IIOP.h. Referenced by push(), reconnect_consumer_ec(), and update_consumer(). |
|
The flag for using the consumer proxy map. With 1 the consumer proxy map is used, meaning that for each unique source id we use a different proxy push consumer, if 0, we only use one proxy push consumer (the default) for all source ids. Definition at line 237 of file EC_Gateway_IIOP.h. Referenced by open_i(), push(), and TAO_EC_Gateway_IIOP(). |
|
If 1, we use the TTL flags, if 0, we just ignore TTL.
Definition at line 231 of file EC_Gateway_IIOP.h. Referenced by push(), and TAO_EC_Gateway_IIOP(). |