TAO_ECG_Mcast_Gateway Class Reference

Implement the builder for setting up Event Channel multicast gateway. NOT THREAD-SAFE. More...

#include <ECG_Mcast_Gateway.h>

Inheritance diagram for TAO_ECG_Mcast_Gateway:

Inheritance graph
[legend]
Collaboration diagram for TAO_ECG_Mcast_Gateway:

Collaboration graph
[legend]
List of all members.

Public Types

 ECG_MCAST_SENDER
 ECG_MCAST_RECEIVER
 ECG_MCAST_TWO_WAY
 ECG_ADDRESS_SERVER_BASIC
 ECG_ADDRESS_SERVER_SOURCE
 ECG_ADDRESS_SERVER_TYPE
 ECG_HANDLER_BASIC
 ECG_HANDLER_COMPLEX
 ECG_HANDLER_UDP
enum  Service_Type { ECG_MCAST_SENDER, ECG_MCAST_RECEIVER, ECG_MCAST_TWO_WAY }
 Values for some configuration parameters to init (). More...
enum  Address_Server_Type { ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE }
 Values for some configuration parameters to init (). More...
enum  Handler_Type { ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP }
 Values for some configuration parameters to init (). More...

Public Member Functions

 TAO_ECG_Mcast_Gateway (void)
 Constructor.
int init (const char *address_server_arg, const Attributes &attributes=Attributes())
int init (const RtecEventChannelAdmin::ConsumerQOS &consumer_qos, const char *address_server_arg, const Attributes &attributes=Attributes())
void run (CORBA::ORB_ptr orb, RtecEventChannelAdmin::EventChannel_ptr ec)
virtual int init (int argc, ACE_TCHAR *argv[])
 The Service_Object entry points.
virtual int fini (void)
 The Service_Object entry points.

Static Public Member Functions

static int init_svcs (void)

Private Member Functions

void verify_args (CORBA::ORB_ptr orb, RtecEventChannelAdmin::EventChannel_ptr ec)
 Check that arguments to run() are not nil.
int validate_configuration (void)
 Verifies configuration values specified through init() make sense.
PortableServer::ServantBaseinit_address_server (void)
 Allocate and initialize appropriate objects.
TAO_EC_Servant_Var< TAO_ECG_UDP_Senderinit_sender (RtecEventChannelAdmin::EventChannel_ptr ec, RtecUDPAdmin::AddrServer_ptr address_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr)
 Allocate and initialize appropriate objects.
TAO_EC_Servant_Var< TAO_ECG_UDP_Receiverinit_receiver (RtecEventChannelAdmin::EventChannel_ptr ec, RtecUDPAdmin::AddrServer_ptr address_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr)
 Allocate and initialize appropriate objects.
TAO_ECG_Refcounted_Endpoint init_endpoint (void)
 Allocate and initialize appropriate objects.
TAO_ECG_Refcounted_Handler init_handler (TAO_ECG_Dgram_Handler *recv, RtecEventChannelAdmin::EventChannel_ptr ec, ACE_Reactor *reactor)
 Allocate and initialize appropriate objects.

Private Attributes

Service_Type service_type_
 Flags controlling configuration.
Handler_Type handler_type_
 Flags controlling configuration.
Address_Server_Type address_server_type_
 Flags controlling configuration.
ACE_CString address_server_arg_
 Flags controlling configuration.
u_char ttl_value_
 Flags controlling configuration.
ACE_CString nic_
 Flags controlling configuration.
int ip_multicast_loop_
 Flags controlling configuration.
int non_blocking_
 Flags controlling configuration.
RtecEventChannelAdmin::ConsumerQOS consumer_qos_
 Flags controlling configuration.

Classes

struct  Attributes
 Helper class to initialize a TAO_ECG_Mcast_Gateway. More...

Detailed Description

Implement the builder for setting up Event Channel multicast gateway. NOT THREAD-SAFE.

This class simplifies creation of federated Event Channels by presenting a simple unified interface for creating and configuring all components needed to federate an Event Channel. Configuration options are described below.

NOTE: This class does not own any of the components it creates and its lifetime is independent of theirs. This class acts purely as a wrapper facade for creating and wiring appropriate components together.

Todo:
This class is an ACE_Service_Object, but the only reason for it is the need for easy configuration using files. Since ACE_Service_Object provides much more than that, we should look into replacing it with a more lightweight utility that would serve our needs.
CONFIGURATION OPTIONS There are two ways to use this class: 1) Use service config file to specify configuration options (which are described below), and use service configurator to obtain a an instance of configured TAO_ECG_Mcast_Gateway in your program. (See TAO/orbsvcs/tests/Event/Mcast/Common and TAO/orbsvcs/tests/Event/Mcast/Simple for an example.)

Service config file options:

-ECGService <service> Valid values: sender, receiver, two_way Specifies whether this gateway should act as a multicast sender of the events or multicast receiver, or both.

