#include <ECG_Mcast_Gateway.h>
Inheritance diagram for TAO_ECG_Mcast_Gateway:
This class simplifies creation of federated Event Channels by presenting a simple unified interface for creating and configuring all components needed to federate an Event Channel. Configuration options are described below.
NOTE: This class does not own any of the components it creates and its lifetime is independent of theirs. This class acts purely as a wrapper facade for creating and wiring appropriate components together.
Service config file options:
-ECGService Valid values: sender, receiver, two_way Specifies whether this gateway should act as a multicast sender of the events or multicast receiver, or both.
-ECGAddressServer Valid values: basic, source, type Specifies what implementation of the address server should be used by the gateway. basic - the same multicast address is returned for all event headers. source - multicast addresses are returned based on the event source, according to the mapping provided at initialization. type - multicast addresses are returned based on the event type, according to the mapping provided at initialization.
-ECGAddressServerArg Valid value: arg is a string, whose format requirements are specific to the implementation of address server used. Arg is not interpreted by the gateway, but simply passed to the address server specified by -ECGAddressServer during initialization. THIS OPTION MUST ALWAYS BE SPECIFIED BY THE USER (there is no default value for it)
-ECGHandler Valid values: basic, complex, udp Specifies what implementation of event handler should be used if gateway is acting as events receiver. basic - a simple event handler listening on a single mcast address. complex - event handler listening on multiple mcast addresses based on events of interest to consumers. udp - similar to basic handler, except listens on udp address as opposed to a multicast group.
-ECGTTL Valid values: a number > 0 IP_Multicast time to live value that should be set on a sending socket. This option matters only if the gateway is acting as a sender of mcast messages.
-ECGNIC Valid values: name of the network interface This interface is used for sending and/or receiving multicast messages.
-ECGNonBlocking Boolean flag to configure if the socket is in blocking or non-blocking code. The default is non-blocking. NOTE: Certain device drivers block the process if the physical link fails.
2) Create an instance of TAO_ECG_Mcast_Gateway in your code, on the stack or dynamically, and use init () method to configure it. No configuration files involved. See service config options above for the description of configurable options, and init() method below for how to specify them.
Default configuration values (for either use case) can be found in ECG_Defaults.h
Definition at line 128 of file ECG_Mcast_Gateway.h.
|
Definition at line 152 of file ECG_Mcast_Gateway.h.
00152 {ECG_ADDRESS_SERVER_BASIC, 00153 ECG_ADDRESS_SERVER_SOURCE, 00154 ECG_ADDRESS_SERVER_TYPE}; |
|
Definition at line 156 of file ECG_Mcast_Gateway.h.
00156 {ECG_HANDLER_BASIC, 00157 ECG_HANDLER_COMPLEX, 00158 ECG_HANDLER_UDP}; |
|
Definition at line 148 of file ECG_Mcast_Gateway.h.
00148 {ECG_MCAST_SENDER, 00149 ECG_MCAST_RECEIVER, 00150 ECG_MCAST_TWO_WAY}; |
|
Constructor.
Definition at line 8 of file ECG_Mcast_Gateway.i. References consumer_qos_, RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_ECG_DEFAULT_ADDRESS_SERVER, TAO_ECG_DEFAULT_ADDRESS_SERVER_ARG, TAO_ECG_DEFAULT_HANDLER, TAO_ECG_DEFAULT_IP_MULTICAST_LOOP, TAO_ECG_DEFAULT_NIC, TAO_ECG_DEFAULT_NON_BLOCKING, TAO_ECG_DEFAULT_SERVICE, and TAO_ECG_DEFAULT_TTL.
00009 : service_type_ (TAO_ECG_DEFAULT_SERVICE) 00010 , handler_type_ (TAO_ECG_DEFAULT_HANDLER) 00011 , address_server_type_ (TAO_ECG_DEFAULT_ADDRESS_SERVER) 00012 , address_server_arg_ ((const char *) TAO_ECG_DEFAULT_ADDRESS_SERVER_ARG) 00013 , ttl_value_ (TAO_ECG_DEFAULT_TTL) 00014 , nic_ (static_cast<const char *> (TAO_ECG_DEFAULT_NIC)) 00015 , ip_multicast_loop_ (TAO_ECG_DEFAULT_IP_MULTICAST_LOOP) 00016 , non_blocking_ (TAO_ECG_DEFAULT_NON_BLOCKING) 00017 , consumer_qos_ () 00018 { 00019 this->consumer_qos_.dependencies.length (0); 00020 } |
|
Reimplemented from ACE_Shared_Object. Definition at line 44 of file ECG_Mcast_Gateway.cpp.
00045 {
00046 return 0;
00047 }
|
|
Same as the method above, but also gives the client an opportunity to specify consumer qos, i.e., which EC traffic should get multicasted. Definition at line 233 of file ECG_Mcast_Gateway.cpp. References consumer_qos_, and init().
00237 { 00238 this->consumer_qos_ = consumer_qos; 00239 return this->init (address_server_arg, 00240 attributes); 00241 } |
|
Configure TAO_ECG_Mcast_Gateway programatically. This method should be used when NOT using service configurator to obtain/configure TAO_ECG_Mcast_Gateway. See class documentation above for more info. Definition at line 216 of file ECG_Mcast_Gateway.cpp. References address_server_arg_, TAO_ECG_Mcast_Gateway::Attributes::address_server_type, address_server_type_, TAO_ECG_Mcast_Gateway::Attributes::handler_type, handler_type_, TAO_ECG_Mcast_Gateway::Attributes::ip_multicast_loop, ip_multicast_loop_, TAO_ECG_Mcast_Gateway::Attributes::nic, nic_, TAO_ECG_Mcast_Gateway::Attributes::non_blocking, non_blocking_, TAO_ECG_Mcast_Gateway::Attributes::service_type, TAO_ECG_Mcast_Gateway::Attributes::ttl_value, ttl_value_, and validate_configuration().
00218 { 00219 this->address_server_arg_.set (address_server_arg); 00220 00221 this->address_server_type_ = attr.address_server_type; 00222 this->handler_type_ = attr.handler_type; 00223 this->service_type_ = attr.service_type; 00224 this->ttl_value_ = attr.ttl_value; 00225 this->nic_.set (attr.nic.c_str ()); 00226 this->ip_multicast_loop_ = attr.ip_multicast_loop; 00227 this->non_blocking_ = attr.non_blocking; 00228 00229 return this->validate_configuration (); 00230 } |
|
Reimplemented from ACE_Shared_Object. Definition at line 50 of file ECG_Mcast_Gateway.cpp. References ACE_Arg_Shifter, ACE_DEBUG, ACE_ERROR, ACE_TCHAR, ACE_TEXT(), address_server_arg_, address_server_type_, ACE_OS::atoi(), ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE, ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP, ECG_MCAST_RECEIVER, ECG_MCAST_SENDER, ECG_MCAST_TWO_WAY, handler_type_, ip_multicast_loop_, LM_ERROR, LM_WARNING, nic_, non_blocking_, ACE_OS::strcasecmp(), ACE_OS::strtoul(), ttl_value_, and validate_configuration(). Referenced by init().
00051 { 00052 int result = 0; 00053 00054 ACE_Arg_Shifter arg_shifter (argc, argv); 00055 00056 while (arg_shifter.is_anything_left ()) 00057 { 00058 const ACE_TCHAR *arg = arg_shifter.get_current (); 00059 00060 if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGService")) == 0) 00061 { 00062 arg_shifter.consume_arg (); 00063 00064 if (arg_shifter.is_parameter_next ()) 00065 { 00066 const ACE_TCHAR* opt = arg_shifter.get_current (); 00067 if (ACE_OS::strcasecmp (opt, ACE_TEXT ("receiver")) == 0) 00068 this->service_type_ = ECG_MCAST_RECEIVER; 00069 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("sender")) == 0) 00070 this->service_type_ = ECG_MCAST_SENDER; 00071 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("two_way")) == 0) 00072 this->service_type_ = ECG_MCAST_TWO_WAY; 00073 else 00074 { 00075 ACE_ERROR ((LM_ERROR, 00076 ACE_TEXT ("Unsupported <-ECGService> option ") 00077 ACE_TEXT ("value: <%s>. Ignoring this option ") 00078 ACE_TEXT ("- using defaults instead.\n"), 00079 opt)); 00080 result = -1; 00081 } 00082 arg_shifter.consume_arg (); 00083 } 00084 } 00085 00086 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServer")) == 0) 00087 { 00088 arg_shifter.consume_arg (); 00089 00090 if (arg_shifter.is_parameter_next ()) 00091 { 00092 const ACE_TCHAR* opt = arg_shifter.get_current (); 00093 if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0) 00094 this->address_server_type_ = ECG_ADDRESS_SERVER_BASIC; 00095 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("source")) == 0) 00096 this->address_server_type_ = ECG_ADDRESS_SERVER_SOURCE; 00097 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("type")) == 0) 00098 this->address_server_type_ = ECG_ADDRESS_SERVER_TYPE; 00099 else 00100 { 00101 ACE_ERROR ((LM_ERROR, 00102 ACE_TEXT ("Unsupported <-ECGAddressServer> ") 00103 ACE_TEXT ("option value: <%s>. Ignoring this ") 00104 ACE_TEXT ("option - using defaults instead.\n"), 00105 opt)); 00106 result = -1; 00107 } 00108 arg_shifter.consume_arg (); 00109 } 00110 } 00111 00112 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServerArg")) == 0) 00113 { 00114 arg_shifter.consume_arg (); 00115 00116 if (arg_shifter.is_parameter_next ()) 00117 { 00118 this->address_server_arg_.set (arg_shifter.get_current ()); 00119 arg_shifter.consume_arg (); 00120 } 00121 } 00122 00123 00124 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGHandler")) == 0) 00125 { 00126 arg_shifter.consume_arg (); 00127 00128 if (arg_shifter.is_parameter_next ()) 00129 { 00130 const ACE_TCHAR* opt = arg_shifter.get_current (); 00131 if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0) 00132 this->handler_type_ = ECG_HANDLER_BASIC; 00133 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("complex")) == 0) 00134 this->handler_type_ = ECG_HANDLER_COMPLEX; 00135 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("udp")) == 0) 00136 this->handler_type_ = ECG_HANDLER_UDP; 00137 else 00138 { 00139 ACE_ERROR ((LM_ERROR, 00140 ACE_TEXT ("Unsupported <-ECGHandler> ") 00141 ACE_TEXT ("option value: <%s>. Ignoring this ") 00142 ACE_TEXT ("option - using defaults instead.\n"), 00143 opt)); 00144 result = -1; 00145 } 00146 arg_shifter.consume_arg (); 00147 } 00148 } 00149 00150 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGTTL")) == 0) 00151 { 00152 arg_shifter.consume_arg (); 00153 00154 if (arg_shifter.is_parameter_next ()) 00155 { 00156 const ACE_TCHAR* opt = arg_shifter.get_current (); 00157 unsigned long tmp = ACE_OS::strtoul (opt, 0, 0) & 0xff; 00158 this->ttl_value_ = static_cast<u_char> (tmp); 00159 arg_shifter.consume_arg (); 00160 } 00161 } 00162 00163 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNIC")) == 0) 00164 { 00165 arg_shifter.consume_arg (); 00166 00167 if (arg_shifter.is_parameter_next ()) 00168 { 00169 this->nic_.set (arg_shifter.get_current ()); 00170 arg_shifter.consume_arg (); 00171 } 00172 } 00173 00174 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGIPMULTICASTLOOP")) == 0) 00175 { 00176 arg_shifter.consume_arg (); 00177 00178 if (arg_shifter.is_parameter_next ()) 00179 { 00180 this->ip_multicast_loop_ = 00181 (ACE_OS::atoi(arg_shifter.get_current()) != 0); 00182 arg_shifter.consume_arg (); 00183 } 00184 } 00185 00186 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNONBLOCKING")) == 0) 00187 { 00188 arg_shifter.consume_arg (); 00189 00190 if (arg_shifter.is_parameter_next ()) 00191 { 00192 this->non_blocking_ = 00193 (ACE_OS::atoi(arg_shifter.get_current()) != 0); 00194 arg_shifter.consume_arg (); 00195 } 00196 } 00197 00198 else 00199 { 00200 arg_shifter.ignore_arg (); 00201 ACE_DEBUG ((LM_WARNING, 00202 ACE_TEXT ("Ignoring <%s> option ") 00203 ACE_TEXT ("during initialization.\n"), 00204 arg)); 00205 result = -1; 00206 } 00207 } 00208 00209 if (this->validate_configuration () == -1) 00210 return -1; 00211 else 00212 return result; 00213 } |
|
Definition at line 359 of file ECG_Mcast_Gateway.cpp. References TAO_EC_Servant_Var< T >::_retn(), ACE_ERROR, address_server_arg_, address_server_type_, TAO_ECG_Complex_Address_Server::create(), TAO_ECG_Simple_Address_Server::create(), ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE, TAO_EC_Servant_Var< T >::in(), and LM_ERROR. Referenced by run().
00360 { 00361 const char * address_server_arg = 00362 (this->address_server_arg_.length ()) 00363 ? this->address_server_arg_.c_str () : 0; 00364 00365 if (this->address_server_type_ == ECG_ADDRESS_SERVER_BASIC) 00366 { 00367 TAO_EC_Servant_Var<TAO_ECG_Simple_Address_Server> impl = 00368 TAO_ECG_Simple_Address_Server::create (); 00369 if (!impl.in ()) 00370 return 0; 00371 00372 if (impl->init (address_server_arg) == -1) 00373 { 00374 return 0; 00375 } 00376 return impl._retn (); 00377 } 00378 00379 else if (this->address_server_type_ == ECG_ADDRESS_SERVER_SOURCE) 00380 { 00381 TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl = 00382 TAO_ECG_Complex_Address_Server::create (1); 00383 if (!impl.in ()) 00384 return 0; 00385 00386 if (impl->init (address_server_arg) == -1) 00387 { 00388 return 0; 00389 } 00390 return impl._retn (); 00391 } 00392 00393 else if (this->address_server_type_ == ECG_ADDRESS_SERVER_TYPE) 00394 { 00395 TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl = 00396 TAO_ECG_Complex_Address_Server::create (0); 00397 if (!impl.in ()) 00398 return 0; 00399 00400 if (impl->init (address_server_arg) == -1) 00401 { 00402 return 0; 00403 } 00404 return impl._retn (); 00405 } 00406 00407 else 00408 { 00409 ACE_ERROR ((LM_ERROR, 00410 "Cannot create address server: " 00411 "unknown address server type specified.\n")); 00412 return 0; 00413 } 00414 } |
|
Definition at line 288 of file ECG_Mcast_Gateway.cpp. References ACE_ERROR, ACE_NEW_NORETURN, ACE_NONBLOCK, ACE_IPC_SAP::enable(), IP_MULTICAST_LOOP, IP_MULTICAST_TTL, LM_ERROR, nic_, non_blocking_, ACE_SOCK_Dgram::open(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset(), ACE_SOCK_Dgram::set_nic(), TAO_ECG_Refcounted_Endpoint, and ttl_value_. Referenced by run().
00289 { 00290 TAO_ECG_UDP_Out_Endpoint* endpoint = 0; 00291 TAO_ECG_Refcounted_Endpoint refendpoint; 00292 00293 // Try to allocate a new endpoint from the heap 00294 ACE_NEW_NORETURN (endpoint, 00295 TAO_ECG_UDP_Out_Endpoint); 00296 00297 if (endpoint != 0) 00298 { 00299 refendpoint.reset (endpoint); 00300 } 00301 else 00302 { 00303 return TAO_ECG_Refcounted_Endpoint (); 00304 } 00305 00306 ACE_SOCK_Dgram& dgram = refendpoint->dgram (); 00307 00308 if (dgram.open (ACE_Addr::sap_any) == -1) 00309 { 00310 ACE_ERROR ((LM_ERROR, 00311 "Cannot open dgram " 00312 "for sending mcast messages.\n")); 00313 return TAO_ECG_Refcounted_Endpoint (); 00314 } 00315 00316 if (this->nic_.length () != 0) 00317 { 00318 dgram.set_nic (this->nic_.c_str ()); 00319 } 00320 00321 if (this->ttl_value_ > 0) 00322 { 00323 if (dgram.ACE_SOCK::set_option (IPPROTO_IP, 00324 IP_MULTICAST_TTL, 00325 &this->ttl_value_, 00326 sizeof (this->ttl_value_)) 00327 == -1) 00328 { 00329 ACE_ERROR ((LM_ERROR, 00330 "Error setting TTL option on dgram " 00331 "for sending mcast messages.\n")); 00332 return TAO_ECG_Refcounted_Endpoint (); 00333 } 00334 } 00335 00336 if (dgram.ACE_SOCK::set_option (IPPROTO_IP, 00337 IP_MULTICAST_LOOP, 00338 &this->ip_multicast_loop_, 00339 sizeof (this->ip_multicast_loop_)) == -1) 00340 { 00341 ACE_ERROR ((LM_ERROR, 00342 "Error setting MULTICAST_LOOP option " 00343 "on dgram for sending mcast messages.\n")); 00344 return TAO_ECG_Refcounted_Endpoint (); 00345 } 00346 00347 if (this->non_blocking_ 00348 && dgram.enable(ACE_NONBLOCK) == -1) 00349 { 00350 ACE_ERROR ((LM_ERROR, 00351 "Error setting NON BLOCKING option.\n")); 00352 return TAO_ECG_Refcounted_Endpoint (); 00353 } 00354 00355 return refendpoint; 00356 } |
|
Definition at line 417 of file ECG_Mcast_Gateway.cpp. References ACE_CHECK_RETURN, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_NEW_RETURN, address_server_arg_, ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP, handler_type_, LM_ERROR, nic_, TAO_ECG_UDP_EH::open(), TAO_ECG_Mcast_EH::open(), TAO_ECG_Simple_Mcast_EH::open(), ACE_Event_Handler::reactor(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset(), ACE_INET_Addr::set(), and TAO_ECG_Refcounted_Handler.
00421 { 00422 TAO_ECG_Refcounted_Handler handler; 00423 00424 const char * nic = 00425 (this->nic_.length ()) ? this->nic_.c_str () : 0; 00426 const char * address_server_arg = 00427 (this->address_server_arg_.length ()) 00428 ? this->address_server_arg_.c_str () : 0; 00429 00430 if (this->handler_type_ == ECG_HANDLER_BASIC) 00431 { 00432 TAO_ECG_Simple_Mcast_EH * h = 0; 00433 ACE_NEW_RETURN (h, 00434 TAO_ECG_Simple_Mcast_EH (receiver), 00435 handler); 00436 handler.reset (h); 00437 00438 h->reactor (reactor); 00439 if (h->open (address_server_arg, nic) != 0) 00440 return TAO_ECG_Refcounted_Handler (); 00441 } 00442 00443 else if (this->handler_type_ == ECG_HANDLER_COMPLEX) 00444 { 00445 TAO_ECG_Mcast_EH * h = 0; 00446 ACE_NEW_RETURN (h, 00447 TAO_ECG_Mcast_EH (receiver, nic), 00448 handler); 00449 handler.reset (h); 00450 00451 h->reactor (reactor); 00452 00453 h->open (ec ACE_ENV_ARG_PARAMETER); 00454 ACE_CHECK_RETURN (TAO_ECG_Refcounted_Handler ()); 00455 } 00456 00457 else if (this->handler_type_ == ECG_HANDLER_UDP) 00458 { 00459 TAO_ECG_UDP_EH * h = 0; 00460 ACE_NEW_RETURN (h, 00461 TAO_ECG_UDP_EH (receiver), 00462 handler); 00463 handler.reset (h); 00464 h->reactor (reactor); 00465 00466 ACE_INET_Addr ipaddr; 00467 if (ipaddr.set (address_server_arg) != 0) 00468 { 00469 ACE_ERROR ((LM_ERROR, 00470 "ERROR using address server argument " 00471 "in ACE_INET_Addr.set ().\n")); 00472 return TAO_ECG_Refcounted_Handler (); 00473 } 00474 if (h->open (ipaddr) != 0) 00475 return TAO_ECG_Refcounted_Handler (); 00476 } 00477 00478 else 00479 { 00480 ACE_ERROR ((LM_ERROR, 00481 "Cannot create handler: unknown " 00482 "handler type specified.\n")); 00483 return handler; 00484 } 00485 00486 return handler; 00487 } |
|
Definition at line 538 of file ECG_Mcast_Gateway.cpp. References ACE_CHECK_RETURN, ACE_ENV_ARG_PARAMETER, ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, TAO_EC_Auto_Command< T >::disallow_command(), ACE_SupplierQOS_Factory::get_SupplierQOS(), TAO_EC_Servant_Var< T >::in(), ACE_SupplierQOS_Factory::insert(), RtecEventChannelAdmin::SupplierQOS::is_gateway, TAO_EC_Auto_Command< T >::set_command(), TAO_ECG_Refcounted_Endpoint, and UDP_Receiver_Shutdown. Referenced by run().
00543 { 00544 TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> 00545 receiver (TAO_ECG_UDP_Receiver::create ()); 00546 if (!receiver.in ()) 00547 return receiver; 00548 00549 receiver->init (ec, 00550 endpoint_rptr, 00551 address_server 00552 ACE_ENV_ARG_PARAMETER); 00553 ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> ()); 00554 00555 TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown; 00556 receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver)); 00557 00558 ACE_SupplierQOS_Factory supplier_qos_factory; 00559 supplier_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY, 00560 ACE_ES_EVENT_ANY, 00561 0, 1); 00562 RtecEventChannelAdmin::SupplierQOS & qos = 00563 const_cast<RtecEventChannelAdmin::SupplierQOS &> (supplier_qos_factory.get_SupplierQOS ()); 00564 qos.is_gateway = 1; 00565 00566 receiver->connect (qos ACE_ENV_ARG_PARAMETER); 00567 ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> ()); 00568 00569 receiver_shutdown.disallow_command (); 00570 return receiver; 00571 } |
|
Definition at line 490 of file ECG_Mcast_Gateway.cpp. References ACE_CHECK_RETURN, ACE_ENV_ARG_PARAMETER, ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, consumer_qos_, RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_EC_Auto_Command< T >::disallow_command(), ACE_ConsumerQOS_Factory::get_ConsumerQOS(), TAO_EC_Servant_Var< T >::in(), ACE_ConsumerQOS_Factory::insert(), RtecEventChannelAdmin::ConsumerQOS::is_gateway, TAO_EC_Auto_Command< T >::set_command(), ACE_ConsumerQOS_Factory::start_disjunction_group(), TAO_ECG_Refcounted_Endpoint, and UDP_Sender_Shutdown. Referenced by run().
00495 { 00496 TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> 00497 sender (TAO_ECG_UDP_Sender::create ()); 00498 if (!sender.in ()) 00499 return sender; 00500 00501 sender->init (ec, 00502 address_server, 00503 endpoint_rptr 00504 ACE_ENV_ARG_PARAMETER); 00505 ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ()); 00506 00507 TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown; 00508 sender_shutdown.set_command (UDP_Sender_Shutdown (sender)); 00509 00510 if (this->consumer_qos_.dependencies.length () > 0) 00511 { 00512 // Client supplied consumer qos. Use it. 00513 this->consumer_qos_.is_gateway = 1; 00514 sender->connect (this->consumer_qos_ ACE_ENV_ARG_PARAMETER); 00515 ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ()); 00516 } 00517 else 00518 { 00519 // Client did not specify anything - subscribe to all events. 00520 ACE_ConsumerQOS_Factory consumer_qos_factory; 00521 consumer_qos_factory.start_disjunction_group (1); 00522 consumer_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY, 00523 ACE_ES_EVENT_ANY, 00524 0); 00525 RtecEventChannelAdmin::ConsumerQOS & qos = 00526 const_cast<RtecEventChannelAdmin::ConsumerQOS &> (consumer_qos_factory.get_ConsumerQOS ()); 00527 qos.is_gateway = 1; 00528 00529 sender->connect (qos ACE_ENV_ARG_PARAMETER); 00530 ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ()); 00531 } 00532 00533 sender_shutdown.disallow_command (); 00534 return sender; 00535 } |
|
Helper function to register the Gateway into the service configurator. Definition at line 36 of file ECG_Mcast_Gateway.cpp. References ACE_Service_Config::static_svcs().
00037 { 00038 return ACE_Service_Config::static_svcs ()-> 00039 insert (&ace_svc_desc_TAO_ECG_Mcast_Gateway); 00040 } |
|
The main method - create, configure and run federation components according to the specified configuration. Definition at line 595 of file ECG_Mcast_Gateway.cpp. References ACE_CHECK, ACE_DEBUG, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_THROW, activate(), TAO_EC_Auto_Command< T >::disallow_command(), TAO_EC_Object_Deactivator::disallow_deactivation(), ECG_MCAST_RECEIVER, ECG_MCAST_SENDER, ECG_MCAST_TWO_WAY, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), TAO_EC_Servant_Var< T >::in(), init_address_server(), init_endpoint(), init_receiver(), init_sender(), LM_ERROR, TAO_EC_Auto_Command< T >::set_command(), TAO_ECG_Refcounted_Endpoint, TAO_ECG_Refcounted_Handler, UDP_Receiver_Shutdown, UDP_Sender_Shutdown, and verify_args().
00598 { 00599 // Verify args. 00600 this->verify_args (orb, ec ACE_ENV_ARG_PARAMETER); 00601 ACE_CHECK; 00602 00603 // Auto-cleanup objects. 00604 TAO_EC_Object_Deactivator address_server_deactivator; 00605 TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown; 00606 TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown; 00607 00608 // Set up address server. 00609 PortableServer::ServantBase_var address_server_servant = 00610 this->init_address_server (); 00611 if (!address_server_servant.in ()) 00612 { 00613 ACE_DEBUG ((LM_ERROR, 00614 "Unable to create address server.\n")); 00615 ACE_THROW (CORBA::INTERNAL ()); 00616 } 00617 00618 RtecUDPAdmin::AddrServer_var address_server; 00619 00620 PortableServer::POA_var poa = 00621 address_server_servant->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); 00622 ACE_CHECK; 00623 00624 activate (address_server, 00625 poa.in (), 00626 address_server_servant.in (), 00627 address_server_deactivator 00628 ACE_ENV_ARG_PARAMETER); 00629 ACE_CHECK; 00630 00631 TAO_ECG_Refcounted_Endpoint endpoint_rptr; 00632 TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender; 00633 00634 // Set up event sender. 00635 if (this->service_type_ == ECG_MCAST_SENDER 00636 || this->service_type_ == ECG_MCAST_TWO_WAY) 00637 { 00638 endpoint_rptr = this->init_endpoint (); 00639 if (endpoint_rptr.get () == 0) 00640 { 00641 ACE_THROW (CORBA::INTERNAL ()); 00642 } 00643 00644 sender = this->init_sender (ec, 00645 address_server.in (), 00646 endpoint_rptr 00647 ACE_ENV_ARG_PARAMETER); 00648 ACE_CHECK; 00649 if (!sender.in ()) 00650 { 00651 ACE_THROW (CORBA::INTERNAL ()); 00652 } 00653 00654 sender_shutdown.set_command (UDP_Sender_Shutdown (sender)); 00655 } 00656 00657 // Set up event receiver. 00658 TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver; 00659 if (this->service_type_ == ECG_MCAST_RECEIVER 00660 || this->service_type_ == ECG_MCAST_TWO_WAY) 00661 { 00662 ACE_Reactor *reactor = orb->orb_core ()->reactor (); 00663 00664 receiver = this->init_receiver (ec, 00665 address_server.in (), 00666 endpoint_rptr 00667 ACE_ENV_ARG_PARAMETER); 00668 ACE_CHECK; 00669 if (!receiver.in ()) 00670 { 00671 ACE_THROW (CORBA::INTERNAL ()); 00672 } 00673 00674 receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver)); 00675 00676 TAO_ECG_Refcounted_Handler 00677 handler_rptr (this->init_handler (receiver.in (), 00678 ec, 00679 reactor 00680 ACE_ENV_ARG_PARAMETER)); 00681 ACE_CHECK; 00682 if (handler_rptr.get () == 0) 00683 { 00684 ACE_THROW (CORBA::INTERNAL ()); 00685 } 00686 receiver->set_handler_shutdown (handler_rptr); 00687 } 00688 00689 // Everything went ok - disable auto-cleanup. 00690 address_server_deactivator.disallow_deactivation (); 00691 receiver_shutdown.disallow_command (); 00692 sender_shutdown.disallow_command (); 00693 } |
|
Verifies configuration values specified through init() make sense.
Definition at line 244 of file ECG_Mcast_Gateway.cpp. References ACE_DEBUG, address_server_arg_, address_server_type_, ECG_ADDRESS_SERVER_BASIC, ECG_HANDLER_BASIC, ECG_HANDLER_UDP, ECG_MCAST_SENDER, handler_type_, ip_multicast_loop_, LM_ERROR, and non_blocking_. Referenced by init().
00245 { 00246 if ((this->handler_type_ == ECG_HANDLER_BASIC 00247 || this->handler_type_ == ECG_HANDLER_UDP) 00248 && this->service_type_ != ECG_MCAST_SENDER 00249 && this->address_server_type_ != ECG_ADDRESS_SERVER_BASIC) 00250 { 00251 ACE_DEBUG ((LM_ERROR, 00252 "Configurations for mcast handler and " 00253 "address server do not match.\n")); 00254 return -1; 00255 } 00256 00257 // Currently all Address Server implementations require an 00258 // initialization string. If we ever add a new Address Server 00259 // implementation, which does not, we'll have to remove this check. 00260 if (this->address_server_arg_.length () == 0) 00261 { 00262 ACE_DEBUG ((LM_ERROR, 00263 "Address server initializaton " 00264 "argument not specified.\n")); 00265 return -1; 00266 } 00267 00268 if (this->ip_multicast_loop_ != 0 00269 && this->ip_multicast_loop_ != 1) 00270 { 00271 ACE_DEBUG ((LM_ERROR, 00272 "IP MULTICAST LOOP option must have a boolean value.\n")); 00273 return -1; 00274 } 00275 00276 if (this->non_blocking_ != 0 00277 && this->non_blocking_ != 1) 00278 { 00279 ACE_DEBUG ((LM_ERROR, 00280 "NON BLOCKING flag must have a boolean value.\n")); 00281 return -1; 00282 } 00283 00284 return 0; 00285 } |
|
Check that arguments to run() are not nil.
Definition at line 574 of file ECG_Mcast_Gateway.cpp. References ACE_ERROR, ACE_THROW, CORBA::is_nil(), and LM_ERROR. Referenced by run().
00577 { 00578 if (CORBA::is_nil (ec)) 00579 { 00580 ACE_ERROR ((LM_ERROR, 00581 "Nil event channel argument passed to " 00582 "TAO_ECG_Mcast_Gateway::run().\n")); 00583 ACE_THROW (CORBA::INTERNAL ()); 00584 } 00585 if (CORBA::is_nil (orb)) 00586 { 00587 ACE_ERROR ((LM_ERROR, 00588 "Nil orb argument passed to " 00589 "TAO_ECG_Mcast_Gateway::run().\n")); 00590 ACE_THROW (CORBA::INTERNAL ()); 00591 } 00592 } |
|
Definition at line 270 of file ECG_Mcast_Gateway.h. Referenced by init(), init_address_server(), init_handler(), and validate_configuration(). |
|
Definition at line 269 of file ECG_Mcast_Gateway.h. Referenced by init(), init_address_server(), and validate_configuration(). |
|
Definition at line 276 of file ECG_Mcast_Gateway.h. Referenced by init(), init_sender(), and TAO_ECG_Mcast_Gateway(). |
|
Definition at line 268 of file ECG_Mcast_Gateway.h. Referenced by init(), init_handler(), and validate_configuration(). |
|
Definition at line 273 of file ECG_Mcast_Gateway.h. Referenced by init(), and validate_configuration(). |
|
Definition at line 272 of file ECG_Mcast_Gateway.h. Referenced by init(), init_endpoint(), and init_handler(). |
|
Definition at line 274 of file ECG_Mcast_Gateway.h. Referenced by init(), init_endpoint(), and validate_configuration(). |
|
Definition at line 267 of file ECG_Mcast_Gateway.h. |
|
Definition at line 271 of file ECG_Mcast_Gateway.h. Referenced by init(), and init_endpoint(). |