TAO_EC_Gateway_IIOP Class Reference

Event Channel Gateway using IIOP. More...

#include <EC_Gateway_IIOP.h>

Inheritance diagram for TAO_EC_Gateway_IIOP:

Inheritance graph
[legend]
Collaboration diagram for TAO_EC_Gateway_IIOP:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_EC_Gateway_IIOP (void)
virtual ~TAO_EC_Gateway_IIOP (void)
int init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, RtecEventChannelAdmin::EventChannel_ptr consumer_ec)
void disconnect_push_supplier (void)
 The channel is disconnecting.
void disconnect_push_consumer (void)
 The channel is disconnecting.
void push (const RtecEventComm::EventSet &events)
int shutdown (void)
 Disconnect and shutdown the gateway.
virtual void close (void)
virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS &sub)
virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS &pub)
void reconnect_consumer_ec (void)
CORBA::Boolean consumer_ec_non_existent (CORBA::Boolean_out disconnected)
 Check whether the consumer event channel is non existent or not.
void cleanup_consumer_proxies (void)
int cleanup_consumer_ec (void)
int cleanup_supplier_ec (void)
void suspend_supplier_ec (void)
 Suspend the connection to the supplier ec.
void resume_supplier_ec (void)
 Resume the connection to the supplier ec.

Protected Types

typedef ACE_Map_Manager< RtecEventComm::EventSourceID,
RtecEventChannelAdmin::ProxyPushConsumer_ptr,
ACE_Null_Mutex
Consumer_Map
typedef ACE_Map_Iterator<
RtecEventComm::EventSourceID,
RtecEventChannelAdmin::ProxyPushConsumer_ptr,
ACE_Null_Mutex
Consumer_Map_Iterator

Protected Member Functions

int init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec, RtecEventChannelAdmin::EventChannel_ptr consumer_ec)
 Do the real work in init().

Protected Attributes

TAO_SYNCH_MUTEX lock_
 Lock to synchronize internal changes.
CORBA::ULong busy_count_
int update_posted_
RtecEventChannelAdmin::ConsumerQOS c_qos_
int cleanup_posted_
int supplier_ec_suspended_
RtecEventChannelAdmin::EventChannel_var supplier_ec_
RtecEventChannelAdmin::EventChannel_var consumer_ec_
 The event channel acting as consumer of this gateway.
RtecBase::handle_t supplier_info_
 Our RT_Infos for the event channel that is the supplier.
RtecBase::handle_t consumer_info_
 Our RT_Infos for the event channel that is the consumer.
ACE_PushConsumer_Adapter<
TAO_EC_Gateway_IIOP
consumer_
 Our consumer personality....
bool consumer_is_active_
 If it is true then we must deactivate the consumer.
ACE_PushSupplier_Adapter<
TAO_EC_Gateway_IIOP
supplier_
 Our supplier personality....
bool supplier_is_active_
 If it is true then we must deactivate the supplier.
Consumer_Map consumer_proxy_map_
RtecEventChannelAdmin::ProxyPushConsumer_var default_consumer_proxy_
RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_
TAO_ECG_ConsumerEC_Controlec_control_
TAO_EC_Gateway_IIOP_Factoryfactory_
 The Gateway IIOP Factory for all the settings.
int use_ttl_
 If 1, we use the TTL flags, if 0, we just ignore TTL.
int use_consumer_proxy_map_

Private Member Functions

void close_i (void)
void disconnect_supplier_proxy_i (void)
 Disconnect the supplier proxy.
void disconnect_consumer_proxies_i (void)
 Disconnect all consumer proxies.
void cleanup_consumer_proxies_i (void)
 Remove all consumer proxies without calling disconnect on them.
void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS &sub)
void open_i (const RtecEventChannelAdmin::ConsumerQOS &sub)
 Create all connections to consumer ec and to supplier ec.
CORBA::Boolean is_consumer_ec_connected_i (void) const
 Helper method to see if consumer ec is connected.
void push_to_consumer (RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer, const RtecEventComm::EventSet &event)
 Push the event to the consumer.
void cleanup_consumer_ec_i (void)
void cleanup_supplier_ec_i (void)

Detailed Description

Event Channel Gateway using IIOP.

This class mediates among two event channels, it connects as a consumer of events with a remote event channel, and as a supplier of events with the local EC. As a consumer it gives a QoS designed to only accept the events in which *local* consumers are interested. Eventually the local EC should create this object and compute its QoS in an automated manner; but this requires some way to filter out the peers registered as consumers, otherwise we will get loops in the QoS graph. It uses exactly the same set of events in the publications list when connected as a supplier.

Note:
An alternative implementation would be to register with the remote EC as a supplier, and then filter on the remote EC, but one of the objectives is to minimize network traffic. On the other hand the events will be pushed to remote consumers, event though they will be dropped upon receipt (due to the TTL field); IMHO this is another suggestion that the EC needs to know (somehow) which consumers are truly its peers in disguise.
Todo:
: The class makes an extra copy of the events, we need to investigate if closer collaboration with its collocated EC could be used to remove that copy.