-ECGAddressServer <server_type> Valid values: basic, source, type Specifies what implementation of the address server should be used by the gateway. basic - the same multicast address is returned for all event headers. source - multicast addresses are returned based on the event source, according to the mapping provided at initialization. type - multicast addresses are returned based on the event type, according to the mapping provided at initialization.

-ECGAddressServerArg <arg> Valid value: arg is a string, whose format requirements are specific to the implementation of address server used. Arg is not interpreted by the gateway, but simply passed to the address server specified by -ECGAddressServer during initialization. THIS OPTION MUST ALWAYS BE SPECIFIED BY THE USER (there is no default value for it)

-ECGHandler <handler_type> Valid values: basic, complex, udp Specifies what implementation of event handler should be used if gateway is acting as events receiver. basic - a simple event handler listening on a single mcast address. complex - event handler listening on multiple mcast addresses based on events of interest to consumers. udp - similar to basic handler, except listens on udp address as opposed to a multicast group.

-ECGTTL <ttl> Valid values: a number > 0 IP_Multicast time to live value that should be set on a sending socket. This option matters only if the gateway is acting as a sender of mcast messages.

-ECGNIC <nic> Valid values: name of the network interface This interface is used for sending and/or receiving multicast messages.

-ECGNonBlocking Boolean flag to configure if the socket is in blocking or non-blocking code. The default is non-blocking. NOTE: Certain device drivers block the process if the physical link fails.

2) Create an instance of TAO_ECG_Mcast_Gateway in your code, on the stack or dynamically, and use init () method to configure it. No configuration files involved. See service config options above for the description of configurable options, and init() method below for how to specify them.

Default configuration values (for either use case) can be found in ECG_Defaults.h

Definition at line 128 of file ECG_Mcast_Gateway.h.


Member Enumeration Documentation

enum TAO_ECG_Mcast_Gateway::Address_Server_Type

Values for some configuration parameters to init ().

Enumerator:
ECG_ADDRESS_SERVER_BASIC 
ECG_ADDRESS_SERVER_SOURCE 
ECG_ADDRESS_SERVER_TYPE 

Definition at line 152 of file ECG_Mcast_Gateway.h.

enum TAO_ECG_Mcast_Gateway::Handler_Type

Values for some configuration parameters to init ().

Enumerator:
ECG_HANDLER_BASIC 
ECG_HANDLER_COMPLEX 
ECG_HANDLER_UDP 

Definition at line 156 of file ECG_Mcast_Gateway.h.

enum TAO_ECG_Mcast_Gateway::Service_Type

Values for some configuration parameters to init ().

Enumerator:
ECG_MCAST_SENDER 
ECG_MCAST_RECEIVER 
ECG_MCAST_TWO_WAY 

Definition at line 148 of file ECG_Mcast_Gateway.h.


Constructor & Destructor Documentation

TAO_ECG_Mcast_Gateway::TAO_ECG_Mcast_Gateway ( void   ) 

Constructor.


Member Function Documentation

int TAO_ECG_Mcast_Gateway::fini ( void   )  [virtual]

The Service_Object entry points.

Reimplemented from ACE_Shared_Object.

Definition at line 44 of file ECG_Mcast_Gateway.cpp.

00045 {
00046   return 0;
00047 }

int TAO_ECG_Mcast_Gateway::init ( const RtecEventChannelAdmin::ConsumerQOS consumer_qos,
const char *  address_server_arg,
const Attributes attributes = Attributes() 
)

Same as the method above, but also gives the client an opportunity to specify consumer qos, i.e., which EC traffic should get multicasted.

Definition at line 233 of file ECG_Mcast_Gateway.cpp.

References consumer_qos_, and init().

00237 {
00238   this->consumer_qos_ = consumer_qos;
00239   return this->init (address_server_arg,
00240                      attributes);
00241 }

int TAO_ECG_Mcast_Gateway::init ( const char *  address_server_arg,
const Attributes attributes = Attributes() 
)

Configure TAO_ECG_Mcast_Gateway programatically. This method should be used when NOT using service configurator to obtain/configure TAO_ECG_Mcast_Gateway. See class documentation above for more info.

Definition at line 216 of file ECG_Mcast_Gateway.cpp.

References address_server_arg_, TAO_ECG_Mcast_Gateway::Attributes::address_server_type, address_server_type_, ACE_String_Base< CHAR >::c_str(), TAO_ECG_Mcast_Gateway::Attributes::handler_type, handler_type_, TAO_ECG_Mcast_Gateway::Attributes::ip_multicast_loop, ip_multicast_loop_, TAO_ECG_Mcast_Gateway::Attributes::nic, nic_, TAO_ECG_Mcast_Gateway::Attributes::non_blocking, non_blocking_, TAO_ECG_Mcast_Gateway::Attributes::service_type, service_type_, ACE_String_Base< CHAR >::set(), TAO_ECG_Mcast_Gateway::Attributes::ttl_value, ttl_value_, and validate_configuration().

