EC_Gateway_IIOP.cpp

Go to the documentation of this file.
00001 // EC_Gateway_IIOP.cpp,v 1.19 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Defaults.h"
00005 #include "orbsvcs/Event_Utilities.h"
00006 #include "orbsvcs/Time_Utilities.h"
00007 
00008 #include "orbsvcs/Event/EC_Gateway_IIOP_Factory.h"
00009 #include "orbsvcs/Event/ECG_ConsumerEC_Control.h"
00010 
00011 #include "ace/Dynamic_Service.h"
00012 
00013 ACE_RCSID (Event,
00014            EC_Gateway_IIOP,
00015            "EC_Gateway_IIOP.cpp,v 1.19 2006/03/14 06:14:25 jtc Exp")
00016 
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018 
00019 TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void)
00020   :  busy_count_ (0),
00021      update_posted_ (0),
00022      cleanup_posted_ (0),
00023      supplier_ec_suspended_ (0),
00024      supplier_info_ (0),
00025      consumer_info_ (0),
00026      consumer_ (this),
00027      consumer_is_active_ (0),
00028      supplier_ (this),
00029      supplier_is_active_ (0),
00030      ec_control_ (0),
00031      factory_ (0),
00032      use_ttl_ (1),
00033      use_consumer_proxy_map_ (1)
00034 {
00035   if (this->factory_ == 0)
00036     {
00037       this->factory_ =
00038              ACE_Dynamic_Service<TAO_EC_Gateway_IIOP_Factory>::instance ("EC_Gateway_IIOP_Factory");
00039 
00040       if (this->factory_ == 0)
00041         {
00042           TAO_EC_Gateway_IIOP_Factory *f = 0;
00043           ACE_NEW (f,
00044                    TAO_EC_Gateway_IIOP_Factory);
00045           this->factory_ = f;
00046         }
00047     }
00048 
00049   if (this->factory_ != 0)
00050     {
00051       this->use_ttl_ = this->factory_->use_ttl();
00052       this->use_consumer_proxy_map_ = this->factory_->use_consumer_proxy_map();
00053     }
00054 }
00055 
00056 TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP (void)
00057 {
00058    delete ec_control_;
00059    ec_control_ = 0;
00060 }
00061 
00062 int
00063 TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
00064                            RtecEventChannelAdmin::EventChannel_ptr consumer_ec
00065                            ACE_ENV_ARG_DECL)
00066 {
00067   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00068 
00069   return this->init_i (supplier_ec, consumer_ec ACE_ENV_ARG_PARAMETER);
00070 }
00071 
00072 int
00073 TAO_EC_Gateway_IIOP::init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
00074                              RtecEventChannelAdmin::EventChannel_ptr consumer_ec
00075                              ACE_ENV_ARG_DECL_NOT_USED)
00076 {
00077   if (CORBA::is_nil (this->supplier_ec_.in ()) && CORBA::is_nil (this->consumer_ec_.in ()))
00078   {
00079     this->supplier_ec_ =
00080       RtecEventChannelAdmin::EventChannel::_duplicate (supplier_ec);
00081     this->consumer_ec_ =
00082       RtecEventChannelAdmin::EventChannel::_duplicate (consumer_ec);
00083 
00084         if (ec_control_ == 0)
00085      {
00086         ec_control_ = factory_->create_consumerec_control(this);
00087         ec_control_->activate();
00088      }
00089 
00090     return 0;
00091   }
00092   else
00093     ACE_ERROR_RETURN ((LM_ERROR,
00094                        "TAO_EC_Gateway_IIOP - init_i "
00095                        "Supplier and consumer event channel reference "
00096                        "should be nil.\n"), -1);
00097 }
00098 
00099 void
00100 TAO_EC_Gateway_IIOP::close (ACE_ENV_SINGLE_ARG_DECL)
00101 {
00102   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00103 
00104   this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
00105 }
00106 
00107 void
00108 TAO_EC_Gateway_IIOP::cleanup_consumer_proxies (ACE_ENV_SINGLE_ARG_DECL)
00109 {
00110   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00111 
00112   // In case we are still pushing, don't cleanup the proxies
00113   if (this->busy_count_ != 0)
00114     {
00115       this->cleanup_posted_ = 1;
00116       return;
00117     }
00118 
00119   this->cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_PARAMETER);
00120 }
00121 
00122 void
00123 TAO_EC_Gateway_IIOP::close_i (ACE_ENV_SINGLE_ARG_DECL)
00124 {
00125   // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
00126   this->disconnect_consumer_proxies_i (ACE_ENV_SINGLE_ARG_PARAMETER);
00127   ACE_CHECK;
00128 
00129   this->disconnect_supplier_proxy_i (ACE_ENV_SINGLE_ARG_PARAMETER);
00130   ACE_CHECK;
00131 }
00132 
00133 void
00134 TAO_EC_Gateway_IIOP::disconnect_consumer_proxies_i (ACE_ENV_SINGLE_ARG_DECL)
00135 {
00136   if (this->consumer_proxy_map_.current_size () > 0)
00137     {
00138       for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00139            j != this->consumer_proxy_map_.end ();
00140            ++j)
00141         {
00142           RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
00143           if (CORBA::is_nil (consumer))
00144             continue;
00145           ACE_TRY
00146             {
00147               consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00148               ACE_TRY_CHECK;
00149             }
00150           ACE_CATCHANY
00151             {
00152             }
00153           ACE_ENDTRY;
00154           CORBA::release (consumer);
00155         }
00156       // Remove all the elements on the map.  Calling close() does not
00157       // work because the map is left in an inconsistent state.
00158       this->consumer_proxy_map_.open ();
00159     }
00160 
00161   if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
00162     {
00163       this->default_consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00164       ACE_CHECK;
00165 
00166       this->default_consumer_proxy_ =
00167         RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00168     }
00169 }
00170 
00171 void
00172 TAO_EC_Gateway_IIOP::disconnect_supplier_proxy_i (ACE_ENV_SINGLE_ARG_DECL)
00173 {
00174   if (!CORBA::is_nil (this->supplier_proxy_.in ()))
00175     {
00176       this->supplier_proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00177       ACE_CHECK;
00178 
00179       this->supplier_proxy_ =
00180         RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00181     }
00182 }
00183 
00184 void
00185 TAO_EC_Gateway_IIOP::reconnect_consumer_ec(ACE_ENV_SINGLE_ARG_DECL)
00186 {
00187   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00188 
00189   if (this->busy_count_ != 0)
00190     {
00191       this->update_posted_ = 1;
00192       return;
00193     }
00194 
00195   this->update_consumer_i (c_qos_ ACE_ENV_ARG_PARAMETER);
00196 }
00197 
00198 void
00199 TAO_EC_Gateway_IIOP::update_consumer (
00200     const RtecEventChannelAdmin::ConsumerQOS& c_qos
00201     ACE_ENV_ARG_DECL)
00202       ACE_THROW_SPEC ((CORBA::SystemException))
00203 {
00204   if (c_qos.dependencies.length () == 0)
00205     return;
00206 
00207   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00208 
00209   this->c_qos_ = c_qos;
00210 
00211   if (this->busy_count_ != 0)
00212     {
00213       this->update_posted_ = 1;
00214       return;
00215     }
00216 
00217   this->update_consumer_i (c_qos ACE_ENV_ARG_PARAMETER);
00218 }
00219 
00220 void
00221 TAO_EC_Gateway_IIOP::cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00222 {
00223   if (this->consumer_proxy_map_.current_size () > 0)
00224     {
00225       for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00226            j != this->consumer_proxy_map_.end ();
00227            ++j)
00228         {
00229           RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
00230           if (CORBA::is_nil (consumer))
00231             continue;
00232 
00233           CORBA::release (consumer);
00234         }
00235       // Remove all the elements on the map.  Calling close() does not
00236       // work because the map is left in an inconsistent state.
00237       this->consumer_proxy_map_.open ();
00238     }
00239 
00240   if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
00241     {
00242       this->default_consumer_proxy_ =
00243         RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00244     }
00245 }
00246 
00247 void
00248 TAO_EC_Gateway_IIOP::update_consumer_i (
00249     const RtecEventChannelAdmin::ConsumerQOS& c_qos
00250     ACE_ENV_ARG_DECL)
00251 {
00252   this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
00253   ACE_CHECK;
00254 
00255   if (CORBA::is_nil (this->consumer_ec_.in ())
00256       || CORBA::is_nil (this->supplier_ec_.in ()))
00257     return;
00258 
00259   // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n"));
00260 
00261   this->open_i (c_qos ACE_ENV_ARG_PARAMETER);
00262   ACE_CHECK;
00263 }
00264 
00265 void
00266 TAO_EC_Gateway_IIOP::open_i (
00267     const RtecEventChannelAdmin::ConsumerQOS& c_qos
00268     ACE_ENV_ARG_DECL)
00269 {
00270   // = Connect as a supplier to the consumer EC
00271   RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
00272     this->consumer_ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
00273   ACE_CHECK;
00274 
00275   RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
00276   sub.is_gateway = 1;
00277 
00278   // Change the RT_Info in the consumer QoS.
00279   // On the same loop we discover the subscriptions by event source,
00280   // and fill the consumer proxy map if we have to use this map.
00281   for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
00282     {
00283       sub.dependencies[i].rt_info = this->supplier_info_;
00284 
00285       RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
00286       const RtecEventComm::EventHeader &h =
00287         sub.dependencies[i].event.header;
00288 
00289       RtecEventComm::EventSourceID sid = h.source;
00290 
00291       //ACE_DEBUG ((LM_DEBUG,
00292       //            "ECG (%t)    trying (%d,%d)\n",
00293       //           sid, h.type));
00294 
00295       // Skip all subscriptions that do not require an specific source
00296       // id or skip all subscriptions when we don't need to use the consumer
00297       // proxy map.
00298       if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0)
00299         continue;
00300 
00301       // Skip all the magic event types.
00302       if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
00303         continue;
00304 
00305       if (this->consumer_proxy_map_.find (sid, proxy) == -1)
00306         {
00307           //ACE_DEBUG ((LM_DEBUG,
00308           //            "ECG (%t)    binding source %d\n",
00309           //            sid));
00310           proxy = supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00311           ACE_CHECK;
00312           this->consumer_proxy_map_.bind (sid, proxy);
00313         }
00314     }
00315   //ACE_DEBUG ((LM_DEBUG,
00316   //            "ECG (%t)    consumer map computed (%d entries)\n",
00317   //            this->consumer_proxy_map_.current_size ()));
00318 
00319   if (this->consumer_proxy_map_.current_size () > 0)
00320     {
00321       this->supplier_is_active_ = 1;
00322 
00323       // Obtain a reference to our supplier personality...
00324       RtecEventComm::PushSupplier_var supplier_ref =
00325         this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
00326       ACE_CHECK;
00327 
00328       // For each subscription by source build the set of publications
00329       // (they may several, by type, for instance) and connect to the
00330       // consumer proxy.
00331       for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00332            j != this->consumer_proxy_map_.end ();
00333            ++j)
00334         {
00335           RtecEventChannelAdmin::SupplierQOS pub;
00336           pub.publications.length (sub.dependencies.length () + 1);
00337           pub.is_gateway = 1;
00338 
00339           int c = 0;
00340 
00341           RtecEventComm::EventSourceID sid = (*j).ext_id_;
00342           for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
00343             {
00344               const RtecEventComm::EventHeader& h =
00345                 sub.dependencies[k].event.header;
00346               if (h.source != sid
00347                   || (ACE_ES_EVENT_ANY < h.type
00348                       && h.type < ACE_ES_EVENT_UNDEFINED))
00349                 continue;
00350               pub.publications[c].event.header = h;
00351               pub.publications[c].dependency_info.dependency_type =
00352                 RtecBase::TWO_WAY_CALL;
00353               pub.publications[c].dependency_info.number_of_calls = 1;
00354               pub.publications[c].dependency_info.rt_info = this->consumer_info_;
00355               c++;
00356             }
00357           //ACE_DEBUG ((LM_DEBUG,
00358           //            "ECG (%t) supplier id %d has %d elements\n",
00359           //            sid, c));
00360           if (c == 0)
00361             continue;
00362 
00363           pub.publications.length (c);
00364 
00365           // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier "));
00366           // ACE_SupplierQOS_Factory::debug (pub);
00367           (*j).int_id_->connect_push_supplier (supplier_ref.in (),
00368                                                pub
00369                                                ACE_ENV_ARG_PARAMETER);
00370           ACE_CHECK;
00371         }
00372     }
00373 
00374   // Also build the subscriptions that are *not* by source when we use the
00375   // consumer proxy map, and all subscriptions when we don't use the map and
00376   // then connect to the default consumer proxy.
00377   RtecEventChannelAdmin::SupplierQOS pub;
00378   pub.publications.length (sub.dependencies.length () + 1);
00379   pub.is_gateway = 1;
00380   int c = 0;
00381   for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
00382     {
00383       const RtecEventComm::EventHeader& h =
00384         sub.dependencies[k].event.header;
00385       RtecEventComm::EventSourceID sid = h.source;
00386 
00387       // Skip all subscriptions with a specific source when we use the map
00388       if (sid != ACE_ES_EVENT_SOURCE_ANY && this->use_consumer_proxy_map_ == 1)
00389         continue;
00390 
00391       // Skip all the magic event types.
00392       if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
00393         continue;
00394 
00395       pub.publications[c].event.header = h;
00396       pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero ();
00397       pub.publications[c].dependency_info.dependency_type =
00398         RtecBase::TWO_WAY_CALL;
00399       pub.publications[c].dependency_info.number_of_calls = 1;
00400       pub.publications[c].dependency_info.rt_info = this->consumer_info_;
00401       c++;
00402     }
00403 
00404   if (c > 0)
00405     {
00406       this->supplier_is_active_ = 1;
00407 
00408       // Obtain a reference to our supplier personality...
00409       RtecEventComm::PushSupplier_var supplier_ref =
00410         this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
00411       ACE_CHECK;
00412 
00413       // Obtain the consumer....
00414       this->default_consumer_proxy_ =
00415         supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00416       ACE_CHECK;
00417 
00418       pub.publications.length (c);
00419       // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
00420       // ACE_SupplierQOS_Factory::debug (pub);
00421       this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
00422                                                             pub
00423                                                             ACE_ENV_ARG_PARAMETER);
00424       ACE_CHECK;
00425     }
00426 
00427   RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00428     this->supplier_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
00429   ACE_CHECK;
00430 
00431   this->supplier_proxy_ =
00432     consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00433   ACE_CHECK;
00434 
00435   this->consumer_is_active_ = 1;
00436   RtecEventComm::PushConsumer_var consumer_ref =
00437     this->consumer_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
00438   ACE_CHECK;
00439 
00440   // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer "));
00441   // ACE_ConsumerQOS_Factory::debug (sub);
00442 
00443   this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00444                                                 sub
00445                                                 ACE_ENV_ARG_PARAMETER);
00446   ACE_CHECK;
00447 }
00448 
00449 void
00450 TAO_EC_Gateway_IIOP::update_supplier (
00451     const RtecEventChannelAdmin::SupplierQOS&
00452     ACE_ENV_ARG_DECL_NOT_USED)
00453       ACE_THROW_SPEC ((CORBA::SystemException))
00454 {
00455   // Do nothing...
00456 }
00457 
00458 void
00459 TAO_EC_Gateway_IIOP::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00460 {
00461   // ACE_DEBUG ((LM_DEBUG,
00462   //             "ECG (%t): Supplier-consumer received "
00463   //            "disconnect from channel.\n"));
00464 }
00465 
00466 void
00467 TAO_EC_Gateway_IIOP::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00468 {
00469   // ACE_DEBUG ((LM_DEBUG,
00470   //            "ECG (%t): Supplier received "
00471   //            "disconnect from channel.\n"));
00472 }
00473 
00474 void
00475 TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events
00476                            ACE_ENV_ARG_DECL)
00477 {
00478   // ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n"));
00479 
00480   if (events.length () == 0)
00481     {
00482       // ACE_DEBUG ((LM_DEBUG, "no events\n"));
00483       return;
00484     }
00485 
00486   {
00487     ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00488 
00489     this->busy_count_++;
00490   }
00491 
00492   // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ()));
00493 
00494   // @@ TODO, there is an extra data copy here, we should do the event
00495   // modification without it and only compact the necessary events.
00496   RtecEventComm::EventSet out (1);
00497   out.length (1);
00498   for (CORBA::ULong i = 0; i < events.length (); ++i)
00499     {
00500       if (this->use_ttl_ == 1)
00501         {
00502           if (events[i].header.ttl == 0)
00503             continue;
00504         }
00505 
00506       RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
00507       RtecEventComm::EventSourceID sid = events[i].header.source;
00508       if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0
00509           || this->consumer_proxy_map_.find (sid, proxy) == -1)
00510         {
00511           // If the source is not in our map or we should not use the map then
00512           // use the default consumer proxy.
00513           proxy = this->default_consumer_proxy_.in ();
00514         }
00515 
00516       if (CORBA::is_nil (proxy))
00517         continue;
00518 
00519       out[0] = events[i];
00520 
00521       if (this->use_ttl_ == 1)
00522         out[0].header.ttl--;
00523 
00524       // ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n"));
00525       this->push_to_consumer(proxy, out ACE_ENV_ARG_PARAMETER);
00526       ACE_CHECK;
00527     }
00528 
00529   {
00530     ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00531 
00532     this->busy_count_--;
00533 
00534     if (this->busy_count_ == 0 && this->cleanup_posted_ != 0)
00535       {
00536         this->cleanup_posted_ = 0;
00537         this->cleanup_consumer_proxies_i (ACE_ENV_SINGLE_ARG_PARAMETER);
00538         ACE_CHECK;
00539       }
00540 
00541     if (this->busy_count_ == 0 && this->update_posted_ != 0)
00542       {
00543         this->update_posted_ = 0;
00544         this->update_consumer_i (this->c_qos_ ACE_ENV_ARG_PARAMETER);
00545         ACE_CHECK;
00546       }
00547   }
00548 }
00549 
00550 void
00551 TAO_EC_Gateway_IIOP::push_to_consumer (
00552     RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
00553     const RtecEventComm::EventSet& event
00554     ACE_ENV_ARG_DECL)
00555 {
00556   ACE_TRY
00557     {
00558       consumer->push (event ACE_ENV_ARG_PARAMETER);
00559       ACE_TRY_CHECK;
00560     }
00561   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used)
00562     {
00563       ec_control_->event_channel_not_exist (this ACE_ENV_ARG_PARAMETER);
00564       ACE_TRY_CHECK;
00565     }
00566   ACE_CATCH (CORBA::SystemException, sysex)
00567     {
00568       ec_control_->system_exception (this,
00569                                      sysex
00570                                      ACE_ENV_ARG_PARAMETER);
00571       ACE_TRY_CHECK;
00572     }
00573   ACE_CATCHANY
00574     {
00575       // Shouldn't happen.
00576     }
00577   ACE_ENDTRY;
00578 }
00579 
00580 int
00581 TAO_EC_Gateway_IIOP::shutdown (ACE_ENV_SINGLE_ARG_DECL)
00582 {
00583   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00584 
00585   ec_control_->shutdown();
00586 
00587   this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
00588   ACE_CHECK_RETURN (-1);
00589 
00590   if (this->supplier_is_active_)
00591     {
00592       PortableServer::POA_var poa =
00593         this->supplier_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00594       ACE_CHECK_RETURN (-1);
00595       PortableServer::ObjectId_var id =
00596         poa->servant_to_id (&this->supplier_ ACE_ENV_ARG_PARAMETER);
00597       ACE_CHECK_RETURN (-1);
00598       poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
00599       ACE_CHECK_RETURN (-1);
00600       this->supplier_is_active_ = 0;
00601     }
00602 
00603   if (this->consumer_is_active_)
00604     {
00605       PortableServer::POA_var poa =
00606         this->consumer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00607       ACE_CHECK_RETURN (-1);
00608       PortableServer::ObjectId_var id =
00609         poa->servant_to_id (&this->consumer_ ACE_ENV_ARG_PARAMETER);
00610       ACE_CHECK_RETURN (-1);
00611       poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
00612       ACE_CHECK_RETURN (-1);
00613       this->consumer_is_active_ = 0;
00614     }
00615 
00616   this->cleanup_consumer_ec_i ();
00617   this->cleanup_supplier_ec_i ();
00618 
00619   return 0;
00620 }
00621 
00622 int
00623 TAO_EC_Gateway_IIOP::cleanup_consumer_ec (void)
00624 {
00625   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00626 
00627   this->cleanup_consumer_ec_i ();
00628 
00629   return 0;
00630 }
00631 
00632 int
00633 TAO_EC_Gateway_IIOP::cleanup_supplier_ec (void)
00634 {
00635   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00636 
00637   this->cleanup_supplier_ec_i ();
00638 
00639   return 0;
00640 }
00641 
00642 void
00643 TAO_EC_Gateway_IIOP::cleanup_consumer_ec_i (void)
00644 {
00645   this->consumer_ec_ =
00646     RtecEventChannelAdmin::EventChannel::_nil ();
00647 }
00648 
00649 void
00650 TAO_EC_Gateway_IIOP::cleanup_supplier_ec_i (void)
00651 {
00652   this->supplier_ec_ =
00653     RtecEventChannelAdmin::EventChannel::_nil ();
00654 }
00655 
00656 CORBA::Boolean
00657 TAO_EC_Gateway_IIOP::is_consumer_ec_connected_i (void) const
00658 {
00659   return !CORBA::is_nil (this->consumer_ec_.in ());
00660 }
00661 
00662 CORBA::Boolean
00663 TAO_EC_Gateway_IIOP::consumer_ec_non_existent (
00664       CORBA::Boolean_out disconnected
00665       ACE_ENV_ARG_DECL)
00666 {
00667   CORBA::Object_var consumer_ec;
00668   {
00669     ACE_GUARD_THROW_EX (
00670         TAO_SYNCH_MUTEX, ace_mon, this->lock_,
00671         CORBA::INTERNAL ());
00672     ACE_CHECK_RETURN (0);
00673 
00674     disconnected = 0;
00675     if (this->is_consumer_ec_connected_i () == 0)
00676       {
00677         disconnected = 1;
00678         return 0;
00679       }
00680 
00681     consumer_ec = CORBA::Object::_duplicate (this->consumer_ec_.in ());
00682   }
00683 
00684 #if (TAO_HAS_MINIMUM_CORBA == 0)
00685   return consumer_ec->_non_existent (ACE_ENV_SINGLE_ARG_PARAMETER);
00686 #else
00687   return 0;
00688 #endif /* TAO_HAS_MINIMUM_CORBA */
00689 }
00690 
00691 void
00692 TAO_EC_Gateway_IIOP::suspend_supplier_ec (ACE_ENV_SINGLE_ARG_DECL)
00693 {
00694   if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 0)
00695     {
00696       this->supplier_proxy_->suspend_connection (ACE_ENV_SINGLE_ARG_PARAMETER);
00697       ACE_CHECK;
00698 
00699       supplier_ec_suspended_ = 1;
00700     }
00701 }
00702 
00703 void
00704 TAO_EC_Gateway_IIOP::resume_supplier_ec (ACE_ENV_SINGLE_ARG_DECL)
00705 {
00706   if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 1)
00707     {
00708       this->supplier_proxy_->resume_connection (ACE_ENV_SINGLE_ARG_PARAMETER);
00709       ACE_CHECK;
00710 
00711       supplier_ec_suspended_ = 0;
00712     }
00713 }
00714 
00715 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:07 2006 for TAO_RTEvent by doxygen 1.3.6