TAO_ECG_Mcast_Gateway Class Reference

Implement the builder for setting up Event Channel multicast gateway. NOT THREAD-SAFE. More...

#include <ECG_Mcast_Gateway.h>

Inheritance diagram for TAO_ECG_Mcast_Gateway:

Inheritance graph
[legend]
Collaboration diagram for TAO_ECG_Mcast_Gateway:

Collaboration graph
[legend]
List of all members.

Public Types

enum  Service_Type { ECG_MCAST_SENDER, ECG_MCAST_RECEIVER, ECG_MCAST_TWO_WAY }
enum  Address_Server_Type { ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE }
enum  Handler_Type { ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP }

Public Member Functions

 TAO_ECG_Mcast_Gateway (void)
 Constructor.

int init (const char *address_server_arg, const Attributes &attributes=Attributes())
int init (const RtecEventChannelAdmin::ConsumerQOS &consumer_qos, const char *address_server_arg, const Attributes &attributes=Attributes())
void run (CORBA::ORB_ptr orb, RtecEventChannelAdmin::EventChannel_ptr ec)
virtual int init (int argc, ACE_TCHAR *argv[])
virtual int fini (void)

Static Public Member Functions

int init_svcs (void)

Private Member Functions

void verify_args (CORBA::ORB_ptr orb, RtecEventChannelAdmin::EventChannel_ptr ec)
 Check that arguments to run() are not nil.

int validate_configuration (void)
 Verifies configuration values specified through init() make sense.

PortableServer::ServantBaseinit_address_server (void)
TAO_EC_Servant_Var< TAO_ECG_UDP_Senderinit_sender (RtecEventChannelAdmin::EventChannel_ptr ec, RtecUDPAdmin::AddrServer_ptr address_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr)
TAO_EC_Servant_Var< TAO_ECG_UDP_Receiverinit_receiver (RtecEventChannelAdmin::EventChannel_ptr ec, RtecUDPAdmin::AddrServer_ptr address_server, TAO_ECG_Refcounted_Endpoint endpoint_rptr)
TAO_ECG_Refcounted_Endpoint init_endpoint (void)
TAO_ECG_Refcounted_Handler init_handler (TAO_ECG_Dgram_Handler *recv, RtecEventChannelAdmin::EventChannel_ptr ec, ACE_Reactor *reactor)

Private Attributes

Service_Type service_type_
Handler_Type handler_type_
Address_Server_Type address_server_type_
ACE_CString address_server_arg_
u_char ttl_value_
ACE_CString nic_
int ip_multicast_loop_
int non_blocking_
RtecEventChannelAdmin::ConsumerQOS consumer_qos_

Detailed Description

Implement the builder for setting up Event Channel multicast gateway. NOT THREAD-SAFE.

This class simplifies creation of federated Event Channels by presenting a simple unified interface for creating and configuring all components needed to federate an Event Channel. Configuration options are described below.

NOTE: This class does not own any of the components it creates and its lifetime is independent of theirs. This class acts purely as a wrapper facade for creating and wiring appropriate components together.

Todo:
This class is an ACE_Service_Object, but the only reason for it is the need for easy configuration using files. Since ACE_Service_Object provides much more than that, we should look into replacing it with a more lightweight utility that would serve our needs.
CONFIGURATION OPTIONS There are two ways to use this class: 1) Use service config file to specify configuration options (which are described below), and use service configurator to obtain a an instance of configured TAO_ECG_Mcast_Gateway in your program. (See TAO/orbsvcs/tests/Event/Mcast/Common and TAO/orbsvcs/tests/Event/Mcast/Simple for an example.)

Service config file options:

-ECGService Valid values: sender, receiver, two_way Specifies whether this gateway should act as a multicast sender of the events or multicast receiver, or both.

-ECGAddressServer Valid values: basic, source, type Specifies what implementation of the address server should be used by the gateway. basic - the same multicast address is returned for all event headers. source - multicast addresses are returned based on the event source, according to the mapping provided at initialization. type - multicast addresses are returned based on the event type, according to the mapping provided at initialization.

-ECGAddressServerArg Valid value: arg is a string, whose format requirements are specific to the implementation of address server used. Arg is not interpreted by the gateway, but simply passed to the address server specified by -ECGAddressServer during initialization. THIS OPTION MUST ALWAYS BE SPECIFIED BY THE USER (there is no default value for it)

-ECGHandler Valid values: basic, complex, udp Specifies what implementation of event handler should be used if gateway is acting as events receiver. basic - a simple event handler listening on a single mcast address. complex - event handler listening on multiple mcast addresses based on events of interest to consumers. udp - similar to basic handler, except listens on udp address as opposed to a multicast group.

-ECGTTL Valid values: a number > 0 IP_Multicast time to live value that should be set on a sending socket. This option matters only if the gateway is acting as a sender of mcast messages.

-ECGNIC Valid values: name of the network interface This interface is used for sending and/or receiving multicast messages.

-ECGNonBlocking Boolean flag to configure if the socket is in blocking or non-blocking code. The default is non-blocking. NOTE: Certain device drivers block the process if the physical link fails.

