TAO_EC_Gateway_IIOP Class Reference

Event Channel Gateway using IIOP. More...

#include <EC_Gateway_IIOP.h>

Inheritance diagram for TAO_EC_Gateway_IIOP:

Inheritance graph
[legend]
Collaboration diagram for TAO_EC_Gateway_IIOP:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_EC_Gateway_IIOP (void)
virtual ~TAO_EC_Gateway_IIOP (void)
int init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, RtecEventChannelAdmin::EventChannel_ptr consumer_ec)
void disconnect_push_supplier (void)
 The channel is disconnecting.

void disconnect_push_consumer (void)
 The channel is disconnecting.

void push (const RtecEventComm::EventSet &events)
int shutdown (void)
 Disconnect and shutdown the gateway.

virtual void close (void)
virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS &sub)
virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS &pub)
void reconnect_consumer_ec (void)
CORBA::Boolean consumer_ec_non_existent (CORBA::Boolean_out disconnected)
 Check whether the consumer event channel is non existent or not.

void cleanup_consumer_proxies (void)
int cleanup_consumer_ec (void)
int cleanup_supplier_ec (void)
void suspend_supplier_ec (void)
 Suspend the connection to the supplier ec.

void resume_supplier_ec (void)
 Resume the connection to the supplier ec.


Protected Types

typedef ACE_Map_Manager< RtecEventComm::EventSourceID,
RtecEventChannelAdmin::ProxyPushConsumer_ptr,
ACE_Null_Mutex
Consumer_Map
typedef ACE_Map_Iterator<
RtecEventComm::EventSourceID,
RtecEventChannelAdmin::ProxyPushConsumer_ptr,
ACE_Null_Mutex
Consumer_Map_Iterator

Protected Member Functions

int init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, RtecEventChannelAdmin::EventChannel_ptr consumer_ec)
 Do the real work in init().


Protected Attributes

TAO_SYNCH_MUTEX lock_
 Lock to synchronize internal changes.

CORBA::ULong busy_count_
int update_posted_
RtecEventChannelAdmin::ConsumerQOS c_qos_
int cleanup_posted_
int supplier_ec_suspended_
RtecEventChannelAdmin::EventChannel_var supplier_ec_
RtecEventChannelAdmin::EventChannel_var consumer_ec_
 The event channel acting as consumer of this gateway.

RtecBase::handle_t supplier_info_
 Our RT_Infos for the event channel that is the supplier.

RtecBase::handle_t consumer_info_
 Our RT_Infos for the event channel that is the consumer.

ACE_PushConsumer_Adapter<
TAO_EC_Gateway_IIOP
consumer_
 Our consumer personality....

int consumer_is_active_
 If it is not 0 then we must deactivate the consumer.

ACE_PushSupplier_Adapter<
TAO_EC_Gateway_IIOP
supplier_
 Our supplier personality....

int supplier_is_active_
 If it is not 0 then we must deactivate the supplier.

Consumer_Map consumer_proxy_map_
RtecEventChannelAdmin::ProxyPushConsumer_var default_consumer_proxy_
RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_
TAO_ECG_ConsumerEC_Controlec_control_
TAO_EC_Gateway_IIOP_Factoryfactory_
 The Gateway IIOP Factory for all the settings.

int use_ttl_
 If 1, we use the TTL flags, if 0, we just ignore TTL.

int use_consumer_proxy_map_

Private Member Functions

void close_i (void)
void disconnect_supplier_proxy_i (void)
 Disconnect the supplier proxy.

void disconnect_consumer_proxies_i (void)
 Disconnect all consumer proxies.

void cleanup_consumer_proxies_i (void)
 Remove all consumer proxies without calling disconnect on them.

void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS &sub)
void open_i (const RtecEventChannelAdmin::ConsumerQOS &sub)
 Create all connections to consumer ec and to supplier ec.

CORBA::Boolean is_consumer_ec_connected_i (void) const
 Helper method to see if consumer ec is connected.

void push_to_consumer (RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer, const RtecEventComm::EventSet &event)
 Push the event to the consumer.

void cleanup_consumer_ec_i (void)
void cleanup_supplier_ec_i (void)

Detailed Description

Event Channel Gateway using IIOP.

This class mediates among two event channels, it connects as a consumer of events with a remote event channel, and as a supplier of events with the local EC. As a consumer it gives a QoS designed to only accept the events in which *local* consumers are interested. Eventually the local EC should create this object and compute its QoS in an automated manner; but this requires some way to filter out the peers registered as consumers, otherwise we will get loops in the QoS graph. It uses exactly the same set of events in the publications list when connected as a supplier.

Note:
An alternative implementation would be to register with the remote EC as a supplier, and then filter on the remote EC, but one of the objectives is to minimize network traffic. On the other hand the events will be pushed to remote consumers, event though they will be dropped upon receipt (due to the TTL field); IMHO this is another suggestion that the EC needs to know (somehow) which consumers are truly its peers in disguise.