00218 {
00219   this->address_server_arg_.set (address_server_arg);
00220 
00221   this->address_server_type_ = attr.address_server_type;
00222   this->handler_type_ = attr.handler_type;
00223   this->service_type_ = attr.service_type;
00224   this->ttl_value_ = attr.ttl_value;
00225   this->nic_.set (attr.nic.c_str ());
00226   this->ip_multicast_loop_ = attr.ip_multicast_loop;
00227   this->non_blocking_ = attr.non_blocking;
00228 
00229   return this->validate_configuration ();
00230 }

int TAO_ECG_Mcast_Gateway::init ( int  argc,
ACE_TCHAR argv[] 
) [virtual]

The Service_Object entry points.

Reimplemented from ACE_Shared_Object.

Definition at line 50 of file ECG_Mcast_Gateway.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), address_server_type_, ACE_OS::atoi(), ACE_Arg_Shifter_T< CHAR_TYPE >::consume_arg(), ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE, ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP, ECG_MCAST_RECEIVER, ECG_MCAST_SENDER, ECG_MCAST_TWO_WAY, ACE_Arg_Shifter_T< CHAR_TYPE >::get_current(), handler_type_, ACE_Arg_Shifter_T< CHAR_TYPE >::ignore_arg(), ACE_Arg_Shifter_T< CHAR_TYPE >::is_anything_left(), ACE_Arg_Shifter_T< CHAR_TYPE >::is_parameter_next(), LM_ERROR, LM_WARNING, service_type_, ACE_OS::strcasecmp(), ACE_OS::strtoul(), and ttl_value_.

Referenced by init().

00051 {
00052   int result = 0;
00053 
00054   ACE_Arg_Shifter arg_shifter (argc, argv);
00055 
00056   while (arg_shifter.is_anything_left ())
00057     {
00058       const ACE_TCHAR *arg = arg_shifter.get_current ();
00059 
00060       if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGService")) == 0)
00061         {
00062           arg_shifter.consume_arg ();
00063 
00064           if (arg_shifter.is_parameter_next ())
00065             {
00066               const ACE_TCHAR* opt = arg_shifter.get_current ();
00067               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("receiver")) == 0)
00068                 this->service_type_ = ECG_MCAST_RECEIVER;
00069               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("sender")) == 0)
00070                 this->service_type_ = ECG_MCAST_SENDER;
00071               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("two_way")) == 0)
00072                 this->service_type_ = ECG_MCAST_TWO_WAY;
00073               else
00074                 {
00075                   ACE_ERROR ((LM_ERROR,
00076                              ACE_TEXT ("Unsupported <-ECGService> option ")
00077                              ACE_TEXT ("value: <%s>. Ignoring this option ")
00078                              ACE_TEXT ("- using defaults instead.\n"),
00079                              opt));
00080                   result = -1;
00081                 }
00082               arg_shifter.consume_arg ();
00083             }
00084         }
00085 
00086       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServer")) == 0)
00087         {
00088           arg_shifter.consume_arg ();
00089 
00090           if (arg_shifter.is_parameter_next ())
00091             {
00092               const ACE_TCHAR* opt = arg_shifter.get_current ();
00093               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0)
00094                 this->address_server_type_ = ECG_ADDRESS_SERVER_BASIC;
00095               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("source")) == 0)
00096                 this->address_server_type_ = ECG_ADDRESS_SERVER_SOURCE;
00097               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("type")) == 0)
00098                 this->address_server_type_ = ECG_ADDRESS_SERVER_TYPE;
00099               else
00100                 {
00101                   ACE_ERROR ((LM_ERROR,
00102                               ACE_TEXT ("Unsupported <-ECGAddressServer> ")
00103                               ACE_TEXT ("option value: <%s>. Ignoring this ")
00104                               ACE_TEXT ("option - using defaults instead.\n"),
00105                               opt));
00106                   result = -1;
00107                 }
00108               arg_shifter.consume_arg ();
00109             }
00110         }
00111 
00112       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServerArg")) == 0)
00113         {
00114           arg_shifter.consume_arg ();
00115 
00116           if (arg_shifter.is_parameter_next ())
00117             {
00118               this->address_server_arg_.set (arg_shifter.get_current ());
00119               arg_shifter.consume_arg ();
00120             }
00121         }
00122 
00123 
00124       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGHandler")) == 0)
00125         {
00126           arg_shifter.consume_arg ();
00127 
00128           if (arg_shifter.is_parameter_next ())
00129             {
00130               const ACE_TCHAR* opt = arg_shifter.get_current ();
00131               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0)
00132                 this->handler_type_ = ECG_HANDLER_BASIC;
00133               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("complex")) == 0)
00134                 this->handler_type_ = ECG_HANDLER_COMPLEX;
00135               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("udp")) == 0)
00136                 this->handler_type_ = ECG_HANDLER_UDP;
00137               else
00138                 {
00139                   ACE_ERROR ((LM_ERROR,
00140                               ACE_TEXT ("Unsupported <-ECGHandler> ")
00141                               ACE_TEXT ("option value: <%s>. Ignoring this ")
00142                               ACE_TEXT ("option - using defaults instead.\n"),
00143                               opt));
00144                   result = -1;
00145                 }
00146               arg_shifter.consume_arg ();
00147             }
00148         }
00149 
00150       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGTTL")) == 0)
00151         {
00152           arg_shifter.consume_arg ();
00153 
00154           if (arg_shifter.is_parameter_next ())
00155             {
00156               const ACE_TCHAR* opt = arg_shifter.get_current ();
00157               unsigned long tmp = ACE_OS::strtoul (opt, 0, 0) & 0xff;
00158               this->ttl_value_ = static_cast<u_char> (tmp);
00159               arg_shifter.consume_arg ();
00160             }
00161         }
00162 
00163       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNIC")) == 0)
00164         {
00165           arg_shifter.consume_arg ();
00166 
00167           if (arg_shifter.is_parameter_next ())
00168             {
00169               this->nic_.set (arg_shifter.get_current ());
00170               arg_shifter.consume_arg ();
00171             }
00172         }
00173 
00174       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGIPMULTICASTLOOP")) == 0)
00175         {
00176           arg_shifter.consume_arg ();
00177 
00178           if (arg_shifter.is_parameter_next ())
00179             {
00180               this->ip_multicast_loop_ =
00181                 (ACE_OS::atoi(arg_shifter.get_current()) != 0);
00182               arg_shifter.consume_arg ();
00183             }
00184         }
00185 
00186       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNONBLOCKING")) == 0)
00187       {
00188           arg_shifter.consume_arg ();
00189 
00190           if (arg_shifter.is_parameter_next ())
00191             {
00192               this->non_blocking_ =
00193                 (ACE_OS::atoi(arg_shifter.get_current()) != 0);
00194               arg_shifter.consume_arg ();
00195             }
00196         }
00197 
00198       else
00199         {
00200           arg_shifter.ignore_arg ();
00201           ACE_DEBUG ((LM_WARNING,
00202                       ACE_TEXT ("Ignoring <%s> option ")
00203                       ACE_TEXT ("during initialization.\n"),
00204                       arg));
00205           result = -1;
00206         }
00207     }
00208 
00209   if (this->validate_configuration () == -1)
00210     return -1;
00211   else
00212     return result;
00213 }

