UDP.cpp

Go to the documentation of this file.
00001 // UDP.cpp,v 5.35 2006/03/14 06:14:24 jtc Exp
00002 
00003 #include "orbsvcs/AV/UDP.h"
00004 #include "orbsvcs/AV/AVStreams_i.h"
00005 #include "orbsvcs/AV/MCast.h"
00006 
00007 #include "tao/debug.h"
00008 #include "ace/OS_NS_strings.h"
00009 
00010 #if !defined (__ACE_INLINE__)
00011 #include "orbsvcs/AV/UDP.i"
00012 #endif /* __ACE_INLINE__ */
00013 
00014 ACE_RCSID (AV,
00015            UDP,
00016            "UDP.cpp,v 5.35 2006/03/14 06:14:24 jtc Exp")
00017 
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 //------------------------------------------------------------
00021 // TAO_AV_UDP_Flow_Handler
00022 //------------------------------------------------------------
00023 
00024 TAO_AV_UDP_Flow_Handler::TAO_AV_UDP_Flow_Handler (void)
00025 {
00026   ACE_NEW (this->transport_,
00027            TAO_AV_UDP_Transport (this));
00028 }
00029 
00030 TAO_AV_UDP_Flow_Handler::~TAO_AV_UDP_Flow_Handler (void)
00031 {
00032   // remove the event handler from the reactor.
00033   TAO_AV_CORE::instance()->reactor ()->remove_handler (this->event_handler(),
00034                                               ACE_Event_Handler::READ_MASK);
00035 
00036   // close the socket.
00037   this->close ();
00038   delete this->transport_;
00039 }
00040 
00041 TAO_AV_Transport *
00042 TAO_AV_UDP_Flow_Handler::transport (void)
00043 {
00044   return this->transport_;
00045 }
00046 
00047 int
00048 TAO_AV_UDP_Flow_Handler::handle_input (ACE_HANDLE /*fd*/)
00049 {
00050   return this->protocol_object_->handle_input ();
00051 }
00052 
00053 int
00054 TAO_AV_UDP_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
00055                                          const void *arg)
00056 {
00057   return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
00058 }
00059 
00060 int
00061 TAO_AV_UDP_Flow_Handler::set_remote_address (ACE_Addr *address)
00062 {
00063   if (TAO_debug_level > 0)
00064     ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Flow_Handler::set_remote_address\n"));
00065 
00066   ACE_INET_Addr *inet_addr = dynamic_cast<ACE_INET_Addr*> (address);
00067   this->peer_addr_ = *inet_addr;
00068   TAO_AV_UDP_Transport *transport = dynamic_cast<TAO_AV_UDP_Transport*> (this->transport_);
00069 
00070   return transport->set_remote_address (*inet_addr);
00071 }
00072 
00073 
00074 ACE_HANDLE
00075 TAO_AV_UDP_Flow_Handler::get_handle (void) const
00076 {
00077   if (TAO_debug_level > 0)
00078     ACE_DEBUG ((LM_DEBUG,
00079                 "TAO_AV_UDP_Flow_Handler::get_handle:%d\n",
00080                 this->sock_dgram_.get_handle ()));
00081 
00082   return this->sock_dgram_.get_handle () ;
00083 }
00084 
00085 int
00086 TAO_AV_UDP_Flow_Handler::change_qos(AVStreams::QoS qos)
00087 {
00088   if( TAO_debug_level > 0 )
00089     {
00090       ACE_DEBUG ((LM_DEBUG,
00091                   "(%N,%l) TAO_AV_UDP_Flow_Handler::change_qos\n"));
00092     }
00093 
00094   unsigned int i=0;
00095 
00096   int ret = 0;
00097   CORBA::Long dscp = 0;
00098   CORBA::Long ecn = 0;
00099   int dscp_flag=0;
00100   for(i=0; i < qos.QoSParams.length(); i++)
00101     {
00102 
00103       if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "Diffserv_Codepoint") == 0)
00104         {
00105           qos.QoSParams[i].property_value >>= dscp;
00106           dscp_flag=1;
00107           // DSCP value can only be 6 bits wide
00108           if(!((dscp >= 0) && (dscp <= 63)))
00109             {
00110               dscp_flag = 0;
00111               ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
00112               return -1;
00113             }
00114         }
00115 
00116       if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "ECN") == 0)
00117         {
00118           qos.QoSParams[i].property_value >>= ecn;
00119           // ECN value can only occupy bits 6 and 7 of the
00120           // IP Diffserv byte
00121           if(!((ecn >= 0) && (ecn <= 3)))
00122             {
00123               ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
00124               ecn = 0;
00125             }
00126 
00127         }
00128     }
00129       // Set the Diffserv byte only if we specified
00130       // the Diffserv Codepoint (DSCP) or ECN via QoSParams
00131       // passed into this method
00132       if(dscp_flag || ecn)
00133         {
00134           int tos;
00135           tos = (int)(dscp << 2);
00136           if(ecn)
00137             {
00138               tos |= ecn;
00139             }
00140           ret = sock_dgram_.set_option(IPPROTO_IP, IP_TOS, (int *)&tos , (int)sizeof(tos));
00141 
00142           if(TAO_debug_level > 1)
00143             {
00144               ACE_DEBUG((LM_DEBUG, "(%N,%l) set tos: ret: %d\n", ret));
00145             }
00146         }
00147 
00148       if(TAO_debug_level > 1)
00149         {
00150            if(ret < 0 )
00151              {
00152                 ACE_DEBUG((LM_DEBUG, "(%N,%l) errno: %p\n"));
00153              }
00154         }
00155       return ret;
00156 }
00157 
00158 //------------------------------------------------------------
00159 // TAO_AV_UDP_Transport
00160 //------------------------------------------------------------
00161 
00162 TAO_AV_UDP_Transport::TAO_AV_UDP_Transport (void)
00163   :handler_ (0)
00164 {
00165 }
00166 
00167 TAO_AV_UDP_Transport::TAO_AV_UDP_Transport (TAO_AV_UDP_Flow_Handler *handler)
00168   :handler_ (handler),
00169    addr_ (0)
00170 {
00171 }
00172 
00173 TAO_AV_UDP_Transport::~TAO_AV_UDP_Transport (void)
00174 {
00175 }
00176 
00177 int
00178 TAO_AV_UDP_Transport::set_remote_address (const ACE_INET_Addr &address)
00179 {
00180   this->peer_addr_ = address;
00181   return 0;
00182 }
00183 
00184 int
00185 TAO_AV_UDP_Transport::open (ACE_Addr * /*address*/)
00186 {
00187   return 0;
00188 }
00189 
00190 int
00191 TAO_AV_UDP_Transport::close (void)
00192 {
00193   return 0;
00194 }
00195 
00196 int
00197 TAO_AV_UDP_Transport::mtu (void)
00198 {
00199   return 65535;
00200 }
00201 
00202 ACE_Addr*
00203 TAO_AV_UDP_Transport::get_peer_addr (void)
00204 {
00205   return &this->peer_addr_;
00206 }
00207 
00208 ssize_t
00209 TAO_AV_UDP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *)
00210 {
00211   // For the most part this was copied from GIOP::send_request and
00212   // friends.
00213 
00214   iovec iov[ACE_IOV_MAX];
00215   int iovcnt = 0;
00216   ssize_t n = 0;
00217   ssize_t nbytes = 0;
00218 
00219   for (const ACE_Message_Block *i = mblk;
00220        i != 0;
00221        i = i->cont ())
00222     {
00223       // Make sure there is something to send!
00224       if (i->length () > 0)
00225         {
00226           iov[iovcnt].iov_base = i->rd_ptr ();
00227           iov[iovcnt].iov_len  = static_cast<u_long> (i->length ());
00228           iovcnt++;
00229 
00230           // The buffer is full make a OS call.  @@ TODO this should
00231           // be optimized on a per-platform basis, for instance, some
00232           // platforms do not implement writev() there we should copy
00233           // the data into a buffer and call send_n(). In other cases
00234           // there may be some limits on the size of the iovec, there
00235           // we should set ACE_IOV_MAX to that limit.
00236           if (iovcnt == ACE_IOV_MAX)
00237             {
00238                n = this->handler_->get_socket ()->send ((const iovec *) iov,
00239                                                         iovcnt,
00240                                                         this->peer_addr_);
00241 
00242               if (n < 1)
00243                 return n;
00244 
00245               nbytes += n;
00246               iovcnt = 0;
00247             }
00248         }
00249     }
00250 
00251   // Check for remaining buffers to be sent!
00252   if (iovcnt != 0)
00253     {
00254       n = this->handler_->get_socket ()->send ((const iovec *) iov,
00255                                                iovcnt,
00256                                                this->peer_addr_);
00257 
00258       if (n < 1)
00259         return n;
00260 
00261       nbytes += n;
00262     }
00263 
00264   return nbytes;
00265 }
00266 
00267 ssize_t
00268 TAO_AV_UDP_Transport::send (const char *buf,
00269                             size_t len,
00270                             ACE_Time_Value *)
00271 {
00272   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Transport::send "));
00273   char addr [BUFSIZ];
00274   this->peer_addr_.addr_to_string (addr,BUFSIZ);
00275   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"to %s\n",addr));
00276 
00277   return this->handler_->get_socket ()->send (buf, len,this->peer_addr_);
00278 }
00279 
00280 ssize_t
00281 TAO_AV_UDP_Transport::send (const iovec *iov,
00282                           int iovcnt,
00283                           ACE_Time_Value *)
00284 {
00285   return this->handler_->get_socket ()->send ((const iovec *) iov,
00286                                               iovcnt,
00287                                               this->peer_addr_);
00288 
00289 }
00290 
00291 ssize_t
00292 TAO_AV_UDP_Transport::recv (char *buf,
00293                             size_t len,
00294                             ACE_Time_Value *)
00295 {
00296   return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_);
00297 }
00298 
00299 ssize_t
00300 TAO_AV_UDP_Transport::recv (char *buf,
00301                             size_t len,
00302                             int flags,
00303                             ACE_Time_Value *timeout)
00304 {
00305   return this->handler_->get_socket ()->recv (buf,
00306                                               len,
00307                                               this->peer_addr_,
00308                                               flags,
00309                                               timeout);
00310 }
00311 
00312 ssize_t
00313 TAO_AV_UDP_Transport::recv (iovec *iov,
00314                             int /*iovcnt*/,
00315                             ACE_Time_Value *timeout)
00316 {
00317   return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout);
00318 }
00319 
00320 
00321 //------------------------------------------------------------
00322 // TAO_AV_UDP_Acceptor
00323 //------------------------------------------------------------
00324 
00325 TAO_AV_UDP_Acceptor::TAO_AV_UDP_Acceptor (void)
00326   : address_ (0),
00327     control_inet_address_ (0)
00328 {
00329 }
00330 
00331 TAO_AV_UDP_Acceptor::~TAO_AV_UDP_Acceptor (void)
00332 {
00333   if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00334     delete this->entry_->control_handler ();
00335 
00336   delete this->address_;
00337   delete this->control_inet_address_;
00338 }
00339 
00340 int
00341 TAO_AV_UDP_Acceptor::activate_svc_handler (TAO_AV_Flow_Handler *handler)
00342 {
00343   ACE_Event_Handler *event_handler = handler->event_handler ();
00344   int result = this->av_core_->reactor ()->register_handler (event_handler,
00345                                                              ACE_Event_Handler::READ_MASK);
00346 
00347   if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00348     handler->schedule_timer ();
00349 
00350  return result;
00351 }
00352 
00353 int
00354 TAO_AV_UDP_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
00355                            TAO_AV_Core *av_core,
00356                            TAO_FlowSpec_Entry *entry,
00357                            TAO_AV_Flow_Protocol_Factory *factory,
00358                            TAO_AV_Core::Flow_Component flow_comp)
00359 {
00360   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Acceptor::open\n"));
00361   this->av_core_ = av_core;
00362   this->endpoint_ = endpoint;
00363   this->entry_ = entry;
00364   this->flow_component_ = flow_comp;
00365   this->flow_protocol_factory_ = factory;
00366   ACE_INET_Addr *inet_addr;
00367   if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00368     {
00369       this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00370       inet_addr = (ACE_INET_Addr *) entry->control_address ();
00371     }
00372   else
00373     {
00374       this->flowname_ = entry->flowname ();
00375       inet_addr = (ACE_INET_Addr *) entry->address ();
00376     }
00377 
00378   if (inet_addr != 0)
00379     {
00380       char buf[BUFSIZ];
00381       inet_addr->addr_to_string (buf,
00382                                  BUFSIZ);
00383 
00384       if (TAO_debug_level > 0)
00385         ACE_DEBUG ((LM_DEBUG,
00386                     "TAO_AV_UDP_Acceptor::open: %s\n",
00387                     buf));
00388     }
00389 
00390   int result = this->open_i (inet_addr, 0);
00391 
00392   if (result < 0)
00393     return result;
00394   return 0;
00395 }
00396 
00397 int
00398 TAO_AV_UDP_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
00399                                    TAO_AV_Core *av_core,
00400                                    TAO_FlowSpec_Entry *entry,
00401                                    TAO_AV_Flow_Protocol_Factory *factory,
00402                                    TAO_AV_Core::Flow_Component flow_comp)
00403 {
00404   this->av_core_ = av_core;
00405   this->endpoint_ = endpoint;
00406   this->entry_ = entry;
00407   this->flow_component_ = flow_comp;
00408   this->flow_protocol_factory_ = factory;
00409   if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00410     {
00411       this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00412     }
00413   else
00414     {
00415       this->flowname_ = entry->flowname ();
00416       ACE_NEW_RETURN (this->address_,
00417                     ACE_INET_Addr ("0"),
00418                     -1);
00419     }
00420 
00421   int result = this->open_i (this->address_, 1);
00422   if (result < 0)
00423     return result;
00424 
00425   return 0;
00426 }
00427 
00428 int
00429 TAO_AV_UDP_Acceptor::open_i (ACE_INET_Addr *inet_addr,
00430                              int is_default_addr)
00431 {
00432   int result = -1;
00433   ACE_INET_Addr *local_addr = 0;
00434 
00435   TAO_AV_Flow_Handler *flow_handler = 0;
00436 
00437   // if using a default address and this is the control flow component, the
00438   //  handler and local address are already set in the flow spec entry
00439   if (is_default_addr &&
00440       (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)  &&
00441       (ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0))
00442     {
00443       flow_handler = this->entry_->control_handler ();
00444 
00445       local_addr = dynamic_cast<ACE_INET_Addr*> (this->entry_->get_local_control_addr ());
00446     }
00447   else
00448     {
00449       // this variable is only used for RTP/UDP; RFC 1889 requires an even/odd
00450       //  consecutive port pair
00451       int get_new_port = 1;
00452 
00453       while (get_new_port)
00454         {
00455           // assume the ports will be OK
00456           get_new_port = 0;
00457 
00458           result = TAO_AV_UDP_Connection_Setup::setup (flow_handler,
00459                                                        inet_addr,
00460                                                        local_addr,
00461                                                        this->entry_->is_multicast (),
00462                                                        TAO_AV_UDP_Connection_Setup::ACCEPTOR);
00463 
00464           if (result < 0)
00465           {
00466              ACE_DEBUG((LM_DEBUG,"(%N,%l) Error during connection setup: %d\n", result));
00467           }
00468 
00469           local_addr->set (local_addr->get_port_number (),
00470                            local_addr->get_host_name ());
00471 
00472           if (is_default_addr)
00473             {
00474               if ((ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00475                   (this->flow_component_ == TAO_AV_Core::TAO_AV_DATA))
00476                 {
00477                   if (is_default_addr && local_addr->get_port_number ()%2 != 0)
00478                     {
00479                       // RTP port should be even
00480                       delete local_addr;
00481                       delete flow_handler;
00482                       get_new_port = 1;
00483                     }
00484                   else
00485                     {
00486                       ACE_INET_Addr *local_control_addr;
00487                       TAO_AV_Flow_Handler *control_flow_handler = 0;
00488 
00489                       ACE_NEW_RETURN (this->control_inet_address_,
00490                                       ACE_INET_Addr ("0"),
00491                                       -1);
00492 
00493                       TAO_AV_UDP_Connection_Setup::setup(control_flow_handler,
00494                                                          this->control_inet_address_,
00495                                                          local_control_addr,
00496                                                          this->entry_->is_multicast (),
00497                                                          TAO_AV_UDP_Connection_Setup::ACCEPTOR);
00498 
00499                       if (local_control_addr->get_port_number () !=
00500                           local_addr->get_port_number () +1)
00501                         {
00502                           delete this->control_inet_address_;
00503                           delete local_addr;
00504                           delete flow_handler;
00505                           delete local_control_addr;
00506                           delete control_flow_handler;
00507                           get_new_port = 1;
00508                         }
00509                       else
00510                         {
00511                           this->entry_->control_address (this->control_inet_address_);
00512                           this->entry_->set_local_control_addr (local_control_addr);
00513                           this->entry_->control_handler (control_flow_handler);
00514                         }
00515                     }
00516                 }
00517             }
00518         }
00519     }
00520 
00521   TAO_AV_Protocol_Object *object =
00522     this->flow_protocol_factory_->make_protocol_object (this->entry_,
00523                                                         this->endpoint_,
00524                                                         flow_handler,
00525                                                         flow_handler->transport ());
00526   flow_handler->protocol_object (object);
00527 
00528   if (this->flow_component_ == TAO_AV_Core::TAO_AV_DATA)
00529     {
00530       this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
00531 
00532       this->entry_->protocol_object (object);
00533       this->entry_->set_local_addr (local_addr);
00534       this->entry_->handler (flow_handler);
00535       //this->entry_->address (inet_addr);
00536       this->entry_->address (local_addr);
00537     }
00538   else
00539     {
00540       this->endpoint_->set_control_flow_handler (this->flowname_.c_str (),flow_handler);
00541 
00542       this->entry_->control_protocol_object (object);
00543       this->entry_->set_local_control_addr (local_addr);
00544       this->entry_->control_handler (flow_handler);
00545     }
00546 
00547   char buf[BUFSIZ];
00548   local_addr->addr_to_string (buf,BUFSIZ);
00549   if (TAO_debug_level > 0)
00550     ACE_DEBUG ((LM_DEBUG,
00551                 "TAO_AV_UDP_ACCEPTOR::open:%s \n",
00552                 buf));
00553 
00554   // call activate svc handler.
00555   return this->activate_svc_handler (flow_handler);
00556 }
00557 
00558 int
00559 TAO_AV_UDP_Acceptor::close (void)
00560 {
00561   return 0;
00562 }
00563 
00564 //------------------------------------------------------------
00565 // TAO_AV_UDP_Connector
00566 //------------------------------------------------------------
00567 TAO_AV_UDP_Connector::TAO_AV_UDP_Connector (void)
00568   : control_inet_address_ (0)
00569 {
00570 }
00571 
00572 TAO_AV_UDP_Connector::~TAO_AV_UDP_Connector (void)
00573 {
00574   if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00575     {
00576       delete this->entry_->control_handler ();
00577     }
00578 
00579   if (this->control_inet_address_ != 0)
00580     delete this->control_inet_address_;
00581 }
00582 
00583 int
00584 TAO_AV_UDP_Connector::open (TAO_Base_StreamEndPoint *endpoint,
00585                             TAO_AV_Core *av_core,
00586                             TAO_AV_Flow_Protocol_Factory *factory)
00587 
00588 {
00589   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Connector::open "));
00590   this->endpoint_ = endpoint;
00591   this->av_core_ = av_core;
00592   this->flow_protocol_factory_ = factory;
00593   return 0;
00594 }
00595 
00596 int
00597 TAO_AV_UDP_Connector::connect (TAO_FlowSpec_Entry *entry,
00598                                TAO_AV_Transport *&transport,
00599                                TAO_AV_Core::Flow_Component flow_component)
00600 {
00601   ACE_INET_Addr *local_addr = 0;
00602   ACE_INET_Addr *control_inet_addr = 0;
00603 
00604   this->entry_ = entry;
00605   this->flow_component_ = flow_component;
00606 
00607   ACE_INET_Addr *inet_addr;
00608 
00609   if (flow_component == TAO_AV_Core::TAO_AV_CONTROL)
00610     {
00611       this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname());
00612     inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00613     }
00614   else
00615     {
00616       this->flowname_ = entry->flowname ();
00617       inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->address ());
00618       control_inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00619     }
00620 
00621   TAO_AV_Flow_Handler *flow_handler = 0;
00622 
00623   // if this is the control flow component, the
00624   //  handler and local address are already set in the flow spec entry
00625   if ((flow_component == TAO_AV_Core::TAO_AV_CONTROL)  &&
00626       (ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00627       !entry->is_multicast ())
00628     {
00629       flow_handler = this->entry_->control_handler ();
00630       flow_handler->set_remote_address (inet_addr);
00631 
00632       local_addr = dynamic_cast<ACE_INET_Addr*> (this->entry_->get_local_control_addr ());
00633     }
00634   else
00635     {
00636       // this variable is only used for RTP/UDP; RFC 1889 requires an even/odd
00637       //  consecutive port pair
00638       int get_new_port = 1;
00639 
00640       while (get_new_port)
00641         {
00642           // assume the ports will be OK
00643           get_new_port = 0;
00644 
00645           ACE_Addr *addr;
00646           if ((addr = entry->get_peer_addr ()) != 0)
00647             {
00648               local_addr = dynamic_cast<ACE_INET_Addr*> (addr);
00649               char buf [BUFSIZ];
00650               local_addr->addr_to_string (buf, BUFSIZ);
00651             }
00652 
00653           TAO_AV_UDP_Connection_Setup::setup (flow_handler,
00654                                               inet_addr,
00655                                               local_addr,
00656                                               entry->is_multicast (),
00657                                               TAO_AV_UDP_Connection_Setup::CONNECTOR);
00658 
00659           if ((ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00660               (flow_component == TAO_AV_Core::TAO_AV_DATA) &&
00661               !entry->is_multicast ())
00662             {
00663               if (local_addr->get_port_number ()%2 != 0)
00664                 {
00665                   // RTP port should be even
00666                   delete local_addr;
00667                   delete flow_handler;
00668                   get_new_port = 1;
00669                 }
00670               else
00671                 {
00672                   ACE_INET_Addr *local_control_addr;
00673                   TAO_AV_Flow_Handler *control_flow_handler = 0;
00674 
00675                   if (entry->is_multicast ())
00676                     control_inet_addr =  dynamic_cast<ACE_INET_Addr*> (entry->control_address ()) ;
00677                   else
00678                     {
00679 
00680                       if (local_addr != 0)
00681                         {
00682                           char buf [BUFSIZ];
00683                           ACE_CString addr_str (local_addr->get_host_name ());
00684                           addr_str += ":";
00685                           addr_str += ACE_OS::itoa (local_addr->get_port_number () + 1, buf, 10);
00686                           ACE_NEW_RETURN (local_control_addr,
00687                                           ACE_INET_Addr (addr_str.c_str ()),
00688                                           -1);
00689                           local_control_addr->addr_to_string (buf, BUFSIZ);
00690                         }
00691 
00692 
00693                       if (entry->control_address () == 0)
00694                         ACE_NEW_RETURN (this->control_inet_address_,
00695                                         ACE_INET_Addr ("0"),
00696                                         -1);
00697                       else
00698                         control_inet_address_ = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00699                     }
00700 
00701                   TAO_AV_UDP_Connection_Setup::setup (control_flow_handler,
00702                                                       control_inet_addr,
00703                                                       local_control_addr,
00704                                                       entry->is_multicast (),
00705                                                       TAO_AV_UDP_Connection_Setup::CONNECTOR);
00706 
00707                   if (local_control_addr->get_port_number () !=
00708                       local_addr->get_port_number () +1)
00709                     {
00710                       delete local_addr;
00711                       delete flow_handler;
00712                       delete local_control_addr;
00713                       delete control_flow_handler;
00714                       get_new_port = 1;
00715                     }
00716                   else
00717                     {
00718                       this->entry_->set_local_control_addr (local_control_addr);
00719                       this->entry_->control_handler (control_flow_handler);
00720                     }
00721                 }
00722             }
00723         }
00724     }
00725 
00726   TAO_AV_Protocol_Object *object =
00727     this->flow_protocol_factory_->make_protocol_object (this->entry_,
00728                                                         this->endpoint_,
00729                                                         flow_handler,
00730                                                         flow_handler->transport ());
00731 
00732   flow_handler->protocol_object (object);
00733 
00734   if (flow_component == TAO_AV_Core::TAO_AV_DATA)
00735     {
00736       this->endpoint_->set_flow_handler (this->flowname_.c_str (),
00737                                          flow_handler);
00738       this->entry_->protocol_object (object);
00739       entry->set_local_addr (local_addr);
00740       entry->handler (flow_handler);
00741       transport = flow_handler->transport ();
00742     }
00743   else
00744     {
00745       this->endpoint_->set_control_flow_handler (this->flowname_.c_str (),
00746                                                  flow_handler);
00747       this->entry_->control_protocol_object (object);
00748       entry->set_local_control_addr (local_addr);
00749       entry->control_handler (flow_handler);
00750       transport = flow_handler->transport ();
00751     }
00752 
00753   char buf[BUFSIZ];
00754   local_addr->addr_to_string (buf,BUFSIZ);
00755 
00756   if (TAO_debug_level > 0)
00757     ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_CONNECTOR::connect:%s \n",buf));
00758 
00759   // call activate svc handler.
00760   return this->activate_svc_handler (flow_handler);
00761 }
00762 
00763 int
00764 TAO_AV_UDP_Connector::activate_svc_handler (TAO_AV_Flow_Handler *handler)
00765 {
00766   ACE_Event_Handler *event_handler = handler->event_handler ();
00767   int result = this->av_core_->reactor ()->register_handler (event_handler,
00768                                                              ACE_Event_Handler::READ_MASK);
00769 
00770   if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00771     handler->schedule_timer ();
00772 
00773   return result;
00774 }
00775 
00776 int
00777 TAO_AV_UDP_Connector::close (void)
00778 {
00779   return 0;
00780 }
00781 
00782 //------------------------------------------------------------
00783 // TAO_AV_UDP_Connection_Setup
00784 //------------------------------------------------------------
00785 
00786 int
00787 TAO_AV_UDP_Connection_Setup::setup (TAO_AV_Flow_Handler *&flow_handler,
00788                                     ACE_INET_Addr *inet_addr,
00789                                     ACE_INET_Addr *&local_addr,
00790                                     int is_multicast,
00791                                     ConnectionType ct)
00792 {
00793   int result;
00794 
00795   if (is_multicast)
00796     {
00797       TAO_AV_UDP_MCast_Flow_Handler *handler;
00798       ACE_NEW_RETURN (handler,
00799                       TAO_AV_UDP_MCast_Flow_Handler,
00800                       -1);
00801 
00802       flow_handler = handler;
00803 
00804       result = handler->get_mcast_socket ()->subscribe (*inet_addr);
00805       if (result < 0)
00806         ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_UDP_MCast_connector::open failed\n"),-1);
00807 
00808       // Now disable Multicast loopback.
00809       // @@Should we make this a policy?
00810 #if defined (ACE_HAS_IP_MULTICAST)
00811       if (handler->get_mcast_socket ()->set_option (IP_MULTICAST_LOOP,
00812                                                     0) < 0)
00813         if (TAO_debug_level > 0)
00814           ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_MCast_Acceptor::multicast loop disable failed\n"));
00815       // @@ This should also be policies.
00816 #endif /*ACE_HAS_IP_MULTICAST*/
00817 
00818       int bufsize = 80 * 1024;
00819       if (handler->get_mcast_socket ()->ACE_SOCK::set_option (SOL_SOCKET,
00820                                                               SO_RCVBUF,
00821                                                               (char *)&bufsize,
00822                                                               sizeof(bufsize)) < 0)
00823         {
00824           bufsize = 32 * 1024;
00825           if (handler->get_mcast_socket ()->ACE_SOCK::set_option (SOL_SOCKET,
00826                                                                   SO_RCVBUF,
00827                                                                   (char *)&bufsize,
00828                                                                   sizeof(bufsize)) < 0)
00829             perror("SO_RCVBUF");
00830         }
00831       ACE_NEW_RETURN (local_addr,
00832                       ACE_INET_Addr ("0"),
00833                       -1);
00834 
00835       if (ct == ACCEPTOR)
00836         {
00837           result = handler->get_mcast_socket ()->get_local_addr (*local_addr);
00838           if (result < 0)
00839             ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
00840 
00841           local_addr->set (local_addr->get_port_number (),
00842                            local_addr->get_host_name ());
00843           handler->set_peer_addr (local_addr);
00844         }
00845     }
00846   else
00847     {
00848       if (local_addr == 0)
00849         ACE_NEW_RETURN (local_addr,
00850                         ACE_INET_Addr ("0"),
00851                         -1);
00852 
00853       TAO_AV_UDP_Flow_Handler *handler;
00854       ACE_NEW_RETURN (handler,
00855                       TAO_AV_UDP_Flow_Handler,
00856                       -1);
00857 
00858       flow_handler = handler;
00859 
00860       if (ct == ACCEPTOR)
00861         result = handler->open (*inet_addr);
00862       else
00863         result = handler->open (*local_addr);
00864       if (result < 0)
00865         ACE_ERROR_RETURN ((LM_ERROR,"handler::open failed\n"),-1);
00866 
00867       // set the socket buffer sizes to 64k.
00868       int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00869       int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00870 
00871       if (handler->get_socket ()->set_option (SOL_SOCKET,
00872                                               SO_SNDBUF,
00873                                               (void *) &sndbufsize,
00874                                               sizeof (sndbufsize)) == -1
00875           && errno != ENOTSUP)
00876         return 0;
00877       else if (handler->get_socket ()->set_option (SOL_SOCKET,
00878                                                    SO_RCVBUF,
00879                                                    (void *) &rcvbufsize,
00880                                                    sizeof (rcvbufsize)) == -1
00881                && errno != ENOTSUP)
00882         return 0;
00883 
00884       if (ct == CONNECTOR)
00885         handler->set_remote_address  (inet_addr);
00886 
00887       result = handler->get_socket ()->get_local_addr (*local_addr);
00888 
00889       local_addr->set (local_addr->get_port_number (),
00890                        local_addr->get_host_name ());
00891 
00892       char buf [BUFSIZ];
00893       local_addr->addr_to_string (buf, BUFSIZ);
00894 
00895       if (result < 0)
00896         ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
00897     }
00898 
00899   return 1;
00900 }
00901 
00902 //------------------------------------------------------------
00903 // TAO_AV_UDP_Protocol_Factory
00904 //------------------------------------------------------------
00905 
00906 TAO_AV_UDP_Factory::TAO_AV_UDP_Factory (void)
00907 {
00908 }
00909 
00910 TAO_AV_UDP_Factory::~TAO_AV_UDP_Factory (void)
00911 {
00912 }
00913 
00914 int
00915 TAO_AV_UDP_Factory::match_protocol (const char *protocol_string)
00916 {
00917   if (ACE_OS::strcasecmp (protocol_string,"UDP") == 0)
00918     return 1;
00919   if (ACE_OS::strcasecmp (protocol_string,"RTP/UDP") == 0)
00920     return 1;
00921   return 0;
00922 }
00923 
00924 TAO_AV_Acceptor*
00925 TAO_AV_UDP_Factory::make_acceptor (void)
00926 {
00927   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Factory::make_acceptor\n"));
00928   TAO_AV_Acceptor *acceptor = 0;
00929   ACE_NEW_RETURN (acceptor,
00930                   TAO_AV_UDP_Acceptor,
00931                   0);
00932   return acceptor;
00933 }
00934 
00935 TAO_AV_Connector*
00936 TAO_AV_UDP_Factory::make_connector (void)
00937 {
00938   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Factory::make_connector\n"));
00939   TAO_AV_Connector *connector = 0;
00940   ACE_NEW_RETURN (connector,
00941                   TAO_AV_UDP_Connector,
00942                   0);
00943   return connector;
00944 }
00945 
00946 int
00947 TAO_AV_UDP_Factory::init (int /* argc */,
00948                           char * /* argv */ [])
00949 {
00950   return 0;
00951 }
00952 
00953 //------------------------------------------------------------
00954 // TAO_AV_UDP_Object
00955 //------------------------------------------------------------
00956 
00957 int
00958 TAO_AV_UDP_Object::handle_input (void)
00959 {
00960   int n = this->transport_->recv (this->frame_.rd_ptr (),
00961                                   this->frame_.size ());
00962   if (n == -1)
00963     ACE_ERROR_RETURN ((LM_ERROR,"(%N,%l) TAO_AV_UDP_Flow_Handler::handle_input recv failed: errno: %m\n"),-1);
00964 
00965   this->frame_.wr_ptr (this->frame_.rd_ptr () + n);
00966 
00967   return this->callback_->receive_frame (&this->frame_);
00968 }
00969 
00970 int
00971 TAO_AV_UDP_Object::send_frame (ACE_Message_Block *frame,
00972                                TAO_AV_frame_info * /*frame_info*/)
00973 {
00974   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Object::send_frame\n"));
00975   int result = this->transport_->send (frame);
00976   if (result < 0)
00977     return result;
00978   return 0;
00979 }
00980 
00981 int
00982 TAO_AV_UDP_Object::send_frame (const iovec *iov,
00983                                int iovcnt,
00984                                TAO_AV_frame_info * /*frame_info*/)
00985 {
00986   int result = this->transport_->send (iov,iovcnt);
00987   if (result < 0)
00988     return result;
00989   return 0;
00990 }
00991 
00992 int
00993 TAO_AV_UDP_Object::send_frame (const char*buf,
00994                                size_t len)
00995 {
00996   int result = this->transport_->send (buf, len, 0);
00997   if (result < 0)
00998     return result;
00999   return 0;
01000 }
01001 
01002 TAO_AV_UDP_Object::TAO_AV_UDP_Object (TAO_AV_Callback *callback,
01003                                       TAO_AV_Transport *transport)
01004   :TAO_AV_Protocol_Object (callback,transport)
01005 {
01006   this->frame_.size (this->transport_->mtu ());
01007 }
01008 
01009 TAO_AV_UDP_Object::~TAO_AV_UDP_Object (void)
01010 {
01011   //no-op
01012 }
01013 
01014 int
01015 TAO_AV_UDP_Object::destroy (void)
01016 {
01017   this->callback_->handle_destroy ();
01018   delete this;
01019 
01020   return 0;
01021 }
01022 
01023 
01024 //------------------------------------------------------------
01025 // TAO_AV_UDP_Flow_Factory
01026 //------------------------------------------------------------
01027 TAO_AV_UDP_Flow_Factory::TAO_AV_UDP_Flow_Factory (void)
01028 {
01029 }
01030 
01031 TAO_AV_UDP_Flow_Factory::~TAO_AV_UDP_Flow_Factory (void)
01032 {
01033 }
01034 
01035 int
01036 TAO_AV_UDP_Flow_Factory::init (int /* argc */,
01037                                char * /* argv */ [])
01038 {
01039   return 0;
01040 }
01041 
01042 int
01043 TAO_AV_UDP_Flow_Factory::match_protocol (const char *flow_string)
01044 {
01045   if (ACE_OS::strcasecmp (flow_string,"UDP") == 0)
01046     return 1;
01047   return 0;
01048 }
01049 
01050 TAO_AV_Protocol_Object*
01051 TAO_AV_UDP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
01052                                                TAO_Base_StreamEndPoint *endpoint,
01053                                                TAO_AV_Flow_Handler *handler,
01054                                                TAO_AV_Transport *transport)
01055 {
01056   TAO_AV_Callback *callback = 0;
01057   if( endpoint->get_callback (entry->flowname (), callback) ) {
01058     ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) Invalid callback\n"), 0);
01059   }
01060 
01061   TAO_AV_UDP_Object *object = 0;
01062   ACE_NEW_RETURN (object,
01063                   TAO_AV_UDP_Object (callback,
01064                                      transport),
01065                   0);
01066   callback->open (object,
01067                   handler);
01068   endpoint->set_protocol_object (entry->flowname (),
01069                                  object);
01070 
01071   endpoint->protocol_object_set ();
01072 
01073   return object;
01074 }
01075 
01076 TAO_END_VERSIONED_NAMESPACE_DECL
01077 
01078 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_Flow_Factory)
01079 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_Flow_Factory,
01080                        ACE_TEXT ("UDP_Flow_Factory"),
01081                        ACE_SVC_OBJ_T,
01082                        &ACE_SVC_NAME (TAO_AV_UDP_Flow_Factory),
01083                        ACE_Service_Type::DELETE_THIS |
01084                        ACE_Service_Type::DELETE_OBJ,
01085                        0)
01086 
01087 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_Factory)
01088 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_Factory,
01089                        ACE_TEXT ("UDP_Factory"),
01090                        ACE_SVC_OBJ_T,
01091                        &ACE_SVC_NAME (TAO_AV_UDP_Factory),
01092                        ACE_Service_Type::DELETE_THIS |
01093                        ACE_Service_Type::DELETE_OBJ,
01094                        0)

Generated on Thu Nov 9 13:44:47 2006 for TAO_AV by doxygen 1.3.6