Todo:
: The class makes an extra copy of the events, we need to investigate if closer collaboration with its collocated EC could be used to remove that copy.

Definition at line 65 of file EC_Gateway_IIOP.h.


Member Typedef Documentation

typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> TAO_EC_Gateway_IIOP::Consumer_Map [protected]
 

Definition at line 210 of file EC_Gateway_IIOP.h.

typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> TAO_EC_Gateway_IIOP::Consumer_Map_Iterator [protected]
 

Definition at line 211 of file EC_Gateway_IIOP.h.

Referenced by cleanup_consumer_proxies_i(), disconnect_consumer_proxies_i(), and open_i().


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP void   ) 
 

Definition at line 19 of file EC_Gateway_IIOP.cpp.

References ACE_NEW, ACE_Dynamic_Service< TYPE >::instance(), TAO_EC_Gateway_IIOP_Factory::use_consumer_proxy_map(), use_consumer_proxy_map_, TAO_EC_Gateway_IIOP_Factory::use_ttl(), and use_ttl_.

00020   :  busy_count_ (0),
00021      update_posted_ (0),
00022      cleanup_posted_ (0),
00023      supplier_ec_suspended_ (0),
00024      supplier_info_ (0),
00025      consumer_info_ (0),
00026      consumer_ (this),
00027      consumer_is_active_ (0),
00028      supplier_ (this),
00029      supplier_is_active_ (0),
00030      ec_control_ (0),
00031      factory_ (0),
00032      use_ttl_ (1),
00033      use_consumer_proxy_map_ (1)
00034 {
00035   if (this->factory_ == 0)
00036     {
00037       this->factory_ =
00038              ACE_Dynamic_Service<TAO_EC_Gateway_IIOP_Factory>::instance ("EC_Gateway_IIOP_Factory");
00039 
00040       if (this->factory_ == 0)
00041         {
00042           TAO_EC_Gateway_IIOP_Factory *f = 0;
00043           ACE_NEW (f,
00044                    TAO_EC_Gateway_IIOP_Factory);
00045           this->factory_ = f;
00046         }
00047     }
00048 
00049   if (this->factory_ != 0)
00050     {
00051       this->use_ttl_ = this->factory_->use_ttl();
00052       this->use_consumer_proxy_map_ = this->factory_->use_consumer_proxy_map();
00053     }
00054 }

TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP void   )  [virtual]
 

Definition at line 56 of file EC_Gateway_IIOP.cpp.

References ec_control_.

00057 {
00058    delete ec_control_;
00059    ec_control_ = 0;
00060 }


Member Function Documentation

int TAO_EC_Gateway_IIOP::cleanup_consumer_ec void   ) 
 

Cleanup the connection to the consumer ec. Doesn't call anything on the ec again, just set the object to nil

Definition at line 576 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD_RETURN, cleanup_consumer_ec_i(), and TAO_SYNCH_MUTEX.

Referenced by TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist(), and TAO_ECG_Reactive_ConsumerEC_Control::system_exception().

00577 {
00578   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00579 
00580   this->cleanup_consumer_ec_i ();
00581 
00582   return 0;
00583 }

void TAO_EC_Gateway_IIOP::cleanup_consumer_ec_i void   )  [private]
 

Definition at line 596 of file EC_Gateway_IIOP.cpp.

References consumer_ec_.

Referenced by cleanup_consumer_ec(), and shutdown().

00597 {
00598   this->consumer_ec_ =
00599     RtecEventChannelAdmin::EventChannel::_nil ();
00600 }

void TAO_EC_Gateway_IIOP::cleanup_consumer_proxies void   ) 
 

Cleanup all consumer proxies we have without trying to tell the consumer that we are going to disconnect. This can be used to cleanup the consumer proxy administration in case we know that the consumers are all unreachable.

Definition at line 106 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD, cleanup_consumer_proxies_i(), cleanup_posted_, and TAO_SYNCH_MUTEX.

Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist(), TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist(), TAO_ECG_Reconnect_ConsumerEC_Control::system_exception(), and TAO_ECG_Reactive_ConsumerEC_Control::system_exception().

00107 {
00108   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00109 
00110   // In case we are still pushing, don't cleanup the proxies
00111   if (this->busy_count_ != 0)
00112     {
00113       this->cleanup_posted_ = 1;
00114       return;
00115     }
00116 
00117   this->cleanup_consumer_proxies_i ();
00118 }

void TAO_EC_Gateway_IIOP::cleanup_consumer_proxies_i void   )  [private]
 

Remove all consumer proxies without calling disconnect on them.

Definition at line 211 of file EC_Gateway_IIOP.cpp.

References ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::begin(), Consumer_Map_Iterator, consumer_proxy_map_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::current_size(), default_consumer_proxy_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), CORBA::is_nil(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::open(), and CORBA::release().

Referenced by cleanup_consumer_proxies(), and push().

00212 {
00213   if (this->consumer_proxy_map_.current_size () > 0)
00214     {
00215       for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00216            j != this->consumer_proxy_map_.end ();
00217            ++j)
00218         {
00219           RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
00220           if (CORBA::is_nil (consumer))
00221             continue;
00222 
00223           CORBA::release (consumer);
00224         }
00225       // Remove all the elements on the map.  Calling close() does not
00226       // work because the map is left in an inconsistent state.
00227       this->consumer_proxy_map_.open ();
00228     }
00229 
00230   if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
00231     {
00232       this->default_consumer_proxy_ =
00233         RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00234     }
00235 }

int TAO_EC_Gateway_IIOP::cleanup_supplier_ec void   ) 
 

Cleanup the connection to the supplier ec. Doesn't call anything on the ec again, just set the object to nil

Definition at line 586 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD_RETURN, cleanup_supplier_ec_i(), and TAO_SYNCH_MUTEX.

00587 {
00588   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00589 
00590   this->cleanup_supplier_ec_i ();
00591 
00592   return 0;
00593 }

void TAO_EC_Gateway_IIOP::cleanup_supplier_ec_i void   )  [private]
 

Definition at line 603 of file EC_Gateway_IIOP.cpp.

References supplier_ec_.

Referenced by cleanup_supplier_ec(), and shutdown().

00604 {
00605   this->supplier_ec_ =
00606     RtecEventChannelAdmin::EventChannel::_nil ();
00607 }

void TAO_EC_Gateway_IIOP::close void   )  [virtual]
 

The gateway must disconnect from all the relevant event channels, or any other communication media (such as multicast groups).

Implements TAO_EC_Gateway.

Definition at line 98 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD, close_i(), and TAO_SYNCH_MUTEX.

00099 {
00100   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00101 
00102   this->close_i ();
00103 }

void TAO_EC_Gateway_IIOP::close_i void   )  [private]
 

Definition at line 121 of file EC_Gateway_IIOP.cpp.

References disconnect_consumer_proxies_i(), and disconnect_supplier_proxy_i().

Referenced by close(), shutdown(), and update_consumer_i().

00122 {
00123   // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
00124   this->disconnect_consumer_proxies_i ();
00125 
00126   this->disconnect_supplier_proxy_i ();
00127 }

CORBA::Boolean TAO_EC_Gateway_IIOP::consumer_ec_non_existent CORBA::Boolean_out  disconnected  ) 
 

Check whether the consumer event channel is non existent or not.

Definition at line 616 of file EC_Gateway_IIOP.cpp.

References CORBA::Object::_duplicate(), ACE_GUARD_THROW_EX, is_consumer_ec_connected_i(), and TAO_SYNCH_MUTEX.

Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel(), TAO_ECG_Reactive_ConsumerEC_Control::query_eventchannel(), and TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect().

00618 {
00619   CORBA::Object_var consumer_ec;
00620   {
00621     ACE_GUARD_THROW_EX (
00622         TAO_SYNCH_MUTEX, ace_mon, this->lock_,
00623         CORBA::INTERNAL ());
00624 
00625     disconnected = 0;
00626     if (this->is_consumer_ec_connected_i () == 0)
00627       {
00628         disconnected = 1;
00629         return 0;
00630       }
00631 
00632     consumer_ec = CORBA::Object::_duplicate (this->consumer_ec_.in ());
00633   }
00634 
00635 #if (TAO_HAS_MINIMUM_CORBA == 0)
00636   return consumer_ec->_non_existent ();
00637 #else
00638   return 0;
00639 #endif /* TAO_HAS_MINIMUM_CORBA */
00640 }

void TAO_EC_Gateway_IIOP::disconnect_consumer_proxies_i void   )  [private]
 

Disconnect all consumer proxies.

Definition at line 130 of file EC_Gateway_IIOP.cpp.

References ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::begin(), Consumer_Map_Iterator, consumer_proxy_map_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::current_size(), default_consumer_proxy_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), CORBA::is_nil(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::open(), and CORBA::release().

Referenced by close_i().

00131 {
00132   if (this->consumer_proxy_map_.current_size () > 0)
00133     {
00134       for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00135            j != this->consumer_proxy_map_.end ();
00136            ++j)
00137         {
00138           RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
00139           if (CORBA::is_nil (consumer))
00140             continue;
00141           try
00142             {
00143               consumer->disconnect_push_consumer ();
00144             }
00145           catch (const CORBA::Exception&)
00146             {
00147             }
00148           CORBA::release (consumer);
00149         }
00150       // Remove all the elements on the map.  Calling close() does not
00151       // work because the map is left in an inconsistent state.
00152       this->consumer_proxy_map_.open ();
00153     }
00154 
00155   if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
00156     {
00157       this->default_consumer_proxy_->disconnect_push_consumer ();
00158 
00159       this->default_consumer_proxy_ =
00160         RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00161     }
00162 }

void TAO_EC_Gateway_IIOP::disconnect_push_consumer void   ) 
 

The channel is disconnecting.

Definition at line 429 of file EC_Gateway_IIOP.cpp.