Definition at line 65 of file EC_Gateway_IIOP.h.


Member Typedef Documentation

typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> TAO_EC_Gateway_IIOP::Consumer_Map [protected]

Definition at line 210 of file EC_Gateway_IIOP.h.

typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> TAO_EC_Gateway_IIOP::Consumer_Map_Iterator [protected]

Definition at line 211 of file EC_Gateway_IIOP.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP ( void   ) 

Definition at line 19 of file EC_Gateway_IIOP.cpp.

References ACE_NEW, ACE_Dynamic_Service< TYPE >::instance(), and TAO_EC_Gateway_IIOP_Factory::use_ttl().

00020   :  busy_count_ (0),
00021      update_posted_ (0),
00022      cleanup_posted_ (0),
00023      supplier_ec_suspended_ (0),
00024      supplier_info_ (0),
00025      consumer_info_ (0),
00026      consumer_ (this),
00027      consumer_is_active_ (false),
00028      supplier_ (this),
00029      supplier_is_active_ (false),
00030      ec_control_ (0),
00031      factory_ (0),
00032      use_ttl_ (1),
00033      use_consumer_proxy_map_ (1)
00034 {
00035   if (this->factory_ == 0)
00036     {
00037       this->factory_ =
00038              ACE_Dynamic_Service<TAO_EC_Gateway_IIOP_Factory>::instance ("EC_Gateway_IIOP_Factory");
00039 
00040       if (this->factory_ == 0)
00041         {
00042           TAO_EC_Gateway_IIOP_Factory *f = 0;
00043           ACE_NEW (f,
00044                    TAO_EC_Gateway_IIOP_Factory);
00045           this->factory_ = f;
00046         }
00047     }
00048 
00049   if (this->factory_ != 0)
00050     {
00051       this->use_ttl_ = this->factory_->use_ttl();
00052       this->use_consumer_proxy_map_ = this->factory_->use_consumer_proxy_map();
00053     }
00054 }

TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP ( void   )  [virtual]

Definition at line 56 of file EC_Gateway_IIOP.cpp.

References ec_control_.

00057 {
00058    delete ec_control_;
00059    ec_control_ = 0;
00060 }


Member Function Documentation

int TAO_EC_Gateway_IIOP::cleanup_consumer_ec ( void   ) 

Cleanup the connection to the consumer ec. Doesn't call anything on the ec again, just set the object to nil

Definition at line 575 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD_RETURN, cleanup_consumer_ec_i(), and TAO_SYNCH_MUTEX.

Referenced by TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist(), and TAO_ECG_Reactive_ConsumerEC_Control::system_exception().

00576 {
00577   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00578 
00579   this->cleanup_consumer_ec_i ();
00580 
00581   return 0;
00582 }

void TAO_EC_Gateway_IIOP::cleanup_consumer_ec_i ( void   )  [private]

Definition at line 595 of file EC_Gateway_IIOP.cpp.

References consumer_ec_.

Referenced by cleanup_consumer_ec(), and shutdown().

00596 {
00597   this->consumer_ec_ =
00598     RtecEventChannelAdmin::EventChannel::_nil ();
00599 }

void TAO_EC_Gateway_IIOP::cleanup_consumer_proxies ( void   ) 

Cleanup all consumer proxies we have without trying to tell the consumer that we are going to disconnect. This can be used to cleanup the consumer proxy administration in case we know that the consumers are all unreachable.

Definition at line 106 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD, cleanup_consumer_proxies_i(), cleanup_posted_, and TAO_SYNCH_MUTEX.

Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist(), TAO_ECG_Reactive_ConsumerEC_Control::event_channel_not_exist(), TAO_ECG_Reconnect_ConsumerEC_Control::system_exception(), and TAO_ECG_Reactive_ConsumerEC_Control::system_exception().

00107 {
00108   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00109 
00110   // In case we are still pushing, don't cleanup the proxies
00111   if (this->busy_count_ != 0)
00112     {
00113       this->cleanup_posted_ = 1;
00114       return;
00115     }
00116 
00117   this->cleanup_consumer_proxies_i ();
00118 }

void TAO_EC_Gateway_IIOP::cleanup_consumer_proxies_i ( void   )  [private]

Remove all consumer proxies without calling disconnect on them.

Definition at line 211 of file EC_Gateway_IIOP.cpp.

References consumer_proxy_map_, default_consumer_proxy_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), CORBA::is_nil(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::open(), and CORBA::release().

Referenced by cleanup_consumer_proxies(), and push().

00212 {
00213   if (this->consumer_proxy_map_.current_size () > 0)
00214     {
00215       for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00216            j != this->consumer_proxy_map_.end ();
00217            ++j)
00218         {
00219           RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
00220           if (CORBA::is_nil (consumer))
00221             continue;
00222 
00223           CORBA::release (consumer);
00224         }
00225       // Remove all the elements on the map.  Calling close() does not
00226       // work because the map is left in an inconsistent state.
00227       this->consumer_proxy_map_.open ();
00228     }
00229 
00230   if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
00231     {
00232       this->default_consumer_proxy_ =
00233         RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00234     }
00235 }