PortableServer::ServantBase * TAO_ECG_Mcast_Gateway::init_address_server ( void   )  [private]

Allocate and initialize appropriate objects.

Definition at line 359 of file ECG_Mcast_Gateway.cpp.

References TAO_EC_Servant_Var< T >::_retn(), ACE_ERROR, address_server_arg_, TAO_ECG_Complex_Address_Server::create(), TAO_ECG_Simple_Address_Server::create(), ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE, TAO_EC_Servant_Var< T >::in(), ACE_String_Base< CHAR >::length(), and LM_ERROR.

Referenced by run().

00360 {
00361   const char * address_server_arg =
00362     (this->address_server_arg_.length ())
00363     ? this->address_server_arg_.c_str () : 0;
00364 
00365   if (this->address_server_type_ == ECG_ADDRESS_SERVER_BASIC)
00366     {
00367       TAO_EC_Servant_Var<TAO_ECG_Simple_Address_Server> impl =
00368         TAO_ECG_Simple_Address_Server::create ();
00369       if (!impl.in ())
00370         return 0;
00371 
00372       if (impl->init (address_server_arg) == -1)
00373         {
00374           return 0;
00375         }
00376       return impl._retn ();
00377     }
00378 
00379   else if (this->address_server_type_ == ECG_ADDRESS_SERVER_SOURCE)
00380     {
00381       TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl =
00382         TAO_ECG_Complex_Address_Server::create (1);
00383       if (!impl.in ())
00384         return 0;
00385 
00386       if (impl->init (address_server_arg) == -1)
00387         {
00388           return 0;
00389         }
00390       return impl._retn ();
00391     }
00392 
00393   else if (this->address_server_type_ == ECG_ADDRESS_SERVER_TYPE)
00394     {
00395       TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl =
00396         TAO_ECG_Complex_Address_Server::create (0);
00397       if (!impl.in ())
00398         return 0;
00399 
00400       if (impl->init (address_server_arg) == -1)
00401         {
00402           return 0;
00403         }
00404       return impl._retn ();
00405     }
00406 
00407   else
00408     {
00409       ACE_ERROR ((LM_ERROR,
00410                   "Cannot create address server: "
00411                   "unknown address server type specified.\n"));
00412       return 0;
00413     }
00414 }

TAO_ECG_Refcounted_Endpoint TAO_ECG_Mcast_Gateway::init_endpoint ( void   )  [private]

Allocate and initialize appropriate objects.

Definition at line 288 of file ECG_Mcast_Gateway.cpp.