00430 {
00431   // ACE_DEBUG ((LM_DEBUG,
00432   //             "ECG (%t): Supplier-consumer received "
00433   //            "disconnect from channel.\n"));
00434 }

void TAO_EC_Gateway_IIOP::disconnect_push_supplier void   ) 
 

The channel is disconnecting.

Definition at line 437 of file EC_Gateway_IIOP.cpp.

00438 {
00439   // ACE_DEBUG ((LM_DEBUG,
00440   //            "ECG (%t): Supplier received "
00441   //            "disconnect from channel.\n"));
00442 }

void TAO_EC_Gateway_IIOP::disconnect_supplier_proxy_i void   )  [private]
 

Disconnect the supplier proxy.

Definition at line 165 of file EC_Gateway_IIOP.cpp.

References CORBA::is_nil(), and supplier_proxy_.

Referenced by close_i().

00166 {
00167   if (!CORBA::is_nil (this->supplier_proxy_.in ()))
00168     {
00169       this->supplier_proxy_->disconnect_push_supplier ();
00170 
00171       this->supplier_proxy_ =
00172         RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00173     }
00174 }

int TAO_EC_Gateway_IIOP::init RtecEventChannelAdmin::EventChannel_ptr  supplier_ec,
RtecEventChannelAdmin::EventChannel_ptr  consumer_ec
 

To do its job this class requires to know the local and remote ECs it will connect to.

Returns:
0 in case of success, -1 in case of failure

Definition at line 63 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD_RETURN, init_i(), and TAO_SYNCH_MUTEX.

00065 {
00066   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00067 
00068   return this->init_i (supplier_ec, consumer_ec);
00069 }

int TAO_EC_Gateway_IIOP::init_i RtecEventChannelAdmin::EventChannel_ptr  supplier_ec,
RtecEventChannelAdmin::EventChannel_ptr  consumer_ec
[protected]
 

Do the real work in init().

Definition at line 72 of file EC_Gateway_IIOP.cpp.

References ACE_ERROR_RETURN, TAO_ECG_ConsumerEC_Control::activate(), consumer_ec_, TAO_EC_Gateway_IIOP_Factory::create_consumerec_control(), ec_control_, CORBA::is_nil(), LM_ERROR, and supplier_ec_.

Referenced by TAO_EC_Gateway_Sched::init(), and init().

00074 {
00075   if (CORBA::is_nil (this->supplier_ec_.in ()) && CORBA::is_nil (this->consumer_ec_.in ()))
00076   {
00077     this->supplier_ec_ =
00078       RtecEventChannelAdmin::EventChannel::_duplicate (supplier_ec);
00079     this->consumer_ec_ =
00080       RtecEventChannelAdmin::EventChannel::_duplicate (consumer_ec);
00081 
00082         if (ec_control_ == 0)
00083      {
00084         ec_control_ = factory_->create_consumerec_control(this);
00085         ec_control_->activate();
00086      }
00087 
00088     return 0;
00089   }
00090   else
00091     ACE_ERROR_RETURN ((LM_ERROR,
00092                        "TAO_EC_Gateway_IIOP - init_i "
00093                        "Supplier and consumer event channel reference "
00094                        "should be nil.\n"), -1);
00095 }

CORBA::Boolean TAO_EC_Gateway_IIOP::is_consumer_ec_connected_i void   )  const [private]
 

Helper method to see if consumer ec is connected.

Definition at line 610 of file EC_Gateway_IIOP.cpp.

References CORBA::is_nil().

Referenced by consumer_ec_non_existent().

00611 {
00612   return !CORBA::is_nil (this->consumer_ec_.in ());
00613 }

void TAO_EC_Gateway_IIOP::open_i const RtecEventChannelAdmin::ConsumerQOS sub  )  [private]
 

Create all connections to consumer ec and to supplier ec.

Definition at line 253 of file EC_Gateway_IIOP.cpp.

References ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_UNDEFINED, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::begin(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::bind(), consumer_ec_, consumer_info_, consumer_is_active_, Consumer_Map_Iterator, consumer_proxy_map_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::current_size(), default_consumer_proxy_, RtecEventChannelAdmin::ConsumerQOS::dependencies, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), RtecEventComm::EventSourceID, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::find(), RtecEventChannelAdmin::SupplierQOS::is_gateway, RtecEventChannelAdmin::ConsumerQOS::is_gateway, RtecEventChannelAdmin::SupplierQOS::publications, RtecEventComm::EventHeader::source, supplier_, supplier_ec_, supplier_info_, supplier_is_active_, supplier_proxy_, RtecEventComm::EventHeader::type, and use_consumer_proxy_map_.

Referenced by update_consumer_i().