2) Create an instance of TAO_ECG_Mcast_Gateway in your code, on the stack or dynamically, and use init () method to configure it. No configuration files involved. See service config options above for the description of configurable options, and init() method below for how to specify them.

Default configuration values (for either use case) can be found in ECG_Defaults.h

Definition at line 128 of file ECG_Mcast_Gateway.h.


Member Enumeration Documentation

enum TAO_ECG_Mcast_Gateway::Address_Server_Type
 

Enumeration values:
ECG_ADDRESS_SERVER_BASIC 
ECG_ADDRESS_SERVER_SOURCE 
ECG_ADDRESS_SERVER_TYPE 

Definition at line 152 of file ECG_Mcast_Gateway.h.

enum TAO_ECG_Mcast_Gateway::Handler_Type
 

Enumeration values:
ECG_HANDLER_BASIC 
ECG_HANDLER_COMPLEX 
ECG_HANDLER_UDP 

Definition at line 156 of file ECG_Mcast_Gateway.h.

enum TAO_ECG_Mcast_Gateway::Service_Type
 

Enumeration values:
ECG_MCAST_SENDER 
ECG_MCAST_RECEIVER 
ECG_MCAST_TWO_WAY 

Definition at line 148 of file ECG_Mcast_Gateway.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE TAO_ECG_Mcast_Gateway::TAO_ECG_Mcast_Gateway void   ) 
 

Constructor.

Definition at line 8 of file ECG_Mcast_Gateway.i.

References consumer_qos_, RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_ECG_DEFAULT_ADDRESS_SERVER, TAO_ECG_DEFAULT_ADDRESS_SERVER_ARG, TAO_ECG_DEFAULT_HANDLER, TAO_ECG_DEFAULT_IP_MULTICAST_LOOP, TAO_ECG_DEFAULT_NIC, TAO_ECG_DEFAULT_NON_BLOCKING, TAO_ECG_DEFAULT_SERVICE, and TAO_ECG_DEFAULT_TTL.

00009   : service_type_ (TAO_ECG_DEFAULT_SERVICE)
00010   , handler_type_ (TAO_ECG_DEFAULT_HANDLER)
00011   , address_server_type_ (TAO_ECG_DEFAULT_ADDRESS_SERVER)
00012   , address_server_arg_ ((const char *) TAO_ECG_DEFAULT_ADDRESS_SERVER_ARG)
00013   , ttl_value_ (TAO_ECG_DEFAULT_TTL)
00014   , nic_ (static_cast<const char *> (TAO_ECG_DEFAULT_NIC))
00015   , ip_multicast_loop_ (TAO_ECG_DEFAULT_IP_MULTICAST_LOOP)
00016   , non_blocking_ (TAO_ECG_DEFAULT_NON_BLOCKING)
00017   , consumer_qos_ ()
00018 {
00019   this->consumer_qos_.dependencies.length (0);
00020 }


Member Function Documentation

int TAO_ECG_Mcast_Gateway::fini void   )  [virtual]
 

Reimplemented from ACE_Shared_Object.

Definition at line 44 of file ECG_Mcast_Gateway.cpp.

00045 {
00046   return 0;
00047 }

int TAO_ECG_Mcast_Gateway::init const RtecEventChannelAdmin::ConsumerQOS consumer_qos,
const char *  address_server_arg,
const Attributes attributes = Attributes()
 

Same as the method above, but also gives the client an opportunity to specify consumer qos, i.e., which EC traffic should get multicasted.

Definition at line 233 of file ECG_Mcast_Gateway.cpp.

References consumer_qos_, and init().

00237 {
00238   this->consumer_qos_ = consumer_qos;
00239   return this->init (address_server_arg,
00240                      attributes);
00241 }

int TAO_ECG_Mcast_Gateway::init const char *  address_server_arg,
const Attributes attributes = Attributes()
 

Configure TAO_ECG_Mcast_Gateway programatically. This method should be used when NOT using service configurator to obtain/configure TAO_ECG_Mcast_Gateway. See class documentation above for more info.

Definition at line 216 of file ECG_Mcast_Gateway.cpp.

References address_server_arg_, TAO_ECG_Mcast_Gateway::Attributes::address_server_type, address_server_type_, TAO_ECG_Mcast_Gateway::Attributes::handler_type, handler_type_, TAO_ECG_Mcast_Gateway::Attributes::ip_multicast_loop, ip_multicast_loop_, TAO_ECG_Mcast_Gateway::Attributes::nic, nic_, TAO_ECG_Mcast_Gateway::Attributes::non_blocking, non_blocking_, TAO_ECG_Mcast_Gateway::Attributes::service_type, TAO_ECG_Mcast_Gateway::Attributes::ttl_value, ttl_value_, and validate_configuration().

00218 {
00219   this->address_server_arg_.set (address_server_arg);
00220 
00221   this->address_server_type_ = attr.address_server_type;
00222   this->handler_type_ = attr.handler_type;
00223   this->service_type_ = attr.service_type;
00224   this->ttl_value_ = attr.ttl_value;
00225   this->nic_.set (attr.nic.c_str ());
00226   this->ip_multicast_loop_ = attr.ip_multicast_loop;
00227   this->non_blocking_ = attr.non_blocking;
00228 
00229   return this->validate_configuration ();
00230 }

