CEC_TypedEventChannel.cpp

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 //
00003 // $Id: CEC_TypedEventChannel.cpp 81229 2008-04-03 15:12:37Z sma $
00004 
00005 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00006 #include "orbsvcs/CosEvent/CEC_Dispatching.h"
00007 #include "orbsvcs/CosEvent/CEC_TypedConsumerAdmin.h"
00008 #include "orbsvcs/CosEvent/CEC_TypedSupplierAdmin.h"
00009 #include "orbsvcs/CosEvent/CEC_ConsumerControl.h"
00010 #include "orbsvcs/CosEvent/CEC_SupplierControl.h"
00011 #include "tao/debug.h"
00012 #include "tao/ORB_Core.h"
00013 #include "ace/Dynamic_Service.h"
00014 #include "ace/Reactor.h"
00015 
00016 #if ! defined (__ACE_INLINE__)
00017 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.inl"
00018 #endif /* __ACE_INLINE__ */
00019 
00020 
00021 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 // Implementation skeleton constructor
00024 TAO_CEC_TypedEventChannel::
00025 TAO_CEC_TypedEventChannel (const TAO_CEC_TypedEventChannel_Attributes& attr,
00026                            TAO_CEC_Factory* factory,
00027                            int own_factory)
00028   : typed_supplier_poa_ (PortableServer::POA::_duplicate (attr.typed_supplier_poa)),
00029     typed_consumer_poa_ (PortableServer::POA::_duplicate (attr.typed_consumer_poa)),
00030     orb_ (CORBA::ORB::_duplicate (attr.orb)),
00031     interface_repository_ (CORBA::Repository::_duplicate (attr.interface_repository)),
00032     factory_ (factory),
00033     own_factory_ (own_factory),
00034     consumer_reconnect_ (attr.consumer_reconnect),
00035     supplier_reconnect_ (attr.supplier_reconnect),
00036     disconnect_callbacks_ (attr.disconnect_callbacks),
00037     destroy_on_shutdown_ (attr.destroy_on_shutdown),
00038     destroyed_ (0)
00039 {
00040   if (this->factory_ == 0)
00041     {
00042       this->factory_ =
00043         ACE_Dynamic_Service<TAO_CEC_Factory>::instance ("CEC_Factory");
00044       this->own_factory_ = 0;
00045       ACE_ASSERT (this->factory_ != 0);
00046     }
00047 
00048   this->dispatching_ =
00049     this->factory_->create_dispatching (this);
00050   this->typed_consumer_admin_ =
00051     this->factory_->create_consumer_admin (this);
00052   this->typed_supplier_admin_ =
00053     this->factory_->create_supplier_admin (this);
00054   this->consumer_control_ =
00055     this->factory_->create_consumer_control (this);
00056   this->supplier_control_ =
00057     this->factory_->create_supplier_control (this);
00058 }
00059 
00060 // Implementation skeleton destructor
00061 TAO_CEC_TypedEventChannel::~TAO_CEC_TypedEventChannel (void)
00062 {
00063   this->clear_ifr_cache ();
00064   this->interface_description_.close ();
00065 
00066   this->factory_->destroy_dispatching (this->dispatching_);
00067   this->dispatching_ = 0;
00068 
00069   this->factory_->destroy_consumer_admin (this->typed_consumer_admin_);
00070   this->typed_consumer_admin_ = 0;
00071   this->factory_->destroy_supplier_admin (this->typed_supplier_admin_);
00072   this->typed_supplier_admin_ = 0;
00073 
00074   if (this->own_factory_)
00075     delete this->factory_;
00076 }
00077 
00078 void
00079 TAO_CEC_TypedEventChannel::activate (void)
00080 {
00081   this->dispatching_->activate ();
00082   this->consumer_control_->activate ();
00083   this->supplier_control_->activate ();
00084 }
00085 
00086 namespace
00087 {
00088   struct ShutdownHandler : ACE_Event_Handler
00089   {
00090     ShutdownHandler (CORBA::ORB_ptr orb)
00091       : orb_ (CORBA::ORB::_duplicate (orb)) {}
00092     CORBA::ORB_var orb_;
00093 
00094     virtual int handle_timeout (const ACE_Time_Value&, const void*)
00095     {
00096       orb_->shutdown (1);
00097       return 0;
00098     }
00099 
00100   };
00101 }
00102 
00103 void
00104 TAO_CEC_TypedEventChannel::shutdown (void)
00105 {
00106   this->dispatching_->shutdown ();
00107   this->supplier_control_->shutdown ();
00108   this->consumer_control_->shutdown ();
00109 
00110   PortableServer::POA_var typed_consumer_poa =
00111     this->typed_consumer_admin_->_default_POA ();
00112   PortableServer::ObjectId_var typed_consumer_id =
00113     typed_consumer_poa->servant_to_id (this->typed_consumer_admin_);
00114   typed_consumer_poa->deactivate_object (typed_consumer_id.in ());
00115 
00116   PortableServer::POA_var typed_supplier_poa =
00117     this->typed_supplier_admin_->_default_POA ();
00118   PortableServer::ObjectId_var typed_supplier_id =
00119     typed_supplier_poa->servant_to_id (this->typed_supplier_admin_);
00120   typed_supplier_poa->deactivate_object (typed_supplier_id.in ());
00121 
00122   this->typed_supplier_admin_->shutdown ();
00123 
00124   this->typed_consumer_admin_->shutdown ();
00125 
00126   if (destroy_on_shutdown_)
00127     {
00128       // Deactivate the Typed EC
00129       PortableServer::POA_var t_poa =
00130         this->_default_POA ();
00131 
00132       PortableServer::ObjectId_var t_id =
00133         t_poa->servant_to_id (this);
00134 
00135       t_poa->deactivate_object (t_id.in ());
00136 
00137       ACE_Event_Handler *timer;
00138       ACE_NEW (timer, ShutdownHandler (this->orb_.in ()));
00139       ACE_Reactor *reactor = this->orb_->orb_core ()->reactor ();
00140       reactor->schedule_timer (timer, 0, ACE_Time_Value (1));
00141     }
00142 }
00143 
00144 void
00145 TAO_CEC_TypedEventChannel::connected (TAO_CEC_TypedProxyPushConsumer* consumer)
00146 {
00147   this->typed_supplier_admin_->connected (consumer);
00148 }
00149 
00150 void
00151 TAO_CEC_TypedEventChannel::reconnected (TAO_CEC_TypedProxyPushConsumer* consumer)
00152 {
00153   this->typed_supplier_admin_->reconnected (consumer);
00154 }
00155 
00156 void
00157 TAO_CEC_TypedEventChannel::disconnected (TAO_CEC_TypedProxyPushConsumer* consumer)
00158 {
00159   this->typed_supplier_admin_->disconnected (consumer);
00160 }
00161 
00162 void
00163 TAO_CEC_TypedEventChannel::connected (TAO_CEC_ProxyPushSupplier* supplier)
00164 {
00165   this->typed_consumer_admin_->connected (supplier);
00166 }
00167 
00168 void
00169 TAO_CEC_TypedEventChannel::reconnected (TAO_CEC_ProxyPushSupplier* supplier)
00170 {
00171   this->typed_consumer_admin_->reconnected (supplier);
00172 }
00173 
00174 void
00175 TAO_CEC_TypedEventChannel::disconnected (TAO_CEC_ProxyPushSupplier* supplier)
00176 {
00177   this->typed_consumer_admin_->disconnected (supplier);
00178 }
00179 
00180 // Find from the ifr cache the operation and return the parameter array pointer.
00181 TAO_CEC_Operation_Params *
00182 TAO_CEC_TypedEventChannel::find_from_ifr_cache (const char *operation)
00183 {
00184   TAO_CEC_Operation_Params *found = 0;
00185 
00186   this->interface_description_.find (operation, found);
00187 
00188   return found;
00189 }
00190 
00191 // Insert the operation and its parameters into the ifr cache.
00192 int
00193 TAO_CEC_TypedEventChannel::insert_into_ifr_cache (const char *operation_,
00194                                                   TAO_CEC_Operation_Params *parameters_)
00195 {
00196   // Make sure that the supplied Object reference is valid,
00197   // i.e. not nil.
00198   if (operation_ == 0 || parameters_ == 0)
00199     {
00200       errno = EINVAL;
00201       return -1;
00202     };
00203 
00204   CORBA::String_var operation = CORBA::string_dup (operation_);
00205 
00206   int result = this->interface_description_.bind (operation.in (), parameters_);
00207 
00208   if (result == 0)
00209     {
00210       // Transfer ownership to the Object InterfaceDescription map.
00211       (void) operation._retn ();
00212     }
00213 
00214   return result;
00215 }
00216 
00217 // Clear the ifr cache, freeing up all its contents.
00218 int
00219 TAO_CEC_TypedEventChannel::clear_ifr_cache (void)
00220 {
00221   for (Iterator i = this->interface_description_.begin ();
00222        i != this->interface_description_.end ();
00223        ++i)
00224     {
00225       if (TAO_debug_level >= 10)
00226         {
00227           ACE_DEBUG ((LM_DEBUG,
00228                       ACE_TEXT ("***** Destroying operation %s from ifr cache *****\n"),
00229                       const_cast<char *> ((*i).ext_id_)));
00230         }
00231 
00232       // Deallocate the operation
00233       CORBA::string_free (const_cast<char *> ((*i).ext_id_));
00234 
00235       // Destroy the parameter
00236       delete ((*i).int_id_);
00237     }
00238 
00239   int result = this->interface_description_.unbind_all ();
00240 
00241   return result;
00242 }
00243 
00244 
00245 // The function performs a lookup_id of the passed interface in the IFR,
00246 // and then obtains the FullInterfaceDescription.
00247 // The base interfaces for the interface are stored on this class.
00248 // All the operations and their parameters are then inserted in the ifr cache.
00249 // Function returns 0 if successful or -1 on a failure.
00250 int
00251 TAO_CEC_TypedEventChannel::cache_interface_description (const char *interface_)
00252 {
00253   try
00254     {
00255       // Lookup the Interface Name in the IFR
00256       CORBA::Contained_var contained =
00257         this->interface_repository_->lookup_id (interface_);
00258 
00259       // Narrow the interface
00260       CORBA::InterfaceDef_var intface =
00261         CORBA::InterfaceDef::_narrow (contained.in ());
00262 
00263       if (CORBA::is_nil (intface.in () ))
00264         {
00265           if (TAO_debug_level >= 10)
00266             {
00267               ACE_DEBUG ((LM_DEBUG,
00268                           ACE_TEXT ("***** CORBA::InterfaceDef::_narrow failed for interface %s *****\n"),
00269                           interface_));
00270             }
00271           return -1;
00272         }
00273       else
00274         {
00275           // Obtain the full interface description
00276           CORBA::InterfaceDef::FullInterfaceDescription_var fid =
00277             intface->describe_interface ();
00278 
00279           // Obtain the base interfaces
00280           this->base_interfaces_ = fid->base_interfaces;
00281           if (TAO_debug_level >= 10)
00282             {
00283               for (CORBA::ULong base=0; base<fid->base_interfaces.length(); base++)
00284                 {
00285                   ACE_DEBUG ((LM_DEBUG,
00286                               ACE_TEXT ("***** Base interface %s found on interface %s *****\n"),
00287                               static_cast<char const*>(fid->base_interfaces[base]),
00288                               interface_ ));
00289                 }
00290             }
00291 
00292           // Obtain the operations
00293           for (CORBA::ULong oper=0; oper<fid->operations.length(); oper++)
00294             {
00295               if (TAO_debug_level >= 10)
00296                 {
00297                   ACE_DEBUG ((LM_DEBUG,
00298                               ACE_TEXT ("***** Operation %s found on interface %s, num params %d *****\n"),
00299                               fid->operations[oper].name.in(),
00300                               interface_,
00301                               fid->operations[oper].parameters.length() ));
00302                 }
00303 
00304               // Obtain the parameters
00305               CORBA::ULong num_params = fid->operations[oper].parameters.length();
00306               TAO_CEC_Operation_Params *oper_params = new TAO_CEC_Operation_Params (num_params);
00307 
00308               for (CORBA::ULong param=0; param<num_params; param++)
00309                 {
00310                   oper_params->parameters_[param].name_ = fid->operations[oper].parameters[param].name.in();
00311                   oper_params->parameters_[param].type_ = fid->operations[oper].parameters[param].type;
00312                   switch (fid->operations[oper].parameters[param].mode)
00313                     {
00314                     case CORBA::PARAM_IN:
00315                       oper_params->parameters_[param].direction_ = CORBA::ARG_IN;
00316                       break;
00317                     case CORBA::PARAM_OUT:
00318                       oper_params->parameters_[param].direction_ = CORBA::ARG_OUT;
00319                       break;
00320                     case CORBA::PARAM_INOUT:
00321                       oper_params->parameters_[param].direction_ = CORBA::ARG_INOUT;
00322                       break;
00323                     }
00324 
00325                   if (TAO_debug_level >= 10)
00326                     {
00327                       ACE_DEBUG ((LM_DEBUG,
00328                                   ACE_TEXT ("***** Parameter %s found on operation %s *****\n"),
00329                                   oper_params->parameters_[param].name_.in(),
00330                                   fid->operations[oper].name.in() ));
00331                     }
00332                 }
00333 
00334               if (TAO_debug_level >= 10)
00335                 {
00336                   ACE_DEBUG ((LM_DEBUG,
00337                               ACE_TEXT ("***** Adding operation %s with %d parameters to the IFR cache *****\n"),
00338                               fid->operations[oper].name.in(),
00339                               oper_params->num_params_ ));
00340                 }
00341 
00342               int result = insert_into_ifr_cache (fid->operations[oper].name.in(), oper_params);
00343               if (result != 0)
00344                 {
00345                   if (TAO_debug_level >= 10)
00346                     {
00347                       ACE_DEBUG ((LM_DEBUG,
00348                                   ACE_TEXT ("***** Adding operation to IFR cache failed *****\n")));
00349                     }
00350                 }
00351             }
00352         }
00353     }
00354   catch (const CORBA::SystemException& sysex)
00355     {
00356       if (TAO_debug_level >= 4)
00357         {
00358           sysex._tao_print_exception (
00359             "during TAO_CEC_TypedEventChannel::cache_interface_description");
00360         }
00361       return -1;
00362     }
00363   catch (const CORBA::Exception& ex)
00364     {
00365       if (TAO_debug_level >= 4)
00366         {
00367           ex._tao_print_exception (
00368             "ex raised during TAO_CEC_TypedEventChannel::cache_interface_description");
00369         }
00370       return -1;
00371     }
00372   return 0;
00373 }
00374 
00375 // A consumer is attempting to register its uses_interface.
00376 // Note only a single interface can be registered with this version of the EC.
00377 // For users that require more than one interface, start another EC.
00378 // If the passed uses_interface is the same as a registered interface the function returns 0.
00379 // If an attempt is made to register a second interface, this function will return -1
00380 // and the TypedConsumerAdmin will throw CosTypedEventChannelAdmin::NoSuchImplementation.
00381 // If neither a consumer nor a supplier has registered an interface,
00382 // the function calls cache_interface_description and returns 0 if successful.
00383 int
00384 TAO_CEC_TypedEventChannel::consumer_register_uses_interace (const char *uses_interface)
00385 {
00386   // Check if a consumer has already registered an interface with the typed EC
00387   if (this->uses_interface_.length() > 0)
00388     {
00389       // Check if the registered uses_interface_ == the new uses_interface
00390       if (this->uses_interface_ == ACE_CString (uses_interface))
00391         {
00392           return 0;
00393         }
00394       else
00395         {
00396           if (TAO_debug_level >= 10)
00397             {
00398               ACE_DEBUG ((LM_DEBUG,
00399                           ACE_TEXT ("***** different uses_interface_ already registered *****\n")));
00400             }
00401           return -1;
00402         }
00403     }
00404 
00405   // Check if a supplier has already registered an inerface with the typed EC
00406   if (this->supported_interface_.length() > 0)
00407     {
00408       // Check if the registered supported_interface_ == the new uses_interface
00409       if (this->supported_interface_ == ACE_CString (uses_interface))
00410         {
00411           this->uses_interface_ = uses_interface;
00412           return 0;
00413         }
00414       else
00415         {
00416           if (TAO_debug_level >= 10)
00417             {
00418               ACE_DEBUG ((LM_DEBUG,
00419                           ACE_TEXT ("***** different supported_interface_ already registered *****\n")));
00420             }
00421           return -1;
00422         }
00423     }
00424   else
00425     {
00426       // Neither a consumer nor a supplier has connected yet
00427       int result = cache_interface_description (uses_interface);
00428 
00429       if (result == 0)
00430         {
00431           this->uses_interface_ = uses_interface;
00432         }
00433       return result;
00434     }
00435 }
00436 
00437 // A supplier is attempting to register its supported_interface.
00438 // Note only a single interface can be registered with this version of the EC.
00439 // For users that require more than one interface, start another EC.
00440 // If the passed supported_interface is the same as a registered interface the function returns 0.
00441 // If an attempt is made to register a second interface, this function will return -1
00442 // and the TypedSupplierAdmin will throw CosTypedEventChannelAdmin::InterfaceNotSupported.
00443 // If neither a consumer nor a supplier has registered an interface,
00444 // the function calls cache_interface_description and returns 0 if successful.
00445 int
00446 TAO_CEC_TypedEventChannel::supplier_register_supported_interface (const char *supported_interface)
00447 {
00448   // Check if a supplier has already registered an interface with the typed EC
00449   if (this->supported_interface_.length() > 0)
00450     {
00451       // Check if the registered interface == the new supported_interface
00452       if (this->supported_interface_ == ACE_CString (supported_interface))
00453         {
00454           return 0;
00455         }
00456       else
00457         {
00458           if (TAO_debug_level >= 10)
00459             {
00460               ACE_DEBUG ((LM_DEBUG,
00461                           ACE_TEXT ("***** different supported_interface_ already registered *****\n")));
00462             }
00463           return -1;
00464         }
00465     }
00466 
00467   // Check if a consumer has already registered an inerface with the typed EC
00468   if (this->uses_interface_.length() > 0)
00469     {
00470       // Check if the registered uses_interface_ == the new supported_interface
00471       if (this->uses_interface_ == ACE_CString (supported_interface))
00472         {
00473           this->supported_interface_ = supported_interface;
00474           return 0;
00475         }
00476       else
00477         {
00478           if (TAO_debug_level >= 10)
00479             {
00480               ACE_DEBUG ((LM_DEBUG,
00481                           ACE_TEXT ("***** different uses_interface_ already registered *****\n")));
00482             }
00483           return -1;
00484         }
00485     }
00486   else
00487     {
00488       // Neither a consumer nor a supplier has connected yet
00489       int result = cache_interface_description (supported_interface);
00490 
00491       if (result == 0)
00492         {
00493           this->supported_interface_ = supported_interface;
00494         }
00495       return result;
00496     }
00497 }
00498 
00499 // Function creates a NVList and populates it from the parameter information.
00500 void
00501 TAO_CEC_TypedEventChannel::create_operation_list (TAO_CEC_Operation_Params *oper_params,
00502                                                   CORBA::NVList_out new_list)
00503 {
00504   this->orb_->create_list (0, new_list);
00505 
00506   for (CORBA::ULong param=0; param<oper_params->num_params_; param++)
00507     {
00508 
00509       CORBA::Any any_1;
00510       any_1._tao_set_typecode(oper_params->parameters_[param].type_.in ());
00511 
00512       new_list->add_value (oper_params->parameters_[param].name_. in (),
00513                            any_1,
00514                            oper_params->parameters_[param].direction_);
00515     }
00516 }
00517 
00518 // Function creates an empty NVList.
00519 void
00520 TAO_CEC_TypedEventChannel::create_list (CORBA::Long count,
00521                                         CORBA::NVList_out new_list)
00522 {
00523   this->orb_->create_list (count, new_list);
00524 }
00525 
00526 // The CosTypedEventChannelAdmin::TypedEventChannel methods...
00527 CosTypedEventChannelAdmin::TypedConsumerAdmin_ptr
00528 TAO_CEC_TypedEventChannel::for_consumers (void)
00529 {
00530   return this->typed_consumer_admin_->_this ();
00531 }
00532 
00533 CosTypedEventChannelAdmin::TypedSupplierAdmin_ptr
00534 TAO_CEC_TypedEventChannel::for_suppliers (void)
00535 {
00536   return this->typed_supplier_admin_->_this ();
00537 }
00538 
00539 void
00540 TAO_CEC_TypedEventChannel::destroy (void)
00541 {
00542   if (!destroyed_)
00543     {
00544       destroyed_ = 1;
00545       this->shutdown ();
00546     }
00547 }
00548 
00549 CORBA::Policy_ptr
00550 TAO_CEC_TypedEventChannel::create_roundtrip_timeout_policy
00551 (const ACE_Time_Value &timeout)
00552 {
00553   return this->factory_->create_roundtrip_timeout_policy (timeout);
00554 }
00555 
00556 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:51 2010 for TAO_CosEvent by  doxygen 1.4.7