QoS_UDP.cpp

Go to the documentation of this file.
00001 // $Id: QoS_UDP.cpp 76626 2007-01-26 13:50:03Z elliott_c $
00002 
00003 
00004 #include "orbsvcs/AV/QoS_UDP.h"
00005 
00006 #if defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS)
00007 
00008 #include "orbsvcs/AV/UDP.h"
00009 #include "orbsvcs/AV/AVStreams_i.h"
00010 #include "orbsvcs/AV/MCast.h"
00011 #include "orbsvcs/AV/Fill_ACE_QoS.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "orbsvcs/AV/QoS_UDP.inl"
00015 #endif /* __ACE_INLINE__ */
00016 
00017 //------------------------------------------------------------
00018 // TAO_AV_UDP_Flow_Handler
00019 //------------------------------------------------------------
00020 
00021 static int resv_error = 0;
00022 static int resv_confirm = 0;
00023 
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 int
00027 FillQoSParams (ACE_QoS_Params &qos_params,
00028                iovec* iov,
00029                ACE_QoS* qos)
00030 {
00031   qos_params.callee_data (iov);
00032   qos_params.caller_data (0);
00033   qos_params.socket_qos (qos);
00034   qos_params.group_socket_qos (0);
00035   qos_params.flags (ACE_JL_BOTH);
00036 
00037   return 0;
00038 }
00039 
00040 TAO_AV_UDP_QoS_Session_Helper::TAO_AV_UDP_QoS_Session_Helper (void)
00041 {
00042 
00043 }
00044 
00045 TAO_AV_UDP_QoS_Session_Helper::~TAO_AV_UDP_QoS_Session_Helper (void)
00046 {
00047 }
00048 
00049 int
00050 TAO_AV_UDP_QoS_Session_Helper::set_qos (ACE_Flow_Spec &ace_flow_spec,
00051                                         TAO_AV_UDP_QoS_Flow_Handler *handler)
00052 {
00053   ACE_QoS* ace_qos = 0;
00054 
00055   ACE_NEW_RETURN (ace_qos,
00056                   ACE_QoS,
00057                   -1);
00058 
00059   Fill_ACE_QoS fill_ace_qos;
00060 
00061   if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
00062     {
00063       if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
00064                                                 &ace_flow_spec) !=0)
00065         ACE_ERROR_RETURN ((LM_ERROR,
00066                            "Unable to fill simplex sender qos (%N|%l)\n"),
00067                           -1);
00068       else
00069         if (TAO_debug_level > 0)
00070           ACE_DEBUG ((LM_DEBUG,
00071                       "Filled up the Sender QoS parameters\n"));
00072     }
00073   else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
00074     {
00075       if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
00076                                                   &ace_flow_spec) !=0)
00077         ACE_ERROR_RETURN ((LM_ERROR,
00078                            "Unable to fill simplex receiver qos (%N|%l)\n"),
00079                           -1);
00080       else
00081         if (TAO_debug_level > 0)
00082           ACE_DEBUG ((LM_DEBUG,
00083                       "Filled up the Receiver QoS parameters\n"));
00084 
00085     }
00086 
00087   ACE_QoS_Manager qos_manager = handler->get_socket ()->qos_manager ();
00088 
00089   // Set the QoS for the session. Replaces the ioctl () call that
00090   // was being made previously.
00091   if (handler->qos_session ()->qos (handler->get_socket (),
00092                                     &qos_manager,
00093                                     *ace_qos) == -1)
00094     ACE_ERROR_RETURN ((LM_ERROR,
00095                        "Unable to set QoS (%N|%l)\n"),
00096                       -1);
00097   else
00098     ACE_DEBUG ((LM_DEBUG,
00099                 "Setting QOS succeeds\n"));
00100 
00101   return 0;
00102 }
00103 
00104 ACE_QoS_Session *
00105 TAO_AV_UDP_QoS_Session_Helper::open_qos_session (TAO_AV_UDP_QoS_Flow_Handler *handler,
00106                                                  ACE_INET_Addr &addr)
00107 {
00108   ACE_QoS_Params qos_params;
00109 
00110   ACE_QoS* ace_qos = 0;
00111 
00112   FillQoSParams (qos_params,
00113                  0,
00114                  ace_qos);
00115 
00116 
00117   // Create a QoS Session Factory.
00118   ACE_QoS_Session_Factory session_factory;
00119 
00120   // Ask the factory to create a QoS session.
00121   ACE_QoS_Session *qos_session = session_factory.create_session ();
00122 
00123   // Create a destination address for the QoS session. The same
00124   // address should be used for the subscribe call later. A copy
00125   // is made below only to distinguish the two usages of the dest
00126   // address.
00127   ACE_INET_Addr dest_addr (addr);
00128 
00129   // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
00130   // Protocol]. Initialize the QoS session.
00131   if (qos_session->open (dest_addr,
00132                          IPPROTO_UDP) == -1)
00133     ACE_ERROR_RETURN ((LM_ERROR,
00134                        "Error in opening the QoS session\n"),
00135                       0);
00136   else
00137     ACE_DEBUG ((LM_DEBUG,
00138                 "QoS session opened successfully\n"));
00139 
00140   if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
00141     {
00142       // This is a sender
00143       qos_session->flags (ACE_QoS_Session::ACE_QOS_SENDER);
00144     }
00145   else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
00146     {
00147       // This is a receiver
00148       qos_session->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
00149     }
00150 
00151   return qos_session;
00152 }
00153 
00154 int
00155 TAO_AV_UDP_QoS_Session_Helper::activate_qos_handler (ACE_QoS_Session *qos_session,
00156                                                      TAO_AV_UDP_QoS_Flow_Handler *handler)
00157 {
00158   ACE_QoS_Decorator* qos_decorator;
00159 
00160   // Decorate the above handler with QoS functionality.
00161   ACE_NEW_RETURN (qos_decorator,
00162                   ACE_QoS_Decorator (handler,
00163                                      qos_session,
00164                                      handler->av_core ()->reactor ()),
00165                   -1);
00166 
00167   // Initialize the Decorator.
00168   if (qos_decorator->init () != 0)
00169     ACE_ERROR_RETURN ((LM_ERROR,
00170                        "QoS Decorator init () failed (%N|%l)\n"),
00171                       -1);
00172 
00173   // Register the decorated Event Handler with the Reactor.
00174   int result = handler->av_core ()->reactor ()->register_handler (qos_decorator,
00175                                                                   ACE_Event_Handler::QOS_MASK |
00176                                                                   ACE_Event_Handler::READ_MASK);
00177   if (result == -1)
00178     ACE_ERROR_RETURN ((LM_ERROR,
00179                        "Error in registering the Decorator with the Reactor (%N|%l)\n"),
00180                       -1);
00181 
00182   return 0;
00183 
00184 }
00185 
00186 TAO_AV_UDP_QoS_Flow_Handler::TAO_AV_UDP_QoS_Flow_Handler (void)
00187 {
00188   ACE_NEW (this->transport_,
00189            TAO_AV_UDP_QoS_Transport (this));
00190 }
00191 
00192 TAO_AV_UDP_QoS_Flow_Handler::~TAO_AV_UDP_QoS_Flow_Handler (void)
00193 {
00194   delete this->transport_;
00195 }
00196 
00197 TAO_AV_Transport *
00198 TAO_AV_UDP_QoS_Flow_Handler::transport (void)
00199 {
00200   return this->transport_;
00201 }
00202 
00203 int
00204 TAO_AV_UDP_QoS_Flow_Handler::handle_input (ACE_HANDLE /*fd*/)
00205 {
00206   this->protocol_object_->handle_input ();
00207   return 0;
00208 }
00209 
00210 int
00211 TAO_AV_UDP_QoS_Flow_Handler::translate (CosPropertyService::Properties &qos_params,
00212                                         ACE_Flow_Spec *ace_flow_spec)
00213 {
00214   for (unsigned int i = 0;
00215        i < qos_params.length ();
00216        i++)
00217     {
00218       if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0)
00219         {
00220           CORBA::Short type;
00221           qos_params [i].property_value >>= type;
00222           ace_flow_spec->service_type (type);
00223         }
00224       else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0)
00225         {
00226           CORBA::ULong tok_rate;
00227           qos_params [i].property_value >>= tok_rate;
00228           ace_flow_spec->token_rate (tok_rate);
00229         }
00230       else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Size") == 0)
00231         {
00232           CORBA::ULong tok_buck_size;
00233           qos_params [i].property_value >>= tok_buck_size;
00234           ace_flow_spec->token_bucket_size (tok_buck_size);
00235         }
00236       else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0)
00237         {
00238           CORBA::ULong peak_bw;
00239           qos_params [i].property_value >>= peak_bw;
00240           ace_flow_spec->peak_bandwidth (peak_bw);
00241         }
00242       else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0)
00243         {
00244           CORBA::ULong lat;
00245           qos_params [i].property_value >>= lat;
00246           ace_flow_spec->latency (lat);
00247         }
00248       else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0)
00249         {
00250           CORBA::ULong delay_var;
00251           qos_params [i].property_value >>= delay_var;
00252           ace_flow_spec->delay_variation (delay_var);
00253         }
00254       else if (ACE_OS::strcmp (qos_params [i].property_name, "Max_SDU_Size") == 0)
00255         {
00256           CORBA::ULong max_sdu;
00257           qos_params [i].property_value >>= max_sdu;
00258           ace_flow_spec->max_sdu_size (max_sdu);
00259         }
00260       else if (ACE_OS::strcmp (qos_params [i].property_name, "Minimum_Policed_Size") == 0)
00261         {
00262           CORBA::ULong min_pol_size;
00263           qos_params [i].property_value >>= min_pol_size;
00264           ace_flow_spec->minimum_policed_size (min_pol_size);
00265         }
00266       else if (ACE_OS::strcmp (qos_params [i].property_name, "TTL") == 0)
00267         {
00268           CORBA::ULong ttl;
00269           qos_params [i].property_value >>= ttl;
00270           ace_flow_spec->ttl (ttl);
00271         }
00272       else if (ACE_OS::strcmp (qos_params [i].property_name, "Priority") == 0)
00273         {
00274           CORBA::ULong priority;
00275           qos_params [i].property_value >>= priority;
00276           ace_flow_spec->priority (priority);
00277         }
00278     }
00279 
00280   return 0;
00281 }
00282 
00283 int
00284 TAO_AV_UDP_QoS_Flow_Handler::translate (ACE_Flow_Spec *ace_flow_spec,
00285                                         CosPropertyService::Properties &qos_params)
00286 {
00287   qos_params.length (9);
00288 
00289   qos_params [0].property_name = CORBA::string_dup ("Service_Type");
00290   qos_params [0].property_value <<= (CORBA::Short) ace_flow_spec->service_type ();
00291 
00292   qos_params [1].property_name = CORBA::string_dup ("Token_Rate");
00293   qos_params [1].property_value <<= (CORBA::ULong) ace_flow_spec->token_rate ();
00294 
00295   qos_params [2].property_name = CORBA::string_dup ("Token_Bucket_Size");
00296   qos_params [2].property_value <<= (CORBA::ULong) ace_flow_spec->token_bucket_size ();
00297 
00298   qos_params [3].property_name = CORBA::string_dup ("Peak_Bandwidth");
00299   qos_params [3].property_value <<= (CORBA::ULong) ace_flow_spec->peak_bandwidth ();
00300 
00301   qos_params [4].property_name = CORBA::string_dup ("Latency");
00302   qos_params [4].property_value <<= (CORBA::ULong) ace_flow_spec->latency ();
00303 
00304   qos_params [5].property_name = CORBA::string_dup ("Delay_Variation");
00305   qos_params [5].property_value <<= (CORBA::ULong) ace_flow_spec->delay_variation ();
00306 
00307   qos_params [6].property_name = CORBA::string_dup ("Max_SDU_Size");
00308   qos_params [6].property_value <<= (CORBA::ULong) ace_flow_spec->max_sdu_size ();
00309 
00310   qos_params [7].property_name = CORBA::string_dup ("Minimum_Policed_Size");
00311   qos_params [7].property_value <<= (CORBA::ULong) ace_flow_spec->minimum_policed_size ();
00312 
00313   qos_params [8].property_name = CORBA::string_dup ("TTL");
00314   qos_params [8].property_value <<= (CORBA::ULong) ace_flow_spec->ttl ();
00315 
00316   return 0;
00317 }
00318 
00319 int
00320 TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/)
00321 {
00322 
00323   if (TAO_debug_level > 0)
00324     ACE_DEBUG ((LM_DEBUG,
00325                 "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n"));
00326 
00327   if (this->qos_session_->update_qos () == -1)
00328     ACE_ERROR_RETURN ((LM_ERROR,
00329                        "Error in updating QoS\n"),
00330                       -1);
00331   else
00332   {
00333     if(TAO_debug_level > 0)
00334       ACE_DEBUG ((LM_DEBUG,
00335                   "(%N,%l) Updating QOS succeeds.\n"));
00336   }
00337 
00338   if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_RESV_ERROR)
00339     {
00340       resv_error = 1;
00341     }
00342 
00343   if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_RESV_CONFIRM)
00344     {
00345       resv_confirm = 1;
00346     }
00347 
00348   if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_SENDER)
00349     {
00350       if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_RESV_EVENT)
00351             {
00352               if( TAO_debug_level > 0 )
00353               {
00354                  ACE_DEBUG ((LM_DEBUG,
00355                              "(%N,%l) Resv Event Received\n"));
00356               }
00357               if (!CORBA::is_nil (this->negotiator_))
00358                 {
00359                   if( TAO_debug_level > 0 )
00360                   {
00361                      ACE_DEBUG ((LM_DEBUG,
00362                                  "(%N,%l) Negotiator Specified\n"));
00363                   }
00364 
00365                   AVStreams::streamQoS new_qos;
00366                   ACE_Flow_Spec *ace_flow_spec =
00367                     this->qos_session_->qos ().sending_flowspec ();
00368 
00369                   if (ace_flow_spec != 0)
00370                     {
00371                       new_qos.length (1);
00372                       this->translate (ace_flow_spec,
00373                                        new_qos [0].QoSParams);
00374                     }
00375 
00376                   AVStreams::Negotiator_var remote_negotiator;
00377                   this->negotiator_->negotiate (remote_negotiator.in (),
00378                                                 new_qos);
00379                 }
00380             }
00381     }
00382   else if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_RECEIVER)
00383     {
00384       if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_PATH_EVENT)
00385         {
00386           ACE_QoS_Manager qos_manager =
00387             this->get_socket ()->qos_manager ();
00388 
00389           ACE_QoS ace_qos = this->qos_session_->qos ();
00390 
00391           this->qos_session_->qos (this->get_socket (),
00392                                    &qos_manager,
00393                                    ace_qos);
00394         }
00395     }
00396 
00397   return 0;
00398 }
00399 
00400 int
00401 TAO_AV_UDP_QoS_Flow_Handler::change_qos (AVStreams::QoS new_qos)
00402 {
00403   if( TAO_debug_level > 0 )
00404   {
00405      ACE_DEBUG ((LM_DEBUG,
00406                  "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::change_qos\n"));
00407   }
00408 
00409   ACE_QoS* ace_qos = 0;
00410 
00411   ACE_NEW_RETURN (ace_qos,
00412                   ACE_QoS,
00413                   -1);
00414 
00415   if (new_qos.QoSParams.length () != 0)
00416     {
00417       ACE_DEBUG ((LM_DEBUG,
00418                   "New QoS Params are not Empty\n"));
00419 
00420       ACE_Flow_Spec *ace_flow_spec;
00421 
00422       ACE_NEW_RETURN (ace_flow_spec,
00423                       ACE_Flow_Spec,
00424                       -1);
00425 
00426       this->translate (new_qos.QoSParams,
00427                        ace_flow_spec);
00428 
00429 
00430       Fill_ACE_QoS fill_ace_qos;
00431 
00432       if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_SENDER)
00433         {
00434           if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
00435                                                     ace_flow_spec) !=0)
00436             ACE_ERROR_RETURN ((LM_ERROR,
00437                                "Unable to fill simplex sender qos\n"),
00438                               -1);
00439           else
00440             {
00441               if( TAO_debug_level > 0 )
00442                 ACE_DEBUG ((LM_DEBUG,
00443                             "(%N,%l) Filled up the Sender QoS parameters\n"));
00444             }
00445         }
00446       else if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_RECEIVER)
00447         {
00448           if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
00449                                                       ace_flow_spec) !=0)
00450             ACE_ERROR_RETURN ((LM_ERROR,
00451                                "Unable to fill simplex receiver qos\n"),
00452                               -1);
00453           else
00454             {
00455               if( TAO_debug_level > 0 )
00456                 ACE_DEBUG ((LM_DEBUG,
00457                             "(%N,%l) Filled up the Receiver QoS parameters\n"));
00458             }
00459 
00460         }
00461 
00462       ACE_QoS_Params qos_params;
00463       FillQoSParams (qos_params,
00464                      0,
00465                      ace_qos);
00466     }
00467 
00468   ACE_QoS_Manager qos_manager =
00469     this->get_socket ()->qos_manager ();
00470 
00471   int result = this->qos_session_->qos (this->get_socket (),
00472                                         &qos_manager,
00473                                         *ace_qos);
00474   if (result != 0)
00475     return result;
00476 
00477   return 0;
00478 }
00479 
00480 int
00481 TAO_AV_UDP_QoS_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
00482                                                const void *arg)
00483 {
00484   return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
00485 }
00486 
00487 int
00488 TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address)
00489 {
00490 
00491   if (TAO_debug_level > 0)
00492     {
00493         char buf [BUFSIZ];
00494     ACE_INET_Addr *remote_addr = dynamic_cast<ACE_INET_Addr*> (address);
00495     remote_addr->addr_to_string (buf,
00496                      BUFSIZ);
00497 
00498     ACE_DEBUG ((LM_DEBUG,
00499             "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::set_remote_address %s\n",
00500             buf));
00501     }
00502 
00503 
00504   ACE_INET_Addr *inet_addr =
00505     dynamic_cast<ACE_INET_Addr*> (address);
00506 
00507   this->peer_addr_ = *inet_addr;
00508 
00509   TAO_AV_UDP_QoS_Transport *transport =
00510     dynamic_cast<TAO_AV_UDP_QoS_Transport*> (this->transport_);
00511 
00512   if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
00513     {
00514 
00515       TAO_AV_UDP_QoS_Session_Helper helper;
00516 
00517       this->qos_session_ = helper.open_qos_session (this,
00518                                                     *inet_addr);
00519 
00520       if (this->qos_session_ == 0)
00521         ACE_ERROR_RETURN ((LM_ERROR,
00522                            "QoS Session Open Failed (%N|%l)\n"),
00523                           -1);
00524 
00525       ACE_INET_Addr local_addr;
00526       this->get_socket ()->get_local_addr (local_addr);
00527 
00528       ACE_INET_Addr* src_addr;
00529       ACE_NEW_RETURN (src_addr,
00530               ACE_INET_Addr (local_addr.get_port_number (),
00531                      local_addr.get_host_name ()),
00532               -1);
00533 
00534       this->qos_session_->source_addr (src_addr);
00535 
00536       if (helper.activate_qos_handler (this->qos_session_,
00537                                        this) == -1)
00538         ACE_ERROR_RETURN ((LM_ERROR,
00539                            "Activating QoS Handler Failed (%N|%l)\n"),
00540                           -1);
00541     }
00542   return transport->set_remote_address (*inet_addr);
00543 }
00544 
00545 
00546 ACE_HANDLE
00547 TAO_AV_UDP_QoS_Flow_Handler::get_handle (void) const
00548 {
00549   if (TAO_debug_level > 0)
00550     ACE_DEBUG ((LM_DEBUG,
00551                 "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::get_handle:%d\n",
00552                 this->qos_sock_dgram_.get_handle ()));
00553 
00554   return this->qos_sock_dgram_.get_handle () ;
00555 }
00556 
00557 //------------------------------------------------------------
00558 // TAO_AV_UDP_Transport
00559 //------------------------------------------------------------
00560 
00561 TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (void)
00562   :handler_ (0)
00563 {
00564 }
00565 
00566 TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (TAO_AV_UDP_QoS_Flow_Handler *handler)
00567   :handler_ (handler),
00568    addr_ (0)
00569 {
00570 }
00571 
00572 TAO_AV_UDP_QoS_Transport::~TAO_AV_UDP_QoS_Transport (void)
00573 {
00574 }
00575 
00576 int
00577 TAO_AV_UDP_QoS_Transport::set_remote_address (const ACE_INET_Addr &address)
00578 {
00579   this->peer_addr_ = address;
00580   return 0;
00581 }
00582 
00583 int
00584 TAO_AV_UDP_QoS_Transport::open (ACE_Addr * /*address*/)
00585 {
00586   return 0;
00587 }
00588 
00589 int
00590 TAO_AV_UDP_QoS_Transport::close (void)
00591 {
00592   return 0;
00593 }
00594 
00595 int
00596 TAO_AV_UDP_QoS_Transport::mtu (void)
00597 {
00598   return ACE_MAX_DGRAM_SIZE;
00599 }
00600 
00601 ACE_Addr*
00602 TAO_AV_UDP_QoS_Transport::get_peer_addr (void)
00603 {
00604   return &this->peer_addr_;
00605 }
00606 
00607 ssize_t
00608 TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk,
00609                                 ACE_Time_Value *)
00610 {
00611   // For the most part this was copied from GIOP::send_request and
00612   // friends.
00613 
00614   iovec iov[IOV_MAX];
00615   int iovcnt = 0;
00616   ssize_t n = 0;
00617   ssize_t nbytes = 0;
00618 
00619   for (const ACE_Message_Block *i = mblk;
00620        i != 0;
00621        i = i->cont ())
00622     {
00623       // Make sure there is something to send!
00624       if (i->length () > 0)
00625         {
00626           iov[iovcnt].iov_base = i->rd_ptr ();
00627           iov[iovcnt].iov_len  = static_cast<u_long> (i->length ());
00628           iovcnt++;
00629 
00630           // The buffer is full make a OS call.  @@ TODO this should
00631           // be optimized on a per-platform basis, for instance, some
00632           // platforms do not implement writev() there we should copy
00633           // the data into a buffer and call send_n(). In other cases
00634           // there may be some limits on the size of the iovec, there
00635           // we should set IOV_MAX to that limit.
00636 
00637           size_t bytes_sent = 0;
00638 
00639           if (iovcnt == IOV_MAX)
00640             {
00641               if (this->handler_->get_socket ()->send (iov,
00642                                                        1,
00643                                                        bytes_sent,
00644                                                        0,
00645                                                        this->handler_->qos_session ()->dest_addr (),
00646                                                        0,
00647                                                        0) == -1)
00648                 ACE_ERROR_RETURN ((LM_ERROR,
00649                                    "Error in dgram_mcast.send () (%N|%l)\n"),
00650                                   -1);
00651               else
00652                 if (TAO_debug_level > 0)
00653                   ACE_DEBUG ((LM_DEBUG,
00654                               "Using ACE_OS::sendto () : Bytes sent : %d",
00655                               bytes_sent));
00656 
00657               if (n < 1)
00658                 return n;
00659 
00660               nbytes += bytes_sent;
00661               iovcnt = 0;
00662             }
00663         }
00664     }
00665 
00666   size_t bytes_sent = 0;
00667 
00668   // Check for remaining buffers to be sent!
00669   if (iovcnt != 0)
00670     {
00671       if (this->handler_->get_socket ()->send (iov,
00672                                                1,
00673                                                bytes_sent,
00674                                                0,
00675                                                this->handler_->qos_session ()->dest_addr (),
00676                                                0,
00677                                                0) == -1)
00678         ACE_ERROR_RETURN ((LM_ERROR,
00679                            "Error in dgram_mcast.send ()\n"),
00680                           -1);
00681       else
00682         if( TAO_debug_level > 0)
00683           ACE_DEBUG ((LM_DEBUG,
00684                       "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
00685                       bytes_sent));
00686 
00687       if (n < 1)
00688         return n;
00689 
00690       nbytes += bytes_sent;
00691     }
00692 
00693   return nbytes;
00694 }
00695 
00696 ssize_t
00697 TAO_AV_UDP_QoS_Transport::send (const char *buf,
00698                                 size_t len,
00699                                 ACE_Time_Value *)
00700 {
00701   if (TAO_debug_level > 0)
00702     ACE_DEBUG ((LM_DEBUG,
00703                 "(%N,%l) TAO_AV_UDP_QoS_Transport::send "));
00704 
00705   char addr [BUFSIZ];
00706   this->peer_addr_.addr_to_string (addr,BUFSIZ);
00707   if (TAO_debug_level > 0)
00708     ACE_DEBUG ((LM_DEBUG,
00709                 "(%N,%l) to %s\n",
00710                 addr));
00711 
00712   return this->handler_->get_socket ()->send (buf,
00713                                               len,
00714                                               this->handler_->qos_session ()->dest_addr (),
00715                                               0,
00716                                               0,
00717                                               0);
00718 }
00719 
00720 ssize_t
00721 TAO_AV_UDP_QoS_Transport::send (const iovec *iov,
00722                                 int /*iovcnt*/,
00723                                 ACE_Time_Value *)
00724 {
00725   size_t bytes_sent = 0;
00726   if (this->handler_->get_socket ()->send (iov,
00727                                            1,
00728                                            bytes_sent,
00729                                            0,
00730                                            this->handler_->qos_session ()->dest_addr (),
00731                                            0,
00732                                            0) == -1)
00733     ACE_ERROR_RETURN ((LM_ERROR,
00734                        "Error in dgram_mcast.send ()\n"),
00735                       -1);
00736   else
00737   {
00738     if( TAO_debug_level > 0)
00739        ACE_DEBUG ((LM_DEBUG,
00740                    "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
00741                    bytes_sent));
00742   }
00743 
00744 
00745   return bytes_sent;
00746 }
00747 
00748 ssize_t
00749 TAO_AV_UDP_QoS_Transport::recv (char *buf,
00750                                 size_t len,
00751                                 ACE_Time_Value *)
00752 {
00753   return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_);
00754 }
00755 
00756 ssize_t
00757 TAO_AV_UDP_QoS_Transport::recv (char *buf,
00758                                 size_t len,
00759                                 int flags,
00760                                 ACE_Time_Value *timeout)
00761 {
00762   return this->handler_->get_socket ()->recv (buf,
00763                                               len,
00764                                               this->peer_addr_,
00765                                               flags,
00766                                               timeout);
00767 }
00768 
00769 ssize_t
00770 TAO_AV_UDP_QoS_Transport::recv (iovec *iov,
00771                                 int /*iovcnt*/,
00772                                 ACE_Time_Value *timeout)
00773 {
00774   return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout);
00775 }
00776 
00777 
00778 //------------------------------------------------------------
00779 // TAO_AV_UDP_Acceptor
00780 //------------------------------------------------------------
00781 
00782 TAO_AV_UDP_QoS_Acceptor::TAO_AV_UDP_QoS_Acceptor (void)
00783 {
00784 }
00785 
00786 TAO_AV_UDP_QoS_Acceptor::~TAO_AV_UDP_QoS_Acceptor (void)
00787 {
00788 }
00789 
00790 int
00791 TAO_AV_UDP_QoS_Acceptor::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
00792 {
00793   int result = 0;
00794 
00795   if (TAO_debug_level > 0)
00796     ACE_DEBUG ((LM_DEBUG,
00797                 "(%N,%l) Acceptor Svc Handler QOS ENABLED \n"));
00798 
00799   TAO_AV_UDP_QoS_Session_Helper helper;
00800 
00801   result = helper.activate_qos_handler (handler->qos_session (),
00802                                         handler);
00803   if (result == -1)
00804     ACE_ERROR_RETURN ((LM_ERROR,
00805                        "Error in registering the Decorator with the Reactor\n"),
00806                       -1);
00807 
00808   return result;
00809 }
00810 
00811 int
00812 TAO_AV_UDP_QoS_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
00813                                TAO_AV_Core *av_core,
00814                                TAO_FlowSpec_Entry *entry,
00815                                TAO_AV_Flow_Protocol_Factory *factory,
00816                                TAO_AV_Core::Flow_Component flow_comp)
00817 {
00818   ACE_UNUSED_ARG (flow_comp);
00819 
00820   if (TAO_debug_level > 0)
00821     ACE_DEBUG ((LM_DEBUG,
00822                 "(%N,%l) TAO_AV_UDP_QoS_Acceptor::open "));
00823 
00824   this->av_core_ = av_core;
00825   this->endpoint_ = endpoint;
00826   this->entry_ = entry;
00827 
00828 
00829   this->flow_protocol_factory_ = factory;
00830   this->flowname_ = entry->flowname ();
00831   ACE_INET_Addr *inet_addr = (ACE_INET_Addr *) entry->address ();
00832 //   inet_addr->set (inet_addr->get_port_number (),
00833 //                   inet_addr->get_host_name ());
00834   char buf[BUFSIZ];
00835   inet_addr->addr_to_string (buf,
00836                              BUFSIZ);
00837   if (TAO_debug_level > 0)
00838     ACE_DEBUG ((LM_DEBUG,
00839                 "(%N,%l) TAO_AV_UDP_QoS_Acceptor::open: %s",
00840                 buf));
00841 
00842   int result = this->open_i (inet_addr);
00843 
00844   if (result < 0)
00845     return result;
00846 
00847   return 0;
00848 }
00849 
00850 int
00851 TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
00852                                        TAO_AV_Core *av_core,
00853                                        TAO_FlowSpec_Entry *entry,
00854                                        TAO_AV_Flow_Protocol_Factory *factory,
00855                                        TAO_AV_Core::Flow_Component flow_comp)
00856 {
00857   ACE_UNUSED_ARG (flow_comp);
00858 
00859   this->av_core_ = av_core;
00860   this->endpoint_ = endpoint;
00861   this->entry_ = entry;
00862   this->flow_protocol_factory_ = factory;
00863   this->flowname_ = entry->flowname ();
00864   char buf [BUFSIZ];
00865   ACE_OS::hostname (buf,
00866                     BUFSIZ);
00867   qos_acceptor_addr_.set((u_short)0, buf);
00868 /*  ACE_INET_Addr *address;
00869   ACE_NEW_RETURN (address,
00870                   ACE_INET_Addr ("0"),
00871                   -1);
00872 
00873   address->addr_to_string (buf,
00874                            BUFSIZ);*/
00875   ACE_DEBUG ((LM_DEBUG,
00876               "(%N,%l) ADDRESS IS %s:%d\n",
00877               buf, qos_acceptor_addr_.get_port_number() ));
00878 
00879   int result = this->open_i (&qos_acceptor_addr_);
00880   if (result < 0)
00881     return result;
00882   return 0;
00883 }
00884 
00885 
00886 int
00887 TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
00888 {
00889   int result = 0;
00890 
00891   //  TAO_AV_Callback *callback = 0;
00892   //   this->endpoint_->get_callback (this->flowname_.c_str (),
00893   //                                  callback);
00894   ACE_INET_Addr *local_addr;
00895 
00896   ACE_NEW_RETURN (local_addr,
00897                   ACE_INET_Addr (*inet_addr),
00898                   -1);
00899 
00900   ACE_QoS_Params qos_params;
00901 
00902   ACE_QoS* ace_qos = 0;
00903 
00904   FillQoSParams (qos_params,
00905                  0,
00906                  ace_qos);
00907 
00908 
00909   TAO_AV_UDP_QoS_Flow_Handler* handler;
00910   ACE_NEW_RETURN (handler,
00911                   TAO_AV_UDP_QoS_Flow_Handler,
00912                   -1);
00913 
00914 
00915   TAO_AV_Flow_Handler *flow_handler = 0;
00916   flow_handler = handler;
00917 
00918   handler->endpoint (this->endpoint_);
00919   handler->flowspec_entry (this->entry_);
00920   handler->av_core (this->av_core_);
00921 
00922   if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
00923     {
00924 
00925       TAO_AV_UDP_QoS_Session_Helper helper;
00926 
00927       int result = handler->get_socket ()->open (*inet_addr,
00928                                                  qos_params,
00929                                                  AF_INET,
00930                                                  0,
00931                                                  0,
00932                                                  0,
00933                                                  ACE_OVERLAPPED_SOCKET_FLAG
00934                                                  | ACE_FLAG_MULTIPOINT_C_LEAF
00935                                                  | ACE_FLAG_MULTIPOINT_D_LEAF,
00936                                                  1);
00937 
00938       if (result < 0)
00939         ACE_ERROR_RETURN ((LM_ERROR,
00940                            "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"),
00941                           -1);
00942 
00943       result = handler->get_socket ()->get_local_addr (*local_addr);
00944       if (result < 0)
00945         ACE_ERROR_RETURN ((LM_ERROR,
00946                            "Error in getting Local Address (%N|%l)\n"),
00947                           -1);
00948 
00949       // Create a destination address for the QoS session. The same
00950       // address should be used for the subscribe call later. A copy
00951       // is made below only to distinguish the two usages of the dest
00952       // address.
00953       ACE_INET_Addr dest_addr;
00954       dest_addr.set  (local_addr->get_port_number (),
00955                       local_addr->get_host_name ());
00956 
00957       char dest_buf [BUFSIZ];
00958       dest_addr.addr_to_string (dest_buf,
00959                                 BUFSIZ);
00960 
00961       if (TAO_debug_level > 0)
00962         ACE_DEBUG ((LM_DEBUG,
00963                     "Session Address is %s\n",
00964                     dest_buf));
00965 
00966       this->qos_session_ = helper.open_qos_session (handler,
00967                                                     dest_addr);
00968 
00969       if (this->qos_session_ == 0)
00970         ACE_ERROR_RETURN ((LM_ERROR,
00971                            "QoS Session Open Failed (%N|%l)\n"),
00972                           -1);
00973 
00974       handler->qos_session (this->qos_session_);
00975 
00976       if (this->activate_svc_handler (handler) == -1)
00977         ACE_ERROR_RETURN ((LM_ERROR,
00978                            "Activate Svc Handler Failed (%N|%l)\n"),
00979                           -1);
00980 
00981       AVStreams::QoS qos;
00982       int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
00983                                                                 qos);
00984 
00985       if (qos_available == 0)
00986         {
00987 
00988           ACE_Flow_Spec *ace_flow_spec = 0;
00989           ACE_NEW_RETURN (ace_flow_spec,
00990                           ACE_Flow_Spec,
00991                           -1);
00992 
00993           handler->translate (qos.QoSParams,
00994                               ace_flow_spec);
00995 
00996           if (helper.set_qos (*ace_flow_spec,
00997                               handler) == -1)
00998 
00999             ACE_ERROR_RETURN ((LM_ERROR,
01000                                "Set QoS Failed (%N|%l)\n"),
01001                               -1);
01002         }
01003     }
01004   else
01005     {
01006 
01007       int result = handler->get_socket ()->open (*inet_addr,
01008                                                  qos_params,
01009                                                  AF_INET,
01010                                                  0,
01011                                                  0,
01012                                                  0,
01013                                                  ACE_OVERLAPPED_SOCKET_FLAG
01014                                                  | ACE_FLAG_MULTIPOINT_C_LEAF
01015                                                  | ACE_FLAG_MULTIPOINT_D_LEAF,
01016                                                  1);
01017 
01018       if (result < 0)
01019         ACE_ERROR_RETURN ((LM_ERROR,
01020                            "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"),
01021                           -1);
01022     }
01023 
01024   TAO_AV_Protocol_Object *object =
01025     this->flow_protocol_factory_->make_protocol_object (this->entry_,
01026                                                         this->endpoint_,
01027                                                         flow_handler,
01028                                                         flow_handler->transport ());
01029   flow_handler->protocol_object (object);
01030 
01031   AVStreams::Negotiator_ptr negotiator;
01032 
01033   try
01034     {
01035       CORBA::Any_ptr negotiator_any =
01036         this->endpoint_->get_property_value ("Negotiator");
01037 
01038       *negotiator_any >>= negotiator;
01039       handler->negotiator (negotiator);
01040     }
01041   catch (const CORBA::Exception&)
01042     {
01043       ACE_DEBUG ((LM_DEBUG,
01044                   "(%N,%l) Negotiator Not Found \n"));
01045     }
01046 
01047   this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
01048   this->entry_->protocol_object (object);
01049 
01050   result = handler->get_socket ()->get_local_addr (*local_addr);
01051   if (result < 0)
01052     ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
01053   local_addr->set (local_addr->get_port_number (),
01054                    local_addr->get_host_name ());
01055 
01056   if (TAO_debug_level > 0)
01057     {
01058       char buf [BUFSIZ];
01059       local_addr->addr_to_string (buf,
01060                                   BUFSIZ);
01061       ACE_DEBUG ((LM_DEBUG,
01062                   "Local Address is %s\n",
01063                   buf));
01064     }
01065 
01066   this->entry_->set_local_addr (local_addr);
01067   this->entry_->handler (flow_handler);
01068 
01069   return 0;
01070 
01071 }
01072 
01073 int
01074 TAO_AV_UDP_QoS_Acceptor::close (void)
01075 {
01076   return 0;
01077 }
01078 
01079 //------------------------------------------------------------
01080 // TAO_AV_UDP_Connector
01081 //------------------------------------------------------------
01082 TAO_AV_UDP_QoS_Connector::TAO_AV_UDP_QoS_Connector (void)
01083 {
01084 }
01085 
01086 TAO_AV_UDP_QoS_Connector::~TAO_AV_UDP_QoS_Connector (void)
01087 {
01088 }
01089 
01090 int
01091 TAO_AV_UDP_QoS_Connector::open (TAO_Base_StreamEndPoint *endpoint,
01092                                 TAO_AV_Core *av_core,
01093                                 TAO_AV_Flow_Protocol_Factory *factory)
01094 
01095 {
01096   if (TAO_debug_level > 0)
01097     ACE_DEBUG ((LM_DEBUG,
01098                 "TAO_AV_UDP_QoS_Connector::open "));
01099 
01100   this->endpoint_ = endpoint;
01101   this->av_core_ = av_core;
01102   this->flow_protocol_factory_ = factory;
01103   return 0;
01104 }
01105 
01106 //  int
01107 //  TAO_AV_UDP_QoS_Connector::translate (CosPropertyService::Properties &qos_params,
01108 //                                   ACE_Flow_Spec *ace_flow_spec)
01109 //  {
01110 //    for (unsigned int i = 0;
01111 //         i < qos_params.length ();
01112 //         i++)
01113 //      {
01114 //        if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0)
01115 //          {
01116 //            CORBA::Short type;
01117 //            qos_params [i].property_value >>= type;
01118 //            ace_flow_spec->service_type (type);
01119 //          }
01120 //        else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0)
01121 //          {
01122 //            CORBA::ULong tok_rate;
01123 //            qos_params [i].property_value >>= tok_rate;
01124 //            ace_flow_spec->token_rate (tok_rate);
01125 //          }
01126 //        else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Rate") == 0)
01127 //          {
01128 //            CORBA::ULong tok_buck_size;
01129 //            qos_params [i].property_value >>= tok_buck_size;
01130 //            ace_flow_spec->token_bucket_size (tok_buck_size);
01131 //          }
01132 //        else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0)
01133 //          {
01134 //            CORBA::ULong peak_bw;
01135 //            qos_params [i].property_value >>= peak_bw;
01136 //            ace_flow_spec->peak_bandwidth (peak_bw);
01137 //          }
01138 //        else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0)
01139 //          {
01140 //            CORBA::ULong lat;
01141 //            qos_params [i].property_value >>= lat;
01142 //            ace_flow_spec->latency (lat);
01143 //          }
01144 //        else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0)
01145 //          {
01146 //            CORBA::ULong delay_var;
01147 //            qos_params [i].property_value >>= delay_var;
01148 //            ace_flow_spec->delay_variation (delay_var);
01149 //          }
01150 
01151 //      }
01152 
01153 //    return 0;
01154 //  }
01155 
01156 
01157 int
01158 TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry,
01159                                    TAO_AV_Transport *&transport,
01160                                    TAO_AV_Core::Flow_Component flow_comp)
01161 {
01162   ACE_UNUSED_ARG (flow_comp);
01163 
01164   int result = 0;
01165   this->entry_ = entry;
01166   this->flowname_ = entry->flowname ();
01167 
01168   ACE_INET_Addr *local_addr;
01169 
01170   if (entry->get_peer_addr () != 0)
01171     {
01172       local_addr = dynamic_cast<ACE_INET_Addr*> (entry->get_peer_addr ());
01173     }
01174   else
01175     ACE_NEW_RETURN (local_addr,
01176             ACE_INET_Addr ("0"),
01177             -1);
01178 
01179   TAO_AV_Flow_Handler *flow_handler = 0;
01180 
01181   TAO_AV_UDP_QoS_Flow_Handler *handler;
01182   ACE_NEW_RETURN (handler,
01183                   TAO_AV_UDP_QoS_Flow_Handler,
01184                   -1);
01185 
01186   flow_handler = handler;
01187 
01188   handler->endpoint (this->endpoint_);
01189   handler->flowspec_entry (this->entry_);
01190   handler->av_core (this->av_core_);
01191 
01192   ACE_INET_Addr *inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->address ());
01193 
01194   ACE_QoS_Params qos_params;
01195 
01196   ACE_QoS* ace_qos = 0;
01197 
01198   FillQoSParams (qos_params,
01199                  0,
01200                  ace_qos);
01201 
01202   result = handler->get_socket ()->open (*local_addr,
01203                                          qos_params,
01204                                          AF_INET,
01205                                          0,
01206                                          0,
01207                                          0,
01208                                          ACE_OVERLAPPED_SOCKET_FLAG
01209                                          | ACE_FLAG_MULTIPOINT_C_LEAF
01210                                          | ACE_FLAG_MULTIPOINT_D_LEAF,
01211                                          1);
01212 
01213   if (result < 0)
01214     ACE_ERROR_RETURN ((LM_ERROR,
01215                        "Data socket open failed (%N|%l)\n"),
01216                       -1);
01217 
01218   result = handler->get_socket ()->get_local_addr (*local_addr);
01219 
01220 
01221   ACE_INET_Addr *session_addr = 0;
01222   if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
01223     {
01224       ACE_NEW_RETURN (session_addr,
01225                       ACE_INET_Addr,
01226                       -1);
01227 
01228       session_addr->set (local_addr->get_port_number (),
01229                          local_addr->get_host_name ());
01230 
01231     }
01232   else
01233     {
01234       session_addr = inet_addr;
01235     }
01236 
01237   char sess_buf [BUFSIZ];
01238   session_addr->addr_to_string (sess_buf,
01239                                 BUFSIZ);
01240 
01241   if (TAO_debug_level > 0)
01242     ACE_DEBUG ((LM_DEBUG,
01243                 "Session Address is %s\n",
01244                 sess_buf));
01245 
01246   // Create a destination address for the QoS session. The same
01247   // address should be used for the subscribe call later. A copy
01248   // is made below only to distinguish the two usages of the dest
01249   // address.
01250   ACE_INET_Addr dest_addr (*session_addr);
01251 
01252   TAO_AV_UDP_QoS_Session_Helper helper;
01253 
01254   this->qos_session_ = helper.open_qos_session (handler,
01255                                                 *session_addr);
01256 
01257   if (this->qos_session_ == 0)
01258     ACE_ERROR_RETURN ((LM_ERROR,
01259                        "QoS Session Open Failed (%N|%l)\n"),
01260                       -1);
01261   else
01262     ACE_DEBUG ((LM_DEBUG,
01263                 "QoS session opened successfully\n"));
01264 
01265   if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
01266     {
01267       //this->qos_session_->source_port (local_addr->get_port_number ());
01268       ACE_INET_Addr* src_addr;
01269       ACE_NEW_RETURN (src_addr,
01270               ACE_INET_Addr (local_addr->get_port_number (),
01271                      local_addr->get_host_name ()),
01272               -1);
01273 
01274       this->qos_session_->source_addr (src_addr);
01275 
01276     }
01277 
01278   handler->qos_session (this->qos_session_);
01279 
01280   this->qos_manager_ =
01281     handler->get_socket ()->qos_manager ();
01282 
01283   AVStreams::QoS qos;
01284 
01285   int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
01286                                                             qos);
01287   if (qos_available == 0)
01288     {
01289 
01290       ACE_Flow_Spec* ace_flow_spec;
01291       ACE_NEW_RETURN (ace_flow_spec,
01292                       ACE_Flow_Spec,
01293                       -1);
01294 
01295       handler->translate (qos.QoSParams,
01296                           ace_flow_spec);
01297 
01298       if (helper.set_qos (*ace_flow_spec,
01299                           handler) == -1)
01300         ACE_ERROR_RETURN ((LM_ERROR,
01301                            "Unable to set QoS (%N|%l)\n"),
01302                           -1);
01303       else
01304       {
01305         if( TAO_debug_level > 0)
01306             ACE_DEBUG ((LM_DEBUG,
01307                     "(%N,%l) Setting QOS succeeds.\n"));
01308       }
01309     }
01310 
01311   TAO_AV_Protocol_Object *object =
01312     this->flow_protocol_factory_->make_protocol_object (this->entry_,
01313                                                         this->endpoint_,
01314                                                         flow_handler,
01315                                                         flow_handler->transport ());
01316 
01317   AVStreams::Negotiator_ptr negotiator;
01318 
01319   try
01320     {
01321       CORBA::Any_ptr negotiator_any =
01322         this->endpoint_->get_property_value ("Negotiator");
01323 
01324       *negotiator_any >>= negotiator;
01325       handler->negotiator (negotiator);
01326     }
01327   catch (const CORBA::Exception&)
01328     {
01329       ACE_DEBUG ((LM_DEBUG,
01330                   "Negotiator not found for flow %s\n",
01331                   this->entry_->flowname ()));
01332     }
01333 
01334   flow_handler->protocol_object (object);
01335 
01336   this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
01337   this->entry_->protocol_object (object);
01338 
01339   result = handler->get_socket ()->get_local_addr (*local_addr);
01340   if (result < 0)
01341     ACE_ERROR_RETURN ((LM_ERROR,
01342                        "Get local addr failed (%N|%l)\n"),
01343                       result);
01344 
01345   local_addr->set (local_addr->get_port_number (),
01346                    local_addr->get_host_name ());
01347 
01348   if (TAO_debug_level > 0)
01349     {
01350       char buf[BUFSIZ];
01351       local_addr->addr_to_string (buf,
01352                                   BUFSIZ);
01353 
01354       ACE_DEBUG ((LM_DEBUG,
01355                   "Local Address is %s\n",
01356                   buf));
01357     }
01358 
01359   entry->set_local_addr (local_addr);
01360   entry->handler (flow_handler);
01361   transport = flow_handler->transport ();
01362 
01363   // call activate svc handler.
01364   return this->activate_svc_handler (handler);
01365 }
01366 
01367 int
01368 TAO_AV_UDP_QoS_Connector::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
01369 {
01370   int result = 0;
01371 
01372   TAO_AV_UDP_QoS_Session_Helper helper;
01373 
01374   result = helper.activate_qos_handler (this->qos_session_,
01375                                         handler);
01376 
01377   if (result == -1)
01378     ACE_ERROR_RETURN ((LM_ERROR,
01379                        "(%N,%l) Error in registering the Decorator with the Reactor\n"),
01380                       -1);
01381 
01382   return result;
01383 }
01384 
01385 int
01386 TAO_AV_UDP_QoS_Connector::close (void)
01387 {
01388   return 0;
01389 }
01390 
01391 //------------------------------------------------------------
01392 // TAO_AV_UDP_Protocol_Factory
01393 //------------------------------------------------------------
01394 
01395 TAO_AV_UDP_QoS_Factory::TAO_AV_UDP_QoS_Factory (void)
01396 {
01397   if (TAO_debug_level > 0)
01398     ACE_DEBUG ((LM_DEBUG,
01399                 "TAO_AV_UDP_QoS_Factory::TAO_AV_UDP_QoS_Factory\n"));
01400 }
01401 
01402 TAO_AV_UDP_QoS_Factory::~TAO_AV_UDP_QoS_Factory (void)
01403 {
01404 }
01405 
01406 int
01407 TAO_AV_UDP_QoS_Factory::match_protocol (const char *protocol_string)
01408 {
01409   if (TAO_debug_level > 0)
01410     ACE_DEBUG ((LM_DEBUG,
01411                 "TAO_AV_UDP_QoS_Factory::match_protocol\n"));
01412 
01413   if (ACE_OS::strcasecmp (protocol_string,"QoS_UDP") == 0)
01414     return 1;
01415   return 0;
01416 }
01417 
01418 TAO_AV_Acceptor*
01419 TAO_AV_UDP_QoS_Factory::make_acceptor (void)
01420 {
01421   if (TAO_debug_level > 0)
01422     ACE_DEBUG ((LM_DEBUG,
01423                 "TAO_AV_UDP_QoS_Factory::make_acceptor "));
01424 
01425   TAO_AV_Acceptor *acceptor = 0;
01426   ACE_NEW_RETURN (acceptor,
01427                   TAO_AV_UDP_QoS_Acceptor,
01428                   0);
01429   return acceptor;
01430 }
01431 
01432 TAO_AV_Connector*
01433 TAO_AV_UDP_QoS_Factory::make_connector (void)
01434 {
01435   if (TAO_debug_level > 0)
01436     ACE_DEBUG ((LM_DEBUG,
01437                 "TAO_AV_UDP_QoS_Factory::make_connector "));
01438 
01439   TAO_AV_Connector *connector = 0;
01440   ACE_NEW_RETURN (connector,
01441                   TAO_AV_UDP_QoS_Connector,
01442                   0);
01443   return connector;
01444 }
01445 
01446 int
01447 TAO_AV_UDP_QoS_Factory::init (int /* argc */,
01448                               char * /* argv */ [])
01449 {
01450   return 0;
01451 }
01452 
01453 
01454 //------------------------------------------------------------
01455 // TAO_AV_UDP_Flow_Factory
01456 //------------------------------------------------------------
01457 TAO_AV_UDP_QoS_Flow_Factory::TAO_AV_UDP_QoS_Flow_Factory (void)
01458 {
01459   if (TAO_debug_level > 0)
01460     ACE_DEBUG ((LM_DEBUG,
01461                 "TAO_AV_UDP_QoS_Flow_Factory::TAO_AV_UDP_QoS_Flow_Factory\n"));
01462 }
01463 
01464 TAO_AV_UDP_QoS_Flow_Factory::~TAO_AV_UDP_QoS_Flow_Factory (void)
01465 {
01466 }
01467 
01468 int
01469 TAO_AV_UDP_QoS_Flow_Factory::init (int /* argc */,
01470                                    char * /* argv */ [])
01471 {
01472   return 0;
01473 }
01474 
01475 int
01476 TAO_AV_UDP_QoS_Flow_Factory::match_protocol (const char *flow_string)
01477 {
01478   if (ACE_OS::strcasecmp (flow_string,"QoS_UDP") == 0)
01479     return 1;
01480   return 0;
01481 }
01482 
01483 TAO_AV_Protocol_Object*
01484 TAO_AV_UDP_QoS_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
01485                                                    TAO_Base_StreamEndPoint *endpoint,
01486                                                    TAO_AV_Flow_Handler *handler,
01487                                                    TAO_AV_Transport *transport)
01488 {
01489   TAO_AV_Callback *callback = 0;
01490   endpoint->get_callback (entry->flowname (),
01491                           callback);
01492 
01493 
01494   TAO_AV_UDP_Object *object = 0;
01495   ACE_NEW_RETURN (object,
01496                   TAO_AV_UDP_Object (callback,
01497                                      transport),
01498                   0);
01499   callback->open (object,
01500                   handler);
01501   endpoint->set_protocol_object (entry->flowname (),
01502                                  object);
01503   return object;
01504 }
01505 
01506 TAO_END_VERSIONED_NAMESPACE_DECL
01507 
01508 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_QoS_Flow_Factory)
01509 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_QoS_Flow_Factory,
01510                        ACE_TEXT ("UDP_QoS_Flow_Factory"),
01511                        ACE_SVC_OBJ_T,
01512                        &ACE_SVC_NAME (TAO_AV_UDP_QoS_Flow_Factory),
01513                        ACE_Service_Type::DELETE_THIS |
01514                        ACE_Service_Type::DELETE_OBJ,
01515                        0)
01516 
01517 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_QoS_Factory)
01518 
01519 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_QoS_Factory,
01520                        ACE_TEXT ("UDP_QoS_Factory"),
01521                        ACE_SVC_OBJ_T,
01522                        &ACE_SVC_NAME (TAO_AV_UDP_QoS_Factory),
01523                        ACE_Service_Type::DELETE_THIS |
01524                        ACE_Service_Type::DELETE_OBJ,
01525                        0)
01526 
01527 #endif /* ACE_HAS_RAPI || ACE_HAS_WINSOCK2_GQOS */

Generated on Tue Feb 2 17:47:49 2010 for TAO_AV by  doxygen 1.4.7