int TAO_ECG_Mcast_Gateway::init int  argc,
ACE_TCHAR argv[]
[virtual]
 

Reimplemented from ACE_Shared_Object.

Definition at line 50 of file ECG_Mcast_Gateway.cpp.

References ACE_Arg_Shifter, ACE_DEBUG, ACE_ERROR, ACE_TCHAR, ACE_TEXT(), address_server_arg_, address_server_type_, ACE_OS::atoi(), ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE, ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP, ECG_MCAST_RECEIVER, ECG_MCAST_SENDER, ECG_MCAST_TWO_WAY, handler_type_, ip_multicast_loop_, LM_ERROR, LM_WARNING, nic_, non_blocking_, ACE_OS::strcasecmp(), ACE_OS::strtoul(), ttl_value_, and validate_configuration().

Referenced by init().

00051 {
00052   int result = 0;
00053 
00054   ACE_Arg_Shifter arg_shifter (argc, argv);
00055 
00056   while (arg_shifter.is_anything_left ())
00057     {
00058       const ACE_TCHAR *arg = arg_shifter.get_current ();
00059 
00060       if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGService")) == 0)
00061         {
00062           arg_shifter.consume_arg ();
00063 
00064           if (arg_shifter.is_parameter_next ())
00065             {
00066               const ACE_TCHAR* opt = arg_shifter.get_current ();
00067               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("receiver")) == 0)
00068                 this->service_type_ = ECG_MCAST_RECEIVER;
00069               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("sender")) == 0)
00070                 this->service_type_ = ECG_MCAST_SENDER;
00071               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("two_way")) == 0)
00072                 this->service_type_ = ECG_MCAST_TWO_WAY;
00073               else
00074                 {
00075                   ACE_ERROR ((LM_ERROR,
00076                              ACE_TEXT ("Unsupported <-ECGService> option ")
00077                              ACE_TEXT ("value: <%s>. Ignoring this option ")
00078                              ACE_TEXT ("- using defaults instead.\n"),
00079                              opt));
00080                   result = -1;
00081                 }
00082               arg_shifter.consume_arg ();
00083             }
00084         }
00085 
00086       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServer")) == 0)
00087         {
00088           arg_shifter.consume_arg ();
00089 
00090           if (arg_shifter.is_parameter_next ())
00091             {
00092               const ACE_TCHAR* opt = arg_shifter.get_current ();
00093               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0)
00094                 this->address_server_type_ = ECG_ADDRESS_SERVER_BASIC;
00095               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("source")) == 0)
00096                 this->address_server_type_ = ECG_ADDRESS_SERVER_SOURCE;
00097               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("type")) == 0)
00098                 this->address_server_type_ = ECG_ADDRESS_SERVER_TYPE;
00099               else
00100                 {
00101                   ACE_ERROR ((LM_ERROR,
00102                               ACE_TEXT ("Unsupported <-ECGAddressServer> ")
00103                               ACE_TEXT ("option value: <%s>. Ignoring this ")
00104                               ACE_TEXT ("option - using defaults instead.\n"),
00105                               opt));
00106                   result = -1;
00107                 }
00108               arg_shifter.consume_arg ();
00109             }
00110         }
00111 
00112       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServerArg")) == 0)
00113         {
00114           arg_shifter.consume_arg ();
00115 
00116           if (arg_shifter.is_parameter_next ())
00117             {
00118               this->address_server_arg_.set (arg_shifter.get_current ());
00119               arg_shifter.consume_arg ();
00120             }
00121         }
00122 
00123 
00124       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGHandler")) == 0)
00125         {
00126           arg_shifter.consume_arg ();
00127 
00128           if (arg_shifter.is_parameter_next ())
00129             {
00130               const ACE_TCHAR* opt = arg_shifter.get_current ();
00131               if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0)
00132                 this->handler_type_ = ECG_HANDLER_BASIC;
00133               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("complex")) == 0)
00134                 this->handler_type_ = ECG_HANDLER_COMPLEX;
00135               else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("udp")) == 0)
00136                 this->handler_type_ = ECG_HANDLER_UDP;
00137               else
00138                 {
00139                   ACE_ERROR ((LM_ERROR,
00140                               ACE_TEXT ("Unsupported <-ECGHandler> ")
00141                               ACE_TEXT ("option value: <%s>. Ignoring this ")
00142                               ACE_TEXT ("option - using defaults instead.\n"),
00143                               opt));
00144                   result = -1;
00145                 }
00146               arg_shifter.consume_arg ();
00147             }
00148         }
00149 
00150       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGTTL")) == 0)
00151         {
00152           arg_shifter.consume_arg ();
00153 
00154           if (arg_shifter.is_parameter_next ())
00155             {
00156               const ACE_TCHAR* opt = arg_shifter.get_current ();
00157               unsigned long tmp = ACE_OS::strtoul (opt, 0, 0) & 0xff;
00158               this->ttl_value_ = static_cast<u_char> (tmp);
00159               arg_shifter.consume_arg ();
00160             }
00161         }
00162 
00163       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNIC")) == 0)
00164         {
00165           arg_shifter.consume_arg ();
00166 
00167           if (arg_shifter.is_parameter_next ())
00168             {
00169               this->nic_.set (arg_shifter.get_current ());
00170               arg_shifter.consume_arg ();
00171             }
00172         }
00173 
00174       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGIPMULTICASTLOOP")) == 0)
00175         {
00176           arg_shifter.consume_arg ();
00177 
00178           if (arg_shifter.is_parameter_next ())
00179             {
00180               this->ip_multicast_loop_ =
00181                 (ACE_OS::atoi(arg_shifter.get_current()) != 0);
00182               arg_shifter.consume_arg ();
00183             }
00184         }
00185 
00186       else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNONBLOCKING")) == 0)
00187       {
00188           arg_shifter.consume_arg ();
00189 
00190           if (arg_shifter.is_parameter_next ())
00191             {
00192               this->non_blocking_ =
00193                 (ACE_OS::atoi(arg_shifter.get_current()) != 0);
00194               arg_shifter.consume_arg ();
00195             }
00196         }
00197 
00198       else
00199         {
00200           arg_shifter.ignore_arg ();
00201           ACE_DEBUG ((LM_WARNING,
00202                       ACE_TEXT ("Ignoring <%s> option ")
00203                       ACE_TEXT ("during initialization.\n"),
00204                       arg));
00205           result = -1;
00206         }
00207     }
00208 
00209   if (this->validate_configuration () == -1)
00210     return -1;
00211   else
00212     return result;
00213 }