int TAO_EC_Gateway_IIOP::cleanup_supplier_ec ( void   ) 

Cleanup the connection to the supplier ec. Doesn't call anything on the ec again, just set the object to nil

Definition at line 585 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD_RETURN, cleanup_supplier_ec_i(), and TAO_SYNCH_MUTEX.

00586 {
00587   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00588 
00589   this->cleanup_supplier_ec_i ();
00590 
00591   return 0;
00592 }

void TAO_EC_Gateway_IIOP::cleanup_supplier_ec_i ( void   )  [private]

Definition at line 602 of file EC_Gateway_IIOP.cpp.

References supplier_ec_.

Referenced by cleanup_supplier_ec(), and shutdown().

00603 {
00604   this->supplier_ec_ =
00605     RtecEventChannelAdmin::EventChannel::_nil ();
00606 }

void TAO_EC_Gateway_IIOP::close ( void   )  [virtual]

The gateway must disconnect from all the relevant event channels, or any other communication media (such as multicast groups).

Implements TAO_EC_Gateway.

Definition at line 98 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD, close_i(), and TAO_SYNCH_MUTEX.

00099 {
00100   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00101 
00102   this->close_i ();
00103 }

void TAO_EC_Gateway_IIOP::close_i ( void   )  [private]

Definition at line 121 of file EC_Gateway_IIOP.cpp.

References disconnect_consumer_proxies_i(), and disconnect_supplier_proxy_i().

Referenced by close(), shutdown(), and update_consumer_i().

00122 {
00123   // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
00124   this->disconnect_consumer_proxies_i ();
00125 
00126   this->disconnect_supplier_proxy_i ();
00127 }

CORBA::Boolean TAO_EC_Gateway_IIOP::consumer_ec_non_existent ( CORBA::Boolean_out  disconnected  ) 

Check whether the consumer event channel is non existent or not.

Definition at line 615 of file EC_Gateway_IIOP.cpp.

References CORBA::Object::_duplicate(), ACE_GUARD_THROW_EX, and TAO_SYNCH_MUTEX.

Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::query_eventchannel(), TAO_ECG_Reactive_ConsumerEC_Control::query_eventchannel(), and TAO_ECG_Reconnect_ConsumerEC_Control::try_reconnect().

00617 {
00618   CORBA::Object_var consumer_ec;
00619   {
00620     ACE_GUARD_THROW_EX (
00621         TAO_SYNCH_MUTEX, ace_mon, this->lock_,
00622         CORBA::INTERNAL ());
00623 
00624     disconnected = 0;
00625     if (this->is_consumer_ec_connected_i () == 0)
00626       {
00627         disconnected = 1;
00628         return 0;
00629       }
00630 
00631     consumer_ec = CORBA::Object::_duplicate (this->consumer_ec_.in ());
00632   }
00633 
00634 #if (TAO_HAS_MINIMUM_CORBA == 0)
00635   return consumer_ec->_non_existent ();
00636 #else
00637   return 0;
00638 #endif /* TAO_HAS_MINIMUM_CORBA */
00639 }

void TAO_EC_Gateway_IIOP::disconnect_consumer_proxies_i ( void   )  [private]

Disconnect all consumer proxies.

Definition at line 130 of file EC_Gateway_IIOP.cpp.

References consumer_proxy_map_, default_consumer_proxy_, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), CORBA::is_nil(), ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::open(), and CORBA::release().

Referenced by close_i().

00131 {
00132   if (this->consumer_proxy_map_.current_size () > 0)
00133     {
00134       for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00135            j != this->consumer_proxy_map_.end ();
00136            ++j)
00137         {
00138           RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
00139           if (CORBA::is_nil (consumer))
00140             continue;
00141           try
00142             {
00143               consumer->disconnect_push_consumer ();
00144             }
00145           catch (const CORBA::Exception&)
00146             {
00147             }
00148           CORBA::release (consumer);
00149         }
00150       // Remove all the elements on the map.  Calling close() does not
00151       // work because the map is left in an inconsistent state.
00152       this->consumer_proxy_map_.open ();
00153     }
00154 
00155   if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
00156     {
00157       this->default_consumer_proxy_->disconnect_push_consumer ();
00158 
00159       this->default_consumer_proxy_ =
00160         RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00161     }
00162 }

void TAO_EC_Gateway_IIOP::disconnect_push_consumer ( void   ) 

The channel is disconnecting.

Definition at line 428 of file EC_Gateway_IIOP.cpp.

00429 {
00430   // ACE_DEBUG ((LM_DEBUG,
00431   //             "ECG (%t): Supplier-consumer received "
00432   //            "disconnect from channel.\n"));
00433 }

