UDP.cpp

Go to the documentation of this file.
00001 // $Id: UDP.cpp 81401 2008-04-23 18:12:56Z elliott_c $
00002 
00003 #include "orbsvcs/AV/UDP.h"
00004 #include "orbsvcs/AV/AVStreams_i.h"
00005 #include "orbsvcs/AV/MCast.h"
00006 
00007 #include "tao/debug.h"
00008 #include "ace/OS_NS_strings.h"
00009 
00010 #if !defined (__ACE_INLINE__)
00011 #include "orbsvcs/AV/UDP.inl"
00012 #endif /* __ACE_INLINE__ */
00013 
00014 ACE_RCSID (AV,
00015            UDP,
00016            "$Id: UDP.cpp 81401 2008-04-23 18:12:56Z elliott_c $")
00017 
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 //------------------------------------------------------------
00021 // TAO_AV_UDP_Flow_Handler
00022 //------------------------------------------------------------
00023 
00024 TAO_AV_UDP_Flow_Handler::TAO_AV_UDP_Flow_Handler (void)
00025 {
00026   ACE_NEW (this->transport_,
00027            TAO_AV_UDP_Transport (this));
00028 }
00029 
00030 TAO_AV_UDP_Flow_Handler::~TAO_AV_UDP_Flow_Handler (void)
00031 {
00032   // remove the event handler from the reactor.
00033   TAO_AV_CORE::instance()->reactor ()->remove_handler (this->event_handler(),
00034                                               ACE_Event_Handler::READ_MASK);
00035 
00036   // close the socket.
00037   this->close ();
00038   delete this->transport_;
00039 }
00040 
00041 TAO_AV_Transport *
00042 TAO_AV_UDP_Flow_Handler::transport (void)
00043 {
00044   return this->transport_;
00045 }
00046 
00047 int
00048 TAO_AV_UDP_Flow_Handler::handle_input (ACE_HANDLE /*fd*/)
00049 {
00050   return this->protocol_object_->handle_input ();
00051 }
00052 
00053 int
00054 TAO_AV_UDP_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
00055                                          const void *arg)
00056 {
00057   return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
00058 }
00059 
00060 int
00061 TAO_AV_UDP_Flow_Handler::set_remote_address (ACE_Addr *address)
00062 {
00063   if (TAO_debug_level > 0)
00064     ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Flow_Handler::set_remote_address\n"));
00065 
00066   ACE_INET_Addr *inet_addr = dynamic_cast<ACE_INET_Addr*> (address);
00067   this->peer_addr_ = *inet_addr;
00068   TAO_AV_UDP_Transport *transport = dynamic_cast<TAO_AV_UDP_Transport*> (this->transport_);
00069 
00070   return transport->set_remote_address (*inet_addr);
00071 }
00072 
00073 
00074 ACE_HANDLE
00075 TAO_AV_UDP_Flow_Handler::get_handle (void) const
00076 {
00077   if (TAO_debug_level > 0)
00078     ACE_DEBUG ((LM_DEBUG,
00079                 "TAO_AV_UDP_Flow_Handler::get_handle:%d\n",
00080                 this->sock_dgram_.get_handle ()));
00081 
00082   return this->sock_dgram_.get_handle () ;
00083 }
00084 
00085 int
00086 TAO_AV_UDP_Flow_Handler::change_qos(AVStreams::QoS qos)
00087 {
00088   if( TAO_debug_level > 0 )
00089     {
00090       ACE_DEBUG ((LM_DEBUG,
00091                   "(%N,%l) TAO_AV_UDP_Flow_Handler::change_qos\n"));
00092     }
00093 
00094   unsigned int i=0;
00095 
00096   int ret = 0;
00097   CORBA::Long dscp = 0;
00098   CORBA::Long ecn = 0;
00099   int dscp_flag=0;
00100   for(i=0; i < qos.QoSParams.length(); i++)
00101     {
00102 
00103       if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "Diffserv_Codepoint") == 0)
00104         {
00105           qos.QoSParams[i].property_value >>= dscp;
00106           dscp_flag=1;
00107           // DSCP value can only be 6 bits wide
00108           if(!((dscp >= 0) && (dscp <= 63)))
00109             {
00110               dscp_flag = 0;
00111               ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
00112               return -1;
00113             }
00114         }
00115 
00116       if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "ECN") == 0)
00117         {
00118           qos.QoSParams[i].property_value >>= ecn;
00119           // ECN value can only occupy bits 6 and 7 of the
00120           // IP Diffserv byte
00121           if(!((ecn >= 0) && (ecn <= 3)))
00122             {
00123               ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
00124               ecn = 0;
00125             }
00126 
00127         }
00128     }
00129       // Set the Diffserv byte only if we specified
00130       // the Diffserv Codepoint (DSCP) or ECN via QoSParams
00131       // passed into this method
00132       if(dscp_flag || ecn)
00133         {
00134           int tos;
00135           tos = (int)(dscp << 2);
00136           if(ecn)
00137             {
00138               tos |= ecn;
00139             }
00140           ret = sock_dgram_.set_option(IPPROTO_IP, IP_TOS, (int *)&tos , (int)sizeof(tos));
00141 
00142           if(TAO_debug_level > 1)
00143             {
00144               ACE_DEBUG((LM_DEBUG, "(%N,%l) set tos: ret: %d\n", ret));
00145             }
00146         }
00147 
00148       if(TAO_debug_level > 1)
00149         {
00150            if(ret < 0 )
00151              {
00152                 ACE_DEBUG((LM_DEBUG, "(%N,%l) errno: %p\n"));
00153              }
00154         }
00155       return ret;
00156 }
00157 
00158 //------------------------------------------------------------
00159 // TAO_AV_UDP_Transport
00160 //------------------------------------------------------------
00161 
00162 TAO_AV_UDP_Transport::TAO_AV_UDP_Transport (void)
00163   :handler_ (0)
00164 {
00165 }
00166 
00167 TAO_AV_UDP_Transport::TAO_AV_UDP_Transport (TAO_AV_UDP_Flow_Handler *handler)
00168   :handler_ (handler),
00169    addr_ (0)
00170 {
00171 }
00172 
00173 TAO_AV_UDP_Transport::~TAO_AV_UDP_Transport (void)
00174 {
00175 }
00176 
00177 int
00178 TAO_AV_UDP_Transport::set_remote_address (const ACE_INET_Addr &address)
00179 {
00180   this->peer_addr_ = address;
00181   return 0;
00182 }
00183 
00184 int
00185 TAO_AV_UDP_Transport::open (ACE_Addr * /*address*/)
00186 {
00187   return 0;
00188 }
00189 
00190 int
00191 TAO_AV_UDP_Transport::close (void)
00192 {
00193   return 0;
00194 }
00195 
00196 int
00197 TAO_AV_UDP_Transport::mtu (void)
00198 {
00199   return 65535;
00200 }
00201 
00202 ACE_Addr*
00203 TAO_AV_UDP_Transport::get_peer_addr (void)
00204 {
00205   return &this->peer_addr_;
00206 }
00207 
00208 ssize_t
00209 TAO_AV_UDP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *)
00210 {
00211   // For the most part this was copied from GIOP::send_request and
00212   // friends.
00213 
00214   iovec iov[ACE_IOV_MAX];
00215   int iovcnt = 0;
00216   ssize_t n = 0;
00217   ssize_t nbytes = 0;
00218 
00219   for (const ACE_Message_Block *i = mblk;
00220        i != 0;
00221        i = i->cont ())
00222     {
00223       // Make sure there is something to send!
00224       if (i->length () > 0)
00225         {
00226           iov[iovcnt].iov_base = i->rd_ptr ();
00227           iov[iovcnt].iov_len  = static_cast<u_long> (i->length ());
00228           iovcnt++;
00229 
00230           // The buffer is full make a OS call.  @@ TODO this should
00231           // be optimized on a per-platform basis, for instance, some
00232           // platforms do not implement writev() there we should copy
00233           // the data into a buffer and call send_n(). In other cases
00234           // there may be some limits on the size of the iovec, there
00235           // we should set ACE_IOV_MAX to that limit.
00236           if (iovcnt == ACE_IOV_MAX)
00237             {
00238                n = this->handler_->get_socket ()->send ((const iovec *) iov,
00239                                                         iovcnt,
00240                                                         this->peer_addr_);
00241 
00242               if (n < 1)
00243                 return n;
00244 
00245               nbytes += n;
00246               iovcnt = 0;
00247             }
00248         }
00249     }
00250 
00251   // Check for remaining buffers to be sent!
00252   if (iovcnt != 0)
00253     {
00254       n = this->handler_->get_socket ()->send ((const iovec *) iov,
00255                                                iovcnt,
00256                                                this->peer_addr_);
00257 
00258       if (n < 1)
00259         return n;
00260 
00261       nbytes += n;
00262     }
00263 
00264   return nbytes;
00265 }
00266 
00267 ssize_t
00268 TAO_AV_UDP_Transport::send (const char *buf,
00269                             size_t len,
00270                             ACE_Time_Value *)
00271 {
00272   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Transport::send "));
00273   char addr [BUFSIZ];
00274   this->peer_addr_.addr_to_string (addr,BUFSIZ);
00275   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"to %s\n",addr));
00276 
00277   return this->handler_->get_socket ()->send (buf, len,this->peer_addr_);
00278 }
00279 
00280 ssize_t
00281 TAO_AV_UDP_Transport::send (const iovec *iov,
00282                           int iovcnt,
00283                           ACE_Time_Value *)
00284 {
00285   return this->handler_->get_socket ()->send ((const iovec *) iov,
00286                                               iovcnt,
00287                                               this->peer_addr_);
00288 
00289 }
00290 
00291 ssize_t
00292 TAO_AV_UDP_Transport::recv (char *buf,
00293                             size_t len,
00294                             ACE_Time_Value *)
00295 {
00296   return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_);
00297 }
00298 
00299 ssize_t
00300 TAO_AV_UDP_Transport::recv (char *buf,
00301                             size_t len,
00302                             int flags,
00303                             ACE_Time_Value *timeout)
00304 {
00305   return this->handler_->get_socket ()->recv (buf,
00306                                               len,
00307                                               this->peer_addr_,
00308                                               flags,
00309                                               timeout);
00310 }
00311 
00312 ssize_t
00313 TAO_AV_UDP_Transport::recv (iovec *iov,
00314                             int /*iovcnt*/,
00315                             ACE_Time_Value *timeout)
00316 {
00317   return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout);
00318 }
00319 
00320 
00321 //------------------------------------------------------------
00322 // TAO_AV_UDP_Acceptor
00323 //------------------------------------------------------------
00324 
00325 TAO_AV_UDP_Acceptor::TAO_AV_UDP_Acceptor (void)
00326   : address_ (0),
00327     control_inet_address_ (0)
00328 {
00329 }
00330 
00331 TAO_AV_UDP_Acceptor::~TAO_AV_UDP_Acceptor (void)
00332 {
00333   if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00334     delete this->entry_->control_handler ();
00335 
00336   delete this->address_;
00337   delete this->control_inet_address_;
00338 }
00339 
00340 int
00341 TAO_AV_UDP_Acceptor::activate_svc_handler (TAO_AV_Flow_Handler *handler)
00342 {
00343   ACE_Event_Handler *event_handler = handler->event_handler ();
00344   int result = this->av_core_->reactor ()->register_handler (event_handler,
00345                                                              ACE_Event_Handler::READ_MASK);
00346 
00347   if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00348     handler->schedule_timer ();
00349 
00350  return result;
00351 }
00352 
00353 int
00354 TAO_AV_UDP_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
00355                            TAO_AV_Core *av_core,
00356                            TAO_FlowSpec_Entry *entry,
00357                            TAO_AV_Flow_Protocol_Factory *factory,
00358                            TAO_AV_Core::Flow_Component flow_comp)
00359 {
00360   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Acceptor::open\n"));
00361   this->av_core_ = av_core;
00362   this->endpoint_ = endpoint;
00363   this->entry_ = entry;
00364   this->flow_component_ = flow_comp;
00365   this->flow_protocol_factory_ = factory;
00366   ACE_INET_Addr *inet_addr;
00367   if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00368     {
00369       this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00370       inet_addr = (ACE_INET_Addr *) entry->control_address ();
00371     }
00372   else
00373     {
00374       this->flowname_ = entry->flowname ();
00375       inet_addr = (ACE_INET_Addr *) entry->address ();
00376     }
00377 
00378   if (inet_addr != 0)
00379     {
00380       char buf[BUFSIZ];
00381       inet_addr->addr_to_string (buf,
00382                  BUFSIZ);
00383 
00384       if (TAO_debug_level > 0)
00385     ACE_DEBUG ((LM_DEBUG,
00386             "TAO_AV_UDP_Acceptor::open: %s\n",
00387             buf));
00388     }
00389 
00390   int result = this->open_i (inet_addr, 0);
00391 
00392   if (result < 0)
00393     return result;
00394   return 0;
00395 }
00396 
00397 int
00398 TAO_AV_UDP_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
00399                                    TAO_AV_Core *av_core,
00400                                    TAO_FlowSpec_Entry *entry,
00401                                    TAO_AV_Flow_Protocol_Factory *factory,
00402                                    TAO_AV_Core::Flow_Component flow_comp)
00403 {
00404   this->av_core_ = av_core;
00405   this->endpoint_ = endpoint;
00406   this->entry_ = entry;
00407   this->flow_component_ = flow_comp;
00408   this->flow_protocol_factory_ = factory;
00409   if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
00410     {
00411       this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
00412     }
00413   else
00414     {
00415       this->flowname_ = entry->flowname ();
00416       ACE_NEW_RETURN (this->address_,
00417                     ACE_INET_Addr ("0"),
00418                     -1);
00419     }
00420 
00421   int result = this->open_i (this->address_, 1);
00422   if (result < 0)
00423     return result;
00424 
00425   return 0;
00426 }
00427 
00428 int
00429 TAO_AV_UDP_Acceptor::open_i (ACE_INET_Addr *inet_addr,
00430                              int is_default_addr)
00431 {
00432   int result = -1;
00433   ACE_INET_Addr *local_addr = 0;
00434 
00435   TAO_AV_Flow_Handler *flow_handler = 0;
00436 
00437   // if using a default address and this is the control flow component, the
00438   //  handler and local address are already set in the flow spec entry
00439   if (is_default_addr &&
00440       (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)  &&
00441       (ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0))
00442     {
00443       flow_handler = this->entry_->control_handler ();
00444 
00445       local_addr = dynamic_cast<ACE_INET_Addr*> (this->entry_->get_local_control_addr ());
00446     }
00447   else
00448     {
00449       // this variable is only used for RTP/UDP; RFC 1889 requires an even/odd
00450       //  consecutive port pair
00451       int get_new_port = 1;
00452 
00453       while (get_new_port)
00454         {
00455           // assume the ports will be OK
00456           get_new_port = 0;
00457 
00458           result = TAO_AV_UDP_Connection_Setup::setup (flow_handler,
00459                                                        inet_addr,
00460                                                        local_addr,
00461                                                        this->entry_->is_multicast (),
00462                                                        TAO_AV_UDP_Connection_Setup::ACCEPTOR);
00463 
00464       if (result < 0)
00465           {
00466              ACE_DEBUG((LM_DEBUG,"(%N,%l) Error during connection setup: %d\n", result));
00467           }
00468 
00469           local_addr->set (local_addr->get_port_number (),
00470                            local_addr->get_host_name ());
00471 
00472           if (is_default_addr)
00473             {
00474               if ((ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00475                   (this->flow_component_ == TAO_AV_Core::TAO_AV_DATA))
00476                 {
00477                   if (is_default_addr && local_addr->get_port_number ()%2 != 0)
00478                     {
00479                       // RTP port should be even
00480                       delete local_addr;
00481                       local_addr = 0;
00482                       delete flow_handler;
00483                       get_new_port = 1;
00484                     }
00485                   else
00486                     {
00487                       ACE_INET_Addr *local_control_addr = 0;
00488                       TAO_AV_Flow_Handler *control_flow_handler = 0;
00489 
00490                       ACE_NEW_RETURN (this->control_inet_address_,
00491                                       ACE_INET_Addr ("0"),
00492                                       -1);
00493 
00494                       TAO_AV_UDP_Connection_Setup::setup(control_flow_handler,
00495                                                          this->control_inet_address_,
00496                                                          local_control_addr,
00497                                                          this->entry_->is_multicast (),
00498                                                          TAO_AV_UDP_Connection_Setup::ACCEPTOR);
00499 
00500                       if (local_control_addr->get_port_number () !=
00501                           local_addr->get_port_number () +1)
00502                         {
00503                           delete this->control_inet_address_;
00504                           delete local_addr;
00505                           local_addr = 0;
00506                           delete flow_handler;
00507                           delete local_control_addr;
00508                           delete control_flow_handler;
00509                           get_new_port = 1;
00510                         }
00511                       else
00512                         {
00513                           this->entry_->control_address (this->control_inet_address_);
00514                           this->entry_->set_local_control_addr (local_control_addr);
00515                           this->entry_->control_handler (control_flow_handler);
00516                         }
00517                     }
00518                 }
00519             }
00520         }
00521     }
00522 
00523   TAO_AV_Protocol_Object *object =
00524     this->flow_protocol_factory_->make_protocol_object (this->entry_,
00525                                                         this->endpoint_,
00526                                                         flow_handler,
00527                                                         flow_handler->transport ());
00528   flow_handler->protocol_object (object);
00529 
00530   if (this->flow_component_ == TAO_AV_Core::TAO_AV_DATA)
00531     {
00532       this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
00533 
00534       this->entry_->protocol_object (object);
00535       this->entry_->set_local_addr (local_addr);
00536       this->entry_->handler (flow_handler);
00537       //this->entry_->address (inet_addr);
00538       this->entry_->address (local_addr);
00539     }
00540   else
00541     {
00542       this->endpoint_->set_control_flow_handler (this->flowname_.c_str (),flow_handler);
00543 
00544       this->entry_->control_protocol_object (object);
00545       this->entry_->set_local_control_addr (local_addr);
00546       this->entry_->control_handler (flow_handler);
00547     }
00548 
00549   if (local_addr != 0)
00550     {
00551       char buf[BUFSIZ];
00552       local_addr->addr_to_string (buf,BUFSIZ);
00553       if (TAO_debug_level > 0)
00554         ACE_DEBUG ((LM_DEBUG,
00555                     "TAO_AV_UDP_ACCEPTOR::open:%s \n",
00556                     buf));
00557     }
00558 
00559   // call activate svc handler.
00560   return this->activate_svc_handler (flow_handler);
00561 }
00562 
00563 int
00564 TAO_AV_UDP_Acceptor::close (void)
00565 {
00566   return 0;
00567 }
00568 
00569 //------------------------------------------------------------
00570 // TAO_AV_UDP_Connector
00571 //------------------------------------------------------------
00572 TAO_AV_UDP_Connector::TAO_AV_UDP_Connector (void)
00573   : control_inet_address_ (0)
00574 {
00575 }
00576 
00577 TAO_AV_UDP_Connector::~TAO_AV_UDP_Connector (void)
00578 {
00579   if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00580     {
00581       delete this->entry_->control_handler ();
00582     }
00583 
00584   if (this->control_inet_address_ != 0)
00585     delete this->control_inet_address_;
00586 }
00587 
00588 int
00589 TAO_AV_UDP_Connector::open (TAO_Base_StreamEndPoint *endpoint,
00590                             TAO_AV_Core *av_core,
00591                             TAO_AV_Flow_Protocol_Factory *factory)
00592 
00593 {
00594   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Connector::open "));
00595   this->endpoint_ = endpoint;
00596   this->av_core_ = av_core;
00597   this->flow_protocol_factory_ = factory;
00598   return 0;
00599 }
00600 
00601 int
00602 TAO_AV_UDP_Connector::connect (TAO_FlowSpec_Entry *entry,
00603                                TAO_AV_Transport *&transport,
00604                                TAO_AV_Core::Flow_Component flow_component)
00605 {
00606   ACE_INET_Addr *local_addr = 0;
00607   ACE_INET_Addr *control_inet_addr = 0;
00608 
00609   this->entry_ = entry;
00610   this->flow_component_ = flow_component;
00611 
00612   ACE_INET_Addr *inet_addr;
00613 
00614   if (flow_component == TAO_AV_Core::TAO_AV_CONTROL)
00615     {
00616       this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname());
00617     inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00618     }
00619   else
00620     {
00621       this->flowname_ = entry->flowname ();
00622       inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->address ());
00623       control_inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00624     }
00625 
00626   TAO_AV_Flow_Handler *flow_handler = 0;
00627 
00628   // if this is the control flow component, the
00629   //  handler and local address are already set in the flow spec entry
00630   if ((flow_component == TAO_AV_Core::TAO_AV_CONTROL)  &&
00631       (ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00632       !entry->is_multicast ())
00633     {
00634       flow_handler = this->entry_->control_handler ();
00635       flow_handler->set_remote_address (inet_addr);
00636 
00637       local_addr = dynamic_cast<ACE_INET_Addr*> (this->entry_->get_local_control_addr ());
00638     }
00639   else
00640     {
00641       // this variable is only used for RTP/UDP; RFC 1889 requires an even/odd
00642       //  consecutive port pair
00643       int get_new_port = 1;
00644 
00645       while (get_new_port)
00646         {
00647           // assume the ports will be OK
00648           get_new_port = 0;
00649 
00650       ACE_Addr *addr;
00651       if ((addr = entry->get_peer_addr ()) != 0)
00652         {
00653           local_addr = dynamic_cast<ACE_INET_Addr*> (addr);
00654           char buf [BUFSIZ];
00655           local_addr->addr_to_string (buf, BUFSIZ);
00656         }
00657 
00658           TAO_AV_UDP_Connection_Setup::setup (flow_handler,
00659                                               inet_addr,
00660                                               local_addr,
00661                                               entry->is_multicast (),
00662                                               TAO_AV_UDP_Connection_Setup::CONNECTOR);
00663 
00664           if ((ACE_OS::strcasecmp(this->entry_->flow_protocol_str (), "RTP") == 0) &&
00665               (flow_component == TAO_AV_Core::TAO_AV_DATA) &&
00666               !entry->is_multicast ())
00667             {
00668               if (local_addr->get_port_number ()%2 != 0)
00669                 {
00670                   // RTP port should be even
00671                   delete local_addr;
00672                   local_addr = 0;
00673                   delete flow_handler;
00674                   get_new_port = 1;
00675                 }
00676               else
00677                 {
00678                   ACE_INET_Addr *local_control_addr = 0;
00679                   TAO_AV_Flow_Handler *control_flow_handler = 0;
00680 
00681                   if (entry->is_multicast ())
00682                     control_inet_addr =  dynamic_cast<ACE_INET_Addr*> (entry->control_address ()) ;
00683                   else
00684                     {
00685 
00686                       if (local_addr != 0)
00687                         {
00688                           char buf [BUFSIZ];
00689                           ACE_CString addr_str (local_addr->get_host_name ());
00690                           addr_str += ":";
00691                           addr_str += ACE_OS::itoa (local_addr->get_port_number () + 1, buf, 10);
00692                           ACE_NEW_RETURN (local_control_addr,
00693                                           ACE_INET_Addr (addr_str.c_str ()),
00694                                           -1);
00695                           local_control_addr->addr_to_string (buf, BUFSIZ);
00696                         }
00697 
00698 
00699                       if (entry->control_address () == 0)
00700                         ACE_NEW_RETURN (this->control_inet_address_,
00701                                         ACE_INET_Addr ("0"),
00702                                         -1);
00703                       else
00704                         control_inet_address_ = dynamic_cast<ACE_INET_Addr*> (entry->control_address ());
00705                     }
00706 
00707                   TAO_AV_UDP_Connection_Setup::setup (control_flow_handler,
00708                                                       control_inet_addr,
00709                                                       local_control_addr,
00710                                                       entry->is_multicast (),
00711                                                       TAO_AV_UDP_Connection_Setup::CONNECTOR);
00712 
00713                   if (local_control_addr->get_port_number () !=
00714                       local_addr->get_port_number () +1)
00715                     {
00716                       delete local_addr;
00717                       local_addr = 0;
00718                       delete flow_handler;
00719                       delete local_control_addr;
00720                       delete control_flow_handler;
00721                       get_new_port = 1;
00722                     }
00723                   else
00724                     {
00725                       this->entry_->set_local_control_addr (local_control_addr);
00726                       this->entry_->control_handler (control_flow_handler);
00727                     }
00728                 }
00729             }
00730         }
00731     }
00732 
00733   TAO_AV_Protocol_Object *object =
00734     this->flow_protocol_factory_->make_protocol_object (this->entry_,
00735                                                         this->endpoint_,
00736                                                         flow_handler,
00737                                                         flow_handler->transport ());
00738 
00739   flow_handler->protocol_object (object);
00740 
00741   if (flow_component == TAO_AV_Core::TAO_AV_DATA)
00742     {
00743       this->endpoint_->set_flow_handler (this->flowname_.c_str (),
00744                                          flow_handler);
00745       this->entry_->protocol_object (object);
00746       entry->set_local_addr (local_addr);
00747       entry->handler (flow_handler);
00748       transport = flow_handler->transport ();
00749     }
00750   else
00751     {
00752       this->endpoint_->set_control_flow_handler (this->flowname_.c_str (),
00753                                                  flow_handler);
00754       this->entry_->control_protocol_object (object);
00755       entry->set_local_control_addr (local_addr);
00756       entry->control_handler (flow_handler);
00757       transport = flow_handler->transport ();
00758     }
00759 
00760   if (local_addr != 0)
00761     {
00762       char buf[BUFSIZ];
00763       local_addr->addr_to_string (buf,BUFSIZ);
00764 
00765       if (TAO_debug_level > 0)
00766         ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_CONNECTOR::connect:%s \n",buf));
00767     }
00768 
00769   // call activate svc handler.
00770   return this->activate_svc_handler (flow_handler);
00771 }
00772 
00773 int
00774 TAO_AV_UDP_Connector::activate_svc_handler (TAO_AV_Flow_Handler *handler)
00775 {
00776   ACE_Event_Handler *event_handler = handler->event_handler ();
00777   int result = this->av_core_->reactor ()->register_handler (event_handler,
00778                                                              ACE_Event_Handler::READ_MASK);
00779 
00780   if (this->flow_component_ == TAO_AV_Core::TAO_AV_CONTROL)
00781     handler->schedule_timer ();
00782 
00783   return result;
00784 }
00785 
00786 int
00787 TAO_AV_UDP_Connector::close (void)
00788 {
00789   return 0;
00790 }
00791 
00792 //------------------------------------------------------------
00793 // TAO_AV_UDP_Connection_Setup
00794 //------------------------------------------------------------
00795 
00796 int
00797 TAO_AV_UDP_Connection_Setup::setup (TAO_AV_Flow_Handler *&flow_handler,
00798                                     ACE_INET_Addr *inet_addr,
00799                                     ACE_INET_Addr *&local_addr,
00800                                     int is_multicast,
00801                                     ConnectionType ct)
00802 {
00803   int result;
00804 
00805   if (is_multicast)
00806     {
00807       TAO_AV_UDP_MCast_Flow_Handler *handler;
00808       ACE_NEW_RETURN (handler,
00809                       TAO_AV_UDP_MCast_Flow_Handler,
00810                       -1);
00811 
00812       flow_handler = handler;
00813 
00814       result = handler->get_mcast_socket ()->join (*inet_addr);
00815       if (result < 0)
00816         ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_UDP_MCast_connector::open failed\n"),-1);
00817 
00818       // Now disable Multicast loopback.
00819       // @@Should we make this a policy?
00820 #if defined (ACE_HAS_IP_MULTICAST)
00821       if (handler->get_mcast_socket ()->set_option (IP_MULTICAST_LOOP,
00822                                                     0) < 0)
00823         if (TAO_debug_level > 0)
00824           ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_MCast_Acceptor::multicast loop disable failed\n"));
00825       // @@ This should also be policies.
00826 #endif /*ACE_HAS_IP_MULTICAST*/
00827 
00828       int bufsize = 80 * 1024;
00829       if (handler->get_mcast_socket ()->ACE_SOCK::set_option (SOL_SOCKET,
00830                                                               SO_RCVBUF,
00831                                                               (char *)&bufsize,
00832                                                               sizeof(bufsize)) < 0)
00833         {
00834           bufsize = 32 * 1024;
00835           if (handler->get_mcast_socket ()->ACE_SOCK::set_option (SOL_SOCKET,
00836                                                                   SO_RCVBUF,
00837                                                                   (char *)&bufsize,
00838                                                                   sizeof(bufsize)) < 0)
00839             ACE_OS::perror("SO_RCVBUF");
00840         }
00841       ACE_NEW_RETURN (local_addr,
00842                       ACE_INET_Addr ("0"),
00843                       -1);
00844 
00845       if (ct == ACCEPTOR)
00846         {
00847           result = handler->get_mcast_socket ()->get_local_addr (*local_addr);
00848           if (result < 0)
00849             ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
00850 
00851           local_addr->set (local_addr->get_port_number (),
00852                            local_addr->get_host_name ());
00853           handler->set_peer_addr (local_addr);
00854         }
00855     }
00856   else
00857     {
00858       if (local_addr == 0)
00859         ACE_NEW_RETURN (local_addr,
00860                         ACE_INET_Addr ("0"),
00861                         -1);
00862 
00863       TAO_AV_UDP_Flow_Handler *handler;
00864       ACE_NEW_RETURN (handler,
00865                       TAO_AV_UDP_Flow_Handler,
00866                       -1);
00867 
00868       flow_handler = handler;
00869 
00870       if (ct == ACCEPTOR)
00871         result = handler->open (*inet_addr);
00872       else
00873         result = handler->open (*local_addr);
00874       if (result < 0)
00875         ACE_ERROR_RETURN ((LM_ERROR,"handler::open failed\n"),-1);
00876 
00877       // set the socket buffer sizes to 64k.
00878       int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00879       int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00880 
00881       if (handler->get_socket ()->set_option (SOL_SOCKET,
00882                                               SO_SNDBUF,
00883                                               (void *) &sndbufsize,
00884                                               sizeof (sndbufsize)) == -1
00885           && errno != ENOTSUP)
00886         return 0;
00887       else if (handler->get_socket ()->set_option (SOL_SOCKET,
00888                                                    SO_RCVBUF,
00889                                                    (void *) &rcvbufsize,
00890                                                    sizeof (rcvbufsize)) == -1
00891                && errno != ENOTSUP)
00892         return 0;
00893 
00894       if (ct == CONNECTOR)
00895         handler->set_remote_address  (inet_addr);
00896 
00897       result = handler->get_socket ()->get_local_addr (*local_addr);
00898 
00899       local_addr->set (local_addr->get_port_number (),
00900                local_addr->get_host_name ());
00901 
00902       char buf [BUFSIZ];
00903       local_addr->addr_to_string (buf, BUFSIZ);
00904 
00905       if (result < 0)
00906         ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
00907     }
00908 
00909   return 1;
00910 }
00911 
00912 //------------------------------------------------------------
00913 // TAO_AV_UDP_Protocol_Factory
00914 //------------------------------------------------------------
00915 
00916 TAO_AV_UDP_Factory::TAO_AV_UDP_Factory (void)
00917 {
00918 }
00919 
00920 TAO_AV_UDP_Factory::~TAO_AV_UDP_Factory (void)
00921 {
00922 }
00923 
00924 int
00925 TAO_AV_UDP_Factory::match_protocol (const char *protocol_string)
00926 {
00927   if (ACE_OS::strcasecmp (protocol_string,"UDP") == 0)
00928     return 1;
00929   if (ACE_OS::strcasecmp (protocol_string,"RTP/UDP") == 0)
00930     return 1;
00931   return 0;
00932 }
00933 
00934 TAO_AV_Acceptor*
00935 TAO_AV_UDP_Factory::make_acceptor (void)
00936 {
00937   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Factory::make_acceptor\n"));
00938   TAO_AV_Acceptor *acceptor = 0;
00939   ACE_NEW_RETURN (acceptor,
00940                   TAO_AV_UDP_Acceptor,
00941                   0);
00942   return acceptor;
00943 }
00944 
00945 TAO_AV_Connector*
00946 TAO_AV_UDP_Factory::make_connector (void)
00947 {
00948   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Factory::make_connector\n"));
00949   TAO_AV_Connector *connector = 0;
00950   ACE_NEW_RETURN (connector,
00951                   TAO_AV_UDP_Connector,
00952                   0);
00953   return connector;
00954 }
00955 
00956 int
00957 TAO_AV_UDP_Factory::init (int /* argc */,
00958                           char * /* argv */ [])
00959 {
00960   return 0;
00961 }
00962 
00963 //------------------------------------------------------------
00964 // TAO_AV_UDP_Object
00965 //------------------------------------------------------------
00966 
00967 int
00968 TAO_AV_UDP_Object::handle_input (void)
00969 {
00970   int n = this->transport_->recv (this->frame_.rd_ptr (),
00971                                   this->frame_.size ());
00972   if (n == -1)
00973     ACE_ERROR_RETURN ((LM_ERROR,"(%N,%l) TAO_AV_UDP_Flow_Handler::handle_input recv failed: errno: %m\n"),-1);
00974 
00975   this->frame_.wr_ptr (this->frame_.rd_ptr () + n);
00976 
00977   return this->callback_->receive_frame (&this->frame_);
00978 }
00979 
00980 int
00981 TAO_AV_UDP_Object::send_frame (ACE_Message_Block *frame,
00982                                TAO_AV_frame_info * /*frame_info*/)
00983 {
00984   if (TAO_debug_level > 0)
00985     ACE_DEBUG ((LM_DEBUG,"TAO_AV_UDP_Object::send_frame\n"));
00986   int const result = this->transport_->send (frame);
00987   if (result < 0)
00988     return result;
00989   return 0;
00990 }
00991 
00992 int
00993 TAO_AV_UDP_Object::send_frame (const iovec *iov,
00994                                int iovcnt,
00995                                TAO_AV_frame_info * /*frame_info*/)
00996 {
00997   int const result = this->transport_->send (iov,iovcnt);
00998   if (result < 0)
00999     return result;
01000   return 0;
01001 }
01002 
01003 int
01004 TAO_AV_UDP_Object::send_frame (const char*buf,
01005                                size_t len)
01006 {
01007   int const result = this->transport_->send (buf, len, 0);
01008   if (result < 0)
01009     return result;
01010   return 0;
01011 }
01012 
01013 TAO_AV_UDP_Object::TAO_AV_UDP_Object (TAO_AV_Callback *callback,
01014                                       TAO_AV_Transport *transport)
01015   :TAO_AV_Protocol_Object (callback,transport)
01016 {
01017   this->frame_.size (this->transport_->mtu ());
01018 }
01019 
01020 TAO_AV_UDP_Object::~TAO_AV_UDP_Object (void)
01021 {
01022   //no-op
01023 }
01024 
01025 int
01026 TAO_AV_UDP_Object::destroy (void)
01027 {
01028   this->callback_->handle_destroy ();
01029   delete this;
01030 
01031   return 0;
01032 }
01033 
01034 
01035 //------------------------------------------------------------
01036 // TAO_AV_UDP_Flow_Factory
01037 //------------------------------------------------------------
01038 TAO_AV_UDP_Flow_Factory::TAO_AV_UDP_Flow_Factory (void)
01039 {
01040 }
01041 
01042 TAO_AV_UDP_Flow_Factory::~TAO_AV_UDP_Flow_Factory (void)
01043 {
01044 }
01045 
01046 int
01047 TAO_AV_UDP_Flow_Factory::init (int /* argc */,
01048                                char * /* argv */ [])
01049 {
01050   return 0;
01051 }
01052 
01053 int
01054 TAO_AV_UDP_Flow_Factory::match_protocol (const char *flow_string)
01055 {
01056   if (ACE_OS::strcasecmp (flow_string,"UDP") == 0)
01057     return 1;
01058   return 0;
01059 }
01060 
01061 TAO_AV_Protocol_Object*
01062 TAO_AV_UDP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
01063                                                TAO_Base_StreamEndPoint *endpoint,
01064                                                TAO_AV_Flow_Handler *handler,
01065                                                TAO_AV_Transport *transport)
01066 {
01067   TAO_AV_Callback *callback = 0;
01068   if( endpoint->get_callback (entry->flowname (), callback) ) {
01069     ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) Invalid callback\n"), 0);
01070   }
01071 
01072   TAO_AV_UDP_Object *object = 0;
01073   ACE_NEW_RETURN (object,
01074                   TAO_AV_UDP_Object (callback,
01075                                      transport),
01076                   0);
01077   callback->open (object,
01078                   handler);
01079   endpoint->set_protocol_object (entry->flowname (),
01080                                  object);
01081 
01082   endpoint->protocol_object_set ();
01083 
01084   return object;
01085 }
01086 
01087 TAO_END_VERSIONED_NAMESPACE_DECL
01088 
01089 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_Flow_Factory)
01090 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_Flow_Factory,
01091                        ACE_TEXT ("UDP_Flow_Factory"),
01092                        ACE_SVC_OBJ_T,
01093                        &ACE_SVC_NAME (TAO_AV_UDP_Flow_Factory),
01094                        ACE_Service_Type::DELETE_THIS |
01095                        ACE_Service_Type::DELETE_OBJ,
01096                        0)
01097 
01098 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_Factory)
01099 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_Factory,
01100                        ACE_TEXT ("UDP_Factory"),
01101                        ACE_SVC_OBJ_T,
01102                        &ACE_SVC_NAME (TAO_AV_UDP_Factory),
01103                        ACE_Service_Type::DELETE_THIS |
01104                        ACE_Service_Type::DELETE_OBJ,
01105                        0)

Generated on Tue Feb 2 17:47:49 2010 for TAO_AV by  doxygen 1.4.7