00001
00002
00003 #include "orbsvcs/Event/ECG_Mcast_Gateway.h"
00004
00005 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
00006 #include "orbsvcs/Event/ECG_Simple_Address_Server.h"
00007 #include "orbsvcs/Event/ECG_Complex_Address_Server.h"
00008 #include "orbsvcs/Event/ECG_Simple_Mcast_EH.h"
00009 #include "orbsvcs/Event/ECG_Mcast_EH.h"
00010 #include "orbsvcs/Event/ECG_UDP_EH.h"
00011
00012 #include "orbsvcs/Event_Utilities.h"
00013
00014 #include "ace/Dynamic_Service.h"
00015 #include "ace/Arg_Shifter.h"
00016 #include "tao/ORB_Core.h"
00017 #include "ace/OS_NS_strings.h"
00018
00019 #if ! defined (__ACE_INLINE__)
00020 #include "orbsvcs/Event/ECG_Mcast_Gateway.i"
00021 #endif
00022
00023 ACE_RCSID(Event, ECG_Mcast_Gateway, "ECG_Mcast_Gateway.cpp,v 1.19 2006/03/14 06:14:25 jtc Exp")
00024
00025
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 typedef TAO_EC_Shutdown_Command<TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> >
00029 UDP_Sender_Shutdown;
00030
00031 typedef TAO_EC_Shutdown_Command<TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> >
00032 UDP_Receiver_Shutdown;
00033
00034
00035 int
00036 TAO_ECG_Mcast_Gateway::init_svcs (void)
00037 {
00038 return ACE_Service_Config::static_svcs ()->
00039 insert (&ace_svc_desc_TAO_ECG_Mcast_Gateway);
00040 }
00041
00042
00043 int
00044 TAO_ECG_Mcast_Gateway::fini (void)
00045 {
00046 return 0;
00047 }
00048
00049 int
00050 TAO_ECG_Mcast_Gateway::init (int argc, ACE_TCHAR* argv[])
00051 {
00052 int result = 0;
00053
00054 ACE_Arg_Shifter arg_shifter (argc, argv);
00055
00056 while (arg_shifter.is_anything_left ())
00057 {
00058 const ACE_TCHAR *arg = arg_shifter.get_current ();
00059
00060 if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGService")) == 0)
00061 {
00062 arg_shifter.consume_arg ();
00063
00064 if (arg_shifter.is_parameter_next ())
00065 {
00066 const ACE_TCHAR* opt = arg_shifter.get_current ();
00067 if (ACE_OS::strcasecmp (opt, ACE_TEXT ("receiver")) == 0)
00068 this->service_type_ = ECG_MCAST_RECEIVER;
00069 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("sender")) == 0)
00070 this->service_type_ = ECG_MCAST_SENDER;
00071 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("two_way")) == 0)
00072 this->service_type_ = ECG_MCAST_TWO_WAY;
00073 else
00074 {
00075 ACE_ERROR ((LM_ERROR,
00076 ACE_TEXT ("Unsupported <-ECGService> option ")
00077 ACE_TEXT ("value: <%s>. Ignoring this option ")
00078 ACE_TEXT ("- using defaults instead.\n"),
00079 opt));
00080 result = -1;
00081 }
00082 arg_shifter.consume_arg ();
00083 }
00084 }
00085
00086 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServer")) == 0)
00087 {
00088 arg_shifter.consume_arg ();
00089
00090 if (arg_shifter.is_parameter_next ())
00091 {
00092 const ACE_TCHAR* opt = arg_shifter.get_current ();
00093 if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0)
00094 this->address_server_type_ = ECG_ADDRESS_SERVER_BASIC;
00095 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("source")) == 0)
00096 this->address_server_type_ = ECG_ADDRESS_SERVER_SOURCE;
00097 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("type")) == 0)
00098 this->address_server_type_ = ECG_ADDRESS_SERVER_TYPE;
00099 else
00100 {
00101 ACE_ERROR ((LM_ERROR,
00102 ACE_TEXT ("Unsupported <-ECGAddressServer> ")
00103 ACE_TEXT ("option value: <%s>. Ignoring this ")
00104 ACE_TEXT ("option - using defaults instead.\n"),
00105 opt));
00106 result = -1;
00107 }
00108 arg_shifter.consume_arg ();
00109 }
00110 }
00111
00112 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServerArg")) == 0)
00113 {
00114 arg_shifter.consume_arg ();
00115
00116 if (arg_shifter.is_parameter_next ())
00117 {
00118 this->address_server_arg_.set (arg_shifter.get_current ());
00119 arg_shifter.consume_arg ();
00120 }
00121 }
00122
00123
00124 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGHandler")) == 0)
00125 {
00126 arg_shifter.consume_arg ();
00127
00128 if (arg_shifter.is_parameter_next ())
00129 {
00130 const ACE_TCHAR* opt = arg_shifter.get_current ();
00131 if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0)
00132 this->handler_type_ = ECG_HANDLER_BASIC;
00133 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("complex")) == 0)
00134 this->handler_type_ = ECG_HANDLER_COMPLEX;
00135 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("udp")) == 0)
00136 this->handler_type_ = ECG_HANDLER_UDP;
00137 else
00138 {
00139 ACE_ERROR ((LM_ERROR,
00140 ACE_TEXT ("Unsupported <-ECGHandler> ")
00141 ACE_TEXT ("option value: <%s>. Ignoring this ")
00142 ACE_TEXT ("option - using defaults instead.\n"),
00143 opt));
00144 result = -1;
00145 }
00146 arg_shifter.consume_arg ();
00147 }
00148 }
00149
00150 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGTTL")) == 0)
00151 {
00152 arg_shifter.consume_arg ();
00153
00154 if (arg_shifter.is_parameter_next ())
00155 {
00156 const ACE_TCHAR* opt = arg_shifter.get_current ();
00157 unsigned long tmp = ACE_OS::strtoul (opt, 0, 0) & 0xff;
00158 this->ttl_value_ = static_cast<u_char> (tmp);
00159 arg_shifter.consume_arg ();
00160 }
00161 }
00162
00163 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNIC")) == 0)
00164 {
00165 arg_shifter.consume_arg ();
00166
00167 if (arg_shifter.is_parameter_next ())
00168 {
00169 this->nic_.set (arg_shifter.get_current ());
00170 arg_shifter.consume_arg ();
00171 }
00172 }
00173
00174 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGIPMULTICASTLOOP")) == 0)
00175 {
00176 arg_shifter.consume_arg ();
00177
00178 if (arg_shifter.is_parameter_next ())
00179 {
00180 this->ip_multicast_loop_ =
00181 (ACE_OS::atoi(arg_shifter.get_current()) != 0);
00182 arg_shifter.consume_arg ();
00183 }
00184 }
00185
00186 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNONBLOCKING")) == 0)
00187 {
00188 arg_shifter.consume_arg ();
00189
00190 if (arg_shifter.is_parameter_next ())
00191 {
00192 this->non_blocking_ =
00193 (ACE_OS::atoi(arg_shifter.get_current()) != 0);
00194 arg_shifter.consume_arg ();
00195 }
00196 }
00197
00198 else
00199 {
00200 arg_shifter.ignore_arg ();
00201 ACE_DEBUG ((LM_WARNING,
00202 ACE_TEXT ("Ignoring <%s> option ")
00203 ACE_TEXT ("during initialization.\n"),
00204 arg));
00205 result = -1;
00206 }
00207 }
00208
00209 if (this->validate_configuration () == -1)
00210 return -1;
00211 else
00212 return result;
00213 }
00214
00215 int
00216 TAO_ECG_Mcast_Gateway::init (const char * address_server_arg,
00217 const Attributes & attr)
00218 {
00219 this->address_server_arg_.set (address_server_arg);
00220
00221 this->address_server_type_ = attr.address_server_type;
00222 this->handler_type_ = attr.handler_type;
00223 this->service_type_ = attr.service_type;
00224 this->ttl_value_ = attr.ttl_value;
00225 this->nic_.set (attr.nic.c_str ());
00226 this->ip_multicast_loop_ = attr.ip_multicast_loop;
00227 this->non_blocking_ = attr.non_blocking;
00228
00229 return this->validate_configuration ();
00230 }
00231
00232 int
00233 TAO_ECG_Mcast_Gateway::init (
00234 const RtecEventChannelAdmin::ConsumerQOS & consumer_qos,
00235 const char * address_server_arg,
00236 const Attributes & attributes)
00237 {
00238 this->consumer_qos_ = consumer_qos;
00239 return this->init (address_server_arg,
00240 attributes);
00241 }
00242
00243 int
00244 TAO_ECG_Mcast_Gateway::validate_configuration (void)
00245 {
00246 if ((this->handler_type_ == ECG_HANDLER_BASIC
00247 || this->handler_type_ == ECG_HANDLER_UDP)
00248 && this->service_type_ != ECG_MCAST_SENDER
00249 && this->address_server_type_ != ECG_ADDRESS_SERVER_BASIC)
00250 {
00251 ACE_DEBUG ((LM_ERROR,
00252 "Configurations for mcast handler and "
00253 "address server do not match.\n"));
00254 return -1;
00255 }
00256
00257
00258
00259
00260 if (this->address_server_arg_.length () == 0)
00261 {
00262 ACE_DEBUG ((LM_ERROR,
00263 "Address server initializaton "
00264 "argument not specified.\n"));
00265 return -1;
00266 }
00267
00268 if (this->ip_multicast_loop_ != 0
00269 && this->ip_multicast_loop_ != 1)
00270 {
00271 ACE_DEBUG ((LM_ERROR,
00272 "IP MULTICAST LOOP option must have a boolean value.\n"));
00273 return -1;
00274 }
00275
00276 if (this->non_blocking_ != 0
00277 && this->non_blocking_ != 1)
00278 {
00279 ACE_DEBUG ((LM_ERROR,
00280 "NON BLOCKING flag must have a boolean value.\n"));
00281 return -1;
00282 }
00283
00284 return 0;
00285 }
00286
00287 TAO_ECG_Refcounted_Endpoint
00288 TAO_ECG_Mcast_Gateway::init_endpoint (void)
00289 {
00290 TAO_ECG_UDP_Out_Endpoint* endpoint = 0;
00291 TAO_ECG_Refcounted_Endpoint refendpoint;
00292
00293
00294 ACE_NEW_NORETURN (endpoint,
00295 TAO_ECG_UDP_Out_Endpoint);
00296
00297 if (endpoint != 0)
00298 {
00299 refendpoint.reset (endpoint);
00300 }
00301 else
00302 {
00303 return TAO_ECG_Refcounted_Endpoint ();
00304 }
00305
00306 ACE_SOCK_Dgram& dgram = refendpoint->dgram ();
00307
00308 if (dgram.open (ACE_Addr::sap_any) == -1)
00309 {
00310 ACE_ERROR ((LM_ERROR,
00311 "Cannot open dgram "
00312 "for sending mcast messages.\n"));
00313 return TAO_ECG_Refcounted_Endpoint ();
00314 }
00315
00316 if (this->nic_.length () != 0)
00317 {
00318 dgram.set_nic (this->nic_.c_str ());
00319 }
00320
00321 if (this->ttl_value_ > 0)
00322 {
00323 if (dgram.ACE_SOCK::set_option (IPPROTO_IP,
00324 IP_MULTICAST_TTL,
00325 &this->ttl_value_,
00326 sizeof (this->ttl_value_))
00327 == -1)
00328 {
00329 ACE_ERROR ((LM_ERROR,
00330 "Error setting TTL option on dgram "
00331 "for sending mcast messages.\n"));
00332 return TAO_ECG_Refcounted_Endpoint ();
00333 }
00334 }
00335
00336 if (dgram.ACE_SOCK::set_option (IPPROTO_IP,
00337 IP_MULTICAST_LOOP,
00338 &this->ip_multicast_loop_,
00339 sizeof (this->ip_multicast_loop_)) == -1)
00340 {
00341 ACE_ERROR ((LM_ERROR,
00342 "Error setting MULTICAST_LOOP option "
00343 "on dgram for sending mcast messages.\n"));
00344 return TAO_ECG_Refcounted_Endpoint ();
00345 }
00346
00347 if (this->non_blocking_
00348 && dgram.enable(ACE_NONBLOCK) == -1)
00349 {
00350 ACE_ERROR ((LM_ERROR,
00351 "Error setting NON BLOCKING option.\n"));
00352 return TAO_ECG_Refcounted_Endpoint ();
00353 }
00354
00355 return refendpoint;
00356 }
00357
00358 PortableServer::ServantBase *
00359 TAO_ECG_Mcast_Gateway::init_address_server (void)
00360 {
00361 const char * address_server_arg =
00362 (this->address_server_arg_.length ())
00363 ? this->address_server_arg_.c_str () : 0;
00364
00365 if (this->address_server_type_ == ECG_ADDRESS_SERVER_BASIC)
00366 {
00367 TAO_EC_Servant_Var<TAO_ECG_Simple_Address_Server> impl =
00368 TAO_ECG_Simple_Address_Server::create ();
00369 if (!impl.in ())
00370 return 0;
00371
00372 if (impl->init (address_server_arg) == -1)
00373 {
00374 return 0;
00375 }
00376 return impl._retn ();
00377 }
00378
00379 else if (this->address_server_type_ == ECG_ADDRESS_SERVER_SOURCE)
00380 {
00381 TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl =
00382 TAO_ECG_Complex_Address_Server::create (1);
00383 if (!impl.in ())
00384 return 0;
00385
00386 if (impl->init (address_server_arg) == -1)
00387 {
00388 return 0;
00389 }
00390 return impl._retn ();
00391 }
00392
00393 else if (this->address_server_type_ == ECG_ADDRESS_SERVER_TYPE)
00394 {
00395 TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl =
00396 TAO_ECG_Complex_Address_Server::create (0);
00397 if (!impl.in ())
00398 return 0;
00399
00400 if (impl->init (address_server_arg) == -1)
00401 {
00402 return 0;
00403 }
00404 return impl._retn ();
00405 }
00406
00407 else
00408 {
00409 ACE_ERROR ((LM_ERROR,
00410 "Cannot create address server: "
00411 "unknown address server type specified.\n"));
00412 return 0;
00413 }
00414 }
00415
00416 TAO_ECG_Refcounted_Handler
00417 TAO_ECG_Mcast_Gateway::init_handler (TAO_ECG_Dgram_Handler *receiver,
00418 RtecEventChannelAdmin::EventChannel_ptr ec,
00419 ACE_Reactor *reactor
00420 ACE_ENV_ARG_DECL)
00421 {
00422 TAO_ECG_Refcounted_Handler handler;
00423
00424 const char * nic =
00425 (this->nic_.length ()) ? this->nic_.c_str () : 0;
00426 const char * address_server_arg =
00427 (this->address_server_arg_.length ())
00428 ? this->address_server_arg_.c_str () : 0;
00429
00430 if (this->handler_type_ == ECG_HANDLER_BASIC)
00431 {
00432 TAO_ECG_Simple_Mcast_EH * h = 0;
00433 ACE_NEW_RETURN (h,
00434 TAO_ECG_Simple_Mcast_EH (receiver),
00435 handler);
00436 handler.reset (h);
00437
00438 h->reactor (reactor);
00439 if (h->open (address_server_arg, nic) != 0)
00440 return TAO_ECG_Refcounted_Handler ();
00441 }
00442
00443 else if (this->handler_type_ == ECG_HANDLER_COMPLEX)
00444 {
00445 TAO_ECG_Mcast_EH * h = 0;
00446 ACE_NEW_RETURN (h,
00447 TAO_ECG_Mcast_EH (receiver, nic),
00448 handler);
00449 handler.reset (h);
00450
00451 h->reactor (reactor);
00452
00453 h->open (ec ACE_ENV_ARG_PARAMETER);
00454 ACE_CHECK_RETURN (TAO_ECG_Refcounted_Handler ());
00455 }
00456
00457 else if (this->handler_type_ == ECG_HANDLER_UDP)
00458 {
00459 TAO_ECG_UDP_EH * h = 0;
00460 ACE_NEW_RETURN (h,
00461 TAO_ECG_UDP_EH (receiver),
00462 handler);
00463 handler.reset (h);
00464 h->reactor (reactor);
00465
00466 ACE_INET_Addr ipaddr;
00467 if (ipaddr.set (address_server_arg) != 0)
00468 {
00469 ACE_ERROR ((LM_ERROR,
00470 "ERROR using address server argument "
00471 "in ACE_INET_Addr.set ().\n"));
00472 return TAO_ECG_Refcounted_Handler ();
00473 }
00474 if (h->open (ipaddr) != 0)
00475 return TAO_ECG_Refcounted_Handler ();
00476 }
00477
00478 else
00479 {
00480 ACE_ERROR ((LM_ERROR,
00481 "Cannot create handler: unknown "
00482 "handler type specified.\n"));
00483 return handler;
00484 }
00485
00486 return handler;
00487 }
00488
00489 TAO_EC_Servant_Var<TAO_ECG_UDP_Sender>
00490 TAO_ECG_Mcast_Gateway::init_sender (
00491 RtecEventChannelAdmin::EventChannel_ptr ec,
00492 RtecUDPAdmin::AddrServer_ptr address_server,
00493 TAO_ECG_Refcounted_Endpoint endpoint_rptr
00494 ACE_ENV_ARG_DECL)
00495 {
00496 TAO_EC_Servant_Var<TAO_ECG_UDP_Sender>
00497 sender (TAO_ECG_UDP_Sender::create ());
00498 if (!sender.in ())
00499 return sender;
00500
00501 sender->init (ec,
00502 address_server,
00503 endpoint_rptr
00504 ACE_ENV_ARG_PARAMETER);
00505 ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ());
00506
00507 TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown;
00508 sender_shutdown.set_command (UDP_Sender_Shutdown (sender));
00509
00510 if (this->consumer_qos_.dependencies.length () > 0)
00511 {
00512
00513 this->consumer_qos_.is_gateway = 1;
00514 sender->connect (this->consumer_qos_ ACE_ENV_ARG_PARAMETER);
00515 ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ());
00516 }
00517 else
00518 {
00519
00520 ACE_ConsumerQOS_Factory consumer_qos_factory;
00521 consumer_qos_factory.start_disjunction_group (1);
00522 consumer_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY,
00523 ACE_ES_EVENT_ANY,
00524 0);
00525 RtecEventChannelAdmin::ConsumerQOS & qos =
00526 const_cast<RtecEventChannelAdmin::ConsumerQOS &> (consumer_qos_factory.get_ConsumerQOS ());
00527 qos.is_gateway = 1;
00528
00529 sender->connect (qos ACE_ENV_ARG_PARAMETER);
00530 ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ());
00531 }
00532
00533 sender_shutdown.disallow_command ();
00534 return sender;
00535 }
00536
00537 TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver>
00538 TAO_ECG_Mcast_Gateway::init_receiver (
00539 RtecEventChannelAdmin::EventChannel_ptr ec,
00540 RtecUDPAdmin::AddrServer_ptr address_server,
00541 TAO_ECG_Refcounted_Endpoint endpoint_rptr
00542 ACE_ENV_ARG_DECL)
00543 {
00544 TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver>
00545 receiver (TAO_ECG_UDP_Receiver::create ());
00546 if (!receiver.in ())
00547 return receiver;
00548
00549 receiver->init (ec,
00550 endpoint_rptr,
00551 address_server
00552 ACE_ENV_ARG_PARAMETER);
00553 ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> ());
00554
00555 TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown;
00556 receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver));
00557
00558 ACE_SupplierQOS_Factory supplier_qos_factory;
00559 supplier_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY,
00560 ACE_ES_EVENT_ANY,
00561 0, 1);
00562 RtecEventChannelAdmin::SupplierQOS & qos =
00563 const_cast<RtecEventChannelAdmin::SupplierQOS &> (supplier_qos_factory.get_SupplierQOS ());
00564 qos.is_gateway = 1;
00565
00566 receiver->connect (qos ACE_ENV_ARG_PARAMETER);
00567 ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> ());
00568
00569 receiver_shutdown.disallow_command ();
00570 return receiver;
00571 }
00572
00573 void
00574 TAO_ECG_Mcast_Gateway::verify_args (CORBA::ORB_ptr orb,
00575 RtecEventChannelAdmin::EventChannel_ptr ec
00576 ACE_ENV_ARG_DECL)
00577 {
00578 if (CORBA::is_nil (ec))
00579 {
00580 ACE_ERROR ((LM_ERROR,
00581 "Nil event channel argument passed to "
00582 "TAO_ECG_Mcast_Gateway::run().\n"));
00583 ACE_THROW (CORBA::INTERNAL ());
00584 }
00585 if (CORBA::is_nil (orb))
00586 {
00587 ACE_ERROR ((LM_ERROR,
00588 "Nil orb argument passed to "
00589 "TAO_ECG_Mcast_Gateway::run().\n"));
00590 ACE_THROW (CORBA::INTERNAL ());
00591 }
00592 }
00593
00594 void
00595 TAO_ECG_Mcast_Gateway::run (CORBA::ORB_ptr orb,
00596 RtecEventChannelAdmin::EventChannel_ptr ec
00597 ACE_ENV_ARG_DECL)
00598 {
00599
00600 this->verify_args (orb, ec ACE_ENV_ARG_PARAMETER);
00601 ACE_CHECK;
00602
00603
00604 TAO_EC_Object_Deactivator address_server_deactivator;
00605 TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown;
00606 TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown;
00607
00608
00609 PortableServer::ServantBase_var address_server_servant =
00610 this->init_address_server ();
00611 if (!address_server_servant.in ())
00612 {
00613 ACE_DEBUG ((LM_ERROR,
00614 "Unable to create address server.\n"));
00615 ACE_THROW (CORBA::INTERNAL ());
00616 }
00617
00618 RtecUDPAdmin::AddrServer_var address_server;
00619
00620 PortableServer::POA_var poa =
00621 address_server_servant->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00622 ACE_CHECK;
00623
00624 activate (address_server,
00625 poa.in (),
00626 address_server_servant.in (),
00627 address_server_deactivator
00628 ACE_ENV_ARG_PARAMETER);
00629 ACE_CHECK;
00630
00631 TAO_ECG_Refcounted_Endpoint endpoint_rptr;
00632 TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender;
00633
00634
00635 if (this->service_type_ == ECG_MCAST_SENDER
00636 || this->service_type_ == ECG_MCAST_TWO_WAY)
00637 {
00638 endpoint_rptr = this->init_endpoint ();
00639 if (endpoint_rptr.get () == 0)
00640 {
00641 ACE_THROW (CORBA::INTERNAL ());
00642 }
00643
00644 sender = this->init_sender (ec,
00645 address_server.in (),
00646 endpoint_rptr
00647 ACE_ENV_ARG_PARAMETER);
00648 ACE_CHECK;
00649 if (!sender.in ())
00650 {
00651 ACE_THROW (CORBA::INTERNAL ());
00652 }
00653
00654 sender_shutdown.set_command (UDP_Sender_Shutdown (sender));
00655 }
00656
00657
00658 TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver;
00659 if (this->service_type_ == ECG_MCAST_RECEIVER
00660 || this->service_type_ == ECG_MCAST_TWO_WAY)
00661 {
00662 ACE_Reactor *reactor = orb->orb_core ()->reactor ();
00663
00664 receiver = this->init_receiver (ec,
00665 address_server.in (),
00666 endpoint_rptr
00667 ACE_ENV_ARG_PARAMETER);
00668 ACE_CHECK;
00669 if (!receiver.in ())
00670 {
00671 ACE_THROW (CORBA::INTERNAL ());
00672 }
00673
00674 receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver));
00675
00676 TAO_ECG_Refcounted_Handler
00677 handler_rptr (this->init_handler (receiver.in (),
00678 ec,
00679 reactor
00680 ACE_ENV_ARG_PARAMETER));
00681 ACE_CHECK;
00682 if (handler_rptr.get () == 0)
00683 {
00684 ACE_THROW (CORBA::INTERNAL ());
00685 }
00686 receiver->set_handler_shutdown (handler_rptr);
00687 }
00688
00689
00690 address_server_deactivator.disallow_deactivation ();
00691 receiver_shutdown.disallow_command ();
00692 sender_shutdown.disallow_command ();
00693 }
00694
00695 TAO_END_VERSIONED_NAMESPACE_DECL
00696
00697
00698
00699 ACE_STATIC_SVC_DEFINE (TAO_ECG_Mcast_Gateway,
00700 ACE_TEXT ("ECG_Mcast_Gateway"),
00701 ACE_SVC_OBJ_T,
00702 &ACE_SVC_NAME (TAO_ECG_Mcast_Gateway),
00703 ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00704 0)
00705 ACE_FACTORY_DEFINE (TAO_RTEvent_Serv, TAO_ECG_Mcast_Gateway)