00255 {
00256   // = Connect as a supplier to the consumer EC
00257   RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
00258     this->consumer_ec_->for_suppliers ();
00259 
00260   RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
00261   sub.is_gateway = 1;
00262 
00263   // Change the RT_Info in the consumer QoS.
00264   // On the same loop we discover the subscriptions by event source,
00265   // and fill the consumer proxy map if we have to use this map.
00266   for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
00267     {
00268       sub.dependencies[i].rt_info = this->supplier_info_;
00269 
00270       RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
00271       const RtecEventComm::EventHeader &h =
00272         sub.dependencies[i].event.header;
00273 
00274       RtecEventComm::EventSourceID sid = h.source;
00275 
00276       //ACE_DEBUG ((LM_DEBUG,
00277       //            "ECG (%t)    trying (%d,%d)\n",
00278       //           sid, h.type));
00279 
00280       // Skip all subscriptions that do not require an specific source
00281       // id or skip all subscriptions when we don't need to use the consumer
00282       // proxy map.
00283       if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0)
00284         continue;
00285 
00286       // Skip all the magic event types.
00287       if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
00288         continue;
00289 
00290       if (this->consumer_proxy_map_.find (sid, proxy) == -1)
00291         {
00292           //ACE_DEBUG ((LM_DEBUG,
00293           //            "ECG (%t)    binding source %d\n",
00294           //            sid));
00295           proxy = supplier_admin->obtain_push_consumer ();
00296           this->consumer_proxy_map_.bind (sid, proxy);
00297         }
00298     }
00299   //ACE_DEBUG ((LM_DEBUG,
00300   //            "ECG (%t)    consumer map computed (%d entries)\n",
00301   //            this->consumer_proxy_map_.current_size ()));
00302 
00303   if (this->consumer_proxy_map_.current_size () > 0)
00304     {
00305       this->supplier_is_active_ = 1;
00306 
00307       // Obtain a reference to our supplier personality...
00308       RtecEventComm::PushSupplier_var supplier_ref =
00309         this->supplier_._this ();
00310 
00311       // For each subscription by source build the set of publications
00312       // (they may several, by type, for instance) and connect to the
00313       // consumer proxy.
00314       for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00315            j != this->consumer_proxy_map_.end ();
00316            ++j)
00317         {
00318           RtecEventChannelAdmin::SupplierQOS pub;
00319           pub.publications.length (sub.dependencies.length () + 1);
00320           pub.is_gateway = 1;
00321 
00322           int c = 0;
00323 
00324           RtecEventComm::EventSourceID sid = (*j).ext_id_;
00325           for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
00326             {
00327               const RtecEventComm::EventHeader& h =
00328                 sub.dependencies[k].event.header;
00329               if (h.source != sid
00330                   || (ACE_ES_EVENT_ANY < h.type
00331                       && h.type < ACE_ES_EVENT_UNDEFINED))
00332                 continue;
00333               pub.publications[c].event.header = h;
00334               pub.publications[c].dependency_info.dependency_type =
00335                 RtecBase::TWO_WAY_CALL;
00336               pub.publications[c].dependency_info.number_of_calls = 1;
00337               pub.publications[c].dependency_info.rt_info = this->consumer_info_;
00338               c++;
00339             }
00340           //ACE_DEBUG ((LM_DEBUG,
00341           //            "ECG (%t) supplier id %d has %d elements\n",
00342           //            sid, c));
00343           if (c == 0)
00344             continue;
00345 
00346           pub.publications.length (c);
00347 
00348           // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier "));
00349           // ACE_SupplierQOS_Factory::debug (pub);
00350           (*j).int_id_->connect_push_supplier (supplier_ref.in (),
00351                                                pub);
00352         }
00353     }
00354 
00355   // Also build the subscriptions that are *not* by source when we use the
00356   // consumer proxy map, and all subscriptions when we don't use the map and
00357   // then connect to the default consumer proxy.
00358   RtecEventChannelAdmin::SupplierQOS pub;
00359   pub.publications.length (sub.dependencies.length () + 1);
00360   pub.is_gateway = 1;
00361   int c = 0;
00362   for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
00363     {
00364       const RtecEventComm::EventHeader& h =
00365         sub.dependencies[k].event.header;
00366       RtecEventComm::EventSourceID sid = h.source;
00367 
00368       // Skip all subscriptions with a specific source when we use the map
00369       if (sid != ACE_ES_EVENT_SOURCE_ANY && this->use_consumer_proxy_map_ == 1)
00370         continue;
00371 
00372       // Skip all the magic event types.
00373       if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
00374         continue;
00375 
00376       pub.publications[c].event.header = h;
00377       pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero ();
00378       pub.publications[c].dependency_info.dependency_type =
00379         RtecBase::TWO_WAY_CALL;
00380       pub.publications[c].dependency_info.number_of_calls = 1;
00381       pub.publications[c].dependency_info.rt_info = this->consumer_info_;
00382       c++;
00383     }
00384 
00385   if (c > 0)
00386     {
00387       this->supplier_is_active_ = 1;
00388 
00389       // Obtain a reference to our supplier personality...
00390       RtecEventComm::PushSupplier_var supplier_ref =
00391         this->supplier_._this ();
00392 
00393       // Obtain the consumer....
00394       this->default_consumer_proxy_ =
00395         supplier_admin->obtain_push_consumer ();
00396 
00397       pub.publications.length (c);
00398       // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
00399       // ACE_SupplierQOS_Factory::debug (pub);
00400       this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
00401                                                             pub);
00402     }
00403 
00404   RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00405     this->supplier_ec_->for_consumers ();
00406 
00407   this->supplier_proxy_ =
00408     consumer_admin->obtain_push_supplier ();
00409 
00410   this->consumer_is_active_ = 1;
00411   RtecEventComm::PushConsumer_var consumer_ref =
00412     this->consumer_._this ();
00413 
00414   // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer "));
00415   // ACE_ConsumerQOS_Factory::debug (sub);
00416 
00417   this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00418                                                 sub);
00419 }