PortableServer::ServantBase * TAO_ECG_Mcast_Gateway::init_address_server void   )  [private]
 

Definition at line 359 of file ECG_Mcast_Gateway.cpp.

References TAO_EC_Servant_Var< T >::_retn(), ACE_ERROR, address_server_arg_, address_server_type_, TAO_ECG_Complex_Address_Server::create(), TAO_ECG_Simple_Address_Server::create(), ECG_ADDRESS_SERVER_BASIC, ECG_ADDRESS_SERVER_SOURCE, ECG_ADDRESS_SERVER_TYPE, TAO_EC_Servant_Var< T >::in(), and LM_ERROR.

Referenced by run().

00360 {
00361   const char * address_server_arg =
00362     (this->address_server_arg_.length ())
00363     ? this->address_server_arg_.c_str () : 0;
00364 
00365   if (this->address_server_type_ == ECG_ADDRESS_SERVER_BASIC)
00366     {
00367       TAO_EC_Servant_Var<TAO_ECG_Simple_Address_Server> impl =
00368         TAO_ECG_Simple_Address_Server::create ();
00369       if (!impl.in ())
00370         return 0;
00371 
00372       if (impl->init (address_server_arg) == -1)
00373         {
00374           return 0;
00375         }
00376       return impl._retn ();
00377     }
00378 
00379   else if (this->address_server_type_ == ECG_ADDRESS_SERVER_SOURCE)
00380     {
00381       TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl =
00382         TAO_ECG_Complex_Address_Server::create (1);
00383       if (!impl.in ())
00384         return 0;
00385 
00386       if (impl->init (address_server_arg) == -1)
00387         {
00388           return 0;
00389         }
00390       return impl._retn ();
00391     }
00392 
00393   else if (this->address_server_type_ == ECG_ADDRESS_SERVER_TYPE)
00394     {
00395       TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl =
00396         TAO_ECG_Complex_Address_Server::create (0);
00397       if (!impl.in ())
00398         return 0;
00399 
00400       if (impl->init (address_server_arg) == -1)
00401         {
00402           return 0;
00403         }
00404       return impl._retn ();
00405     }
00406 
00407   else
00408     {
00409       ACE_ERROR ((LM_ERROR,
00410                   "Cannot create address server: "
00411                   "unknown address server type specified.\n"));
00412       return 0;
00413     }
00414 }

TAO_ECG_Refcounted_Endpoint TAO_ECG_Mcast_Gateway::init_endpoint void   )  [private]
 

Definition at line 288 of file ECG_Mcast_Gateway.cpp.