References ACE_ERROR, ACE_NEW_NORETURN, ACE_NONBLOCK, ACE_IPC_SAP::enable(), IP_MULTICAST_LOOP, IP_MULTICAST_TTL, IPPROTO_IP, LM_ERROR, ACE_SOCK_Dgram::open(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset(), ACE_Addr::sap_any, and ACE_SOCK_Dgram::set_nic().

Referenced by run().

00289 {
00290   TAO_ECG_UDP_Out_Endpoint* endpoint = 0;
00291   TAO_ECG_Refcounted_Endpoint refendpoint;
00292 
00293   // Try to allocate a new endpoint from the heap
00294   ACE_NEW_NORETURN (endpoint,
00295                     TAO_ECG_UDP_Out_Endpoint);
00296 
00297   if (endpoint != 0)
00298   {
00299     refendpoint.reset (endpoint);
00300   }
00301   else
00302   {
00303     return TAO_ECG_Refcounted_Endpoint ();
00304   }
00305 
00306   ACE_SOCK_Dgram& dgram = refendpoint->dgram ();
00307 
00308   if (dgram.open (ACE_Addr::sap_any) == -1)
00309     {
00310       ACE_ERROR ((LM_ERROR,
00311                              "Cannot open dgram "
00312                              "for sending mcast messages.\n"));
00313       return TAO_ECG_Refcounted_Endpoint ();
00314     }
00315 
00316   if (this->nic_.length () != 0)
00317     {
00318       dgram.set_nic (this->nic_.c_str ());
00319     }
00320 
00321   if (this->ttl_value_ > 0)
00322     {
00323       if (dgram.ACE_SOCK::set_option (IPPROTO_IP,
00324                                       IP_MULTICAST_TTL,
00325                                       &this->ttl_value_,
00326                                       sizeof (this->ttl_value_))
00327           == -1)
00328         {
00329           ACE_ERROR ((LM_ERROR,
00330                       "Error setting TTL option on dgram "
00331                       "for sending mcast messages.\n"));
00332           return TAO_ECG_Refcounted_Endpoint ();
00333         }
00334     }
00335 
00336   if (dgram.ACE_SOCK::set_option (IPPROTO_IP,
00337                                   IP_MULTICAST_LOOP,
00338                                   &this->ip_multicast_loop_,
00339                                   sizeof (this->ip_multicast_loop_)) == -1)
00340     {
00341       ACE_ERROR ((LM_ERROR,
00342                   "Error setting MULTICAST_LOOP option "
00343                   "on dgram for sending mcast messages.\n"));
00344       return TAO_ECG_Refcounted_Endpoint ();
00345     }
00346 
00347   if (this->non_blocking_
00348       && dgram.enable(ACE_NONBLOCK) == -1)
00349     {
00350       ACE_ERROR ((LM_ERROR,
00351                   "Error setting NON BLOCKING option.\n"));
00352       return TAO_ECG_Refcounted_Endpoint ();
00353     }
00354 
00355   return refendpoint;
00356 }

TAO_ECG_Refcounted_Handler TAO_ECG_Mcast_Gateway::init_handler ( TAO_ECG_Dgram_Handler recv,
RtecEventChannelAdmin::EventChannel_ptr  ec,
ACE_Reactor reactor 
) [private]

Allocate and initialize appropriate objects.

Definition at line 417 of file ECG_Mcast_Gateway.cpp.

References ACE_ERROR, ACE_NEW_RETURN, address_server_arg_, ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP, ACE_String_Base< CHAR >::length(), LM_ERROR, nic_, TAO_ECG_UDP_EH::open(), TAO_ECG_Mcast_EH::open(), TAO_ECG_Simple_Mcast_EH::open(), ACE_Event_Handler::reactor(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset(), and ACE_INET_Addr::set().

00420 {
00421   TAO_ECG_Refcounted_Handler handler;
00422 
00423   const char * nic =
00424     (this->nic_.length ()) ? this->nic_.c_str () : 0;
00425   const char * address_server_arg =
00426     (this->address_server_arg_.length ())
00427     ? this->address_server_arg_.c_str () : 0;
00428 
00429   if (this->handler_type_ == ECG_HANDLER_BASIC)
00430     {
00431       TAO_ECG_Simple_Mcast_EH * h = 0;
00432       ACE_NEW_RETURN (h,
00433                       TAO_ECG_Simple_Mcast_EH (receiver),
00434                       handler);
00435       handler.reset (h);
00436 
00437       h->reactor (reactor);
00438       if (h->open (address_server_arg, nic) != 0)
00439         return TAO_ECG_Refcounted_Handler ();
00440     }
00441 
00442   else if (this->handler_type_ == ECG_HANDLER_COMPLEX)
00443     {
00444       TAO_ECG_Mcast_EH * h = 0;
00445       ACE_NEW_RETURN (h,
00446                       TAO_ECG_Mcast_EH (receiver, nic),
00447                       handler);
00448       handler.reset (h);
00449 
00450       h->reactor (reactor);
00451 
00452       h->open (ec);
00453     }
00454 
00455   else if (this->handler_type_ == ECG_HANDLER_UDP)
00456     {
00457       TAO_ECG_UDP_EH * h = 0;
00458       ACE_NEW_RETURN (h,
00459                       TAO_ECG_UDP_EH (receiver),
00460                       handler);
00461       handler.reset (h);
00462       h->reactor (reactor);
00463 
00464       ACE_INET_Addr ipaddr;
00465       if (ipaddr.set (address_server_arg) != 0)
00466         {
00467           ACE_ERROR ((LM_ERROR,
00468                       "ERROR using address server argument "
00469                       "in ACE_INET_Addr.set ().\n"));
00470           return TAO_ECG_Refcounted_Handler ();
00471         }
00472       if (h->open (ipaddr) != 0)
00473         return TAO_ECG_Refcounted_Handler ();
00474     }
00475 
00476   else
00477     {
00478       ACE_ERROR ((LM_ERROR,
00479                   "Cannot create handler: unknown "
00480                   "handler type specified.\n"));
00481       return handler;
00482     }
00483 
00484   return handler;
00485 }

TAO_EC_Servant_Var< TAO_ECG_UDP_Receiver > TAO_ECG_Mcast_Gateway::init_receiver ( RtecEventChannelAdmin::EventChannel_ptr  ec,
RtecUDPAdmin::AddrServer_ptr  address_server,
TAO_ECG_Refcounted_Endpoint  endpoint_rptr 
) [private]

Allocate and initialize appropriate objects.

Definition at line 531 of file ECG_Mcast_Gateway.cpp.

References ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, TAO_ECG_UDP_Receiver::create(), TAO_EC_Auto_Command< T >::disallow_command(), ACE_SupplierQOS_Factory::get_SupplierQOS(), TAO_EC_Servant_Var< T >::in(), ACE_SupplierQOS_Factory::insert(), RtecEventChannelAdmin::SupplierQOS::is_gateway, and TAO_EC_Auto_Command< T >::set_command().

Referenced by run().

00535 {
00536   TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver>
00537     receiver (TAO_ECG_UDP_Receiver::create ());
00538   if (!receiver.in ())
00539     return receiver;
00540 
00541   receiver->init (ec,
00542                   endpoint_rptr,
00543                   address_server);
00544 
00545   TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown;
00546   receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver));
00547 
00548   ACE_SupplierQOS_Factory supplier_qos_factory;
00549   supplier_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY,
00550                                ACE_ES_EVENT_ANY,
00551                                0, 1);
00552   RtecEventChannelAdmin::SupplierQOS & qos =
00553     const_cast<RtecEventChannelAdmin::SupplierQOS &> (supplier_qos_factory.get_SupplierQOS ());
00554   qos.is_gateway = 1;
00555 
00556   receiver->connect (qos);
00557 
00558   receiver_shutdown.disallow_command ();
00559   return receiver;
00560 }