void TAO_EC_Gateway_IIOP::disconnect_push_supplier ( void   ) 

The channel is disconnecting.

Definition at line 436 of file EC_Gateway_IIOP.cpp.

00437 {
00438   // ACE_DEBUG ((LM_DEBUG,
00439   //            "ECG (%t): Supplier received "
00440   //            "disconnect from channel.\n"));
00441 }

void TAO_EC_Gateway_IIOP::disconnect_supplier_proxy_i ( void   )  [private]

Disconnect the supplier proxy.

Definition at line 165 of file EC_Gateway_IIOP.cpp.

References CORBA::is_nil(), and supplier_proxy_.

Referenced by close_i().

00166 {
00167   if (!CORBA::is_nil (this->supplier_proxy_.in ()))
00168     {
00169       this->supplier_proxy_->disconnect_push_supplier ();
00170 
00171       this->supplier_proxy_ =
00172         RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00173     }
00174 }

int TAO_EC_Gateway_IIOP::init ( RtecEventChannelAdmin::EventChannel_ptr  supplier_ec,
RtecEventChannelAdmin::EventChannel_ptr  consumer_ec 
)

To do its job this class requires to know the local and remote ECs it will connect to.

Returns:
0 in case of success, -1 in case of failure

Definition at line 63 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD_RETURN, init_i(), and TAO_SYNCH_MUTEX.

00065 {
00066   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00067 
00068   return this->init_i (supplier_ec, consumer_ec);
00069 }

int TAO_EC_Gateway_IIOP::init_i ( RtecEventChannelAdmin::EventChannel_ptr  supplier_ec,
RtecEventChannelAdmin::EventChannel_ptr  consumer_ec 
) [protected]

Do the real work in init().

Definition at line 72 of file EC_Gateway_IIOP.cpp.

References ACE_ERROR_RETURN, TAO_ECG_ConsumerEC_Control::activate(), consumer_ec_, TAO_EC_Gateway_IIOP_Factory::create_consumerec_control(), ec_control_, factory_, CORBA::is_nil(), LM_ERROR, and supplier_ec_.

Referenced by TAO_EC_Gateway_Sched::init(), and init().

00074 {
00075   if (CORBA::is_nil (this->supplier_ec_.in ()) && CORBA::is_nil (this->consumer_ec_.in ()))
00076   {
00077     this->supplier_ec_ =
00078       RtecEventChannelAdmin::EventChannel::_duplicate (supplier_ec);
00079     this->consumer_ec_ =
00080       RtecEventChannelAdmin::EventChannel::_duplicate (consumer_ec);
00081 
00082     if (ec_control_ == 0)
00083      {
00084         ec_control_ = factory_->create_consumerec_control(this);
00085         ec_control_->activate();
00086      }
00087 
00088     return 0;
00089   }
00090   else
00091     ACE_ERROR_RETURN ((LM_ERROR,
00092                        "TAO_EC_Gateway_IIOP - init_i "
00093                        "Supplier and consumer event channel reference "
00094                        "should be nil.\n"), -1);
00095 }

CORBA::Boolean TAO_EC_Gateway_IIOP::is_consumer_ec_connected_i ( void   )  const [private]

Helper method to see if consumer ec is connected.

Definition at line 609 of file EC_Gateway_IIOP.cpp.

References CORBA::is_nil().

00610 {
00611   return !CORBA::is_nil (this->consumer_ec_.in ());
00612 }

void TAO_EC_Gateway_IIOP::open_i ( const RtecEventChannelAdmin::ConsumerQOS sub  )  [private]

Create all connections to consumer ec and to supplier ec.

Definition at line 253 of file EC_Gateway_IIOP.cpp.

References ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_UNDEFINED, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::bind(), consumer_, consumer_ec_, consumer_info_, consumer_is_active_, consumer_proxy_map_, default_consumer_proxy_, RtecEventChannelAdmin::ConsumerQOS::dependencies, ACE_Map_Manager< EXT_ID, INT_ID, ACE_LOCK >::end(), RtecEventChannelAdmin::SupplierQOS::is_gateway, RtecEventChannelAdmin::ConsumerQOS::is_gateway, RtecEventChannelAdmin::SupplierQOS::publications, supplier_, supplier_ec_, supplier_is_active_, supplier_proxy_, and RtecBase::TWO_WAY_CALL.

Referenced by update_consumer_i().

