#include <ECG_Mcast_Gateway.h>
Inheritance diagram for TAO_ECG_Mcast_Gateway:
Public Types | |
ECG_MCAST_SENDER | |
ECG_MCAST_RECEIVER | |
ECG_MCAST_TWO_WAY | |
ECG_ADDRESS_SERVER_BASIC | |
ECG_ADDRESS_SERVER_SOURCE | |
ECG_ADDRESS_SERVER_TYPE | |
ECG_HANDLER_BASIC | |
ECG_HANDLER_COMPLEX | |
ECG_HANDLER_UDP | |
enum | Service_Type { ECG_MCAST_SENDER, ECG_MCAST_RECEIVER, ECG_MCAST_TWO_WAY } |
Values for some configuration parameters to init (). More... | |
enum | Address_Server_Type { ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE } |
Values for some configuration parameters to init (). More... | |
enum | Handler_Type { ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP } |
Values for some configuration parameters to init (). More... | |
Public Member Functions | |
TAO_ECG_Mcast_Gateway (void) | |
Constructor. | |
int | init (const char *address_server_arg, const Attributes &attributes=Attributes()) |
int | init (const RtecEventChannelAdmin::ConsumerQOS &consumer_qos, const char *address_server_arg, const Attributes &attributes=Attributes()) |
void | run (CORBA::ORB_ptr orb, RtecEventChannelAdmin::EventChannel_ptr ec) |
virtual int | init (int argc, ACE_TCHAR *argv[]) |
The Service_Object entry points. | |
virtual int | fini (void) |
The Service_Object entry points. | |
Static Public Member Functions | |
static int | init_svcs (void) |
Private Member Functions | |
void | verify_args (CORBA::ORB_ptr orb, RtecEventChannelAdmin::EventChannel_ptr ec) |
Check that arguments to run() are not nil. | |
int | validate_configuration (void) |
Verifies configuration values specified through init() make sense. | |
PortableServer::ServantBase * | init_address_server (void) |
Allocate and initialize appropriate objects. | |
TAO_EC_Servant_Var< TAO_ECG_UDP_Sender > | init_sender (RtecEventChannelAdmin::EventChannel_ptr ec, RtecUDPAdmin::AddrServer_ptr address_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr) |
Allocate and initialize appropriate objects. | |
TAO_EC_Servant_Var< TAO_ECG_UDP_Receiver > | init_receiver (RtecEventChannelAdmin::EventChannel_ptr ec, RtecUDPAdmin::AddrServer_ptr address_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr) |
Allocate and initialize appropriate objects. | |
TAO_ECG_Refcounted_Endpoint | init_endpoint (void) |
Allocate and initialize appropriate objects. | |
TAO_ECG_Refcounted_Handler | init_handler (TAO_ECG_Dgram_Handler *recv, RtecEventChannelAdmin::EventChannel_ptr ec, ACE_Reactor *reactor) |
Allocate and initialize appropriate objects. | |
Private Attributes | |
Service_Type | service_type_ |
Flags controlling configuration. | |
Handler_Type | handler_type_ |
Flags controlling configuration. | |
Address_Server_Type | address_server_type_ |
Flags controlling configuration. | |
ACE_CString | address_server_arg_ |
Flags controlling configuration. | |
u_char | ttl_value_ |
Flags controlling configuration. | |
ACE_CString | nic_ |
Flags controlling configuration. | |
int | ip_multicast_loop_ |
Flags controlling configuration. | |
int | non_blocking_ |
Flags controlling configuration. | |
RtecEventChannelAdmin::ConsumerQOS | consumer_qos_ |
Flags controlling configuration. | |
Classes | |
struct | Attributes |
Helper class to initialize a TAO_ECG_Mcast_Gateway. More... |
This class simplifies creation of federated Event Channels by presenting a simple unified interface for creating and configuring all components needed to federate an Event Channel. Configuration options are described below.
NOTE: This class does not own any of the components it creates and its lifetime is independent of theirs. This class acts purely as a wrapper facade for creating and wiring appropriate components together.
Service config file options:
-ECGService <service> Valid values: sender, receiver, two_way Specifies whether this gateway should act as a multicast sender of the events or multicast receiver, or both.
-ECGAddressServer <server_type> Valid values: basic, source, type Specifies what implementation of the address server should be used by the gateway. basic - the same multicast address is returned for all event headers. source - multicast addresses are returned based on the event source, according to the mapping provided at initialization. type - multicast addresses are returned based on the event type, according to the mapping provided at initialization.
-ECGAddressServerArg <arg> Valid value: arg is a string, whose format requirements are specific to the implementation of address server used. Arg is not interpreted by the gateway, but simply passed to the address server specified by -ECGAddressServer during initialization. THIS OPTION MUST ALWAYS BE SPECIFIED BY THE USER (there is no default value for it)
-ECGHandler <handler_type> Valid values: basic, complex, udp Specifies what implementation of event handler should be used if gateway is acting as events receiver. basic - a simple event handler listening on a single mcast address. complex - event handler listening on multiple mcast addresses based on events of interest to consumers. udp - similar to basic handler, except listens on udp address as opposed to a multicast group.
-ECGTTL <ttl> Valid values: a number > 0 IP_Multicast time to live value that should be set on a sending socket. This option matters only if the gateway is acting as a sender of mcast messages.
-ECGNIC <nic> Valid values: name of the network interface This interface is used for sending and/or receiving multicast messages.
-ECGNonBlocking Boolean flag to configure if the socket is in blocking or non-blocking code. The default is non-blocking. NOTE: Certain device drivers block the process if the physical link fails.
2) Create an instance of TAO_ECG_Mcast_Gateway in your code, on the stack or dynamically, and use init () method to configure it. No configuration files involved. See service config options above for the description of configurable options, and init() method below for how to specify them.
Default configuration values (for either use case) can be found in ECG_Defaults.h
Definition at line 128 of file ECG_Mcast_Gateway.h.
Values for some configuration parameters to init ().
Definition at line 152 of file ECG_Mcast_Gateway.h.
00152 {ECG_ADDRESS_SERVER_BASIC, 00153 ECG_ADDRESS_SERVER_SOURCE, 00154 ECG_ADDRESS_SERVER_TYPE};
Values for some configuration parameters to init ().
Definition at line 156 of file ECG_Mcast_Gateway.h.
00156 {ECG_HANDLER_BASIC, 00157 ECG_HANDLER_COMPLEX, 00158 ECG_HANDLER_UDP};
Values for some configuration parameters to init ().
Definition at line 148 of file ECG_Mcast_Gateway.h.
00148 {ECG_MCAST_SENDER, 00149 ECG_MCAST_RECEIVER, 00150 ECG_MCAST_TWO_WAY};
TAO_ECG_Mcast_Gateway::TAO_ECG_Mcast_Gateway | ( | void | ) |
Constructor.
int TAO_ECG_Mcast_Gateway::fini | ( | void | ) | [virtual] |
The Service_Object entry points.
Reimplemented from ACE_Shared_Object.
Definition at line 44 of file ECG_Mcast_Gateway.cpp.
int TAO_ECG_Mcast_Gateway::init | ( | const RtecEventChannelAdmin::ConsumerQOS & | consumer_qos, | |
const char * | address_server_arg, | |||
const Attributes & | attributes = Attributes() | |||
) |
Same as the method above, but also gives the client an opportunity to specify consumer qos, i.e., which EC traffic should get multicasted.
Definition at line 233 of file ECG_Mcast_Gateway.cpp.
References consumer_qos_, and init().
00237 { 00238 this->consumer_qos_ = consumer_qos; 00239 return this->init (address_server_arg, 00240 attributes); 00241 }
int TAO_ECG_Mcast_Gateway::init | ( | const char * | address_server_arg, | |
const Attributes & | attributes = Attributes() | |||
) |
Configure TAO_ECG_Mcast_Gateway programatically. This method should be used when NOT using service configurator to obtain/configure TAO_ECG_Mcast_Gateway. See class documentation above for more info.
Definition at line 216 of file ECG_Mcast_Gateway.cpp.
References address_server_arg_, TAO_ECG_Mcast_Gateway::Attributes::address_server_type, address_server_type_, ACE_String_Base< CHAR >::c_str(), TAO_ECG_Mcast_Gateway::Attributes::handler_type, handler_type_, TAO_ECG_Mcast_Gateway::Attributes::ip_multicast_loop, ip_multicast_loop_, TAO_ECG_Mcast_Gateway::Attributes::nic, nic_, TAO_ECG_Mcast_Gateway::Attributes::non_blocking, non_blocking_, TAO_ECG_Mcast_Gateway::Attributes::service_type, service_type_, ACE_String_Base< CHAR >::set(), TAO_ECG_Mcast_Gateway::Attributes::ttl_value, ttl_value_, and validate_configuration().
00218 { 00219 this->address_server_arg_.set (address_server_arg); 00220 00221 this->address_server_type_ = attr.address_server_type; 00222 this->handler_type_ = attr.handler_type; 00223 this->service_type_ = attr.service_type; 00224 this->ttl_value_ = attr.ttl_value; 00225 this->nic_.set (attr.nic.c_str ()); 00226 this->ip_multicast_loop_ = attr.ip_multicast_loop; 00227 this->non_blocking_ = attr.non_blocking; 00228 00229 return this->validate_configuration (); 00230 }
int TAO_ECG_Mcast_Gateway::init | ( | int | argc, | |
ACE_TCHAR * | argv[] | |||
) | [virtual] |
The Service_Object entry points.
Reimplemented from ACE_Shared_Object.
Definition at line 50 of file ECG_Mcast_Gateway.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), address_server_type_, ACE_OS::atoi(), ACE_Arg_Shifter_T< CHAR_TYPE >::consume_arg(), ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE, ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP, ECG_MCAST_RECEIVER, ECG_MCAST_SENDER, ECG_MCAST_TWO_WAY, ACE_Arg_Shifter_T< CHAR_TYPE >::get_current(), handler_type_, ACE_Arg_Shifter_T< CHAR_TYPE >::ignore_arg(), ACE_Arg_Shifter_T< CHAR_TYPE >::is_anything_left(), ACE_Arg_Shifter_T< CHAR_TYPE >::is_parameter_next(), LM_ERROR, LM_WARNING, service_type_, ACE_OS::strcasecmp(), ACE_OS::strtoul(), and ttl_value_.
Referenced by init().
00051 { 00052 int result = 0; 00053 00054 ACE_Arg_Shifter arg_shifter (argc, argv); 00055 00056 while (arg_shifter.is_anything_left ()) 00057 { 00058 const ACE_TCHAR *arg = arg_shifter.get_current (); 00059 00060 if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGService")) == 0) 00061 { 00062 arg_shifter.consume_arg (); 00063 00064 if (arg_shifter.is_parameter_next ()) 00065 { 00066 const ACE_TCHAR* opt = arg_shifter.get_current (); 00067 if (ACE_OS::strcasecmp (opt, ACE_TEXT ("receiver")) == 0) 00068 this->service_type_ = ECG_MCAST_RECEIVER; 00069 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("sender")) == 0) 00070 this->service_type_ = ECG_MCAST_SENDER; 00071 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("two_way")) == 0) 00072 this->service_type_ = ECG_MCAST_TWO_WAY; 00073 else 00074 { 00075 ACE_ERROR ((LM_ERROR, 00076 ACE_TEXT ("Unsupported <-ECGService> option ") 00077 ACE_TEXT ("value: <%s>. Ignoring this option ") 00078 ACE_TEXT ("- using defaults instead.\n"), 00079 opt)); 00080 result = -1; 00081 } 00082 arg_shifter.consume_arg (); 00083 } 00084 } 00085 00086 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServer")) == 0) 00087 { 00088 arg_shifter.consume_arg (); 00089 00090 if (arg_shifter.is_parameter_next ()) 00091 { 00092 const ACE_TCHAR* opt = arg_shifter.get_current (); 00093 if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0) 00094 this->address_server_type_ = ECG_ADDRESS_SERVER_BASIC; 00095 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("source")) == 0) 00096 this->address_server_type_ = ECG_ADDRESS_SERVER_SOURCE; 00097 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("type")) == 0) 00098 this->address_server_type_ = ECG_ADDRESS_SERVER_TYPE; 00099 else 00100 { 00101 ACE_ERROR ((LM_ERROR, 00102 ACE_TEXT ("Unsupported <-ECGAddressServer> ") 00103 ACE_TEXT ("option value: <%s>. Ignoring this ") 00104 ACE_TEXT ("option - using defaults instead.\n"), 00105 opt)); 00106 result = -1; 00107 } 00108 arg_shifter.consume_arg (); 00109 } 00110 } 00111 00112 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServerArg")) == 0) 00113 { 00114 arg_shifter.consume_arg (); 00115 00116 if (arg_shifter.is_parameter_next ()) 00117 { 00118 this->address_server_arg_.set (arg_shifter.get_current ()); 00119 arg_shifter.consume_arg (); 00120 } 00121 } 00122 00123 00124 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGHandler")) == 0) 00125 { 00126 arg_shifter.consume_arg (); 00127 00128 if (arg_shifter.is_parameter_next ()) 00129 { 00130 const ACE_TCHAR* opt = arg_shifter.get_current (); 00131 if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0) 00132 this->handler_type_ = ECG_HANDLER_BASIC; 00133 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("complex")) == 0) 00134 this->handler_type_ = ECG_HANDLER_COMPLEX; 00135 else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("udp")) == 0) 00136 this->handler_type_ = ECG_HANDLER_UDP; 00137 else 00138 { 00139 ACE_ERROR ((LM_ERROR, 00140 ACE_TEXT ("Unsupported <-ECGHandler> ") 00141 ACE_TEXT ("option value: <%s>. Ignoring this ") 00142 ACE_TEXT ("option - using defaults instead.\n"), 00143 opt)); 00144 result = -1; 00145 } 00146 arg_shifter.consume_arg (); 00147 } 00148 } 00149 00150 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGTTL")) == 0) 00151 { 00152 arg_shifter.consume_arg (); 00153 00154 if (arg_shifter.is_parameter_next ()) 00155 { 00156 const ACE_TCHAR* opt = arg_shifter.get_current (); 00157 unsigned long tmp = ACE_OS::strtoul (opt, 0, 0) & 0xff; 00158 this->ttl_value_ = static_cast<u_char> (tmp); 00159 arg_shifter.consume_arg (); 00160 } 00161 } 00162 00163 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNIC")) == 0) 00164 { 00165 arg_shifter.consume_arg (); 00166 00167 if (arg_shifter.is_parameter_next ()) 00168 { 00169 this->nic_.set (arg_shifter.get_current ()); 00170 arg_shifter.consume_arg (); 00171 } 00172 } 00173 00174 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGIPMULTICASTLOOP")) == 0) 00175 { 00176 arg_shifter.consume_arg (); 00177 00178 if (arg_shifter.is_parameter_next ()) 00179 { 00180 this->ip_multicast_loop_ = 00181 (ACE_OS::atoi(arg_shifter.get_current()) != 0); 00182 arg_shifter.consume_arg (); 00183 } 00184 } 00185 00186 else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNONBLOCKING")) == 0) 00187 { 00188 arg_shifter.consume_arg (); 00189 00190 if (arg_shifter.is_parameter_next ()) 00191 { 00192 this->non_blocking_ = 00193 (ACE_OS::atoi(arg_shifter.get_current()) != 0); 00194 arg_shifter.consume_arg (); 00195 } 00196 } 00197 00198 else 00199 { 00200 arg_shifter.ignore_arg (); 00201 ACE_DEBUG ((LM_WARNING, 00202 ACE_TEXT ("Ignoring <%s> option ") 00203 ACE_TEXT ("during initialization.\n"), 00204 arg)); 00205 result = -1; 00206 } 00207 } 00208 00209 if (this->validate_configuration () == -1) 00210 return -1; 00211 else 00212 return result; 00213 }
PortableServer::ServantBase * TAO_ECG_Mcast_Gateway::init_address_server | ( | void | ) | [private] |
Allocate and initialize appropriate objects.
Definition at line 359 of file ECG_Mcast_Gateway.cpp.
References TAO_EC_Servant_Var< T >::_retn(), ACE_ERROR, address_server_arg_, TAO_ECG_Complex_Address_Server::create(), TAO_ECG_Simple_Address_Server::create(), ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE, TAO_EC_Servant_Var< T >::in(), ACE_String_Base< CHAR >::length(), and LM_ERROR.
Referenced by run().
00360 { 00361 const char * address_server_arg = 00362 (this->address_server_arg_.length ()) 00363 ? this->address_server_arg_.c_str () : 0; 00364 00365 if (this->address_server_type_ == ECG_ADDRESS_SERVER_BASIC) 00366 { 00367 TAO_EC_Servant_Var<TAO_ECG_Simple_Address_Server> impl = 00368 TAO_ECG_Simple_Address_Server::create (); 00369 if (!impl.in ()) 00370 return 0; 00371 00372 if (impl->init (address_server_arg) == -1) 00373 { 00374 return 0; 00375 } 00376 return impl._retn (); 00377 } 00378 00379 else if (this->address_server_type_ == ECG_ADDRESS_SERVER_SOURCE) 00380 { 00381 TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl = 00382 TAO_ECG_Complex_Address_Server::create (1); 00383 if (!impl.in ()) 00384 return 0; 00385 00386 if (impl->init (address_server_arg) == -1) 00387 { 00388 return 0; 00389 } 00390 return impl._retn (); 00391 } 00392 00393 else if (this->address_server_type_ == ECG_ADDRESS_SERVER_TYPE) 00394 { 00395 TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl = 00396 TAO_ECG_Complex_Address_Server::create (0); 00397 if (!impl.in ()) 00398 return 0; 00399 00400 if (impl->init (address_server_arg) == -1) 00401 { 00402 return 0; 00403 } 00404 return impl._retn (); 00405 } 00406 00407 else 00408 { 00409 ACE_ERROR ((LM_ERROR, 00410 "Cannot create address server: " 00411 "unknown address server type specified.\n")); 00412 return 0; 00413 } 00414 }
TAO_ECG_Refcounted_Endpoint TAO_ECG_Mcast_Gateway::init_endpoint | ( | void | ) | [private] |
Allocate and initialize appropriate objects.
Definition at line 288 of file ECG_Mcast_Gateway.cpp.
References ACE_ERROR, ACE_NEW_NORETURN, ACE_NONBLOCK, ACE_IPC_SAP::enable(), IP_MULTICAST_LOOP, IP_MULTICAST_TTL, IPPROTO_IP, LM_ERROR, ACE_SOCK_Dgram::open(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset(), ACE_Addr::sap_any, and ACE_SOCK_Dgram::set_nic().
Referenced by run().
00289 { 00290 TAO_ECG_UDP_Out_Endpoint* endpoint = 0; 00291 TAO_ECG_Refcounted_Endpoint refendpoint; 00292 00293 // Try to allocate a new endpoint from the heap 00294 ACE_NEW_NORETURN (endpoint, 00295 TAO_ECG_UDP_Out_Endpoint); 00296 00297 if (endpoint != 0) 00298 { 00299 refendpoint.reset (endpoint); 00300 } 00301 else 00302 { 00303 return TAO_ECG_Refcounted_Endpoint (); 00304 } 00305 00306 ACE_SOCK_Dgram& dgram = refendpoint->dgram (); 00307 00308 if (dgram.open (ACE_Addr::sap_any) == -1) 00309 { 00310 ACE_ERROR ((LM_ERROR, 00311 "Cannot open dgram " 00312 "for sending mcast messages.\n")); 00313 return TAO_ECG_Refcounted_Endpoint (); 00314 } 00315 00316 if (this->nic_.length () != 0) 00317 { 00318 dgram.set_nic (this->nic_.c_str ()); 00319 } 00320 00321 if (this->ttl_value_ > 0) 00322 { 00323 if (dgram.ACE_SOCK::set_option (IPPROTO_IP, 00324 IP_MULTICAST_TTL, 00325 &this->ttl_value_, 00326 sizeof (this->ttl_value_)) 00327 == -1) 00328 { 00329 ACE_ERROR ((LM_ERROR, 00330 "Error setting TTL option on dgram " 00331 "for sending mcast messages.\n")); 00332 return TAO_ECG_Refcounted_Endpoint (); 00333 } 00334 } 00335 00336 if (dgram.ACE_SOCK::set_option (IPPROTO_IP, 00337 IP_MULTICAST_LOOP, 00338 &this->ip_multicast_loop_, 00339 sizeof (this->ip_multicast_loop_)) == -1) 00340 { 00341 ACE_ERROR ((LM_ERROR, 00342 "Error setting MULTICAST_LOOP option " 00343 "on dgram for sending mcast messages.\n")); 00344 return TAO_ECG_Refcounted_Endpoint (); 00345 } 00346 00347 if (this->non_blocking_ 00348 && dgram.enable(ACE_NONBLOCK) == -1) 00349 { 00350 ACE_ERROR ((LM_ERROR, 00351 "Error setting NON BLOCKING option.\n")); 00352 return TAO_ECG_Refcounted_Endpoint (); 00353 } 00354 00355 return refendpoint; 00356 }
TAO_ECG_Refcounted_Handler TAO_ECG_Mcast_Gateway::init_handler | ( | TAO_ECG_Dgram_Handler * | recv, | |
RtecEventChannelAdmin::EventChannel_ptr | ec, | |||
ACE_Reactor * | reactor | |||
) | [private] |
Allocate and initialize appropriate objects.
Definition at line 417 of file ECG_Mcast_Gateway.cpp.
References ACE_ERROR, ACE_NEW_RETURN, address_server_arg_, ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP, ACE_String_Base< CHAR >::length(), LM_ERROR, nic_, TAO_ECG_UDP_EH::open(), TAO_ECG_Mcast_EH::open(), TAO_ECG_Simple_Mcast_EH::open(), ACE_Event_Handler::reactor(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset(), and ACE_INET_Addr::set().
00420 { 00421 TAO_ECG_Refcounted_Handler handler; 00422 00423 const char * nic = 00424 (this->nic_.length ()) ? this->nic_.c_str () : 0; 00425 const char * address_server_arg = 00426 (this->address_server_arg_.length ()) 00427 ? this->address_server_arg_.c_str () : 0; 00428 00429 if (this->handler_type_ == ECG_HANDLER_BASIC) 00430 { 00431 TAO_ECG_Simple_Mcast_EH * h = 0; 00432 ACE_NEW_RETURN (h, 00433 TAO_ECG_Simple_Mcast_EH (receiver), 00434 handler); 00435 handler.reset (h); 00436 00437 h->reactor (reactor); 00438 if (h->open (address_server_arg, nic) != 0) 00439 return TAO_ECG_Refcounted_Handler (); 00440 } 00441 00442 else if (this->handler_type_ == ECG_HANDLER_COMPLEX) 00443 { 00444 TAO_ECG_Mcast_EH * h = 0; 00445 ACE_NEW_RETURN (h, 00446 TAO_ECG_Mcast_EH (receiver, nic), 00447 handler); 00448 handler.reset (h); 00449 00450 h->reactor (reactor); 00451 00452 h->open (ec); 00453 } 00454 00455 else if (this->handler_type_ == ECG_HANDLER_UDP) 00456 { 00457 TAO_ECG_UDP_EH * h = 0; 00458 ACE_NEW_RETURN (h, 00459 TAO_ECG_UDP_EH (receiver), 00460 handler); 00461 handler.reset (h); 00462 h->reactor (reactor); 00463 00464 ACE_INET_Addr ipaddr; 00465 if (ipaddr.set (address_server_arg) != 0) 00466 { 00467 ACE_ERROR ((LM_ERROR, 00468 "ERROR using address server argument " 00469 "in ACE_INET_Addr.set ().\n")); 00470 return TAO_ECG_Refcounted_Handler (); 00471 } 00472 if (h->open (ipaddr) != 0) 00473 return TAO_ECG_Refcounted_Handler (); 00474 } 00475 00476 else 00477 { 00478 ACE_ERROR ((LM_ERROR, 00479 "Cannot create handler: unknown " 00480 "handler type specified.\n")); 00481 return handler; 00482 } 00483 00484 return handler; 00485 }
TAO_EC_Servant_Var< TAO_ECG_UDP_Receiver > TAO_ECG_Mcast_Gateway::init_receiver | ( | RtecEventChannelAdmin::EventChannel_ptr | ec, | |
RtecUDPAdmin::AddrServer_ptr | address_server, | |||
TAO_ECG_Refcounted_Endpoint | endpoint_rptr | |||
) | [private] |
Allocate and initialize appropriate objects.
Definition at line 531 of file ECG_Mcast_Gateway.cpp.
References ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, TAO_ECG_UDP_Receiver::create(), TAO_EC_Auto_Command< T >::disallow_command(), ACE_SupplierQOS_Factory::get_SupplierQOS(), TAO_EC_Servant_Var< T >::in(), ACE_SupplierQOS_Factory::insert(), RtecEventChannelAdmin::SupplierQOS::is_gateway, and TAO_EC_Auto_Command< T >::set_command().
Referenced by run().
00535 { 00536 TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> 00537 receiver (TAO_ECG_UDP_Receiver::create ()); 00538 if (!receiver.in ()) 00539 return receiver; 00540 00541 receiver->init (ec, 00542 endpoint_rptr, 00543 address_server); 00544 00545 TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown; 00546 receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver)); 00547 00548 ACE_SupplierQOS_Factory supplier_qos_factory; 00549 supplier_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY, 00550 ACE_ES_EVENT_ANY, 00551 0, 1); 00552 RtecEventChannelAdmin::SupplierQOS & qos = 00553 const_cast<RtecEventChannelAdmin::SupplierQOS &> (supplier_qos_factory.get_SupplierQOS ()); 00554 qos.is_gateway = 1; 00555 00556 receiver->connect (qos); 00557 00558 receiver_shutdown.disallow_command (); 00559 return receiver; 00560 }
TAO_EC_Servant_Var< TAO_ECG_UDP_Sender > TAO_ECG_Mcast_Gateway::init_sender | ( | RtecEventChannelAdmin::EventChannel_ptr | ec, | |
RtecUDPAdmin::AddrServer_ptr | address_server, | |||
TAO_ECG_Refcounted_Endpoint | endpoint_rptr | |||
) | [private] |
Allocate and initialize appropriate objects.
Definition at line 488 of file ECG_Mcast_Gateway.cpp.
References ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, TAO_ECG_UDP_Sender::create(), TAO_EC_Auto_Command< T >::disallow_command(), ACE_ConsumerQOS_Factory::get_ConsumerQOS(), TAO_EC_Servant_Var< T >::in(), ACE_ConsumerQOS_Factory::insert(), RtecEventChannelAdmin::ConsumerQOS::is_gateway, TAO_EC_Auto_Command< T >::set_command(), ACE_ConsumerQOS_Factory::start_disjunction_group(), and UDP_Sender_Shutdown.
Referenced by run().
00492 { 00493 TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> 00494 sender (TAO_ECG_UDP_Sender::create ()); 00495 if (!sender.in ()) 00496 return sender; 00497 00498 sender->init (ec, 00499 address_server, 00500 endpoint_rptr); 00501 00502 TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown; 00503 sender_shutdown.set_command (UDP_Sender_Shutdown (sender)); 00504 00505 if (this->consumer_qos_.dependencies.length () > 0) 00506 { 00507 // Client supplied consumer qos. Use it. 00508 this->consumer_qos_.is_gateway = 1; 00509 sender->connect (this->consumer_qos_); 00510 } 00511 else 00512 { 00513 // Client did not specify anything - subscribe to all events. 00514 ACE_ConsumerQOS_Factory consumer_qos_factory; 00515 consumer_qos_factory.start_disjunction_group (1); 00516 consumer_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY, 00517 ACE_ES_EVENT_ANY, 00518 0); 00519 RtecEventChannelAdmin::ConsumerQOS & qos = 00520 const_cast<RtecEventChannelAdmin::ConsumerQOS &> (consumer_qos_factory.get_ConsumerQOS ()); 00521 qos.is_gateway = 1; 00522 00523 sender->connect (qos); 00524 } 00525 00526 sender_shutdown.disallow_command (); 00527 return sender; 00528 }
int TAO_ECG_Mcast_Gateway::init_svcs | ( | void | ) | [static] |
Helper function to register the Gateway into the service configurator.
Definition at line 36 of file ECG_Mcast_Gateway.cpp.
References ACE_Service_Config::static_svcs().
00037 { 00038 return ACE_Service_Config::static_svcs ()-> 00039 insert (&ace_svc_desc_TAO_ECG_Mcast_Gateway); 00040 }
void TAO_ECG_Mcast_Gateway::run | ( | CORBA::ORB_ptr | orb, | |
RtecEventChannelAdmin::EventChannel_ptr | ec | |||
) |
The main method - create, configure and run federation components according to the specified configuration.
Definition at line 583 of file ECG_Mcast_Gateway.cpp.
References ACE_DEBUG, activate(), TAO_EC_Auto_Command< T >::disallow_command(), TAO_EC_Object_Deactivator::disallow_deactivation(), ECG_MCAST_RECEIVER, ECG_MCAST_SENDER, ECG_MCAST_TWO_WAY, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), TAO_EC_Servant_Var< T >::in(), TAO_Objref_Var_T< T >::in(), PortableServer::Servant_var< T >::in(), init_address_server(), init_endpoint(), init_receiver(), init_sender(), LM_ERROR, CORBA::ORB::orb_core(), TAO_ORB_Core::reactor(), ACE_Event_Handler::reactor(), TAO_EC_Auto_Command< T >::set_command(), UDP_Sender_Shutdown, and verify_args().
00585 { 00586 // Verify args. 00587 this->verify_args (orb, ec); 00588 00589 // Auto-cleanup objects. 00590 TAO_EC_Object_Deactivator address_server_deactivator; 00591 TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown; 00592 TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown; 00593 00594 // Set up address server. 00595 PortableServer::ServantBase_var address_server_servant = 00596 this->init_address_server (); 00597 if (!address_server_servant.in ()) 00598 { 00599 ACE_DEBUG ((LM_ERROR, 00600 "Unable to create address server.\n")); 00601 throw CORBA::INTERNAL (); 00602 } 00603 00604 RtecUDPAdmin::AddrServer_var address_server; 00605 00606 PortableServer::POA_var poa = 00607 address_server_servant->_default_POA (); 00608 00609 activate (address_server, 00610 poa.in (), 00611 address_server_servant.in (), 00612 address_server_deactivator); 00613 00614 TAO_ECG_Refcounted_Endpoint endpoint_rptr; 00615 TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender; 00616 00617 // Set up event sender. 00618 if (this->service_type_ == ECG_MCAST_SENDER 00619 || this->service_type_ == ECG_MCAST_TWO_WAY) 00620 { 00621 endpoint_rptr = this->init_endpoint (); 00622 if (endpoint_rptr.get () == 0) 00623 { 00624 throw CORBA::INTERNAL (); 00625 } 00626 00627 sender = this->init_sender (ec, 00628 address_server.in (), 00629 endpoint_rptr); 00630 if (!sender.in ()) 00631 { 00632 throw CORBA::INTERNAL (); 00633 } 00634 00635 sender_shutdown.set_command (UDP_Sender_Shutdown (sender)); 00636 } 00637 00638 // Set up event receiver. 00639 TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver; 00640 if (this->service_type_ == ECG_MCAST_RECEIVER 00641 || this->service_type_ == ECG_MCAST_TWO_WAY) 00642 { 00643 ACE_Reactor *reactor = orb->orb_core ()->reactor (); 00644 00645 receiver = this->init_receiver (ec, 00646 address_server.in (), 00647 endpoint_rptr); 00648 if (!receiver.in ()) 00649 { 00650 throw CORBA::INTERNAL (); 00651 } 00652 00653 receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver)); 00654 00655 TAO_ECG_Refcounted_Handler 00656 handler_rptr (this->init_handler (receiver.in (), 00657 ec, 00658 reactor)); 00659 if (handler_rptr.get () == 0) 00660 { 00661 throw CORBA::INTERNAL (); 00662 } 00663 receiver->set_handler_shutdown (handler_rptr); 00664 } 00665 00666 // Everything went ok - disable auto-cleanup. 00667 address_server_deactivator.disallow_deactivation (); 00668 receiver_shutdown.disallow_command (); 00669 sender_shutdown.disallow_command (); 00670 }
int TAO_ECG_Mcast_Gateway::validate_configuration | ( | void | ) | [private] |
Verifies configuration values specified through init() make sense.
Definition at line 244 of file ECG_Mcast_Gateway.cpp.
References ACE_DEBUG, ECG_ADDRESS_SERVER_BASIC, ECG_HANDLER_BASIC, ECG_HANDLER_UDP, ECG_MCAST_SENDER, and LM_ERROR.
Referenced by init().
00245 { 00246 if ((this->handler_type_ == ECG_HANDLER_BASIC 00247 || this->handler_type_ == ECG_HANDLER_UDP) 00248 && this->service_type_ != ECG_MCAST_SENDER 00249 && this->address_server_type_ != ECG_ADDRESS_SERVER_BASIC) 00250 { 00251 ACE_DEBUG ((LM_ERROR, 00252 "Configurations for mcast handler and " 00253 "address server do not match.\n")); 00254 return -1; 00255 } 00256 00257 // Currently all Address Server implementations require an 00258 // initialization string. If we ever add a new Address Server 00259 // implementation, which does not, we'll have to remove this check. 00260 if (this->address_server_arg_.length () == 0) 00261 { 00262 ACE_DEBUG ((LM_ERROR, 00263 "Address server initializaton " 00264 "argument not specified.\n")); 00265 return -1; 00266 } 00267 00268 if (this->ip_multicast_loop_ != 0 00269 && this->ip_multicast_loop_ != 1) 00270 { 00271 ACE_DEBUG ((LM_ERROR, 00272 "IP MULTICAST LOOP option must have a boolean value.\n")); 00273 return -1; 00274 } 00275 00276 if (this->non_blocking_ != 0 00277 && this->non_blocking_ != 1) 00278 { 00279 ACE_DEBUG ((LM_ERROR, 00280 "NON BLOCKING flag must have a boolean value.\n")); 00281 return -1; 00282 } 00283 00284 return 0; 00285 }
void TAO_ECG_Mcast_Gateway::verify_args | ( | CORBA::ORB_ptr | orb, | |
RtecEventChannelAdmin::EventChannel_ptr | ec | |||
) | [private] |
Check that arguments to run() are not nil.
Definition at line 563 of file ECG_Mcast_Gateway.cpp.
References ACE_ERROR, CORBA::is_nil(), and LM_ERROR.
Referenced by run().
00565 { 00566 if (CORBA::is_nil (ec)) 00567 { 00568 ACE_ERROR ((LM_ERROR, 00569 "Nil event channel argument passed to " 00570 "TAO_ECG_Mcast_Gateway::run().\n")); 00571 throw CORBA::INTERNAL (); 00572 } 00573 if (CORBA::is_nil (orb)) 00574 { 00575 ACE_ERROR ((LM_ERROR, 00576 "Nil orb argument passed to " 00577 "TAO_ECG_Mcast_Gateway::run().\n")); 00578 throw CORBA::INTERNAL (); 00579 } 00580 }
Flags controlling configuration.
Definition at line 265 of file ECG_Mcast_Gateway.h.
Referenced by init(), init_address_server(), and init_handler().
Flags controlling configuration.
Definition at line 264 of file ECG_Mcast_Gateway.h.
Referenced by init().
Flags controlling configuration.
Definition at line 271 of file ECG_Mcast_Gateway.h.
Referenced by init().
Flags controlling configuration.
Definition at line 263 of file ECG_Mcast_Gateway.h.
Referenced by init().
int TAO_ECG_Mcast_Gateway::ip_multicast_loop_ [private] |
Flags controlling configuration.
Definition at line 268 of file ECG_Mcast_Gateway.h.
Referenced by init().
ACE_CString TAO_ECG_Mcast_Gateway::nic_ [private] |
Flags controlling configuration.
Definition at line 267 of file ECG_Mcast_Gateway.h.
Referenced by init(), and init_handler().
int TAO_ECG_Mcast_Gateway::non_blocking_ [private] |
Flags controlling configuration.
Definition at line 269 of file ECG_Mcast_Gateway.h.
Referenced by init().
Flags controlling configuration.
Definition at line 262 of file ECG_Mcast_Gateway.h.
Referenced by init().
u_char TAO_ECG_Mcast_Gateway::ttl_value_ [private] |
Flags controlling configuration.
Definition at line 266 of file ECG_Mcast_Gateway.h.
Referenced by init().