TAO_EC_Servant_Var< TAO_ECG_UDP_Sender > TAO_ECG_Mcast_Gateway::init_sender ( RtecEventChannelAdmin::EventChannel_ptr  ec,
RtecUDPAdmin::AddrServer_ptr  address_server,
TAO_ECG_Refcounted_Endpoint  endpoint_rptr 
) [private]

Allocate and initialize appropriate objects.

Definition at line 488 of file ECG_Mcast_Gateway.cpp.

References ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, TAO_ECG_UDP_Sender::create(), TAO_EC_Auto_Command< T >::disallow_command(), ACE_ConsumerQOS_Factory::get_ConsumerQOS(), TAO_EC_Servant_Var< T >::in(), ACE_ConsumerQOS_Factory::insert(), RtecEventChannelAdmin::ConsumerQOS::is_gateway, TAO_EC_Auto_Command< T >::set_command(), ACE_ConsumerQOS_Factory::start_disjunction_group(), and UDP_Sender_Shutdown.

Referenced by run().

00492 {
00493   TAO_EC_Servant_Var<TAO_ECG_UDP_Sender>
00494     sender (TAO_ECG_UDP_Sender::create ());
00495   if (!sender.in ())
00496     return sender;
00497 
00498   sender->init (ec,
00499                 address_server,
00500                 endpoint_rptr);
00501 
00502   TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown;
00503   sender_shutdown.set_command (UDP_Sender_Shutdown (sender));
00504 
00505   if (this->consumer_qos_.dependencies.length () > 0)
00506     {
00507       // Client supplied consumer qos.  Use it.
00508       this->consumer_qos_.is_gateway = 1;
00509       sender->connect (this->consumer_qos_);
00510     }
00511   else
00512     {
00513       // Client did not specify anything - subscribe to all events.
00514       ACE_ConsumerQOS_Factory consumer_qos_factory;
00515       consumer_qos_factory.start_disjunction_group (1);
00516       consumer_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY,
00517                                    ACE_ES_EVENT_ANY,
00518                                    0);
00519       RtecEventChannelAdmin::ConsumerQOS & qos =
00520         const_cast<RtecEventChannelAdmin::ConsumerQOS &> (consumer_qos_factory.get_ConsumerQOS ());
00521       qos.is_gateway = 1;
00522 
00523       sender->connect (qos);
00524     }
00525 
00526   sender_shutdown.disallow_command ();
00527   return sender;
00528 }