void TAO_EC_Gateway_IIOP::push const RtecEventComm::EventSet events  ) 
 

This is the consumer side behavior, it pushes the events to the local event channel.

Definition at line 445 of file EC_Gateway_IIOP.cpp.

References ACE_ES_EVENT_SOURCE_ANY, ACE_GUARD, cleanup_consumer_proxies_i(), cleanup_posted_, consumer_proxy_map_, default_consumer_proxy_, RtecEventComm::EventSet, RtecEventComm::EventSourceID, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::find(), CORBA::is_nil(), push_to_consumer(), TAO_SYNCH_MUTEX, update_consumer_i(), update_posted_, use_consumer_proxy_map_, and use_ttl_.

00446 {
00447   // ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n"));
00448 
00449   if (events.length () == 0)
00450     {
00451       // ACE_DEBUG ((LM_DEBUG, "no events\n"));
00452       return;
00453     }
00454 
00455   {
00456     ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00457 
00458     this->busy_count_++;
00459   }
00460 
00461   // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ()));
00462 
00463   // @@ TODO, there is an extra data copy here, we should do the event
00464   // modification without it and only compact the necessary events.
00465   RtecEventComm::EventSet out (1);
00466   out.length (1);
00467   for (CORBA::ULong i = 0; i < events.length (); ++i)
00468     {
00469       if (this->use_ttl_ == 1)
00470         {
00471           if (events[i].header.ttl == 0)
00472             continue;
00473         }
00474 
00475       RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
00476       RtecEventComm::EventSourceID sid = events[i].header.source;
00477       if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0
00478           || this->consumer_proxy_map_.find (sid, proxy) == -1)
00479         {
00480           // If the source is not in our map or we should not use the map then
00481           // use the default consumer proxy.
00482           proxy = this->default_consumer_proxy_.in ();
00483         }
00484 
00485       if (CORBA::is_nil (proxy))
00486         continue;
00487 
00488       out[0] = events[i];
00489 
00490       if (this->use_ttl_ == 1)
00491         out[0].header.ttl--;
00492 
00493       // ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n"));
00494       this->push_to_consumer(proxy, out);
00495     }
00496 
00497   {
00498     ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00499 
00500     this->busy_count_--;
00501 
00502     if (this->busy_count_ == 0 && this->cleanup_posted_ != 0)
00503       {
00504         this->cleanup_posted_ = 0;
00505         this->cleanup_consumer_proxies_i ();
00506       }
00507 
00508     if (this->busy_count_ == 0 && this->update_posted_ != 0)
00509       {
00510         this->update_posted_ = 0;
00511         this->update_consumer_i (this->c_qos_);
00512       }
00513   }
00514 }

void TAO_EC_Gateway_IIOP::push_to_consumer RtecEventChannelAdmin::ProxyPushConsumer_ptr  consumer,
const RtecEventComm::EventSet event
[private]
 

Push the event to the consumer.

Definition at line 517 of file EC_Gateway_IIOP.cpp.

References ec_control_, TAO_ECG_ConsumerEC_Control::event_channel_not_exist(), RtecEventComm::EventSet, and TAO_ECG_ConsumerEC_Control::system_exception().

Referenced by push().

00520 {
00521   try
00522     {
00523       consumer->push (event);
00524     }
00525   catch (const CORBA::OBJECT_NOT_EXIST&)
00526     {
00527       ec_control_->event_channel_not_exist (this);
00528     }
00529   catch (CORBA::SystemException& sysex)
00530     {
00531       ec_control_->system_exception (this,
00532                                      sysex);
00533     }
00534   catch (const CORBA::Exception&)
00535     {
00536       // Shouldn't happen.
00537     }
00538 }

void TAO_EC_Gateway_IIOP::reconnect_consumer_ec void   ) 
 

Definition at line 177 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD, c_qos_, TAO_SYNCH_MUTEX, update_consumer_i(), and update_posted_.

Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::reconnect().

00178 {
00179   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00180 
00181   if (this->busy_count_ != 0)
00182     {
00183       this->update_posted_ = 1;
00184       return;
00185     }
00186 
00187   this->update_consumer_i (c_qos_);
00188 }

void TAO_EC_Gateway_IIOP::resume_supplier_ec void   ) 
 

Resume the connection to the supplier ec.

Definition at line 654 of file EC_Gateway_IIOP.cpp.