00255 {
00256   // = Connect as a supplier to the consumer EC
00257   RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
00258     this->consumer_ec_->for_suppliers ();
00259 
00260   RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
00261   sub.is_gateway = 1;
00262 
00263   // Change the RT_Info in the consumer QoS.
00264   // On the same loop we discover the subscriptions by event source,
00265   // and fill the consumer proxy map if we have to use this map.
00266   for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
00267     {
00268       sub.dependencies[i].rt_info = this->supplier_info_;
00269 
00270       RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
00271       const RtecEventComm::EventHeader &h =
00272         sub.dependencies[i].event.header;
00273 
00274       RtecEventComm::EventSourceID sid = h.source;
00275 
00276       //ACE_DEBUG ((LM_DEBUG,
00277       //            "ECG (%t)    trying (%d,%d)\n",
00278       //           sid, h.type));
00279 
00280       // Skip all subscriptions that do not require an specific source
00281       // id or skip all subscriptions when we don't need to use the consumer
00282       // proxy map.
00283       if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0)
00284         continue;
00285 
00286       // Skip all the magic event types.
00287       if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
00288         continue;
00289 
00290       if (this->consumer_proxy_map_.find (sid, proxy) == -1)
00291         {
00292           //ACE_DEBUG ((LM_DEBUG,
00293           //            "ECG (%t)    binding source %d\n",
00294           //            sid));
00295           proxy = supplier_admin->obtain_push_consumer ();
00296           this->consumer_proxy_map_.bind (sid, proxy);
00297         }
00298     }
00299   //ACE_DEBUG ((LM_DEBUG,
00300   //            "ECG (%t)    consumer map computed (%d entries)\n",
00301   //            this->consumer_proxy_map_.current_size ()));
00302 
00303   if (this->consumer_proxy_map_.current_size () > 0)
00304     {
00305       this->supplier_is_active_ = true;
00306 
00307       // Obtain a reference to our supplier personality...
00308       RtecEventComm::PushSupplier_var supplier_ref = this->supplier_._this ();
00309 
00310       // For each subscription by source build the set of publications
00311       // (they may several, by type, for instance) and connect to the
00312       // consumer proxy.
00313       for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00314            j != this->consumer_proxy_map_.end ();
00315            ++j)
00316         {
00317           RtecEventChannelAdmin::SupplierQOS pub;
00318           pub.publications.length (sub.dependencies.length () + 1);
00319           pub.is_gateway = 1;
00320 
00321           int c = 0;
00322 
00323           RtecEventComm::EventSourceID sid = (*j).ext_id_;
00324           for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
00325             {
00326               const RtecEventComm::EventHeader& h =
00327                 sub.dependencies[k].event.header;
00328               if (h.source != sid
00329                   || (ACE_ES_EVENT_ANY < h.type
00330                       && h.type < ACE_ES_EVENT_UNDEFINED))
00331                 continue;
00332               pub.publications[c].event.header = h;
00333               pub.publications[c].dependency_info.dependency_type =
00334                 RtecBase::TWO_WAY_CALL;
00335               pub.publications[c].dependency_info.number_of_calls = 1;
00336               pub.publications[c].dependency_info.rt_info = this->consumer_info_;
00337               c++;
00338             }
00339           //ACE_DEBUG ((LM_DEBUG,
00340           //            "ECG (%t) supplier id %d has %d elements\n",
00341           //            sid, c));
00342           if (c == 0)
00343             continue;
00344 
00345           pub.publications.length (c);
00346 
00347           // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier "));
00348           // ACE_SupplierQOS_Factory::debug (pub);
00349           (*j).int_id_->connect_push_supplier (supplier_ref.in (),
00350                                                pub);
00351         }
00352     }
00353 
00354   // Also build the subscriptions that are *not* by source when we use the
00355   // consumer proxy map, and all subscriptions when we don't use the map and
00356   // then connect to the default consumer proxy.
00357   RtecEventChannelAdmin::SupplierQOS pub;
00358   pub.publications.length (sub.dependencies.length () + 1);
00359   pub.is_gateway = 1;
00360   int c = 0;
00361   for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
00362     {
00363       const RtecEventComm::EventHeader& h =
00364         sub.dependencies[k].event.header;
00365       RtecEventComm::EventSourceID sid = h.source;
00366 
00367       // Skip all subscriptions with a specific source when we use the map
00368       if (sid != ACE_ES_EVENT_SOURCE_ANY && this->use_consumer_proxy_map_ == 1)
00369         continue;
00370 
00371       // Skip all the magic event types.
00372       if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
00373         continue;
00374 
00375       pub.publications[c].event.header = h;
00376       pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero ();
00377       pub.publications[c].dependency_info.dependency_type =
00378         RtecBase::TWO_WAY_CALL;
00379       pub.publications[c].dependency_info.number_of_calls = 1;
00380       pub.publications[c].dependency_info.rt_info = this->consumer_info_;
00381       c++;
00382     }
00383 
00384   if (c > 0)
00385     {
00386       this->supplier_is_active_ = true;
00387 
00388       // Obtain a reference to our supplier personality...
00389       RtecEventComm::PushSupplier_var supplier_ref =
00390         this->supplier_._this ();
00391 
00392       // Obtain the consumer....
00393       this->default_consumer_proxy_ =
00394         supplier_admin->obtain_push_consumer ();
00395 
00396       pub.publications.length (c);
00397       // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
00398       // ACE_SupplierQOS_Factory::debug (pub);
00399       this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
00400                                                             pub);
00401     }
00402 
00403   RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00404     this->supplier_ec_->for_consumers ();
00405 
00406   this->supplier_proxy_ =
00407     consumer_admin->obtain_push_supplier ();
00408 
00409   this->consumer_is_active_ = true;
00410   RtecEventComm::PushConsumer_var consumer_ref =
00411     this->consumer_._this ();
00412 
00413   // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer "));
00414   // ACE_ConsumerQOS_Factory::debug (sub);
00415 
00416   this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00417                                                 sub);
00418 }