int TAO_ECG_Mcast_Gateway::init_svcs ( void   )  [static]

Helper function to register the Gateway into the service configurator.

Definition at line 36 of file ECG_Mcast_Gateway.cpp.

References ACE_Service_Config::static_svcs().

00037 {
00038   return ACE_Service_Config::static_svcs ()->
00039     insert (&ace_svc_desc_TAO_ECG_Mcast_Gateway);
00040 }

void TAO_ECG_Mcast_Gateway::run ( CORBA::ORB_ptr  orb,
RtecEventChannelAdmin::EventChannel_ptr  ec 
)

The main method - create, configure and run federation components according to the specified configuration.

Definition at line 583 of file ECG_Mcast_Gateway.cpp.

References ACE_DEBUG, activate(), TAO_EC_Auto_Command< T >::disallow_command(), TAO_EC_Object_Deactivator::disallow_deactivation(), ECG_MCAST_RECEIVER, ECG_MCAST_SENDER, ECG_MCAST_TWO_WAY, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), TAO_EC_Servant_Var< T >::in(), TAO_Objref_Var_T< T >::in(), PortableServer::Servant_var< T >::in(), init_address_server(), init_endpoint(), init_receiver(), init_sender(), LM_ERROR, CORBA::ORB::orb_core(), TAO_ORB_Core::reactor(), ACE_Event_Handler::reactor(), TAO_EC_Auto_Command< T >::set_command(), UDP_Sender_Shutdown, and verify_args().

00585 {
00586   // Verify args.
00587   this->verify_args (orb, ec);
00588 
00589   // Auto-cleanup objects.
00590   TAO_EC_Object_Deactivator address_server_deactivator;
00591   TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown;
00592   TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown;
00593 
00594   // Set up address server.
00595   PortableServer::ServantBase_var address_server_servant =
00596     this->init_address_server ();
00597   if (!address_server_servant.in ())
00598     {
00599       ACE_DEBUG ((LM_ERROR,
00600                   "Unable to create address server.\n"));
00601       throw CORBA::INTERNAL ();
00602     }
00603 
00604   RtecUDPAdmin::AddrServer_var address_server;
00605 
00606   PortableServer::POA_var poa =
00607     address_server_servant->_default_POA ();
00608 
00609   activate (address_server,
00610             poa.in (),
00611             address_server_servant.in (),
00612             address_server_deactivator);
00613 
00614   TAO_ECG_Refcounted_Endpoint endpoint_rptr;
00615   TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender;
00616 
00617   // Set up event sender.
00618   if (this->service_type_ == ECG_MCAST_SENDER
00619       || this->service_type_ == ECG_MCAST_TWO_WAY)
00620     {
00621       endpoint_rptr = this->init_endpoint ();
00622       if (endpoint_rptr.get () == 0)
00623         {
00624           throw CORBA::INTERNAL ();
00625         }
00626 
00627       sender = this->init_sender (ec,
00628                                   address_server.in (),
00629                                   endpoint_rptr);
00630       if (!sender.in ())
00631         {
00632           throw CORBA::INTERNAL ();
00633         }
00634 
00635       sender_shutdown.set_command (UDP_Sender_Shutdown (sender));
00636     }
00637 
00638   // Set up event receiver.
00639   TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver;
00640   if (this->service_type_ == ECG_MCAST_RECEIVER
00641       || this->service_type_ == ECG_MCAST_TWO_WAY)
00642     {
00643       ACE_Reactor *reactor = orb->orb_core ()->reactor ();
00644 
00645       receiver = this->init_receiver (ec,
00646                                       address_server.in (),
00647                                       endpoint_rptr);
00648       if (!receiver.in ())
00649         {
00650           throw CORBA::INTERNAL ();
00651         }
00652 
00653       receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver));
00654 
00655       TAO_ECG_Refcounted_Handler
00656         handler_rptr (this->init_handler (receiver.in (),
00657                                           ec,
00658                                           reactor));
00659       if (handler_rptr.get () == 0)
00660         {
00661           throw CORBA::INTERNAL ();
00662         }
00663       receiver->set_handler_shutdown (handler_rptr);
00664     }
00665 
00666   // Everything went ok - disable auto-cleanup.
00667   address_server_deactivator.disallow_deactivation ();
00668   receiver_shutdown.disallow_command ();
00669   sender_shutdown.disallow_command ();
00670 }