References ACE_ERROR, ACE_NEW_NORETURN, ACE_NONBLOCK, ACE_IPC_SAP::enable(), IP_MULTICAST_LOOP, IP_MULTICAST_TTL, LM_ERROR, nic_, non_blocking_, ACE_SOCK_Dgram::open(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset(), ACE_SOCK_Dgram::set_nic(), TAO_ECG_Refcounted_Endpoint, and ttl_value_.

Referenced by run().

00289 {
00290   TAO_ECG_UDP_Out_Endpoint* endpoint = 0;
00291   TAO_ECG_Refcounted_Endpoint refendpoint;
00292 
00293   // Try to allocate a new endpoint from the heap
00294   ACE_NEW_NORETURN (endpoint,
00295                     TAO_ECG_UDP_Out_Endpoint);
00296 
00297   if (endpoint != 0)
00298   {
00299     refendpoint.reset (endpoint);
00300   }
00301   else
00302   {
00303     return TAO_ECG_Refcounted_Endpoint ();
00304   }
00305 
00306   ACE_SOCK_Dgram& dgram = refendpoint->dgram ();
00307 
00308   if (dgram.open (ACE_Addr::sap_any) == -1)
00309     {
00310       ACE_ERROR ((LM_ERROR,
00311                              "Cannot open dgram "
00312                              "for sending mcast messages.\n"));
00313       return TAO_ECG_Refcounted_Endpoint ();
00314     }
00315 
00316   if (this->nic_.length () != 0)
00317     {
00318       dgram.set_nic (this->nic_.c_str ());
00319     }
00320 
00321   if (this->ttl_value_ > 0)
00322     {
00323       if (dgram.ACE_SOCK::set_option (IPPROTO_IP,
00324                                       IP_MULTICAST_TTL,
00325                                       &this->ttl_value_,
00326                                       sizeof (this->ttl_value_))
00327           == -1)
00328         {
00329           ACE_ERROR ((LM_ERROR,
00330                       "Error setting TTL option on dgram "
00331                       "for sending mcast messages.\n"));
00332           return TAO_ECG_Refcounted_Endpoint ();
00333         }
00334     }
00335 
00336   if (dgram.ACE_SOCK::set_option (IPPROTO_IP,
00337                                   IP_MULTICAST_LOOP,
00338                                   &this->ip_multicast_loop_,
00339                                   sizeof (this->ip_multicast_loop_)) == -1)
00340     {
00341       ACE_ERROR ((LM_ERROR,
00342                   "Error setting MULTICAST_LOOP option "
00343                   "on dgram for sending mcast messages.\n"));
00344       return TAO_ECG_Refcounted_Endpoint ();
00345     }
00346 
00347   if (this->non_blocking_
00348       && dgram.enable(ACE_NONBLOCK) == -1)
00349     {
00350       ACE_ERROR ((LM_ERROR,
00351                   "Error setting NON BLOCKING option.\n"));
00352       return TAO_ECG_Refcounted_Endpoint ();
00353     }
00354 
00355   return refendpoint;
00356 }

TAO_ECG_Refcounted_Handler TAO_ECG_Mcast_Gateway::init_handler TAO_ECG_Dgram_Handler recv,
RtecEventChannelAdmin::EventChannel_ptr  ec,
ACE_Reactor reactor
[private]
 

Definition at line 417 of file ECG_Mcast_Gateway.cpp.

References ACE_CHECK_RETURN, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_NEW_RETURN, address_server_arg_, ECG_HANDLER_BASIC, ECG_HANDLER_COMPLEX, ECG_HANDLER_UDP, handler_type_, LM_ERROR, nic_, TAO_ECG_UDP_EH::open(), TAO_ECG_Mcast_EH::open(), TAO_ECG_Simple_Mcast_EH::open(), ACE_Event_Handler::reactor(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::reset(), ACE_INET_Addr::set(), and TAO_ECG_Refcounted_Handler.

00421 {
00422   TAO_ECG_Refcounted_Handler handler;
00423 
00424   const char * nic =
00425     (this->nic_.length ()) ? this->nic_.c_str () : 0;
00426   const char * address_server_arg =
00427     (this->address_server_arg_.length ())
00428     ? this->address_server_arg_.c_str () : 0;
00429 
00430   if (this->handler_type_ == ECG_HANDLER_BASIC)
00431     {
00432       TAO_ECG_Simple_Mcast_EH * h = 0;
00433       ACE_NEW_RETURN (h,
00434                       TAO_ECG_Simple_Mcast_EH (receiver),
00435                       handler);
00436       handler.reset (h);
00437 
00438       h->reactor (reactor);
00439       if (h->open (address_server_arg, nic) != 0)
00440         return TAO_ECG_Refcounted_Handler ();
00441     }
00442 
00443   else if (this->handler_type_ == ECG_HANDLER_COMPLEX)
00444     {
00445       TAO_ECG_Mcast_EH * h = 0;
00446       ACE_NEW_RETURN (h,
00447                       TAO_ECG_Mcast_EH (receiver, nic),
00448                       handler);
00449       handler.reset (h);
00450 
00451       h->reactor (reactor);
00452 
00453       h->open (ec ACE_ENV_ARG_PARAMETER);
00454       ACE_CHECK_RETURN (TAO_ECG_Refcounted_Handler ());
00455     }
00456 
00457   else if (this->handler_type_ == ECG_HANDLER_UDP)
00458     {
00459       TAO_ECG_UDP_EH * h = 0;
00460       ACE_NEW_RETURN (h,
00461                       TAO_ECG_UDP_EH (receiver),
00462                       handler);
00463       handler.reset (h);
00464       h->reactor (reactor);
00465 
00466       ACE_INET_Addr ipaddr;
00467       if (ipaddr.set (address_server_arg) != 0)
00468         {
00469           ACE_ERROR ((LM_ERROR,
00470                       "ERROR using address server argument "
00471                       "in ACE_INET_Addr.set ().\n"));
00472           return TAO_ECG_Refcounted_Handler ();
00473         }
00474       if (h->open (ipaddr) != 0)
00475         return TAO_ECG_Refcounted_Handler ();
00476     }
00477 
00478   else
00479     {
00480       ACE_ERROR ((LM_ERROR,
00481                   "Cannot create handler: unknown "
00482                   "handler type specified.\n"));
00483       return handler;
00484     }
00485 
00486   return handler;
00487 }

TAO_EC_Servant_Var< TAO_ECG_UDP_Receiver > TAO_ECG_Mcast_Gateway::init_receiver RtecEventChannelAdmin::EventChannel_ptr  ec,
RtecUDPAdmin::AddrServer_ptr  address_server,
TAO_ECG_Refcounted_Endpoint  endpoint_rptr
[private]
 

Definition at line 538 of file ECG_Mcast_Gateway.cpp.

References ACE_CHECK_RETURN, ACE_ENV_ARG_PARAMETER, ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, TAO_EC_Auto_Command< T >::disallow_command(), ACE_SupplierQOS_Factory::get_SupplierQOS(), TAO_EC_Servant_Var< T >::in(), ACE_SupplierQOS_Factory::insert(), RtecEventChannelAdmin::SupplierQOS::is_gateway, TAO_EC_Auto_Command< T >::set_command(), TAO_ECG_Refcounted_Endpoint, and UDP_Receiver_Shutdown.

Referenced by run().

00543 {
00544   TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver>
00545     receiver (TAO_ECG_UDP_Receiver::create ());
00546   if (!receiver.in ())
00547     return receiver;
00548 
00549   receiver->init (ec,
00550                   endpoint_rptr,
00551                   address_server
00552                   ACE_ENV_ARG_PARAMETER);
00553   ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> ());
00554 
00555   TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown;
00556   receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver));
00557 
00558   ACE_SupplierQOS_Factory supplier_qos_factory;
00559   supplier_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY,
00560                                ACE_ES_EVENT_ANY,
00561                                0, 1);
00562   RtecEventChannelAdmin::SupplierQOS & qos =
00563     const_cast<RtecEventChannelAdmin::SupplierQOS &> (supplier_qos_factory.get_SupplierQOS ());
00564   qos.is_gateway = 1;
00565 
00566   receiver->connect (qos ACE_ENV_ARG_PARAMETER);
00567   ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> ());
00568 
00569   receiver_shutdown.disallow_command ();
00570   return receiver;
00571 }