References CORBA::is_nil(), supplier_ec_suspended_, and supplier_proxy_.

00655 {
00656   if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 1)
00657     {
00658       this->supplier_proxy_->resume_connection ();
00659 
00660       supplier_ec_suspended_ = 0;
00661     }
00662 }

int TAO_EC_Gateway_IIOP::shutdown void   ) 
 

Disconnect and shutdown the gateway.

Definition at line 541 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD_RETURN, cleanup_consumer_ec_i(), cleanup_supplier_ec_i(), close_i(), consumer_is_active_, ec_control_, TAO_ECG_ConsumerEC_Control::shutdown(), supplier_, supplier_is_active_, and TAO_SYNCH_MUTEX.

00542 {
00543   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00544 
00545   ec_control_->shutdown();
00546 
00547   this->close_i ();
00548 
00549   if (this->supplier_is_active_)
00550     {
00551       PortableServer::POA_var poa =
00552         this->supplier_._default_POA ();
00553       PortableServer::ObjectId_var id =
00554         poa->servant_to_id (&this->supplier_);
00555       poa->deactivate_object (id.in ());
00556       this->supplier_is_active_ = 0;
00557     }
00558 
00559   if (this->consumer_is_active_)
00560     {
00561       PortableServer::POA_var poa =
00562         this->consumer_._default_POA ();
00563       PortableServer::ObjectId_var id =
00564         poa->servant_to_id (&this->consumer_);
00565       poa->deactivate_object (id.in ());
00566       this->consumer_is_active_ = 0;
00567     }
00568 
00569   this->cleanup_consumer_ec_i ();
00570   this->cleanup_supplier_ec_i ();
00571 
00572   return 0;
00573 }

void TAO_EC_Gateway_IIOP::suspend_supplier_ec void   ) 
 

Suspend the connection to the supplier ec.

Definition at line 643 of file EC_Gateway_IIOP.cpp.

References CORBA::is_nil(), supplier_ec_suspended_, and supplier_proxy_.

Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist(), and TAO_ECG_Reconnect_ConsumerEC_Control::system_exception().

00644 {
00645   if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 0)
00646     {
00647       this->supplier_proxy_->suspend_connection ();
00648 
00649       supplier_ec_suspended_ = 1;
00650     }
00651 }

void TAO_EC_Gateway_IIOP::update_consumer const RtecEventChannelAdmin::ConsumerQOS sub  )  [virtual]
 

Definition at line 191 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD, c_qos_, RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_SYNCH_MUTEX, update_consumer_i(), and update_posted_.

00193 {
00194   if (c_qos.dependencies.length () == 0)
00195     return;
00196 
00197   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00198 
00199   this->c_qos_ = c_qos;
00200 
00201   if (this->busy_count_ != 0)
00202     {
00203       this->update_posted_ = 1;
00204       return;
00205     }
00206 
00207   this->update_consumer_i (c_qos);
00208 }

void TAO_EC_Gateway_IIOP::update_consumer_i const RtecEventChannelAdmin::ConsumerQOS sub  )  [private]
 

Definition at line 238 of file EC_Gateway_IIOP.cpp.

References close_i(), CORBA::is_nil(), and open_i().

Referenced by push(), reconnect_consumer_ec(), and update_consumer().

00240 {
00241   this->close_i ();
00242 
00243   if (CORBA::is_nil (this->consumer_ec_.in ())
00244       || CORBA::is_nil (this->supplier_ec_.in ()))
00245     return;
00246 
00247   // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n"));
00248 
00249   this->open_i (c_qos);
00250 }

void TAO_EC_Gateway_IIOP::update_supplier const RtecEventChannelAdmin::SupplierQOS pub  )  [virtual]
 

Definition at line 422 of file EC_Gateway_IIOP.cpp.

00424 {
00425   // Do nothing...
00426 }


Member Data Documentation

CORBA::ULong TAO_EC_Gateway_IIOP::busy_count_ [protected]
 

How many threads are running push() we cannot make changes until that reaches 0

Definition at line 164 of file EC_Gateway_IIOP.h.

RtecEventChannelAdmin::ConsumerQOS TAO_EC_Gateway_IIOP::c_qos_ [protected]
 

Definition at line 172 of file EC_Gateway_IIOP.h.

Referenced by reconnect_consumer_ec(), and update_consumer().

int TAO_EC_Gateway_IIOP::cleanup_posted_ [protected]
 

We have a cleanup outstanding and must wait doing cleanup until all pushes are ready.

Definition at line 178 of file EC_Gateway_IIOP.h.

Referenced by cleanup_consumer_proxies(), and push().

ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> TAO_EC_Gateway_IIOP::consumer_ [protected]
 

Our consumer personality....

Definition at line 198 of file EC_Gateway_IIOP.h.

RtecEventChannelAdmin::EventChannel_var TAO_EC_Gateway_IIOP::consumer_ec_ [protected]
 

The event channel acting as consumer of this gateway.

Definition at line 190 of file EC_Gateway_IIOP.h.

