ECG_Mcast_Gateway.cpp

Go to the documentation of this file.
00001 // ECG_Mcast_Gateway.cpp,v 1.19 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/ECG_Mcast_Gateway.h"
00004 
00005 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
00006 #include "orbsvcs/Event/ECG_Simple_Address_Server.h"
00007 #include "orbsvcs/Event/ECG_Complex_Address_Server.h"
00008 #include "orbsvcs/Event/ECG_Simple_Mcast_EH.h"
00009 #include "orbsvcs/Event/ECG_Mcast_EH.h"
00010 #include "orbsvcs/Event/ECG_UDP_EH.h"
00011 
00012 #include "orbsvcs/Event_Utilities.h"
00013 
00014 #include "ace/Dynamic_Service.h"
00015 #include "ace/Arg_Shifter.h"
00016 #include "tao/ORB_Core.h"
00017 #include "ace/OS_NS_strings.h"
00018 
00019 #if ! defined (__ACE_INLINE__)
00020 #include "orbsvcs/Event/ECG_Mcast_Gateway.i"
00021 #endif /* __ACE_INLINE__ */
00022 
00023 ACE_RCSID(Event, ECG_Mcast_Gateway, "ECG_Mcast_Gateway.cpp,v 1.19 2006/03/14 06:14:25 jtc Exp")
00024 
00025 
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 typedef TAO_EC_Shutdown_Command<TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> >
00029 UDP_Sender_Shutdown;
00030 
00031 typedef TAO_EC_Shutdown_Command<TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> >
00032 UDP_Receiver_Shutdown;
00033 
00034 
00035 int
00036 TAO_ECG_Mcast_Gateway::init_svcs (void)
00037 {
00038   return ACE_Service_Config::static_svcs ()->
00039     insert (&ace_svc_desc_TAO_ECG_Mcast_Gateway);
00040 }
00041 
00042 
00043 int
00044 TAO_ECG_Mcast_Gateway::fini (void)
00045 {
00046   return 0;
00047 }
00048 
00049 int
00050 TAO_ECG_Mcast_Gateway::init (int argc, ACE_TCHAR* argv[])
00051 {
00052   int result = 0;
00053 
00054   ACE_Arg_Shifter arg_shifter (argc, argv);
00055 
00056   while (arg_shifter.is_anything_left ())
00057     {
00058       const ACE_TCHAR *arg = arg_shifter.get_current ();
00059 
00060       if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGService")) == 0)
00061         {
00062           arg_shifter.consume_arg ();
00063 
00064           if (arg_shifter.is_parameter_next ())
00065             {
00066               const ACE_TCHAR* opt = arg_shifter.get_current ();
00067               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("receiver")) == 0)
00068                 this->service_type_ = ECG_MCAST_RECEIVER;
00069               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("sender")) == 0)
00070                 this->service_type_ = ECG_MCAST_SENDER;
00071               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("two_way")) == 0)
00072                 this->service_type_ = ECG_MCAST_TWO_WAY;
00073               else
00074                 {
00075                   ACE_ERROR ((LM_ERROR,
00076                              ACE_TEXT ("Unsupported <-ECGService> option ")
00077                              ACE_TEXT ("value: <%s>. Ignoring this option ")
00078                              ACE_TEXT ("- using defaults instead.\n"),
00079                              opt));
00080                   result = -1;
00081                 }
00082               arg_shifter.consume_arg ();
00083             }
00084         }
00085 
00086       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServer")) == 0)
00087         {
00088           arg_shifter.consume_arg ();
00089 
00090           if (arg_shifter.is_parameter_next ())
00091             {
00092               const ACE_TCHAR* opt = arg_shifter.get_current ();
00093               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0)
00094                 this->address_server_type_ = ECG_ADDRESS_SERVER_BASIC;
00095               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("source")) == 0)
00096                 this->address_server_type_ = ECG_ADDRESS_SERVER_SOURCE;
00097               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("type")) == 0)
00098                 this->address_server_type_ = ECG_ADDRESS_SERVER_TYPE;
00099               else
00100                 {
00101                   ACE_ERROR ((LM_ERROR,
00102                               ACE_TEXT ("Unsupported <-ECGAddressServer> ")
00103                               ACE_TEXT ("option value: <%s>. Ignoring this ")
00104                               ACE_TEXT ("option - using defaults instead.\n"),
00105                               opt));
00106                   result = -1;
00107                 }
00108               arg_shifter.consume_arg ();
00109             }
00110         }
00111 
00112       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServerArg")) == 0)
00113         {
00114           arg_shifter.consume_arg ();
00115 
00116           if (arg_shifter.is_parameter_next ())
00117             {
00118               this->address_server_arg_.set (arg_shifter.get_current ());
00119               arg_shifter.consume_arg ();
00120             }
00121         }
00122 
00123 
00124       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGHandler")) == 0)
00125         {
00126           arg_shifter.consume_arg ();
00127 
00128           if (arg_shifter.is_parameter_next ())
00129             {
00130               const ACE_TCHAR* opt = arg_shifter.get_current ();
00131               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0)
00132                 this->handler_type_ = ECG_HANDLER_BASIC;
00133               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("complex")) == 0)
00134                 this->handler_type_ = ECG_HANDLER_COMPLEX;
00135               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("udp")) == 0)
00136                 this->handler_type_ = ECG_HANDLER_UDP;
00137               else
00138                 {
00139                   ACE_ERROR ((LM_ERROR,
00140                               ACE_TEXT ("Unsupported <-ECGHandler> ")
00141                               ACE_TEXT ("option value: <%s>. Ignoring this ")
00142                               ACE_TEXT ("option - using defaults instead.\n"),
00143                               opt));
00144                   result = -1;
00145                 }
00146               arg_shifter.consume_arg ();
00147             }
00148         }
00149 
00150       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGTTL")) == 0)
00151         {
00152           arg_shifter.consume_arg ();
00153 
00154           if (arg_shifter.is_parameter_next ())
00155             {
00156               const ACE_TCHAR* opt = arg_shifter.get_current ();
00157               unsigned long tmp = ACE_OS::strtoul (opt, 0, 0) & 0xff;
00158               this->ttl_value_ = static_cast<u_char> (tmp);
00159               arg_shifter.consume_arg ();
00160             }
00161         }
00162 
00163       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNIC")) == 0)
00164         {
00165           arg_shifter.consume_arg ();
00166 
00167           if (arg_shifter.is_parameter_next ())
00168             {
00169               this->nic_.set (arg_shifter.get_current ());
00170               arg_shifter.consume_arg ();
00171             }
00172         }
00173 
00174       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGIPMULTICASTLOOP")) == 0)
00175         {
00176           arg_shifter.consume_arg ();
00177 
00178           if (arg_shifter.is_parameter_next ())
00179             {
00180               this->ip_multicast_loop_ =
00181                 (ACE_OS::atoi(arg_shifter.get_current()) != 0);
00182               arg_shifter.consume_arg ();
00183             }
00184         }
00185 
00186       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNONBLOCKING")) == 0)
00187       {
00188           arg_shifter.consume_arg ();
00189 
00190           if (arg_shifter.is_parameter_next ())
00191             {
00192               this->non_blocking_ =
00193                 (ACE_OS::atoi(arg_shifter.get_current()) != 0);
00194               arg_shifter.consume_arg ();
00195             }
00196         }
00197 
00198       else
00199         {
00200           arg_shifter.ignore_arg ();
00201           ACE_DEBUG ((LM_WARNING,
00202                       ACE_TEXT ("Ignoring <%s> option ")
00203                       ACE_TEXT ("during initialization.\n"),
00204                       arg));
00205           result = -1;
00206         }
00207     }
00208 
00209   if (this->validate_configuration () == -1)
00210     return -1;
00211   else
00212     return result;
00213 }
00214 
00215 int
00216 TAO_ECG_Mcast_Gateway::init (const char * address_server_arg,
00217                              const Attributes & attr)
00218 {
00219   this->address_server_arg_.set (address_server_arg);
00220 
00221   this->address_server_type_ = attr.address_server_type;
00222   this->handler_type_ = attr.handler_type;
00223   this->service_type_ = attr.service_type;
00224   this->ttl_value_ = attr.ttl_value;
00225   this->nic_.set (attr.nic.c_str ());
00226   this->ip_multicast_loop_ = attr.ip_multicast_loop;
00227   this->non_blocking_ = attr.non_blocking;
00228 
00229   return this->validate_configuration ();
00230 }
00231 
00232 int
00233 TAO_ECG_Mcast_Gateway::init (
00234     const RtecEventChannelAdmin::ConsumerQOS & consumer_qos,
00235     const char * address_server_arg,
00236     const Attributes & attributes)
00237 {
00238   this->consumer_qos_ = consumer_qos;
00239   return this->init (address_server_arg,
00240                      attributes);
00241 }
00242 
00243 int
00244 TAO_ECG_Mcast_Gateway::validate_configuration (void)
00245 {
00246   if ((this->handler_type_ == ECG_HANDLER_BASIC
00247        || this->handler_type_ == ECG_HANDLER_UDP)
00248       && this->service_type_ != ECG_MCAST_SENDER
00249       && this->address_server_type_ != ECG_ADDRESS_SERVER_BASIC)
00250     {
00251       ACE_DEBUG ((LM_ERROR,
00252                       "Configurations for mcast handler and "
00253                       "address server do not match.\n"));
00254       return -1;
00255     }
00256 
00257   // Currently all Address Server implementations require an
00258   // initialization string.  If we ever add a new Address Server
00259   // implementation, which does not, we'll have to remove this check.
00260   if (this->address_server_arg_.length () == 0)
00261     {
00262       ACE_DEBUG ((LM_ERROR,
00263                       "Address server initializaton "
00264                       "argument not specified.\n"));
00265       return -1;
00266     }
00267 
00268   if (this->ip_multicast_loop_ != 0
00269       && this->ip_multicast_loop_ != 1)
00270     {
00271       ACE_DEBUG ((LM_ERROR,
00272                   "IP MULTICAST LOOP option must have a boolean value.\n"));
00273       return -1;
00274     }
00275 
00276   if (this->non_blocking_ != 0
00277       && this->non_blocking_ != 1)
00278     {
00279       ACE_DEBUG ((LM_ERROR,
00280                   "NON BLOCKING flag must have a boolean value.\n"));
00281       return -1;
00282     }
00283 
00284   return 0;
00285 }
00286 
00287 TAO_ECG_Refcounted_Endpoint
00288 TAO_ECG_Mcast_Gateway::init_endpoint (void)
00289 {
00290   TAO_ECG_UDP_Out_Endpoint* endpoint = 0;
00291   TAO_ECG_Refcounted_Endpoint refendpoint;
00292 
00293   // Try to allocate a new endpoint from the heap
00294   ACE_NEW_NORETURN (endpoint,
00295                     TAO_ECG_UDP_Out_Endpoint);
00296 
00297   if (endpoint != 0)
00298   {
00299     refendpoint.reset (endpoint);
00300   }
00301   else
00302   {
00303     return TAO_ECG_Refcounted_Endpoint ();
00304   }
00305 
00306   ACE_SOCK_Dgram& dgram = refendpoint->dgram ();
00307 
00308   if (dgram.open (ACE_Addr::sap_any) == -1)
00309     {
00310       ACE_ERROR ((LM_ERROR,
00311                              "Cannot open dgram "
00312                              "for sending mcast messages.\n"));
00313       return TAO_ECG_Refcounted_Endpoint ();
00314     }
00315 
00316   if (this->nic_.length () != 0)
00317     {
00318       dgram.set_nic (this->nic_.c_str ());
00319     }
00320 
00321   if (this->ttl_value_ > 0)
00322     {
00323       if (dgram.ACE_SOCK::set_option (IPPROTO_IP,
00324                                       IP_MULTICAST_TTL,
00325                                       &this->ttl_value_,
00326                                       sizeof (this->ttl_value_))
00327           == -1)
00328         {
00329           ACE_ERROR ((LM_ERROR,
00330                       "Error setting TTL option on dgram "
00331                       "for sending mcast messages.\n"));
00332           return TAO_ECG_Refcounted_Endpoint ();
00333         }
00334     }
00335 
00336   if (dgram.ACE_SOCK::set_option (IPPROTO_IP,
00337                                   IP_MULTICAST_LOOP,
00338                                   &this->ip_multicast_loop_,
00339                                   sizeof (this->ip_multicast_loop_)) == -1)
00340     {
00341       ACE_ERROR ((LM_ERROR,
00342                   "Error setting MULTICAST_LOOP option "
00343                   "on dgram for sending mcast messages.\n"));
00344       return TAO_ECG_Refcounted_Endpoint ();
00345     }
00346 
00347   if (this->non_blocking_
00348       && dgram.enable(ACE_NONBLOCK) == -1)
00349     {
00350       ACE_ERROR ((LM_ERROR,
00351                   "Error setting NON BLOCKING option.\n"));
00352       return TAO_ECG_Refcounted_Endpoint ();
00353     }
00354 
00355   return refendpoint;
00356 }
00357 
00358 PortableServer::ServantBase *
00359 TAO_ECG_Mcast_Gateway::init_address_server (void)
00360 {
00361   const char * address_server_arg =
00362     (this->address_server_arg_.length ())
00363     ? this->address_server_arg_.c_str () : 0;
00364 
00365   if (this->address_server_type_ == ECG_ADDRESS_SERVER_BASIC)
00366     {
00367       TAO_EC_Servant_Var<TAO_ECG_Simple_Address_Server> impl =
00368         TAO_ECG_Simple_Address_Server::create ();
00369       if (!impl.in ())
00370         return 0;
00371 
00372       if (impl->init (address_server_arg) == -1)
00373         {
00374           return 0;
00375         }
00376       return impl._retn ();
00377     }
00378 
00379   else if (this->address_server_type_ == ECG_ADDRESS_SERVER_SOURCE)
00380     {
00381       TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl =
00382         TAO_ECG_Complex_Address_Server::create (1);
00383       if (!impl.in ())
00384         return 0;
00385 
00386       if (impl->init (address_server_arg) == -1)
00387         {
00388           return 0;
00389         }
00390       return impl._retn ();
00391     }
00392 
00393   else if (this->address_server_type_ == ECG_ADDRESS_SERVER_TYPE)
00394     {
00395       TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl =
00396         TAO_ECG_Complex_Address_Server::create (0);
00397       if (!impl.in ())
00398         return 0;
00399 
00400       if (impl->init (address_server_arg) == -1)
00401         {
00402           return 0;
00403         }
00404       return impl._retn ();
00405     }
00406 
00407   else
00408     {
00409       ACE_ERROR ((LM_ERROR,
00410                   "Cannot create address server: "
00411                   "unknown address server type specified.\n"));
00412       return 0;
00413     }
00414 }
00415 
00416 TAO_ECG_Refcounted_Handler
00417 TAO_ECG_Mcast_Gateway::init_handler (TAO_ECG_Dgram_Handler *receiver,
00418                                      RtecEventChannelAdmin::EventChannel_ptr ec,
00419                                      ACE_Reactor *reactor
00420                                      ACE_ENV_ARG_DECL)
00421 {
00422   TAO_ECG_Refcounted_Handler handler;
00423 
00424   const char * nic =
00425     (this->nic_.length ()) ? this->nic_.c_str () : 0;
00426   const char * address_server_arg =
00427     (this->address_server_arg_.length ())
00428     ? this->address_server_arg_.c_str () : 0;
00429 
00430   if (this->handler_type_ == ECG_HANDLER_BASIC)
00431     {
00432       TAO_ECG_Simple_Mcast_EH * h = 0;
00433       ACE_NEW_RETURN (h,
00434                       TAO_ECG_Simple_Mcast_EH (receiver),
00435                       handler);
00436       handler.reset (h);
00437 
00438       h->reactor (reactor);
00439       if (h->open (address_server_arg, nic) != 0)
00440         return TAO_ECG_Refcounted_Handler ();
00441     }
00442 
00443   else if (this->handler_type_ == ECG_HANDLER_COMPLEX)
00444     {
00445       TAO_ECG_Mcast_EH * h = 0;
00446       ACE_NEW_RETURN (h,
00447                       TAO_ECG_Mcast_EH (receiver, nic),
00448                       handler);
00449       handler.reset (h);
00450 
00451       h->reactor (reactor);
00452 
00453       h->open (ec ACE_ENV_ARG_PARAMETER);
00454       ACE_CHECK_RETURN (TAO_ECG_Refcounted_Handler ());
00455     }
00456 
00457   else if (this->handler_type_ == ECG_HANDLER_UDP)
00458     {
00459       TAO_ECG_UDP_EH * h = 0;
00460       ACE_NEW_RETURN (h,
00461                       TAO_ECG_UDP_EH (receiver),
00462                       handler);
00463       handler.reset (h);
00464       h->reactor (reactor);
00465 
00466       ACE_INET_Addr ipaddr;
00467       if (ipaddr.set (address_server_arg) != 0)
00468         {
00469           ACE_ERROR ((LM_ERROR,
00470                       "ERROR using address server argument "
00471                       "in ACE_INET_Addr.set ().\n"));
00472           return TAO_ECG_Refcounted_Handler ();
00473         }
00474       if (h->open (ipaddr) != 0)
00475         return TAO_ECG_Refcounted_Handler ();
00476     }
00477 
00478   else
00479     {
00480       ACE_ERROR ((LM_ERROR,
00481                   "Cannot create handler: unknown "
00482                   "handler type specified.\n"));
00483       return handler;
00484     }
00485 
00486   return handler;
00487 }
00488 
00489 TAO_EC_Servant_Var<TAO_ECG_UDP_Sender>
00490 TAO_ECG_Mcast_Gateway::init_sender (
00491                                RtecEventChannelAdmin::EventChannel_ptr ec,
00492                                RtecUDPAdmin::AddrServer_ptr address_server,
00493                                TAO_ECG_Refcounted_Endpoint endpoint_rptr
00494                                ACE_ENV_ARG_DECL)
00495 {
00496   TAO_EC_Servant_Var<TAO_ECG_UDP_Sender>
00497     sender (TAO_ECG_UDP_Sender::create ());
00498   if (!sender.in ())
00499     return sender;
00500 
00501   sender->init (ec,
00502                 address_server,
00503                 endpoint_rptr
00504                 ACE_ENV_ARG_PARAMETER);
00505   ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ());
00506 
00507   TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown;
00508   sender_shutdown.set_command (UDP_Sender_Shutdown (sender));
00509 
00510   if (this->consumer_qos_.dependencies.length () > 0)
00511     {
00512       // Client supplied consumer qos.  Use it.
00513       this->consumer_qos_.is_gateway = 1;
00514       sender->connect (this->consumer_qos_ ACE_ENV_ARG_PARAMETER);
00515       ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ());
00516     }
00517   else
00518     {
00519       // Client did not specify anything - subscribe to all events.
00520       ACE_ConsumerQOS_Factory consumer_qos_factory;
00521       consumer_qos_factory.start_disjunction_group (1);
00522       consumer_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY,
00523                                    ACE_ES_EVENT_ANY,
00524                                    0);
00525       RtecEventChannelAdmin::ConsumerQOS & qos =
00526         const_cast<RtecEventChannelAdmin::ConsumerQOS &> (consumer_qos_factory.get_ConsumerQOS ());
00527       qos.is_gateway = 1;
00528 
00529       sender->connect (qos ACE_ENV_ARG_PARAMETER);
00530       ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ());
00531     }
00532 
00533   sender_shutdown.disallow_command ();
00534   return sender;
00535 }
00536 
00537 TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver>
00538 TAO_ECG_Mcast_Gateway::init_receiver (
00539                                RtecEventChannelAdmin::EventChannel_ptr ec,
00540                                RtecUDPAdmin::AddrServer_ptr address_server,
00541                                TAO_ECG_Refcounted_Endpoint endpoint_rptr
00542                                ACE_ENV_ARG_DECL)
00543 {
00544   TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver>
00545     receiver (TAO_ECG_UDP_Receiver::create ());
00546   if (!receiver.in ())
00547     return receiver;
00548 
00549   receiver->init (ec,
00550                   endpoint_rptr,
00551                   address_server
00552                   ACE_ENV_ARG_PARAMETER);
00553   ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> ());
00554 
00555   TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown;
00556   receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver));
00557 
00558   ACE_SupplierQOS_Factory supplier_qos_factory;
00559   supplier_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY,
00560                                ACE_ES_EVENT_ANY,
00561                                0, 1);
00562   RtecEventChannelAdmin::SupplierQOS & qos =
00563     const_cast<RtecEventChannelAdmin::SupplierQOS &> (supplier_qos_factory.get_SupplierQOS ());
00564   qos.is_gateway = 1;
00565 
00566   receiver->connect (qos ACE_ENV_ARG_PARAMETER);
00567   ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> ());
00568 
00569   receiver_shutdown.disallow_command ();
00570   return receiver;
00571 }
00572 
00573 void
00574 TAO_ECG_Mcast_Gateway::verify_args (CORBA::ORB_ptr orb,
00575                                     RtecEventChannelAdmin::EventChannel_ptr ec
00576                                     ACE_ENV_ARG_DECL)
00577 {
00578   if (CORBA::is_nil (ec))
00579     {
00580       ACE_ERROR ((LM_ERROR,
00581                   "Nil event channel argument passed to "
00582                   "TAO_ECG_Mcast_Gateway::run().\n"));
00583       ACE_THROW (CORBA::INTERNAL ());
00584     }
00585   if (CORBA::is_nil (orb))
00586     {
00587       ACE_ERROR ((LM_ERROR,
00588                   "Nil orb argument passed to "
00589                   "TAO_ECG_Mcast_Gateway::run().\n"));
00590       ACE_THROW (CORBA::INTERNAL ());
00591     }
00592 }
00593 
00594 void
00595 TAO_ECG_Mcast_Gateway::run (CORBA::ORB_ptr orb,
00596                             RtecEventChannelAdmin::EventChannel_ptr ec
00597                             ACE_ENV_ARG_DECL)
00598 {
00599   // Verify args.
00600   this->verify_args (orb, ec ACE_ENV_ARG_PARAMETER);
00601   ACE_CHECK;
00602 
00603   // Auto-cleanup objects.
00604   TAO_EC_Object_Deactivator address_server_deactivator;
00605   TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown;
00606   TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown;
00607 
00608   // Set up address server.
00609   PortableServer::ServantBase_var address_server_servant =
00610     this->init_address_server ();
00611   if (!address_server_servant.in ())
00612     {
00613       ACE_DEBUG ((LM_ERROR,
00614                   "Unable to create address server.\n"));
00615       ACE_THROW (CORBA::INTERNAL ());
00616     }
00617 
00618   RtecUDPAdmin::AddrServer_var address_server;
00619 
00620   PortableServer::POA_var poa =
00621     address_server_servant->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00622   ACE_CHECK;
00623 
00624   activate (address_server,
00625             poa.in (),
00626             address_server_servant.in (),
00627             address_server_deactivator
00628             ACE_ENV_ARG_PARAMETER);
00629   ACE_CHECK;
00630 
00631   TAO_ECG_Refcounted_Endpoint endpoint_rptr;
00632   TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender;
00633 
00634   // Set up event sender.
00635   if (this->service_type_ == ECG_MCAST_SENDER
00636       || this->service_type_ == ECG_MCAST_TWO_WAY)
00637     {
00638       endpoint_rptr = this->init_endpoint ();
00639       if (endpoint_rptr.get () == 0)
00640         {
00641           ACE_THROW (CORBA::INTERNAL ());
00642         }
00643 
00644       sender = this->init_sender (ec,
00645                                   address_server.in (),
00646                                   endpoint_rptr
00647                                   ACE_ENV_ARG_PARAMETER);
00648       ACE_CHECK;
00649       if (!sender.in ())
00650         {
00651           ACE_THROW (CORBA::INTERNAL ());
00652         }
00653 
00654       sender_shutdown.set_command (UDP_Sender_Shutdown (sender));
00655     }
00656 
00657   // Set up event receiver.
00658   TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver;
00659   if (this->service_type_ == ECG_MCAST_RECEIVER
00660       || this->service_type_ == ECG_MCAST_TWO_WAY)
00661     {
00662       ACE_Reactor *reactor = orb->orb_core ()->reactor ();
00663 
00664       receiver = this->init_receiver (ec,
00665                                       address_server.in (),
00666                                       endpoint_rptr
00667                                       ACE_ENV_ARG_PARAMETER);
00668       ACE_CHECK;
00669       if (!receiver.in ())
00670         {
00671           ACE_THROW (CORBA::INTERNAL ());
00672         }
00673 
00674       receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver));
00675 
00676       TAO_ECG_Refcounted_Handler
00677         handler_rptr (this->init_handler (receiver.in (),
00678                                           ec,
00679                                           reactor
00680                                           ACE_ENV_ARG_PARAMETER));
00681       ACE_CHECK;
00682       if (handler_rptr.get () == 0)
00683         {
00684           ACE_THROW (CORBA::INTERNAL ());
00685         }
00686       receiver->set_handler_shutdown (handler_rptr);
00687     }
00688 
00689   // Everything went ok - disable auto-cleanup.
00690   address_server_deactivator.disallow_deactivation ();
00691   receiver_shutdown.disallow_command ();
00692   sender_shutdown.disallow_command ();
00693 }
00694 
00695 TAO_END_VERSIONED_NAMESPACE_DECL
00696 
00697 // ****************************************************************
00698 
00699 ACE_STATIC_SVC_DEFINE (TAO_ECG_Mcast_Gateway,
00700                        ACE_TEXT ("ECG_Mcast_Gateway"),
00701                        ACE_SVC_OBJ_T,
00702                        &ACE_SVC_NAME (TAO_ECG_Mcast_Gateway),
00703                        ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00704                        0)
00705 ACE_FACTORY_DEFINE (TAO_RTEvent_Serv, TAO_ECG_Mcast_Gateway)

Generated on Thu Nov 9 13:11:10 2006 for TAO_RTEvent by doxygen 1.3.6