TAO_EC_Servant_Var< TAO_ECG_UDP_Sender > TAO_ECG_Mcast_Gateway::init_sender RtecEventChannelAdmin::EventChannel_ptr  ec,
RtecUDPAdmin::AddrServer_ptr  address_server,
TAO_ECG_Refcounted_Endpoint  endpoint_rptr
[private]
 

Definition at line 490 of file ECG_Mcast_Gateway.cpp.

References ACE_CHECK_RETURN, ACE_ENV_ARG_PARAMETER, ACE_ES_EVENT_ANY, ACE_ES_EVENT_SOURCE_ANY, consumer_qos_, RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_EC_Auto_Command< T >::disallow_command(), ACE_ConsumerQOS_Factory::get_ConsumerQOS(), TAO_EC_Servant_Var< T >::in(), ACE_ConsumerQOS_Factory::insert(), RtecEventChannelAdmin::ConsumerQOS::is_gateway, TAO_EC_Auto_Command< T >::set_command(), ACE_ConsumerQOS_Factory::start_disjunction_group(), TAO_ECG_Refcounted_Endpoint, and UDP_Sender_Shutdown.

Referenced by run().

00495 {
00496   TAO_EC_Servant_Var<TAO_ECG_UDP_Sender>
00497     sender (TAO_ECG_UDP_Sender::create ());
00498   if (!sender.in ())
00499     return sender;
00500 
00501   sender->init (ec,
00502                 address_server,
00503                 endpoint_rptr
00504                 ACE_ENV_ARG_PARAMETER);
00505   ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ());
00506 
00507   TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown;
00508   sender_shutdown.set_command (UDP_Sender_Shutdown (sender));
00509 
00510   if (this->consumer_qos_.dependencies.length () > 0)
00511     {
00512       // Client supplied consumer qos.  Use it.
00513       this->consumer_qos_.is_gateway = 1;
00514       sender->connect (this->consumer_qos_ ACE_ENV_ARG_PARAMETER);
00515       ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ());
00516     }
00517   else
00518     {
00519       // Client did not specify anything - subscribe to all events.
00520       ACE_ConsumerQOS_Factory consumer_qos_factory;
00521       consumer_qos_factory.start_disjunction_group (1);
00522       consumer_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY,
00523                                    ACE_ES_EVENT_ANY,
00524                                    0);
00525       RtecEventChannelAdmin::ConsumerQOS & qos =
00526         const_cast<RtecEventChannelAdmin::ConsumerQOS &> (consumer_qos_factory.get_ConsumerQOS ());
00527       qos.is_gateway = 1;
00528 
00529       sender->connect (qos ACE_ENV_ARG_PARAMETER);
00530       ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ());
00531     }
00532 
00533   sender_shutdown.disallow_command ();
00534   return sender;
00535 }