Referenced by cleanup_consumer_ec_i(), init_i(), and open_i().

RtecBase::handle_t TAO_EC_Gateway_IIOP::consumer_info_ [protected]
 

Our RT_Infos for the event channel that is the consumer.

Definition at line 195 of file EC_Gateway_IIOP.h.

Referenced by open_i().

int TAO_EC_Gateway_IIOP::consumer_is_active_ [protected]
 

If it is not 0 then we must deactivate the consumer.

Definition at line 201 of file EC_Gateway_IIOP.h.

Referenced by open_i(), and shutdown().

Consumer_Map TAO_EC_Gateway_IIOP::consumer_proxy_map_ [protected]
 

We talk to the EC (as a supplier) using either an per-supplier proxy or a generic proxy for the type only subscriptions. We push the events to these proxies

Definition at line 216 of file EC_Gateway_IIOP.h.

Referenced by cleanup_consumer_proxies_i(), disconnect_consumer_proxies_i(), open_i(), and push().

RtecEventChannelAdmin::ProxyPushConsumer_var TAO_EC_Gateway_IIOP::default_consumer_proxy_ [protected]
 

Definition at line 217 of file EC_Gateway_IIOP.h.

Referenced by cleanup_consumer_proxies_i(), disconnect_consumer_proxies_i(), open_i(), and push().

TAO_ECG_ConsumerEC_Control* TAO_EC_Gateway_IIOP::ec_control_ [protected]
 

The consumer ec control which controls the behaviour in case of a misbehaving consumer ec

Definition at line 225 of file EC_Gateway_IIOP.h.

Referenced by init_i(), push_to_consumer(), shutdown(), and ~TAO_EC_Gateway_IIOP().

TAO_EC_Gateway_IIOP_Factory* TAO_EC_Gateway_IIOP::factory_ [protected]
 

The Gateway IIOP Factory for all the settings.

Definition at line 228 of file EC_Gateway_IIOP.h.

TAO_SYNCH_MUTEX TAO_EC_Gateway_IIOP::lock_ [protected]
 

Lock to synchronize internal changes.

Definition at line 160 of file EC_Gateway_IIOP.h.

ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> TAO_EC_Gateway_IIOP::supplier_ [protected]
 

Our supplier personality....

Definition at line 204 of file EC_Gateway_IIOP.h.

Referenced by open_i(), and shutdown().

RtecEventChannelAdmin::EventChannel_var TAO_EC_Gateway_IIOP::supplier_ec_ [protected]
 

The event channel acting as supplier for this gateway so we can reconnect when the list changes.

Definition at line 187 of file EC_Gateway_IIOP.h.

Referenced by cleanup_supplier_ec_i(), init_i(), and open_i().

int TAO_EC_Gateway_IIOP::supplier_ec_suspended_ [protected]
 

Is the supplier ec suspended?

Definition at line 183 of file EC_Gateway_IIOP.h.

Referenced by resume_supplier_ec(), and suspend_supplier_ec().

RtecBase::handle_t TAO_EC_Gateway_IIOP::supplier_info_ [protected]
 

Our RT_Infos for the event channel that is the supplier.

Definition at line 193 of file EC_Gateway_IIOP.h.

Referenced by open_i().

int TAO_EC_Gateway_IIOP::supplier_is_active_ [protected]
 

If it is not 0 then we must deactivate the supplier.

Definition at line 207 of file EC_Gateway_IIOP.h.

Referenced by open_i(), and shutdown().

RtecEventChannelAdmin::ProxyPushSupplier_var TAO_EC_Gateway_IIOP::supplier_proxy_ [protected]
 

We talk to the EC (as a consumer) using this proxy. We receive the events from these proxy

Definition at line 221 of file EC_Gateway_IIOP.h.

Referenced by disconnect_supplier_proxy_i(), open_i(), resume_supplier_ec(), and suspend_supplier_ec().

int TAO_EC_Gateway_IIOP::update_posted_ [protected]
 

An update_consumer() message arrived *while* we were doing a push() the modification is stored, if multiple update_consumer messages arrive only the last one is executed.

Definition at line 171 of file EC_Gateway_IIOP.h.

Referenced by push(), reconnect_consumer_ec(), and update_consumer().

int TAO_EC_Gateway_IIOP::use_consumer_proxy_map_ [protected]
 

The flag for using the consumer proxy map. With 1 the consumer proxy map is used, meaning that for each unique source id we use a different proxy push consumer, if 0, we only use one proxy push consumer (the default) for all source ids.

Definition at line 237 of file EC_Gateway_IIOP.h.

Referenced by open_i(), push(), and TAO_EC_Gateway_IIOP().

int TAO_EC_Gateway_IIOP::use_ttl_ [protected]
 

If 1, we use the TTL flags, if 0, we just ignore TTL.

Definition at line 231 of file EC_Gateway_IIOP.h.

Referenced by push(), and TAO_EC_Gateway_IIOP().


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 13:42:20 2008 for TAO_RTEvent by doxygen 1.3.6