void TAO_EC_Gateway_IIOP::push ( const RtecEventComm::EventSet events  ) 

This is the consumer side behavior, it pushes the events to the local event channel.

Definition at line 444 of file EC_Gateway_IIOP.cpp.

References ACE_ES_EVENT_SOURCE_ANY, ACE_GUARD, busy_count_, cleanup_consumer_proxies_i(), cleanup_posted_, CORBA::is_nil(), push_to_consumer(), TAO_SYNCH_MUTEX, update_consumer_i(), and update_posted_.

00445 {
00446   // ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n"));
00447 
00448   if (events.length () == 0)
00449     {
00450       // ACE_DEBUG ((LM_DEBUG, "no events\n"));
00451       return;
00452     }
00453 
00454   {
00455     ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00456 
00457     this->busy_count_++;
00458   }
00459 
00460   // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ()));
00461 
00462   // @@ TODO, there is an extra data copy here, we should do the event
00463   // modification without it and only compact the necessary events.
00464   RtecEventComm::EventSet out (1);
00465   out.length (1);
00466   for (CORBA::ULong i = 0; i < events.length (); ++i)
00467     {
00468       if (this->use_ttl_ == 1)
00469         {
00470           if (events[i].header.ttl == 0)
00471             continue;
00472         }
00473 
00474       RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
00475       RtecEventComm::EventSourceID sid = events[i].header.source;
00476       if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0
00477           || this->consumer_proxy_map_.find (sid, proxy) == -1)
00478         {
00479           // If the source is not in our map or we should not use the map then
00480           // use the default consumer proxy.
00481           proxy = this->default_consumer_proxy_.in ();
00482         }
00483 
00484       if (CORBA::is_nil (proxy))
00485         continue;
00486 
00487       out[0] = events[i];
00488 
00489       if (this->use_ttl_ == 1)
00490         out[0].header.ttl--;
00491 
00492       // ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n"));
00493       this->push_to_consumer(proxy, out);
00494     }
00495 
00496   {
00497     ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00498 
00499     this->busy_count_--;
00500 
00501     if (this->busy_count_ == 0 && this->cleanup_posted_ != 0)
00502       {
00503         this->cleanup_posted_ = 0;
00504         this->cleanup_consumer_proxies_i ();
00505       }
00506 
00507     if (this->busy_count_ == 0 && this->update_posted_ != 0)
00508       {
00509         this->update_posted_ = 0;
00510         this->update_consumer_i (this->c_qos_);
00511       }
00512   }
00513 }

void TAO_EC_Gateway_IIOP::push_to_consumer ( RtecEventChannelAdmin::ProxyPushConsumer_ptr  consumer,
const RtecEventComm::EventSet event 
) [private]

Push the event to the consumer.

Definition at line 516 of file EC_Gateway_IIOP.cpp.

References ec_control_, TAO_ECG_ConsumerEC_Control::event_channel_not_exist(), and TAO_ECG_ConsumerEC_Control::system_exception().

Referenced by push().

00519 {
00520   try
00521     {
00522       consumer->push (event);
00523     }
00524   catch (const CORBA::OBJECT_NOT_EXIST&)
00525     {
00526       ec_control_->event_channel_not_exist (this);
00527     }
00528   catch (CORBA::SystemException& sysex)
00529     {
00530       ec_control_->system_exception (this,
00531                                      sysex);
00532     }
00533   catch (const CORBA::Exception&)
00534     {
00535       // Shouldn't happen.
00536     }
00537 }

void TAO_EC_Gateway_IIOP::reconnect_consumer_ec ( void   ) 

Definition at line 177 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD, c_qos_, TAO_SYNCH_MUTEX, update_consumer_i(), and update_posted_.

Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::reconnect().

00178 {
00179   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00180 
00181   if (this->busy_count_ != 0)
00182     {
00183       this->update_posted_ = 1;
00184       return;
00185     }
00186 
00187   this->update_consumer_i (c_qos_);
00188 }

void TAO_EC_Gateway_IIOP::resume_supplier_ec ( void   ) 

Resume the connection to the supplier ec.

Definition at line 653 of file EC_Gateway_IIOP.cpp.

References CORBA::is_nil(), supplier_ec_suspended_, and supplier_proxy_.

00654 {
00655   if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 1)
00656     {
00657       this->supplier_proxy_->resume_connection ();
00658 
00659       supplier_ec_suspended_ = 0;
00660     }
00661 }