int TAO_ECG_Mcast_Gateway::init_svcs void   )  [static]
 

Helper function to register the Gateway into the service configurator.

Definition at line 36 of file ECG_Mcast_Gateway.cpp.

References ACE_Service_Config::static_svcs().

00037 {
00038   return ACE_Service_Config::static_svcs ()->
00039     insert (&ace_svc_desc_TAO_ECG_Mcast_Gateway);
00040 }

void TAO_ECG_Mcast_Gateway::run CORBA::ORB_ptr  orb,
RtecEventChannelAdmin::EventChannel_ptr  ec
 

The main method - create, configure and run federation components according to the specified configuration.

Definition at line 595 of file ECG_Mcast_Gateway.cpp.

References ACE_CHECK, ACE_DEBUG, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_THROW, activate(), TAO_EC_Auto_Command< T >::disallow_command(), TAO_EC_Object_Deactivator::disallow_deactivation(), ECG_MCAST_RECEIVER, ECG_MCAST_SENDER, ECG_MCAST_TWO_WAY, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), TAO_EC_Servant_Var< T >::in(), init_address_server(), init_endpoint(), init_receiver(), init_sender(), LM_ERROR, TAO_EC_Auto_Command< T >::set_command(), TAO_ECG_Refcounted_Endpoint, TAO_ECG_Refcounted_Handler, UDP_Receiver_Shutdown, UDP_Sender_Shutdown, and verify_args().

00598 {
00599   // Verify args.
00600   this->verify_args (orb, ec ACE_ENV_ARG_PARAMETER);
00601   ACE_CHECK;
00602 
00603   // Auto-cleanup objects.
00604   TAO_EC_Object_Deactivator address_server_deactivator;
00605   TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown;
00606   TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown;
00607 
00608   // Set up address server.
00609   PortableServer::ServantBase_var address_server_servant =
00610     this->init_address_server ();
00611   if (!address_server_servant.in ())
00612     {
00613       ACE_DEBUG ((LM_ERROR,
00614                   "Unable to create address server.\n"));
00615       ACE_THROW (CORBA::INTERNAL ());
00616     }
00617 
00618   RtecUDPAdmin::AddrServer_var address_server;
00619 
00620   PortableServer::POA_var poa =
00621     address_server_servant->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00622   ACE_CHECK;
00623 
00624   activate (address_server,
00625             poa.in (),
00626             address_server_servant.in (),
00627             address_server_deactivator
00628             ACE_ENV_ARG_PARAMETER);
00629   ACE_CHECK;
00630 
00631   TAO_ECG_Refcounted_Endpoint endpoint_rptr;
00632   TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender;
00633 
00634   // Set up event sender.
00635   if (this->service_type_ == ECG_MCAST_SENDER
00636       || this->service_type_ == ECG_MCAST_TWO_WAY)
00637     {
00638       endpoint_rptr = this->init_endpoint ();
00639       if (endpoint_rptr.get () == 0)
00640         {
00641           ACE_THROW (CORBA::INTERNAL ());
00642         }
00643 
00644       sender = this->init_sender (ec,
00645                                   address_server.in (),
00646                                   endpoint_rptr
00647                                   ACE_ENV_ARG_PARAMETER);
00648       ACE_CHECK;
00649       if (!sender.in ())
00650         {
00651           ACE_THROW (CORBA::INTERNAL ());
00652         }
00653 
00654       sender_shutdown.set_command (UDP_Sender_Shutdown (sender));
00655     }
00656 
00657   // Set up event receiver.
00658   TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver;
00659   if (this->service_type_ == ECG_MCAST_RECEIVER
00660       || this->service_type_ == ECG_MCAST_TWO_WAY)
00661     {
00662       ACE_Reactor *reactor = orb->orb_core ()->reactor ();
00663 
00664       receiver = this->init_receiver (ec,
00665                                       address_server.in (),
00666                                       endpoint_rptr
00667                                       ACE_ENV_ARG_PARAMETER);
00668       ACE_CHECK;
00669       if (!receiver.in ())
00670         {
00671           ACE_THROW (CORBA::INTERNAL ());
00672         }
00673 
00674       receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver));
00675 
00676       TAO_ECG_Refcounted_Handler
00677         handler_rptr (this->init_handler (receiver.in (),
00678                                           ec,
00679                                           reactor
00680                                           ACE_ENV_ARG_PARAMETER));
00681       ACE_CHECK;
00682       if (handler_rptr.get () == 0)
00683         {
00684           ACE_THROW (CORBA::INTERNAL ());
00685         }
00686       receiver->set_handler_shutdown (handler_rptr);
00687     }
00688 
00689   // Everything went ok - disable auto-cleanup.
00690   address_server_deactivator.disallow_deactivation ();
00691   receiver_shutdown.disallow_command ();
00692   sender_shutdown.disallow_command ();
00693 }

int TAO_ECG_Mcast_Gateway::validate_configuration void   )  [private]
 