int TAO_ECG_Mcast_Gateway::validate_configuration ( void   )  [private]

Verifies configuration values specified through init() make sense.

Definition at line 244 of file ECG_Mcast_Gateway.cpp.

References ACE_DEBUG, ECG_ADDRESS_SERVER_BASIC, ECG_HANDLER_BASIC, ECG_HANDLER_UDP, ECG_MCAST_SENDER, and LM_ERROR.

Referenced by init().

00245 {
00246   if ((this->handler_type_ == ECG_HANDLER_BASIC
00247        || this->handler_type_ == ECG_HANDLER_UDP)
00248       && this->service_type_ != ECG_MCAST_SENDER
00249       && this->address_server_type_ != ECG_ADDRESS_SERVER_BASIC)
00250     {
00251       ACE_DEBUG ((LM_ERROR,
00252                       "Configurations for mcast handler and "
00253                       "address server do not match.\n"));
00254       return -1;
00255     }
00256 
00257   // Currently all Address Server implementations require an
00258   // initialization string.  If we ever add a new Address Server
00259   // implementation, which does not, we'll have to remove this check.
00260   if (this->address_server_arg_.length () == 0)
00261     {
00262       ACE_DEBUG ((LM_ERROR,
00263                       "Address server initializaton "
00264                       "argument not specified.\n"));
00265       return -1;
00266     }
00267 
00268   if (this->ip_multicast_loop_ != 0
00269       && this->ip_multicast_loop_ != 1)
00270     {
00271       ACE_DEBUG ((LM_ERROR,
00272                   "IP MULTICAST LOOP option must have a boolean value.\n"));
00273       return -1;
00274     }
00275 
00276   if (this->non_blocking_ != 0
00277       && this->non_blocking_ != 1)
00278     {
00279       ACE_DEBUG ((LM_ERROR,
00280                   "NON BLOCKING flag must have a boolean value.\n"));
00281       return -1;
00282     }
00283 
00284   return 0;
00285 }

void TAO_ECG_Mcast_Gateway::verify_args ( CORBA::ORB_ptr  orb,
RtecEventChannelAdmin::EventChannel_ptr  ec 
) [private]

Check that arguments to run() are not nil.

Definition at line 563 of file ECG_Mcast_Gateway.cpp.

References ACE_ERROR, CORBA::is_nil(), and LM_ERROR.

Referenced by run().

00565 {
00566   if (CORBA::is_nil (ec))
00567     {
00568       ACE_ERROR ((LM_ERROR,
00569                   "Nil event channel argument passed to "
00570                   "TAO_ECG_Mcast_Gateway::run().\n"));
00571       throw CORBA::INTERNAL ();
00572     }
00573   if (CORBA::is_nil (orb))
00574     {
00575       ACE_ERROR ((LM_ERROR,
00576                   "Nil orb argument passed to "
00577                   "TAO_ECG_Mcast_Gateway::run().\n"));
00578       throw CORBA::INTERNAL ();
00579     }
00580 }


Member Data Documentation

ACE_CString TAO_ECG_Mcast_Gateway::address_server_arg_ [private]

Flags controlling configuration.

Definition at line 265 of file ECG_Mcast_Gateway.h.

Referenced by init(), init_address_server(), and init_handler().

Address_Server_Type TAO_ECG_Mcast_Gateway::address_server_type_ [private]

Flags controlling configuration.

Definition at line 264 of file ECG_Mcast_Gateway.h.

Referenced by init().

RtecEventChannelAdmin::ConsumerQOS TAO_ECG_Mcast_Gateway::consumer_qos_ [private]

Flags controlling configuration.

Definition at line 271 of file ECG_Mcast_Gateway.h.

Referenced by init().

Handler_Type TAO_ECG_Mcast_Gateway::handler_type_ [private]

Flags controlling configuration.

Definition at line 263 of file ECG_Mcast_Gateway.h.

Referenced by init().

int TAO_ECG_Mcast_Gateway::ip_multicast_loop_ [private]

Flags controlling configuration.

Definition at line 268 of file ECG_Mcast_Gateway.h.

Referenced by init().

ACE_CString TAO_ECG_Mcast_Gateway::nic_ [private]

Flags controlling configuration.

Definition at line 267 of file ECG_Mcast_Gateway.h.

Referenced by init(), and init_handler().

int TAO_ECG_Mcast_Gateway::non_blocking_ [private]

Flags controlling configuration.

Definition at line 269 of file ECG_Mcast_Gateway.h.

Referenced by init().

Service_Type TAO_ECG_Mcast_Gateway::service_type_ [private]

Flags controlling configuration.

Definition at line 262 of file ECG_Mcast_Gateway.h.

Referenced by init().

u_char TAO_ECG_Mcast_Gateway::ttl_value_ [private]

Flags controlling configuration.

Definition at line 266 of file ECG_Mcast_Gateway.h.

Referenced by init().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:44:39 2010 for TAO_RTEvent by  doxygen 1.4.7