int TAO_EC_Gateway_IIOP::shutdown ( void   ) 

Disconnect and shutdown the gateway.

Definition at line 540 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD_RETURN, cleanup_consumer_ec_i(), cleanup_supplier_ec_i(), close_i(), consumer_, consumer_is_active_, ec_control_, TAO_ECG_ConsumerEC_Control::shutdown(), supplier_, supplier_is_active_, and TAO_SYNCH_MUTEX.

00541 {
00542   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00543 
00544   ec_control_->shutdown();
00545 
00546   this->close_i ();
00547 
00548   if (this->supplier_is_active_)
00549     {
00550       PortableServer::POA_var poa =
00551         this->supplier_._default_POA ();
00552       PortableServer::ObjectId_var id =
00553         poa->servant_to_id (&this->supplier_);
00554       poa->deactivate_object (id.in ());
00555       this->supplier_is_active_ = false;
00556     }
00557 
00558   if (this->consumer_is_active_)
00559     {
00560       PortableServer::POA_var poa =
00561         this->consumer_._default_POA ();
00562       PortableServer::ObjectId_var id =
00563         poa->servant_to_id (&this->consumer_);
00564       poa->deactivate_object (id.in ());
00565       this->consumer_is_active_ = false;
00566     }
00567 
00568   this->cleanup_consumer_ec_i ();
00569   this->cleanup_supplier_ec_i ();
00570 
00571   return 0;
00572 }

void TAO_EC_Gateway_IIOP::suspend_supplier_ec ( void   ) 

Suspend the connection to the supplier ec.

Definition at line 642 of file EC_Gateway_IIOP.cpp.

References CORBA::is_nil(), supplier_ec_suspended_, and supplier_proxy_.

Referenced by TAO_ECG_Reconnect_ConsumerEC_Control::event_channel_not_exist(), and TAO_ECG_Reconnect_ConsumerEC_Control::system_exception().

00643 {
00644   if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 0)
00645     {
00646       this->supplier_proxy_->suspend_connection ();
00647 
00648       supplier_ec_suspended_ = 1;
00649     }
00650 }

void TAO_EC_Gateway_IIOP::update_consumer ( const RtecEventChannelAdmin::ConsumerQOS sub  )  [virtual]

Definition at line 191 of file EC_Gateway_IIOP.cpp.

References ACE_GUARD, c_qos_, RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_SYNCH_MUTEX, update_consumer_i(), and update_posted_.

00193 {
00194   if (c_qos.dependencies.length () == 0)
00195     return;
00196 
00197   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00198 
00199   this->c_qos_ = c_qos;
00200 
00201   if (this->busy_count_ != 0)
00202     {
00203       this->update_posted_ = 1;
00204       return;
00205     }
00206 
00207   this->update_consumer_i (c_qos);
00208 }

void TAO_EC_Gateway_IIOP::update_consumer_i ( const RtecEventChannelAdmin::ConsumerQOS sub  )  [private]

Definition at line 238 of file EC_Gateway_IIOP.cpp.

References close_i(), CORBA::is_nil(), and open_i().

Referenced by push(), reconnect_consumer_ec(), and update_consumer().

00240 {
00241   this->close_i ();
00242 
00243   if (CORBA::is_nil (this->consumer_ec_.in ())
00244       || CORBA::is_nil (this->supplier_ec_.in ()))
00245     return;
00246 
00247   // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n"));
00248 
00249   this->open_i (c_qos);
00250 }

void TAO_EC_Gateway_IIOP::update_supplier ( const RtecEventChannelAdmin::SupplierQOS pub  )  [virtual]

Definition at line 421 of file EC_Gateway_IIOP.cpp.

00423 {
00424   // Do nothing...
00425 }


Member Data Documentation

CORBA::ULong TAO_EC_Gateway_IIOP::busy_count_ [protected]

How many threads are running push() we cannot make changes until that reaches 0

Definition at line 164 of file EC_Gateway_IIOP.h.

Referenced by push().

RtecEventChannelAdmin::ConsumerQOS TAO_EC_Gateway_IIOP::c_qos_ [protected]

Definition at line 172 of file EC_Gateway_IIOP.h.

Referenced by reconnect_consumer_ec(), and update_consumer().

int TAO_EC_Gateway_IIOP::cleanup_posted_ [protected]

We have a cleanup outstanding and must wait doing cleanup until all pushes are ready.

Definition at line 178 of file EC_Gateway_IIOP.h.

Referenced by cleanup_consumer_proxies(), and push().

ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> TAO_EC_Gateway_IIOP::consumer_ [protected]

Our consumer personality....

Definition at line 198 of file EC_Gateway_IIOP.h.

Referenced by open_i(), and shutdown().

RtecEventChannelAdmin::EventChannel_var TAO_EC_Gateway_IIOP::consumer_ec_ [protected]

The event channel acting as consumer of this gateway.

