ECG_Mcast_Gateway.cpp

Go to the documentation of this file.
00001 // $Id: ECG_Mcast_Gateway.cpp 76589 2007-01-25 18:04:11Z elliott_c $
00002 
00003 #include "orbsvcs/Event/ECG_Mcast_Gateway.h"
00004 
00005 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
00006 #include "orbsvcs/Event/ECG_Simple_Address_Server.h"
00007 #include "orbsvcs/Event/ECG_Complex_Address_Server.h"
00008 #include "orbsvcs/Event/ECG_Simple_Mcast_EH.h"
00009 #include "orbsvcs/Event/ECG_Mcast_EH.h"
00010 #include "orbsvcs/Event/ECG_UDP_EH.h"
00011 
00012 #include "orbsvcs/Event_Utilities.h"
00013 
00014 #include "ace/Dynamic_Service.h"
00015 #include "ace/Arg_Shifter.h"
00016 #include "tao/ORB_Core.h"
00017 #include "ace/OS_NS_strings.h"
00018 
00019 #if ! defined (__ACE_INLINE__)
00020 #include "orbsvcs/Event/ECG_Mcast_Gateway.inl"
00021 #endif /* __ACE_INLINE__ */
00022 
00023 ACE_RCSID(Event, ECG_Mcast_Gateway, "$Id: ECG_Mcast_Gateway.cpp 76589 2007-01-25 18:04:11Z elliott_c $")
00024 
00025 
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 typedef TAO_EC_Shutdown_Command<TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> >
00029 UDP_Sender_Shutdown;
00030 
00031 typedef TAO_EC_Shutdown_Command<TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> >
00032 UDP_Receiver_Shutdown;
00033 
00034 
00035 int
00036 TAO_ECG_Mcast_Gateway::init_svcs (void)
00037 {
00038   return ACE_Service_Config::static_svcs ()->
00039     insert (&ace_svc_desc_TAO_ECG_Mcast_Gateway);
00040 }
00041 
00042 
00043 int
00044 TAO_ECG_Mcast_Gateway::fini (void)
00045 {
00046   return 0;
00047 }
00048 
00049 int
00050 TAO_ECG_Mcast_Gateway::init (int argc, ACE_TCHAR* argv[])
00051 {
00052   int result = 0;
00053 
00054   ACE_Arg_Shifter arg_shifter (argc, argv);
00055 
00056   while (arg_shifter.is_anything_left ())
00057     {
00058       const ACE_TCHAR *arg = arg_shifter.get_current ();
00059 
00060       if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGService")) == 0)
00061         {
00062           arg_shifter.consume_arg ();
00063 
00064           if (arg_shifter.is_parameter_next ())
00065             {
00066               const ACE_TCHAR* opt = arg_shifter.get_current ();
00067               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("receiver")) == 0)
00068                 this->service_type_ = ECG_MCAST_RECEIVER;
00069               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("sender")) == 0)
00070                 this->service_type_ = ECG_MCAST_SENDER;
00071               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("two_way")) == 0)
00072                 this->service_type_ = ECG_MCAST_TWO_WAY;
00073               else
00074                 {
00075                   ACE_ERROR ((LM_ERROR,
00076                              ACE_TEXT ("Unsupported <-ECGService> option ")
00077                              ACE_TEXT ("value: <%s>. Ignoring this option ")
00078                              ACE_TEXT ("- using defaults instead.\n"),
00079                              opt));
00080                   result = -1;
00081                 }
00082               arg_shifter.consume_arg ();
00083             }
00084         }
00085 
00086       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServer")) == 0)
00087         {
00088           arg_shifter.consume_arg ();
00089 
00090           if (arg_shifter.is_parameter_next ())
00091             {
00092               const ACE_TCHAR* opt = arg_shifter.get_current ();
00093               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0)
00094                 this->address_server_type_ = ECG_ADDRESS_SERVER_BASIC;
00095               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("source")) == 0)
00096                 this->address_server_type_ = ECG_ADDRESS_SERVER_SOURCE;
00097               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("type")) == 0)
00098                 this->address_server_type_ = ECG_ADDRESS_SERVER_TYPE;
00099               else
00100                 {
00101                   ACE_ERROR ((LM_ERROR,
00102                               ACE_TEXT ("Unsupported <-ECGAddressServer> ")
00103                               ACE_TEXT ("option value: <%s>. Ignoring this ")
00104                               ACE_TEXT ("option - using defaults instead.\n"),
00105                               opt));
00106                   result = -1;
00107                 }
00108               arg_shifter.consume_arg ();
00109             }
00110         }
00111 
00112       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServerArg")) == 0)
00113         {
00114           arg_shifter.consume_arg ();
00115 
00116           if (arg_shifter.is_parameter_next ())
00117             {
00118               this->address_server_arg_.set (arg_shifter.get_current ());
00119               arg_shifter.consume_arg ();
00120             }
00121         }
00122 
00123 
00124       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGHandler")) == 0)
00125         {
00126           arg_shifter.consume_arg ();
00127 
00128           if (arg_shifter.is_parameter_next ())
00129             {
00130               const ACE_TCHAR* opt = arg_shifter.get_current ();
00131               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0)
00132                 this->handler_type_ = ECG_HANDLER_BASIC;
00133               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("complex")) == 0)
00134                 this->handler_type_ = ECG_HANDLER_COMPLEX;
00135               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("udp")) == 0)
00136                 this->handler_type_ = ECG_HANDLER_UDP;
00137               else
00138                 {
00139                   ACE_ERROR ((LM_ERROR,
00140                               ACE_TEXT ("Unsupported <-ECGHandler> ")
00141                               ACE_TEXT ("option value: <%s>. Ignoring this ")
00142                               ACE_TEXT ("option - using defaults instead.\n"),
00143                               opt));
00144                   result = -1;
00145                 }
00146               arg_shifter.consume_arg ();
00147             }
00148         }
00149 
00150       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGTTL")) == 0)
00151         {
00152           arg_shifter.consume_arg ();
00153 
00154           if (arg_shifter.is_parameter_next ())
00155             {
00156               const ACE_TCHAR* opt = arg_shifter.get_current ();
00157               unsigned long tmp = ACE_OS::strtoul (opt, 0, 0) & 0xff;
00158               this->ttl_value_ = static_cast<u_char> (tmp);
00159               arg_shifter.consume_arg ();
00160             }
00161         }
00162 
00163       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNIC")) == 0)
00164         {
00165           arg_shifter.consume_arg ();
00166 
00167           if (arg_shifter.is_parameter_next ())
00168             {
00169               this->nic_.set (arg_shifter.get_current ());
00170               arg_shifter.consume_arg ();
00171             }
00172         }
00173 
00174       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGIPMULTICASTLOOP")) == 0)
00175         {
00176           arg_shifter.consume_arg ();
00177 
00178           if (arg_shifter.is_parameter_next ())
00179             {
00180               this->ip_multicast_loop_ =
00181                 (ACE_OS::atoi(arg_shifter.get_current()) != 0);
00182               arg_shifter.consume_arg ();
00183             }
00184         }
00185 
00186       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNONBLOCKING")) == 0)
00187       {
00188           arg_shifter.consume_arg ();
00189 
00190           if (arg_shifter.is_parameter_next ())
00191             {
00192               this->non_blocking_ =
00193                 (ACE_OS::atoi(arg_shifter.get_current()) != 0);
00194               arg_shifter.consume_arg ();
00195             }
00196         }
00197 
00198       else
00199         {
00200           arg_shifter.ignore_arg ();
00201           ACE_DEBUG ((LM_WARNING,
00202                       ACE_TEXT ("Ignoring <%s> option ")
00203                       ACE_TEXT ("during initialization.\n"),
00204                       arg));
00205           result = -1;
00206         }
00207     }
00208 
00209   if (this->validate_configuration () == -1)
00210     return -1;
00211   else
00212     return result;
00213 }
00214 
00215 int
00216 TAO_ECG_Mcast_Gateway::init (const char * address_server_arg,
00217                              const Attributes & attr)
00218 {
00219   this->address_server_arg_.set (address_server_arg);
00220 
00221   this->address_server_type_ = attr.address_server_type;
00222   this->handler_type_ = attr.handler_type;
00223   this->service_type_ = attr.service_type;
00224   this->ttl_value_ = attr.ttl_value;
00225   this->nic_.set (attr.nic.c_str ());
00226   this->ip_multicast_loop_ = attr.ip_multicast_loop;
00227   this->non_blocking_ = attr.non_blocking;
00228 
00229   return this->validate_configuration ();
00230 }
00231 
00232 int
00233 TAO_ECG_Mcast_Gateway::init (
00234     const RtecEventChannelAdmin::ConsumerQOS & consumer_qos,
00235     const char * address_server_arg,
00236     const Attributes & attributes)
00237 {
00238   this->consumer_qos_ = consumer_qos;
00239   return this->init (address_server_arg,
00240                      attributes);
00241 }
00242 
00243 int
00244 TAO_ECG_Mcast_Gateway::validate_configuration (void)
00245 {
00246   if ((this->handler_type_ == ECG_HANDLER_BASIC
00247        || this->handler_type_ == ECG_HANDLER_UDP)
00248       && this->service_type_ != ECG_MCAST_SENDER
00249       && this->address_server_type_ != ECG_ADDRESS_SERVER_BASIC)
00250     {
00251       ACE_DEBUG ((LM_ERROR,
00252                       "Configurations for mcast handler and "
00253                       "address server do not match.\n"));
00254       return -1;
00255     }
00256 
00257   // Currently all Address Server implementations require an
00258   // initialization string.  If we ever add a new Address Server
00259   // implementation, which does not, we'll have to remove this check.
00260   if (this->address_server_arg_.length () == 0)
00261     {
00262       ACE_DEBUG ((LM_ERROR,
00263                       "Address server initializaton "
00264                       "argument not specified.\n"));
00265       return -1;
00266     }
00267 
00268   if (this->ip_multicast_loop_ != 0
00269       && this->ip_multicast_loop_ != 1)
00270     {
00271       ACE_DEBUG ((LM_ERROR,
00272                   "IP MULTICAST LOOP option must have a boolean value.\n"));
00273       return -1;
00274     }
00275 
00276   if (this->non_blocking_ != 0
00277       && this->non_blocking_ != 1)
00278     {
00279       ACE_DEBUG ((LM_ERROR,
00280                   "NON BLOCKING flag must have a boolean value.\n"));
00281       return -1;
00282     }
00283 
00284   return 0;
00285 }
00286 
00287 TAO_ECG_Refcounted_Endpoint
00288 TAO_ECG_Mcast_Gateway::init_endpoint (void)
00289 {
00290   TAO_ECG_UDP_Out_Endpoint* endpoint = 0;
00291   TAO_ECG_Refcounted_Endpoint refendpoint;
00292 
00293   // Try to allocate a new endpoint from the heap
00294   ACE_NEW_NORETURN (endpoint,
00295                     TAO_ECG_UDP_Out_Endpoint);
00296 
00297   if (endpoint != 0)
00298   {
00299     refendpoint.reset (endpoint);
00300   }
00301   else
00302   {
00303     return TAO_ECG_Refcounted_Endpoint ();
00304   }
00305 
00306   ACE_SOCK_Dgram& dgram = refendpoint->dgram ();
00307 
00308   if (dgram.open (ACE_Addr::sap_any) == -1)
00309     {
00310       ACE_ERROR ((LM_ERROR,
00311                              "Cannot open dgram "
00312                              "for sending mcast messages.\n"));
00313       return TAO_ECG_Refcounted_Endpoint ();
00314     }
00315 
00316   if (this->nic_.length () != 0)
00317     {
00318       dgram.set_nic (this->nic_.c_str ());
00319     }
00320 
00321   if (this->ttl_value_ > 0)
00322     {
00323       if (dgram.ACE_SOCK::set_option (IPPROTO_IP,
00324                                       IP_MULTICAST_TTL,
00325                                       &this->ttl_value_,
00326                                       sizeof (this->ttl_value_))
00327           == -1)
00328         {
00329           ACE_ERROR ((LM_ERROR,
00330                       "Error setting TTL option on dgram "
00331                       "for sending mcast messages.\n"));
00332           return TAO_ECG_Refcounted_Endpoint ();
00333         }
00334     }
00335 
00336   if (dgram.ACE_SOCK::set_option (IPPROTO_IP,
00337                                   IP_MULTICAST_LOOP,
00338                                   &this->ip_multicast_loop_,
00339                                   sizeof (this->ip_multicast_loop_)) == -1)
00340     {
00341       ACE_ERROR ((LM_ERROR,
00342                   "Error setting MULTICAST_LOOP option "
00343                   "on dgram for sending mcast messages.\n"));
00344       return TAO_ECG_Refcounted_Endpoint ();
00345     }
00346 
00347   if (this->non_blocking_
00348       && dgram.enable(ACE_NONBLOCK) == -1)
00349     {
00350       ACE_ERROR ((LM_ERROR,
00351                   "Error setting NON BLOCKING option.\n"));
00352       return TAO_ECG_Refcounted_Endpoint ();
00353     }
00354 
00355   return refendpoint;
00356 }
00357 
00358 PortableServer::ServantBase *
00359 TAO_ECG_Mcast_Gateway::init_address_server (void)
00360 {
00361   const char * address_server_arg =
00362     (this->address_server_arg_.length ())
00363     ? this->address_server_arg_.c_str () : 0;
00364 
00365   if (this->address_server_type_ == ECG_ADDRESS_SERVER_BASIC)
00366     {
00367       TAO_EC_Servant_Var<TAO_ECG_Simple_Address_Server> impl =
00368         TAO_ECG_Simple_Address_Server::create ();
00369       if (!impl.in ())
00370         return 0;
00371 
00372       if (impl->init (address_server_arg) == -1)
00373         {
00374           return 0;
00375         }
00376       return impl._retn ();
00377     }
00378 
00379   else if (this->address_server_type_ == ECG_ADDRESS_SERVER_SOURCE)
00380     {
00381       TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl =
00382         TAO_ECG_Complex_Address_Server::create (1);
00383       if (!impl.in ())
00384         return 0;
00385 
00386       if (impl->init (address_server_arg) == -1)
00387         {
00388           return 0;
00389         }
00390       return impl._retn ();
00391     }
00392 
00393   else if (this->address_server_type_ == ECG_ADDRESS_SERVER_TYPE)
00394     {
00395       TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl =
00396         TAO_ECG_Complex_Address_Server::create (0);
00397       if (!impl.in ())
00398         return 0;
00399 
00400       if (impl->init (address_server_arg) == -1)
00401         {
00402           return 0;
00403         }
00404       return impl._retn ();
00405     }
00406 
00407   else
00408     {
00409       ACE_ERROR ((LM_ERROR,
00410                   "Cannot create address server: "
00411                   "unknown address server type specified.\n"));
00412       return 0;
00413     }
00414 }
00415 
00416 TAO_ECG_Refcounted_Handler
00417 TAO_ECG_Mcast_Gateway::init_handler (TAO_ECG_Dgram_Handler *receiver,
00418                                      RtecEventChannelAdmin::EventChannel_ptr ec,
00419                                      ACE_Reactor *reactor)
00420 {
00421   TAO_ECG_Refcounted_Handler handler;
00422 
00423   const char * nic =
00424     (this->nic_.length ()) ? this->nic_.c_str () : 0;
00425   const char * address_server_arg =
00426     (this->address_server_arg_.length ())
00427     ? this->address_server_arg_.c_str () : 0;
00428 
00429   if (this->handler_type_ == ECG_HANDLER_BASIC)
00430     {
00431       TAO_ECG_Simple_Mcast_EH * h = 0;
00432       ACE_NEW_RETURN (h,
00433                       TAO_ECG_Simple_Mcast_EH (receiver),
00434                       handler);
00435       handler.reset (h);
00436 
00437       h->reactor (reactor);
00438       if (h->open (address_server_arg, nic) != 0)
00439         return TAO_ECG_Refcounted_Handler ();
00440     }
00441 
00442   else if (this->handler_type_ == ECG_HANDLER_COMPLEX)
00443     {
00444       TAO_ECG_Mcast_EH * h = 0;
00445       ACE_NEW_RETURN (h,
00446                       TAO_ECG_Mcast_EH (receiver, nic),
00447                       handler);
00448       handler.reset (h);
00449 
00450       h->reactor (reactor);
00451 
00452       h->open (ec);
00453     }
00454 
00455   else if (this->handler_type_ == ECG_HANDLER_UDP)
00456     {
00457       TAO_ECG_UDP_EH * h = 0;
00458       ACE_NEW_RETURN (h,
00459                       TAO_ECG_UDP_EH (receiver),
00460                       handler);
00461       handler.reset (h);
00462       h->reactor (reactor);
00463 
00464       ACE_INET_Addr ipaddr;
00465       if (ipaddr.set (address_server_arg) != 0)
00466         {
00467           ACE_ERROR ((LM_ERROR,
00468                       "ERROR using address server argument "
00469                       "in ACE_INET_Addr.set ().\n"));
00470           return TAO_ECG_Refcounted_Handler ();
00471         }
00472       if (h->open (ipaddr) != 0)
00473         return TAO_ECG_Refcounted_Handler ();
00474     }
00475 
00476   else
00477     {
00478       ACE_ERROR ((LM_ERROR,
00479                   "Cannot create handler: unknown "
00480                   "handler type specified.\n"));
00481       return handler;
00482     }
00483 
00484   return handler;
00485 }
00486 
00487 TAO_EC_Servant_Var<TAO_ECG_UDP_Sender>
00488 TAO_ECG_Mcast_Gateway::init_sender (
00489                                RtecEventChannelAdmin::EventChannel_ptr ec,
00490                                RtecUDPAdmin::AddrServer_ptr address_server,
00491                                TAO_ECG_Refcounted_Endpoint endpoint_rptr)
00492 {
00493   TAO_EC_Servant_Var<TAO_ECG_UDP_Sender>
00494     sender (TAO_ECG_UDP_Sender::create ());
00495   if (!sender.in ())
00496     return sender;
00497 
00498   sender->init (ec,
00499                 address_server,
00500                 endpoint_rptr);
00501 
00502   TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown;
00503   sender_shutdown.set_command (UDP_Sender_Shutdown (sender));
00504 
00505   if (this->consumer_qos_.dependencies.length () > 0)
00506     {
00507       // Client supplied consumer qos.  Use it.
00508       this->consumer_qos_.is_gateway = 1;
00509       sender->connect (this->consumer_qos_);
00510     }
00511   else
00512     {
00513       // Client did not specify anything - subscribe to all events.
00514       ACE_ConsumerQOS_Factory consumer_qos_factory;
00515       consumer_qos_factory.start_disjunction_group (1);
00516       consumer_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY,
00517                                    ACE_ES_EVENT_ANY,
00518                                    0);
00519       RtecEventChannelAdmin::ConsumerQOS & qos =
00520         const_cast<RtecEventChannelAdmin::ConsumerQOS &> (consumer_qos_factory.get_ConsumerQOS ());
00521       qos.is_gateway = 1;
00522 
00523       sender->connect (qos);
00524     }
00525 
00526   sender_shutdown.disallow_command ();
00527   return sender;
00528 }
00529 
00530 TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver>
00531 TAO_ECG_Mcast_Gateway::init_receiver (
00532                                RtecEventChannelAdmin::EventChannel_ptr ec,
00533                                RtecUDPAdmin::AddrServer_ptr address_server,
00534                                TAO_ECG_Refcounted_Endpoint endpoint_rptr)
00535 {
00536   TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver>
00537     receiver (TAO_ECG_UDP_Receiver::create ());
00538   if (!receiver.in ())
00539     return receiver;
00540 
00541   receiver->init (ec,
00542                   endpoint_rptr,
00543                   address_server);
00544 
00545   TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown;
00546   receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver));
00547 
00548   ACE_SupplierQOS_Factory supplier_qos_factory;
00549   supplier_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY,
00550                                ACE_ES_EVENT_ANY,
00551                                0, 1);
00552   RtecEventChannelAdmin::SupplierQOS & qos =
00553     const_cast<RtecEventChannelAdmin::SupplierQOS &> (supplier_qos_factory.get_SupplierQOS ());
00554   qos.is_gateway = 1;
00555 
00556   receiver->connect (qos);
00557 
00558   receiver_shutdown.disallow_command ();
00559   return receiver;
00560 }
00561 
00562 void
00563 TAO_ECG_Mcast_Gateway::verify_args (CORBA::ORB_ptr orb,
00564                                     RtecEventChannelAdmin::EventChannel_ptr ec)
00565 {
00566   if (CORBA::is_nil (ec))
00567     {
00568       ACE_ERROR ((LM_ERROR,
00569                   "Nil event channel argument passed to "
00570                   "TAO_ECG_Mcast_Gateway::run().\n"));
00571       throw CORBA::INTERNAL ();
00572     }
00573   if (CORBA::is_nil (orb))
00574     {
00575       ACE_ERROR ((LM_ERROR,
00576                   "Nil orb argument passed to "
00577                   "TAO_ECG_Mcast_Gateway::run().\n"));
00578       throw CORBA::INTERNAL ();
00579     }
00580 }
00581 
00582 void
00583 TAO_ECG_Mcast_Gateway::run (CORBA::ORB_ptr orb,
00584                             RtecEventChannelAdmin::EventChannel_ptr ec)
00585 {
00586   // Verify args.
00587   this->verify_args (orb, ec);
00588 
00589   // Auto-cleanup objects.
00590   TAO_EC_Object_Deactivator address_server_deactivator;
00591   TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown;
00592   TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown;
00593 
00594   // Set up address server.
00595   PortableServer::ServantBase_var address_server_servant =
00596     this->init_address_server ();
00597   if (!address_server_servant.in ())
00598     {
00599       ACE_DEBUG ((LM_ERROR,
00600                   "Unable to create address server.\n"));
00601       throw CORBA::INTERNAL ();
00602     }
00603 
00604   RtecUDPAdmin::AddrServer_var address_server;
00605 
00606   PortableServer::POA_var poa =
00607     address_server_servant->_default_POA ();
00608 
00609   activate (address_server,
00610             poa.in (),
00611             address_server_servant.in (),
00612             address_server_deactivator);
00613 
00614   TAO_ECG_Refcounted_Endpoint endpoint_rptr;
00615   TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender;
00616 
00617   // Set up event sender.
00618   if (this->service_type_ == ECG_MCAST_SENDER
00619       || this->service_type_ == ECG_MCAST_TWO_WAY)
00620     {
00621       endpoint_rptr = this->init_endpoint ();
00622       if (endpoint_rptr.get () == 0)
00623         {
00624           throw CORBA::INTERNAL ();
00625         }
00626 
00627       sender = this->init_sender (ec,
00628                                   address_server.in (),
00629                                   endpoint_rptr);
00630       if (!sender.in ())
00631         {
00632           throw CORBA::INTERNAL ();
00633         }
00634 
00635       sender_shutdown.set_command (UDP_Sender_Shutdown (sender));
00636     }
00637 
00638   // Set up event receiver.
00639   TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver;
00640   if (this->service_type_ == ECG_MCAST_RECEIVER
00641       || this->service_type_ == ECG_MCAST_TWO_WAY)
00642     {
00643       ACE_Reactor *reactor = orb->orb_core ()->reactor ();
00644 
00645       receiver = this->init_receiver (ec,
00646                                       address_server.in (),
00647                                       endpoint_rptr);
00648       if (!receiver.in ())
00649         {
00650           throw CORBA::INTERNAL ();
00651         }
00652 
00653       receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver));
00654 
00655       TAO_ECG_Refcounted_Handler
00656         handler_rptr (this->init_handler (receiver.in (),
00657                                           ec,
00658                                           reactor));
00659       if (handler_rptr.get () == 0)
00660         {
00661           throw CORBA::INTERNAL ();
00662         }
00663       receiver->set_handler_shutdown (handler_rptr);
00664     }
00665 
00666   // Everything went ok - disable auto-cleanup.
00667   address_server_deactivator.disallow_deactivation ();
00668   receiver_shutdown.disallow_command ();
00669   sender_shutdown.disallow_command ();
00670 }
00671 
00672 TAO_END_VERSIONED_NAMESPACE_DECL
00673 
00674 // ****************************************************************
00675 
00676 ACE_STATIC_SVC_DEFINE (TAO_ECG_Mcast_Gateway,
00677                        ACE_TEXT ("ECG_Mcast_Gateway"),
00678                        ACE_SVC_OBJ_T,
00679                        &ACE_SVC_NAME (TAO_ECG_Mcast_Gateway),
00680                        ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00681                        0)
00682 ACE_FACTORY_DEFINE (TAO_RTEvent_Serv, TAO_ECG_Mcast_Gateway)

Generated on Tue Feb 2 17:44:06 2010 for TAO_RTEvent by  doxygen 1.4.7