Verifies configuration values specified through init() make sense.

Definition at line 244 of file ECG_Mcast_Gateway.cpp.

References ACE_DEBUG, address_server_arg_, address_server_type_, ECG_ADDRESS_SERVER_BASIC, ECG_HANDLER_BASIC, ECG_HANDLER_UDP, ECG_MCAST_SENDER, handler_type_, ip_multicast_loop_, LM_ERROR, and non_blocking_.

Referenced by init().

00245 {
00246   if ((this->handler_type_ == ECG_HANDLER_BASIC
00247        || this->handler_type_ == ECG_HANDLER_UDP)
00248       && this->service_type_ != ECG_MCAST_SENDER
00249       && this->address_server_type_ != ECG_ADDRESS_SERVER_BASIC)
00250     {
00251       ACE_DEBUG ((LM_ERROR,
00252                       "Configurations for mcast handler and "
00253                       "address server do not match.\n"));
00254       return -1;
00255     }
00256 
00257   // Currently all Address Server implementations require an
00258   // initialization string.  If we ever add a new Address Server
00259   // implementation, which does not, we'll have to remove this check.
00260   if (this->address_server_arg_.length () == 0)
00261     {
00262       ACE_DEBUG ((LM_ERROR,
00263                       "Address server initializaton "
00264                       "argument not specified.\n"));
00265       return -1;
00266     }
00267 
00268   if (this->ip_multicast_loop_ != 0
00269       && this->ip_multicast_loop_ != 1)
00270     {
00271       ACE_DEBUG ((LM_ERROR,
00272                   "IP MULTICAST LOOP option must have a boolean value.\n"));
00273       return -1;
00274     }
00275 
00276   if (this->non_blocking_ != 0
00277       && this->non_blocking_ != 1)
00278     {
00279       ACE_DEBUG ((LM_ERROR,
00280                   "NON BLOCKING flag must have a boolean value.\n"));
00281       return -1;
00282     }
00283 
00284   return 0;
00285 }

void TAO_ECG_Mcast_Gateway::verify_args CORBA::ORB_ptr  orb,
RtecEventChannelAdmin::EventChannel_ptr  ec
[private]
 

Check that arguments to run() are not nil.

Definition at line 574 of file ECG_Mcast_Gateway.cpp.

References ACE_ERROR, ACE_THROW, CORBA::is_nil(), and LM_ERROR.

Referenced by run().

00577 {
00578   if (CORBA::is_nil (ec))
00579     {
00580       ACE_ERROR ((LM_ERROR,
00581                   "Nil event channel argument passed to "
00582                   "TAO_ECG_Mcast_Gateway::run().\n"));
00583       ACE_THROW (CORBA::INTERNAL ());
00584     }
00585   if (CORBA::is_nil (orb))
00586     {
00587       ACE_ERROR ((LM_ERROR,
00588                   "Nil orb argument passed to "
00589                   "TAO_ECG_Mcast_Gateway::run().\n"));
00590       ACE_THROW (CORBA::INTERNAL ());
00591     }
00592 }


Member Data Documentation

ACE_CString TAO_ECG_Mcast_Gateway::address_server_arg_ [private]
 

Definition at line 270 of file ECG_Mcast_Gateway.h.

Referenced by init(), init_address_server(), init_handler(), and validate_configuration().

Address_Server_Type TAO_ECG_Mcast_Gateway::address_server_type_ [private]
 

Definition at line 269 of file ECG_Mcast_Gateway.h.

Referenced by init(), init_address_server(), and validate_configuration().

RtecEventChannelAdmin::ConsumerQOS TAO_ECG_Mcast_Gateway::consumer_qos_ [private]
 

Definition at line 276 of file ECG_Mcast_Gateway.h.

Referenced by init(), init_sender(), and TAO_ECG_Mcast_Gateway().

Handler_Type TAO_ECG_Mcast_Gateway::handler_type_ [private]
 

Definition at line 268 of file ECG_Mcast_Gateway.h.

Referenced by init(), init_handler(), and validate_configuration().

int TAO_ECG_Mcast_Gateway::ip_multicast_loop_ [private]
 

Definition at line 273 of file ECG_Mcast_Gateway.h.

Referenced by init(), and validate_configuration().

ACE_CString TAO_ECG_Mcast_Gateway::nic_ [private]
 

Definition at line 272 of file ECG_Mcast_Gateway.h.

Referenced by init(), init_endpoint(), and init_handler().

int TAO_ECG_Mcast_Gateway::non_blocking_ [private]
 

Definition at line 274 of file ECG_Mcast_Gateway.h.

Referenced by init(), init_endpoint(), and validate_configuration().

Service_Type TAO_ECG_Mcast_Gateway::service_type_ [private]
 

Definition at line 267 of file ECG_Mcast_Gateway.h.

u_char TAO_ECG_Mcast_Gateway::ttl_value_ [private]
 

Definition at line 271 of file ECG_Mcast_Gateway.h.

Referenced by init(), and init_endpoint().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:16:13 2006 for TAO_RTEvent by doxygen 1.3.6