Definition at line 190 of file EC_Gateway_IIOP.h.

Referenced by cleanup_consumer_ec_i(), init_i(), and open_i().

RtecBase::handle_t TAO_EC_Gateway_IIOP::consumer_info_ [protected]

Our RT_Infos for the event channel that is the consumer.

Definition at line 195 of file EC_Gateway_IIOP.h.

Referenced by TAO_EC_Gateway_Sched::init(), and open_i().

bool TAO_EC_Gateway_IIOP::consumer_is_active_ [protected]

If it is true then we must deactivate the consumer.

Definition at line 201 of file EC_Gateway_IIOP.h.

Referenced by open_i(), and shutdown().

Consumer_Map TAO_EC_Gateway_IIOP::consumer_proxy_map_ [protected]

We talk to the EC (as a supplier) using either an per-supplier proxy or a generic proxy for the type only subscriptions. We push the events to these proxies

Definition at line 216 of file EC_Gateway_IIOP.h.

Referenced by cleanup_consumer_proxies_i(), disconnect_consumer_proxies_i(), and open_i().

RtecEventChannelAdmin::ProxyPushConsumer_var TAO_EC_Gateway_IIOP::default_consumer_proxy_ [protected]

Definition at line 217 of file EC_Gateway_IIOP.h.

Referenced by cleanup_consumer_proxies_i(), disconnect_consumer_proxies_i(), and open_i().

TAO_ECG_ConsumerEC_Control* TAO_EC_Gateway_IIOP::ec_control_ [protected]

The consumer ec control which controls the behaviour in case of a misbehaving consumer ec

Definition at line 225 of file EC_Gateway_IIOP.h.

Referenced by init_i(), push_to_consumer(), shutdown(), and ~TAO_EC_Gateway_IIOP().

TAO_EC_Gateway_IIOP_Factory* TAO_EC_Gateway_IIOP::factory_ [protected]

The Gateway IIOP Factory for all the settings.

Definition at line 228 of file EC_Gateway_IIOP.h.

Referenced by init_i().

TAO_SYNCH_MUTEX TAO_EC_Gateway_IIOP::lock_ [protected]

Lock to synchronize internal changes.

Definition at line 160 of file EC_Gateway_IIOP.h.

ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> TAO_EC_Gateway_IIOP::supplier_ [protected]

Our supplier personality....

Definition at line 204 of file EC_Gateway_IIOP.h.

Referenced by open_i(), and shutdown().

RtecEventChannelAdmin::EventChannel_var TAO_EC_Gateway_IIOP::supplier_ec_ [protected]

The event channel acting as supplier for this gateway so we can reconnect when the list changes.

Definition at line 187 of file EC_Gateway_IIOP.h.

Referenced by cleanup_supplier_ec_i(), init_i(), and open_i().

int TAO_EC_Gateway_IIOP::supplier_ec_suspended_ [protected]

Is the supplier ec suspended?

Definition at line 183 of file EC_Gateway_IIOP.h.

Referenced by resume_supplier_ec(), and suspend_supplier_ec().

RtecBase::handle_t TAO_EC_Gateway_IIOP::supplier_info_ [protected]

Our RT_Infos for the event channel that is the supplier.

Definition at line 193 of file EC_Gateway_IIOP.h.

Referenced by TAO_EC_Gateway_Sched::init().

bool TAO_EC_Gateway_IIOP::supplier_is_active_ [protected]

If it is true then we must deactivate the supplier.

Definition at line 207 of file EC_Gateway_IIOP.h.

Referenced by open_i(), and shutdown().

RtecEventChannelAdmin::ProxyPushSupplier_var TAO_EC_Gateway_IIOP::supplier_proxy_ [protected]

We talk to the EC (as a consumer) using this proxy. We receive the events from these proxy

Definition at line 221 of file EC_Gateway_IIOP.h.

Referenced by disconnect_supplier_proxy_i(), open_i(), resume_supplier_ec(), and suspend_supplier_ec().

int TAO_EC_Gateway_IIOP::update_posted_ [protected]

An update_consumer() message arrived *while* we were doing a push() the modification is stored, if multiple update_consumer messages arrive only the last one is executed.

Definition at line 171 of file EC_Gateway_IIOP.h.

Referenced by push(), reconnect_consumer_ec(), and update_consumer().

int TAO_EC_Gateway_IIOP::use_consumer_proxy_map_ [protected]

The flag for using the consumer proxy map. With 1 the consumer proxy map is used, meaning that for each unique source id we use a different proxy push consumer, if 0, we only use one proxy push consumer (the default) for all source ids.

Definition at line 237 of file EC_Gateway_IIOP.h.

int TAO_EC_Gateway_IIOP::use_ttl_ [protected]

If 1, we use the TTL flags, if 0, we just ignore TTL.

Definition at line 231 of file EC_Gateway_IIOP.h.


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:44:29 2010 for TAO_RTEvent by  doxygen 1.4.7