00001
00002
00003
00004
00005 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00006 #include "orbsvcs/CosEvent/CEC_Dispatching.h"
00007 #include "orbsvcs/CosEvent/CEC_TypedConsumerAdmin.h"
00008 #include "orbsvcs/CosEvent/CEC_TypedSupplierAdmin.h"
00009 #include "orbsvcs/CosEvent/CEC_ConsumerControl.h"
00010 #include "orbsvcs/CosEvent/CEC_SupplierControl.h"
00011 #include "tao/debug.h"
00012 #include "tao/ORB_Core.h"
00013 #include "ace/Dynamic_Service.h"
00014 #include "ace/Reactor.h"
00015
00016 #if ! defined (__ACE_INLINE__)
00017 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.inl"
00018 #endif
00019
00020
00021 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023
00024 TAO_CEC_TypedEventChannel::
00025 TAO_CEC_TypedEventChannel (const TAO_CEC_TypedEventChannel_Attributes& attr,
00026 TAO_CEC_Factory* factory,
00027 int own_factory)
00028 : typed_supplier_poa_ (PortableServer::POA::_duplicate (attr.typed_supplier_poa)),
00029 typed_consumer_poa_ (PortableServer::POA::_duplicate (attr.typed_consumer_poa)),
00030 orb_ (CORBA::ORB::_duplicate (attr.orb)),
00031 interface_repository_ (CORBA::Repository::_duplicate (attr.interface_repository)),
00032 factory_ (factory),
00033 own_factory_ (own_factory),
00034 consumer_reconnect_ (attr.consumer_reconnect),
00035 supplier_reconnect_ (attr.supplier_reconnect),
00036 disconnect_callbacks_ (attr.disconnect_callbacks),
00037 destroy_on_shutdown_ (attr.destroy_on_shutdown),
00038 destroyed_ (0)
00039 {
00040 if (this->factory_ == 0)
00041 {
00042 this->factory_ =
00043 ACE_Dynamic_Service<TAO_CEC_Factory>::instance ("CEC_Factory");
00044 this->own_factory_ = 0;
00045 ACE_ASSERT (this->factory_ != 0);
00046 }
00047
00048 this->dispatching_ =
00049 this->factory_->create_dispatching (this);
00050 this->typed_consumer_admin_ =
00051 this->factory_->create_consumer_admin (this);
00052 this->typed_supplier_admin_ =
00053 this->factory_->create_supplier_admin (this);
00054 this->consumer_control_ =
00055 this->factory_->create_consumer_control (this);
00056 this->supplier_control_ =
00057 this->factory_->create_supplier_control (this);
00058 }
00059
00060
00061 TAO_CEC_TypedEventChannel::~TAO_CEC_TypedEventChannel (void)
00062 {
00063 this->clear_ifr_cache ();
00064 this->interface_description_.close ();
00065
00066 this->factory_->destroy_dispatching (this->dispatching_);
00067 this->dispatching_ = 0;
00068
00069 this->factory_->destroy_consumer_admin (this->typed_consumer_admin_);
00070 this->typed_consumer_admin_ = 0;
00071 this->factory_->destroy_supplier_admin (this->typed_supplier_admin_);
00072 this->typed_supplier_admin_ = 0;
00073
00074 if (this->own_factory_)
00075 delete this->factory_;
00076 }
00077
00078 void
00079 TAO_CEC_TypedEventChannel::activate (void)
00080 {
00081 this->dispatching_->activate ();
00082 this->consumer_control_->activate ();
00083 this->supplier_control_->activate ();
00084 }
00085
00086 namespace
00087 {
00088 struct ShutdownHandler : ACE_Event_Handler
00089 {
00090 ShutdownHandler (CORBA::ORB_ptr orb)
00091 : orb_ (CORBA::ORB::_duplicate (orb)) {}
00092 CORBA::ORB_var orb_;
00093
00094 virtual int handle_timeout (const ACE_Time_Value&, const void*)
00095 {
00096 orb_->shutdown (1);
00097 return 0;
00098 }
00099
00100 };
00101 }
00102
00103 void
00104 TAO_CEC_TypedEventChannel::shutdown (void)
00105 {
00106 this->dispatching_->shutdown ();
00107 this->supplier_control_->shutdown ();
00108 this->consumer_control_->shutdown ();
00109
00110 PortableServer::POA_var typed_consumer_poa =
00111 this->typed_consumer_admin_->_default_POA ();
00112 PortableServer::ObjectId_var typed_consumer_id =
00113 typed_consumer_poa->servant_to_id (this->typed_consumer_admin_);
00114 typed_consumer_poa->deactivate_object (typed_consumer_id.in ());
00115
00116 PortableServer::POA_var typed_supplier_poa =
00117 this->typed_supplier_admin_->_default_POA ();
00118 PortableServer::ObjectId_var typed_supplier_id =
00119 typed_supplier_poa->servant_to_id (this->typed_supplier_admin_);
00120 typed_supplier_poa->deactivate_object (typed_supplier_id.in ());
00121
00122 this->typed_supplier_admin_->shutdown ();
00123
00124 this->typed_consumer_admin_->shutdown ();
00125
00126 if (destroy_on_shutdown_)
00127 {
00128
00129 PortableServer::POA_var t_poa =
00130 this->_default_POA ();
00131
00132 PortableServer::ObjectId_var t_id =
00133 t_poa->servant_to_id (this);
00134
00135 t_poa->deactivate_object (t_id.in ());
00136
00137 ACE_Event_Handler *timer;
00138 ACE_NEW (timer, ShutdownHandler (this->orb_.in ()));
00139 ACE_Reactor *reactor = this->orb_->orb_core ()->reactor ();
00140 reactor->schedule_timer (timer, 0, ACE_Time_Value (1));
00141 }
00142 }
00143
00144 void
00145 TAO_CEC_TypedEventChannel::connected (TAO_CEC_TypedProxyPushConsumer* consumer)
00146 {
00147 this->typed_supplier_admin_->connected (consumer);
00148 }
00149
00150 void
00151 TAO_CEC_TypedEventChannel::reconnected (TAO_CEC_TypedProxyPushConsumer* consumer)
00152 {
00153 this->typed_supplier_admin_->reconnected (consumer);
00154 }
00155
00156 void
00157 TAO_CEC_TypedEventChannel::disconnected (TAO_CEC_TypedProxyPushConsumer* consumer)
00158 {
00159 this->typed_supplier_admin_->disconnected (consumer);
00160 }
00161
00162 void
00163 TAO_CEC_TypedEventChannel::connected (TAO_CEC_ProxyPushSupplier* supplier)
00164 {
00165 this->typed_consumer_admin_->connected (supplier);
00166 }
00167
00168 void
00169 TAO_CEC_TypedEventChannel::reconnected (TAO_CEC_ProxyPushSupplier* supplier)
00170 {
00171 this->typed_consumer_admin_->reconnected (supplier);
00172 }
00173
00174 void
00175 TAO_CEC_TypedEventChannel::disconnected (TAO_CEC_ProxyPushSupplier* supplier)
00176 {
00177 this->typed_consumer_admin_->disconnected (supplier);
00178 }
00179
00180
00181 TAO_CEC_Operation_Params *
00182 TAO_CEC_TypedEventChannel::find_from_ifr_cache (const char *operation)
00183 {
00184 TAO_CEC_Operation_Params *found = 0;
00185
00186 this->interface_description_.find (operation, found);
00187
00188 return found;
00189 }
00190
00191
00192 int
00193 TAO_CEC_TypedEventChannel::insert_into_ifr_cache (const char *operation_,
00194 TAO_CEC_Operation_Params *parameters_)
00195 {
00196
00197
00198 if (operation_ == 0 || parameters_ == 0)
00199 {
00200 errno = EINVAL;
00201 return -1;
00202 };
00203
00204 CORBA::String_var operation = CORBA::string_dup (operation_);
00205
00206 int result = this->interface_description_.bind (operation.in (), parameters_);
00207
00208 if (result == 0)
00209 {
00210
00211 (void) operation._retn ();
00212 }
00213
00214 return result;
00215 }
00216
00217
00218 int
00219 TAO_CEC_TypedEventChannel::clear_ifr_cache (void)
00220 {
00221 for (Iterator i = this->interface_description_.begin ();
00222 i != this->interface_description_.end ();
00223 ++i)
00224 {
00225 if (TAO_debug_level >= 10)
00226 {
00227 ACE_DEBUG ((LM_DEBUG,
00228 ACE_TEXT ("***** Destroying operation %s from ifr cache *****\n"),
00229 const_cast<char *> ((*i).ext_id_)));
00230 }
00231
00232
00233 CORBA::string_free (const_cast<char *> ((*i).ext_id_));
00234
00235
00236 delete ((*i).int_id_);
00237 }
00238
00239 int result = this->interface_description_.unbind_all ();
00240
00241 return result;
00242 }
00243
00244
00245
00246
00247
00248
00249
00250 int
00251 TAO_CEC_TypedEventChannel::cache_interface_description (const char *interface_)
00252 {
00253 try
00254 {
00255
00256 CORBA::Contained_var contained =
00257 this->interface_repository_->lookup_id (interface_);
00258
00259
00260 CORBA::InterfaceDef_var intface =
00261 CORBA::InterfaceDef::_narrow (contained.in ());
00262
00263 if (CORBA::is_nil (intface.in () ))
00264 {
00265 if (TAO_debug_level >= 10)
00266 {
00267 ACE_DEBUG ((LM_DEBUG,
00268 ACE_TEXT ("***** CORBA::InterfaceDef::_narrow failed for interface %s *****\n"),
00269 interface_));
00270 }
00271 return -1;
00272 }
00273 else
00274 {
00275
00276 CORBA::InterfaceDef::FullInterfaceDescription_var fid =
00277 intface->describe_interface ();
00278
00279
00280 this->base_interfaces_ = fid->base_interfaces;
00281 if (TAO_debug_level >= 10)
00282 {
00283 for (CORBA::ULong base=0; base<fid->base_interfaces.length(); base++)
00284 {
00285 ACE_DEBUG ((LM_DEBUG,
00286 ACE_TEXT ("***** Base interface %s found on interface %s *****\n"),
00287 static_cast<char const*>(fid->base_interfaces[base]),
00288 interface_ ));
00289 }
00290 }
00291
00292
00293 for (CORBA::ULong oper=0; oper<fid->operations.length(); oper++)
00294 {
00295 if (TAO_debug_level >= 10)
00296 {
00297 ACE_DEBUG ((LM_DEBUG,
00298 ACE_TEXT ("***** Operation %s found on interface %s, num params %d *****\n"),
00299 fid->operations[oper].name.in(),
00300 interface_,
00301 fid->operations[oper].parameters.length() ));
00302 }
00303
00304
00305 CORBA::ULong num_params = fid->operations[oper].parameters.length();
00306 TAO_CEC_Operation_Params *oper_params = new TAO_CEC_Operation_Params (num_params);
00307
00308 for (CORBA::ULong param=0; param<num_params; param++)
00309 {
00310 oper_params->parameters_[param].name_ = fid->operations[oper].parameters[param].name.in();
00311 oper_params->parameters_[param].type_ = fid->operations[oper].parameters[param].type;
00312 switch (fid->operations[oper].parameters[param].mode)
00313 {
00314 case CORBA::PARAM_IN:
00315 oper_params->parameters_[param].direction_ = CORBA::ARG_IN;
00316 break;
00317 case CORBA::PARAM_OUT:
00318 oper_params->parameters_[param].direction_ = CORBA::ARG_OUT;
00319 break;
00320 case CORBA::PARAM_INOUT:
00321 oper_params->parameters_[param].direction_ = CORBA::ARG_INOUT;
00322 break;
00323 }
00324
00325 if (TAO_debug_level >= 10)
00326 {
00327 ACE_DEBUG ((LM_DEBUG,
00328 ACE_TEXT ("***** Parameter %s found on operation %s *****\n"),
00329 oper_params->parameters_[param].name_.in(),
00330 fid->operations[oper].name.in() ));
00331 }
00332 }
00333
00334 if (TAO_debug_level >= 10)
00335 {
00336 ACE_DEBUG ((LM_DEBUG,
00337 ACE_TEXT ("***** Adding operation %s with %d parameters to the IFR cache *****\n"),
00338 fid->operations[oper].name.in(),
00339 oper_params->num_params_ ));
00340 }
00341
00342 int result = insert_into_ifr_cache (fid->operations[oper].name.in(), oper_params);
00343 if (result != 0)
00344 {
00345 if (TAO_debug_level >= 10)
00346 {
00347 ACE_DEBUG ((LM_DEBUG,
00348 ACE_TEXT ("***** Adding operation to IFR cache failed *****\n")));
00349 }
00350 }
00351 }
00352 }
00353 }
00354 catch (const CORBA::SystemException& sysex)
00355 {
00356 if (TAO_debug_level >= 4)
00357 {
00358 sysex._tao_print_exception (
00359 "during TAO_CEC_TypedEventChannel::cache_interface_description");
00360 }
00361 return -1;
00362 }
00363 catch (const CORBA::Exception& ex)
00364 {
00365 if (TAO_debug_level >= 4)
00366 {
00367 ex._tao_print_exception (
00368 "ex raised during TAO_CEC_TypedEventChannel::cache_interface_description");
00369 }
00370 return -1;
00371 }
00372 return 0;
00373 }
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383 int
00384 TAO_CEC_TypedEventChannel::consumer_register_uses_interace (const char *uses_interface)
00385 {
00386
00387 if (this->uses_interface_.length() > 0)
00388 {
00389
00390 if (this->uses_interface_ == ACE_CString (uses_interface))
00391 {
00392 return 0;
00393 }
00394 else
00395 {
00396 if (TAO_debug_level >= 10)
00397 {
00398 ACE_DEBUG ((LM_DEBUG,
00399 ACE_TEXT ("***** different uses_interface_ already registered *****\n")));
00400 }
00401 return -1;
00402 }
00403 }
00404
00405
00406 if (this->supported_interface_.length() > 0)
00407 {
00408
00409 if (this->supported_interface_ == ACE_CString (uses_interface))
00410 {
00411 this->uses_interface_ = uses_interface;
00412 return 0;
00413 }
00414 else
00415 {
00416 if (TAO_debug_level >= 10)
00417 {
00418 ACE_DEBUG ((LM_DEBUG,
00419 ACE_TEXT ("***** different supported_interface_ already registered *****\n")));
00420 }
00421 return -1;
00422 }
00423 }
00424 else
00425 {
00426
00427 int result = cache_interface_description (uses_interface);
00428
00429 if (result == 0)
00430 {
00431 this->uses_interface_ = uses_interface;
00432 }
00433 return result;
00434 }
00435 }
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445 int
00446 TAO_CEC_TypedEventChannel::supplier_register_supported_interface (const char *supported_interface)
00447 {
00448
00449 if (this->supported_interface_.length() > 0)
00450 {
00451
00452 if (this->supported_interface_ == ACE_CString (supported_interface))
00453 {
00454 return 0;
00455 }
00456 else
00457 {
00458 if (TAO_debug_level >= 10)
00459 {
00460 ACE_DEBUG ((LM_DEBUG,
00461 ACE_TEXT ("***** different supported_interface_ already registered *****\n")));
00462 }
00463 return -1;
00464 }
00465 }
00466
00467
00468 if (this->uses_interface_.length() > 0)
00469 {
00470
00471 if (this->uses_interface_ == ACE_CString (supported_interface))
00472 {
00473 this->supported_interface_ = supported_interface;
00474 return 0;
00475 }
00476 else
00477 {
00478 if (TAO_debug_level >= 10)
00479 {
00480 ACE_DEBUG ((LM_DEBUG,
00481 ACE_TEXT ("***** different uses_interface_ already registered *****\n")));
00482 }
00483 return -1;
00484 }
00485 }
00486 else
00487 {
00488
00489 int result = cache_interface_description (supported_interface);
00490
00491 if (result == 0)
00492 {
00493 this->supported_interface_ = supported_interface;
00494 }
00495 return result;
00496 }
00497 }
00498
00499
00500 void
00501 TAO_CEC_TypedEventChannel::create_operation_list (TAO_CEC_Operation_Params *oper_params,
00502 CORBA::NVList_out new_list)
00503 {
00504 this->orb_->create_list (0, new_list);
00505
00506 for (CORBA::ULong param=0; param<oper_params->num_params_; param++)
00507 {
00508
00509 CORBA::Any any_1;
00510 any_1._tao_set_typecode(oper_params->parameters_[param].type_.in ());
00511
00512 new_list->add_value (oper_params->parameters_[param].name_. in (),
00513 any_1,
00514 oper_params->parameters_[param].direction_);
00515 }
00516 }
00517
00518
00519 void
00520 TAO_CEC_TypedEventChannel::create_list (CORBA::Long count,
00521 CORBA::NVList_out new_list)
00522 {
00523 this->orb_->create_list (count, new_list);
00524 }
00525
00526
00527 CosTypedEventChannelAdmin::TypedConsumerAdmin_ptr
00528 TAO_CEC_TypedEventChannel::for_consumers (void)
00529 {
00530 return this->typed_consumer_admin_->_this ();
00531 }
00532
00533 CosTypedEventChannelAdmin::TypedSupplierAdmin_ptr
00534 TAO_CEC_TypedEventChannel::for_suppliers (void)
00535 {
00536 return this->typed_supplier_admin_->_this ();
00537 }
00538
00539 void
00540 TAO_CEC_TypedEventChannel::destroy (void)
00541 {
00542 if (!destroyed_)
00543 {
00544 destroyed_ = 1;
00545 this->shutdown ();
00546 }
00547 }
00548
00549 CORBA::Policy_ptr
00550 TAO_CEC_TypedEventChannel::create_roundtrip_timeout_policy
00551 (const ACE_Time_Value &timeout)
00552 {
00553 return this->factory_->create_roundtrip_timeout_policy (timeout);
00554 }
00555
00556 TAO_END_VERSIONED_NAMESPACE_DECL