QoS_UDP.cpp

Go to the documentation of this file.
00001 // QoS_UDP.cpp,v 1.24 2006/03/14 06:14:24 jtc Exp
00002 
00003 
00004 #include "orbsvcs/AV/QoS_UDP.h"
00005 
00006 #if defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS)
00007 
00008 #include "orbsvcs/AV/UDP.h"
00009 #include "orbsvcs/AV/AVStreams_i.h"
00010 #include "orbsvcs/AV/MCast.h"
00011 #include "orbsvcs/AV/Fill_ACE_QoS.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "orbsvcs/AV/QoS_UDP.i"
00015 #endif /* __ACE_INLINE__ */
00016 
00017 //------------------------------------------------------------
00018 // TAO_AV_UDP_Flow_Handler
00019 //------------------------------------------------------------
00020 
00021 static int resv_error = 0;
00022 static int resv_confirm = 0;
00023 
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 int
00027 FillQoSParams (ACE_QoS_Params &qos_params,
00028                iovec* iov,
00029                ACE_QoS* qos)
00030 {
00031   qos_params.callee_data (iov);
00032   qos_params.caller_data (0);
00033   qos_params.socket_qos (qos);
00034   qos_params.group_socket_qos (0);
00035   qos_params.flags (ACE_JL_BOTH);
00036 
00037   return 0;
00038 }
00039 
00040 TAO_AV_UDP_QoS_Session_Helper::TAO_AV_UDP_QoS_Session_Helper (void)
00041 {
00042 
00043 }
00044 
00045 TAO_AV_UDP_QoS_Session_Helper::~TAO_AV_UDP_QoS_Session_Helper (void)
00046 {
00047 }
00048 
00049 int
00050 TAO_AV_UDP_QoS_Session_Helper::set_qos (ACE_Flow_Spec &ace_flow_spec,
00051                                         TAO_AV_UDP_QoS_Flow_Handler *handler)
00052 {
00053   ACE_QoS* ace_qos = 0;
00054 
00055   ACE_NEW_RETURN (ace_qos,
00056                   ACE_QoS,
00057                   -1);
00058 
00059   Fill_ACE_QoS fill_ace_qos;
00060 
00061   if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
00062     {
00063       if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
00064                                                 &ace_flow_spec) !=0)
00065         ACE_ERROR_RETURN ((LM_ERROR,
00066                            "Unable to fill simplex sender qos (%N|%l)\n"),
00067                           -1);
00068       else
00069         if (TAO_debug_level > 0)
00070           ACE_DEBUG ((LM_DEBUG,
00071                       "Filled up the Sender QoS parameters\n"));
00072     }
00073   else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
00074     {
00075       if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
00076                                                   &ace_flow_spec) !=0)
00077         ACE_ERROR_RETURN ((LM_ERROR,
00078                            "Unable to fill simplex receiver qos (%N|%l)\n"),
00079                           -1);
00080       else
00081         if (TAO_debug_level > 0)
00082           ACE_DEBUG ((LM_DEBUG,
00083                       "Filled up the Receiver QoS parameters\n"));
00084 
00085     }
00086 
00087   ACE_QoS_Manager qos_manager = handler->get_socket ()->qos_manager ();
00088 
00089   // Set the QoS for the session. Replaces the ioctl () call that
00090   // was being made previously.
00091   if (handler->qos_session ()->qos (handler->get_socket (),
00092                                     &qos_manager,
00093                                     *ace_qos) == -1)
00094     ACE_ERROR_RETURN ((LM_ERROR,
00095                        "Unable to set QoS (%N|%l)\n"),
00096                       -1);
00097   else
00098     ACE_DEBUG ((LM_DEBUG,
00099                 "Setting QOS succeeds\n"));
00100 
00101   return 0;
00102 }
00103 
00104 ACE_QoS_Session *
00105 TAO_AV_UDP_QoS_Session_Helper::open_qos_session (TAO_AV_UDP_QoS_Flow_Handler *handler,
00106                                                  ACE_INET_Addr &addr)
00107 {
00108   ACE_QoS_Params qos_params;
00109 
00110   ACE_QoS* ace_qos = 0;
00111 
00112   FillQoSParams (qos_params,
00113                  0,
00114                  ace_qos);
00115 
00116 
00117   // Create a QoS Session Factory.
00118   ACE_QoS_Session_Factory session_factory;
00119 
00120   // Ask the factory to create a QoS session.
00121   ACE_QoS_Session *qos_session = session_factory.create_session ();
00122 
00123   // Create a destination address for the QoS session. The same
00124   // address should be used for the subscribe call later. A copy
00125   // is made below only to distinguish the two usages of the dest
00126   // address.
00127   ACE_INET_Addr dest_addr (addr);
00128 
00129   // A QoS session is defined by the 3-tuple [DestAddr, DestPort,
00130   // Protocol]. Initialize the QoS session.
00131   if (qos_session->open (dest_addr,
00132                          IPPROTO_UDP) == -1)
00133     ACE_ERROR_RETURN ((LM_ERROR,
00134                        "Error in opening the QoS session\n"),
00135                       0);
00136   else
00137     ACE_DEBUG ((LM_DEBUG,
00138                 "QoS session opened successfully\n"));
00139 
00140   if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
00141     {
00142       // This is a sender
00143       qos_session->flags (ACE_QoS_Session::ACE_QOS_SENDER);
00144     }
00145   else if (handler->flowspec_entry ()->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
00146     {
00147       // This is a receiver
00148       qos_session->flags (ACE_QoS_Session::ACE_QOS_RECEIVER);
00149     }
00150 
00151   return qos_session;
00152 }
00153 
00154 int
00155 TAO_AV_UDP_QoS_Session_Helper::activate_qos_handler (ACE_QoS_Session *qos_session,
00156                                                      TAO_AV_UDP_QoS_Flow_Handler *handler)
00157 {
00158   ACE_QoS_Decorator* qos_decorator;
00159 
00160   // Decorate the above handler with QoS functionality.
00161   ACE_NEW_RETURN (qos_decorator,
00162                   ACE_QoS_Decorator (handler,
00163                                      qos_session,
00164                                      handler->av_core ()->reactor ()),
00165                   -1);
00166 
00167   // Initialize the Decorator.
00168   if (qos_decorator->init () != 0)
00169     ACE_ERROR_RETURN ((LM_ERROR,
00170                        "QoS Decorator init () failed (%N|%l)\n"),
00171                       -1);
00172 
00173   // Register the decorated Event Handler with the Reactor.
00174   int result = handler->av_core ()->reactor ()->register_handler (qos_decorator,
00175                                                                   ACE_Event_Handler::QOS_MASK |
00176                                                                   ACE_Event_Handler::READ_MASK);
00177   if (result == -1)
00178     ACE_ERROR_RETURN ((LM_ERROR,
00179                        "Error in registering the Decorator with the Reactor (%N|%l)\n"),
00180                       -1);
00181 
00182   return 0;
00183 
00184 }
00185 
00186 TAO_AV_UDP_QoS_Flow_Handler::TAO_AV_UDP_QoS_Flow_Handler (void)
00187 {
00188   ACE_NEW (this->transport_,
00189            TAO_AV_UDP_QoS_Transport (this));
00190 }
00191 
00192 TAO_AV_UDP_QoS_Flow_Handler::~TAO_AV_UDP_QoS_Flow_Handler (void)
00193 {
00194   delete this->transport_;
00195 }
00196 
00197 TAO_AV_Transport *
00198 TAO_AV_UDP_QoS_Flow_Handler::transport (void)
00199 {
00200   return this->transport_;
00201 }
00202 
00203 int
00204 TAO_AV_UDP_QoS_Flow_Handler::handle_input (ACE_HANDLE /*fd*/)
00205 {
00206   this->protocol_object_->handle_input ();
00207   return 0;
00208 }
00209 
00210 int
00211 TAO_AV_UDP_QoS_Flow_Handler::translate (CosPropertyService::Properties &qos_params,
00212                                         ACE_Flow_Spec *ace_flow_spec)
00213 {
00214   for (unsigned int i = 0;
00215        i < qos_params.length ();
00216        i++)
00217     {
00218       if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0)
00219         {
00220           CORBA::Short type;
00221           qos_params [i].property_value >>= type;
00222           ace_flow_spec->service_type (type);
00223         }
00224       else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0)
00225         {
00226           CORBA::ULong tok_rate;
00227           qos_params [i].property_value >>= tok_rate;
00228           ace_flow_spec->token_rate (tok_rate);
00229         }
00230       else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Size") == 0)
00231         {
00232           CORBA::ULong tok_buck_size;
00233           qos_params [i].property_value >>= tok_buck_size;
00234           ace_flow_spec->token_bucket_size (tok_buck_size);
00235         }
00236       else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0)
00237         {
00238           CORBA::ULong peak_bw;
00239           qos_params [i].property_value >>= peak_bw;
00240           ace_flow_spec->peak_bandwidth (peak_bw);
00241         }
00242       else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0)
00243         {
00244           CORBA::ULong lat;
00245           qos_params [i].property_value >>= lat;
00246           ace_flow_spec->latency (lat);
00247         }
00248       else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0)
00249         {
00250           CORBA::ULong delay_var;
00251           qos_params [i].property_value >>= delay_var;
00252           ace_flow_spec->delay_variation (delay_var);
00253         }
00254       else if (ACE_OS::strcmp (qos_params [i].property_name, "Max_SDU_Size") == 0)
00255         {
00256           CORBA::ULong max_sdu;
00257           qos_params [i].property_value >>= max_sdu;
00258           ace_flow_spec->max_sdu_size (max_sdu);
00259         }
00260       else if (ACE_OS::strcmp (qos_params [i].property_name, "Minimum_Policed_Size") == 0)
00261         {
00262           CORBA::ULong min_pol_size;
00263           qos_params [i].property_value >>= min_pol_size;
00264           ace_flow_spec->minimum_policed_size (min_pol_size);
00265         }
00266       else if (ACE_OS::strcmp (qos_params [i].property_name, "TTL") == 0)
00267         {
00268           CORBA::ULong ttl;
00269           qos_params [i].property_value >>= ttl;
00270           ace_flow_spec->ttl (ttl);
00271         }
00272       else if (ACE_OS::strcmp (qos_params [i].property_name, "Priority") == 0)
00273         {
00274           CORBA::ULong priority;
00275           qos_params [i].property_value >>= priority;
00276           ace_flow_spec->priority (priority);
00277         }
00278     }
00279 
00280   return 0;
00281 }
00282 
00283 int
00284 TAO_AV_UDP_QoS_Flow_Handler::translate (ACE_Flow_Spec *ace_flow_spec,
00285                                         CosPropertyService::Properties &qos_params)
00286 {
00287   qos_params.length (9);
00288 
00289   qos_params [0].property_name = CORBA::string_dup ("Service_Type");
00290   qos_params [0].property_value <<= (CORBA::Short) ace_flow_spec->service_type ();
00291 
00292   qos_params [1].property_name = CORBA::string_dup ("Token_Rate");
00293   qos_params [1].property_value <<= (CORBA::ULong) ace_flow_spec->token_rate ();
00294 
00295   qos_params [2].property_name = CORBA::string_dup ("Token_Bucket_Size");
00296   qos_params [2].property_value <<= (CORBA::ULong) ace_flow_spec->token_bucket_size ();
00297 
00298   qos_params [3].property_name = CORBA::string_dup ("Peak_Bandwidth");
00299   qos_params [3].property_value <<= (CORBA::ULong) ace_flow_spec->peak_bandwidth ();
00300 
00301   qos_params [4].property_name = CORBA::string_dup ("Latency");
00302   qos_params [4].property_value <<= (CORBA::ULong) ace_flow_spec->latency ();
00303 
00304   qos_params [5].property_name = CORBA::string_dup ("Delay_Variation");
00305   qos_params [5].property_value <<= (CORBA::ULong) ace_flow_spec->delay_variation ();
00306 
00307   qos_params [6].property_name = CORBA::string_dup ("Max_SDU_Size");
00308   qos_params [6].property_value <<= (CORBA::ULong) ace_flow_spec->max_sdu_size ();
00309 
00310   qos_params [7].property_name = CORBA::string_dup ("Minimum_Policed_Size");
00311   qos_params [7].property_value <<= (CORBA::ULong) ace_flow_spec->minimum_policed_size ();
00312 
00313   qos_params [8].property_name = CORBA::string_dup ("TTL");
00314   qos_params [8].property_value <<= (CORBA::ULong) ace_flow_spec->ttl ();
00315 
00316   return 0;
00317 }
00318 
00319 int
00320 TAO_AV_UDP_QoS_Flow_Handler::handle_qos (ACE_HANDLE /*fd*/)
00321 {
00322 
00323   ACE_DECLARE_NEW_CORBA_ENV;
00324   if (TAO_debug_level > 0)
00325     ACE_DEBUG ((LM_DEBUG,
00326                 "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::handle_qos\n"));
00327 
00328   if (this->qos_session_->update_qos () == -1)
00329     ACE_ERROR_RETURN ((LM_ERROR,
00330                        "Error in updating QoS\n"),
00331                       -1);
00332   else
00333   {
00334     if(TAO_debug_level > 0)
00335       ACE_DEBUG ((LM_DEBUG,
00336                   "(%N,%l) Updating QOS succeeds.\n"));
00337   }
00338 
00339   if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_RESV_ERROR)
00340     {
00341       resv_error = 1;
00342     }
00343 
00344   if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_RESV_CONFIRM)
00345     {
00346       resv_confirm = 1;
00347     }
00348 
00349   if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_SENDER)
00350     {
00351       if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_RESV_EVENT)
00352             {
00353               if( TAO_debug_level > 0 )
00354               {
00355                  ACE_DEBUG ((LM_DEBUG,
00356                              "(%N,%l) Resv Event Received\n"));
00357               }
00358               if (!CORBA::is_nil (this->negotiator_))
00359                 {
00360                   if( TAO_debug_level > 0 )
00361                   {
00362                      ACE_DEBUG ((LM_DEBUG,
00363                                  "(%N,%l) Negotiator Specified\n"));
00364                   }
00365 
00366                   AVStreams::streamQoS new_qos;
00367                   ACE_Flow_Spec *ace_flow_spec =
00368                     this->qos_session_->qos ().sending_flowspec ();
00369 
00370                   if (ace_flow_spec != 0)
00371                     {
00372                       new_qos.length (1);
00373                       this->translate (ace_flow_spec,
00374                                        new_qos [0].QoSParams);
00375                     }
00376 
00377                   AVStreams::Negotiator_var remote_negotiator;
00378                   this->negotiator_->negotiate (remote_negotiator.in (),
00379                                                 new_qos
00380                                                 ACE_ENV_ARG_PARAMETER);
00381                 }
00382             }
00383     }
00384   else if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_RECEIVER)
00385     {
00386       if (this->qos_session_->rsvp_event_type () == ACE_QoS_Session::RSVP_PATH_EVENT)
00387         {
00388           ACE_QoS_Manager qos_manager =
00389             this->get_socket ()->qos_manager ();
00390 
00391           ACE_QoS ace_qos = this->qos_session_->qos ();
00392 
00393           this->qos_session_->qos (this->get_socket (),
00394                                    &qos_manager,
00395                                    ace_qos);
00396         }
00397     }
00398 
00399   return 0;
00400 }
00401 
00402 int
00403 TAO_AV_UDP_QoS_Flow_Handler::change_qos (AVStreams::QoS new_qos)
00404 {
00405   if( TAO_debug_level > 0 )
00406   {
00407      ACE_DEBUG ((LM_DEBUG,
00408                  "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::change_qos\n"));
00409   }
00410 
00411   ACE_QoS* ace_qos = 0;
00412 
00413   ACE_NEW_RETURN (ace_qos,
00414                   ACE_QoS,
00415                   -1);
00416 
00417   if (new_qos.QoSParams.length () != 0)
00418     {
00419       ACE_DEBUG ((LM_DEBUG,
00420                   "New QoS Params are not Empty\n"));
00421 
00422       ACE_Flow_Spec *ace_flow_spec;
00423 
00424       ACE_NEW_RETURN (ace_flow_spec,
00425                       ACE_Flow_Spec,
00426                       -1);
00427 
00428       this->translate (new_qos.QoSParams,
00429                        ace_flow_spec);
00430 
00431 
00432       Fill_ACE_QoS fill_ace_qos;
00433 
00434       if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_SENDER)
00435         {
00436           if (fill_ace_qos.fill_simplex_sender_qos (*ace_qos,
00437                                                     ace_flow_spec) !=0)
00438             ACE_ERROR_RETURN ((LM_ERROR,
00439                                "Unable to fill simplex sender qos\n"),
00440                               -1);
00441           else
00442             {
00443               if( TAO_debug_level > 0 )
00444                 ACE_DEBUG ((LM_DEBUG,
00445                             "(%N,%l) Filled up the Sender QoS parameters\n"));
00446             }
00447         }
00448       else if (this->qos_session_->flags () == ACE_QoS_Session::ACE_QOS_RECEIVER)
00449         {
00450           if (fill_ace_qos.fill_simplex_receiver_qos (*ace_qos,
00451                                                       ace_flow_spec) !=0)
00452             ACE_ERROR_RETURN ((LM_ERROR,
00453                                "Unable to fill simplex receiver qos\n"),
00454                               -1);
00455           else
00456             {
00457               if( TAO_debug_level > 0 )
00458                 ACE_DEBUG ((LM_DEBUG,
00459                             "(%N,%l) Filled up the Receiver QoS parameters\n"));
00460             }
00461 
00462         }
00463 
00464       ACE_QoS_Params qos_params;
00465       FillQoSParams (qos_params,
00466                      0,
00467                      ace_qos);
00468     }
00469 
00470   ACE_QoS_Manager qos_manager =
00471     this->get_socket ()->qos_manager ();
00472 
00473   int result = this->qos_session_->qos (this->get_socket (),
00474                                         &qos_manager,
00475                                         *ace_qos);
00476   if (result != 0)
00477     return result;
00478 
00479   return 0;
00480 }
00481 
00482 int
00483 TAO_AV_UDP_QoS_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
00484                                                const void *arg)
00485 {
00486   return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
00487 }
00488 
00489 int
00490 TAO_AV_UDP_QoS_Flow_Handler::set_remote_address (ACE_Addr *address)
00491 {
00492 
00493   if (TAO_debug_level > 0)
00494     {
00495         char buf [BUFSIZ];
00496         ACE_INET_Addr *remote_addr = dynamic_cast<ACE_INET_Addr*> (address);
00497         remote_addr->addr_to_string (buf,
00498                                      BUFSIZ);
00499 
00500         ACE_DEBUG ((LM_DEBUG,
00501                     "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::set_remote_address %s\n",
00502                     buf));
00503     }
00504 
00505 
00506   ACE_INET_Addr *inet_addr =
00507     dynamic_cast<ACE_INET_Addr*> (address);
00508 
00509   this->peer_addr_ = *inet_addr;
00510 
00511   TAO_AV_UDP_QoS_Transport *transport =
00512     dynamic_cast<TAO_AV_UDP_QoS_Transport*> (this->transport_);
00513 
00514   if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
00515     {
00516 
00517       TAO_AV_UDP_QoS_Session_Helper helper;
00518 
00519       this->qos_session_ = helper.open_qos_session (this,
00520                                                     *inet_addr);
00521 
00522       if (this->qos_session_ == 0)
00523         ACE_ERROR_RETURN ((LM_ERROR,
00524                            "QoS Session Open Failed (%N|%l)\n"),
00525                           -1);
00526 
00527       ACE_INET_Addr local_addr;
00528       this->get_socket ()->get_local_addr (local_addr);
00529 
00530       ACE_INET_Addr* src_addr;
00531       ACE_NEW_RETURN (src_addr,
00532                       ACE_INET_Addr (local_addr.get_port_number (),
00533                                      local_addr.get_host_name ()),
00534                       -1);
00535 
00536       this->qos_session_->source_addr (src_addr);
00537 
00538       if (helper.activate_qos_handler (this->qos_session_,
00539                                        this) == -1)
00540         ACE_ERROR_RETURN ((LM_ERROR,
00541                            "Activating QoS Handler Failed (%N|%l)\n"),
00542                           -1);
00543     }
00544   return transport->set_remote_address (*inet_addr);
00545 }
00546 
00547 
00548 ACE_HANDLE
00549 TAO_AV_UDP_QoS_Flow_Handler::get_handle (void) const
00550 {
00551   if (TAO_debug_level > 0)
00552     ACE_DEBUG ((LM_DEBUG,
00553                 "(%N,%l) TAO_AV_UDP_QoS_Flow_Handler::get_handle:%d\n",
00554                 this->qos_sock_dgram_.get_handle ()));
00555 
00556   return this->qos_sock_dgram_.get_handle () ;
00557 }
00558 
00559 //------------------------------------------------------------
00560 // TAO_AV_UDP_Transport
00561 //------------------------------------------------------------
00562 
00563 TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (void)
00564   :handler_ (0)
00565 {
00566 }
00567 
00568 TAO_AV_UDP_QoS_Transport::TAO_AV_UDP_QoS_Transport (TAO_AV_UDP_QoS_Flow_Handler *handler)
00569   :handler_ (handler),
00570    addr_ (0)
00571 {
00572 }
00573 
00574 TAO_AV_UDP_QoS_Transport::~TAO_AV_UDP_QoS_Transport (void)
00575 {
00576 }
00577 
00578 int
00579 TAO_AV_UDP_QoS_Transport::set_remote_address (const ACE_INET_Addr &address)
00580 {
00581   this->peer_addr_ = address;
00582   return 0;
00583 }
00584 
00585 int
00586 TAO_AV_UDP_QoS_Transport::open (ACE_Addr * /*address*/)
00587 {
00588   return 0;
00589 }
00590 
00591 int
00592 TAO_AV_UDP_QoS_Transport::close (void)
00593 {
00594   return 0;
00595 }
00596 
00597 int
00598 TAO_AV_UDP_QoS_Transport::mtu (void)
00599 {
00600   return ACE_MAX_DGRAM_SIZE;
00601 }
00602 
00603 ACE_Addr*
00604 TAO_AV_UDP_QoS_Transport::get_peer_addr (void)
00605 {
00606   return &this->peer_addr_;
00607 }
00608 
00609 ssize_t
00610 TAO_AV_UDP_QoS_Transport::send (const ACE_Message_Block *mblk,
00611                                 ACE_Time_Value *)
00612 {
00613   // For the most part this was copied from GIOP::send_request and
00614   // friends.
00615 
00616   iovec iov[IOV_MAX];
00617   int iovcnt = 0;
00618   ssize_t n = 0;
00619   ssize_t nbytes = 0;
00620 
00621   for (const ACE_Message_Block *i = mblk;
00622        i != 0;
00623        i = i->cont ())
00624     {
00625       // Make sure there is something to send!
00626       if (i->length () > 0)
00627         {
00628           iov[iovcnt].iov_base = i->rd_ptr ();
00629           iov[iovcnt].iov_len  = static_cast<u_long> (i->length ());
00630           iovcnt++;
00631 
00632           // The buffer is full make a OS call.  @@ TODO this should
00633           // be optimized on a per-platform basis, for instance, some
00634           // platforms do not implement writev() there we should copy
00635           // the data into a buffer and call send_n(). In other cases
00636           // there may be some limits on the size of the iovec, there
00637           // we should set IOV_MAX to that limit.
00638 
00639           size_t bytes_sent = 0;
00640 
00641           if (iovcnt == IOV_MAX)
00642             {
00643               if (this->handler_->get_socket ()->send (iov,
00644                                                        1,
00645                                                        bytes_sent,
00646                                                        0,
00647                                                        this->handler_->qos_session ()->dest_addr (),
00648                                                        0,
00649                                                        0) == -1)
00650                 ACE_ERROR_RETURN ((LM_ERROR,
00651                                    "Error in dgram_mcast.send () (%N|%l)\n"),
00652                                   -1);
00653               else
00654                 if (TAO_debug_level > 0)
00655                   ACE_DEBUG ((LM_DEBUG,
00656                               "Using ACE_OS::sendto () : Bytes sent : %d",
00657                               bytes_sent));
00658 
00659               if (n < 1)
00660                 return n;
00661 
00662               nbytes += bytes_sent;
00663               iovcnt = 0;
00664             }
00665         }
00666     }
00667 
00668   size_t bytes_sent = 0;
00669 
00670   // Check for remaining buffers to be sent!
00671   if (iovcnt != 0)
00672     {
00673       if (this->handler_->get_socket ()->send (iov,
00674                                                1,
00675                                                bytes_sent,
00676                                                0,
00677                                                this->handler_->qos_session ()->dest_addr (),
00678                                                0,
00679                                                0) == -1)
00680         ACE_ERROR_RETURN ((LM_ERROR,
00681                            "Error in dgram_mcast.send ()\n"),
00682                           -1);
00683       else
00684         if( TAO_debug_level > 0)
00685           ACE_DEBUG ((LM_DEBUG,
00686                       "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
00687                       bytes_sent));
00688 
00689       if (n < 1)
00690         return n;
00691 
00692       nbytes += bytes_sent;
00693     }
00694 
00695   return nbytes;
00696 }
00697 
00698 ssize_t
00699 TAO_AV_UDP_QoS_Transport::send (const char *buf,
00700                                 size_t len,
00701                                 ACE_Time_Value *)
00702 {
00703   if (TAO_debug_level > 0)
00704     ACE_DEBUG ((LM_DEBUG,
00705                 "(%N,%l) TAO_AV_UDP_QoS_Transport::send "));
00706 
00707   char addr [BUFSIZ];
00708   this->peer_addr_.addr_to_string (addr,BUFSIZ);
00709   if (TAO_debug_level > 0)
00710     ACE_DEBUG ((LM_DEBUG,
00711                 "(%N,%l) to %s\n",
00712                 addr));
00713 
00714   return this->handler_->get_socket ()->send (buf,
00715                                               len,
00716                                               this->handler_->qos_session ()->dest_addr (),
00717                                               0,
00718                                               0,
00719                                               0);
00720 }
00721 
00722 ssize_t
00723 TAO_AV_UDP_QoS_Transport::send (const iovec *iov,
00724                                 int /*iovcnt*/,
00725                                 ACE_Time_Value *)
00726 {
00727   size_t bytes_sent = 0;
00728   if (this->handler_->get_socket ()->send (iov,
00729                                            1,
00730                                            bytes_sent,
00731                                            0,
00732                                            this->handler_->qos_session ()->dest_addr (),
00733                                            0,
00734                                            0) == -1)
00735     ACE_ERROR_RETURN ((LM_ERROR,
00736                        "Error in dgram_mcast.send ()\n"),
00737                       -1);
00738   else
00739   {
00740     if( TAO_debug_level > 0)
00741        ACE_DEBUG ((LM_DEBUG,
00742                    "(%N,%l) Using ACE_OS::sendto () : Bytes sent : %d",
00743                    bytes_sent));
00744   }
00745 
00746 
00747   return bytes_sent;
00748 }
00749 
00750 ssize_t
00751 TAO_AV_UDP_QoS_Transport::recv (char *buf,
00752                                 size_t len,
00753                                 ACE_Time_Value *)
00754 {
00755   return this->handler_->get_socket ()->recv (buf, len,this->peer_addr_);
00756 }
00757 
00758 ssize_t
00759 TAO_AV_UDP_QoS_Transport::recv (char *buf,
00760                                 size_t len,
00761                                 int flags,
00762                                 ACE_Time_Value *timeout)
00763 {
00764   return this->handler_->get_socket ()->recv (buf,
00765                                               len,
00766                                               this->peer_addr_,
00767                                               flags,
00768                                               timeout);
00769 }
00770 
00771 ssize_t
00772 TAO_AV_UDP_QoS_Transport::recv (iovec *iov,
00773                                 int /*iovcnt*/,
00774                                 ACE_Time_Value *timeout)
00775 {
00776   return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout);
00777 }
00778 
00779 
00780 //------------------------------------------------------------
00781 // TAO_AV_UDP_Acceptor
00782 //------------------------------------------------------------
00783 
00784 TAO_AV_UDP_QoS_Acceptor::TAO_AV_UDP_QoS_Acceptor (void)
00785 {
00786 }
00787 
00788 TAO_AV_UDP_QoS_Acceptor::~TAO_AV_UDP_QoS_Acceptor (void)
00789 {
00790 }
00791 
00792 int
00793 TAO_AV_UDP_QoS_Acceptor::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
00794 {
00795   int result = 0;
00796 
00797   if (TAO_debug_level > 0)
00798     ACE_DEBUG ((LM_DEBUG,
00799                 "(%N,%l) Acceptor Svc Handler QOS ENABLED \n"));
00800 
00801   TAO_AV_UDP_QoS_Session_Helper helper;
00802 
00803   result = helper.activate_qos_handler (handler->qos_session (),
00804                                         handler);
00805   if (result == -1)
00806     ACE_ERROR_RETURN ((LM_ERROR,
00807                        "Error in registering the Decorator with the Reactor\n"),
00808                       -1);
00809 
00810   return result;
00811 }
00812 
00813 int
00814 TAO_AV_UDP_QoS_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
00815                                TAO_AV_Core *av_core,
00816                                TAO_FlowSpec_Entry *entry,
00817                                TAO_AV_Flow_Protocol_Factory *factory,
00818                                TAO_AV_Core::Flow_Component flow_comp)
00819 {
00820   ACE_UNUSED_ARG (flow_comp);
00821 
00822   if (TAO_debug_level > 0)
00823     ACE_DEBUG ((LM_DEBUG,
00824                 "(%N,%l) TAO_AV_UDP_QoS_Acceptor::open "));
00825 
00826   this->av_core_ = av_core;
00827   this->endpoint_ = endpoint;
00828   this->entry_ = entry;
00829 
00830 
00831   this->flow_protocol_factory_ = factory;
00832   this->flowname_ = entry->flowname ();
00833   ACE_INET_Addr *inet_addr = (ACE_INET_Addr *) entry->address ();
00834 //   inet_addr->set (inet_addr->get_port_number (),
00835 //                   inet_addr->get_host_name ());
00836   char buf[BUFSIZ];
00837   inet_addr->addr_to_string (buf,
00838                              BUFSIZ);
00839   if (TAO_debug_level > 0)
00840     ACE_DEBUG ((LM_DEBUG,
00841                 "(%N,%l) TAO_AV_UDP_QoS_Acceptor::open: %s",
00842                 buf));
00843 
00844   int result = this->open_i (inet_addr);
00845 
00846   if (result < 0)
00847     return result;
00848 
00849   return 0;
00850 }
00851 
00852 int
00853 TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
00854                                        TAO_AV_Core *av_core,
00855                                        TAO_FlowSpec_Entry *entry,
00856                                        TAO_AV_Flow_Protocol_Factory *factory,
00857                                        TAO_AV_Core::Flow_Component flow_comp)
00858 {
00859   ACE_UNUSED_ARG (flow_comp);
00860 
00861   this->av_core_ = av_core;
00862   this->endpoint_ = endpoint;
00863   this->entry_ = entry;
00864   this->flow_protocol_factory_ = factory;
00865   this->flowname_ = entry->flowname ();
00866   char buf [BUFSIZ];
00867   ACE_OS::hostname (buf,
00868                     BUFSIZ);
00869   qos_acceptor_addr_.set((u_short)0, buf);
00870 /*  ACE_INET_Addr *address;
00871   ACE_NEW_RETURN (address,
00872                   ACE_INET_Addr ("0"),
00873                   -1);
00874 
00875   address->addr_to_string (buf,
00876                            BUFSIZ);*/
00877   ACE_DEBUG ((LM_DEBUG,
00878               "(%N,%l) ADDRESS IS %s:%d\n",
00879               buf, qos_acceptor_addr_.get_port_number() ));
00880 
00881   int result = this->open_i (&qos_acceptor_addr_);
00882   if (result < 0)
00883     return result;
00884   return 0;
00885 }
00886 
00887 
00888 int
00889 TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
00890 {
00891   ACE_DECLARE_NEW_CORBA_ENV;
00892   int result = 0;
00893 
00894   //  TAO_AV_Callback *callback = 0;
00895   //   this->endpoint_->get_callback (this->flowname_.c_str (),
00896   //                                  callback);
00897   ACE_INET_Addr *local_addr;
00898 
00899   ACE_NEW_RETURN (local_addr,
00900                   ACE_INET_Addr (*inet_addr),
00901                   -1);
00902 
00903   ACE_QoS_Params qos_params;
00904 
00905   ACE_QoS* ace_qos = 0;
00906 
00907   FillQoSParams (qos_params,
00908                  0,
00909                  ace_qos);
00910 
00911 
00912   TAO_AV_UDP_QoS_Flow_Handler* handler;
00913   ACE_NEW_RETURN (handler,
00914                   TAO_AV_UDP_QoS_Flow_Handler,
00915                   -1);
00916 
00917 
00918   TAO_AV_Flow_Handler *flow_handler = 0;
00919   flow_handler = handler;
00920 
00921   handler->endpoint (this->endpoint_);
00922   handler->flowspec_entry (this->entry_);
00923   handler->av_core (this->av_core_);
00924 
00925   if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
00926     {
00927 
00928       TAO_AV_UDP_QoS_Session_Helper helper;
00929 
00930       int result = handler->get_socket ()->open (*inet_addr,
00931                                                  qos_params,
00932                                                  AF_INET,
00933                                                  0,
00934                                                  0,
00935                                                  0,
00936                                                  ACE_OVERLAPPED_SOCKET_FLAG
00937                                                  | ACE_FLAG_MULTIPOINT_C_LEAF
00938                                                  | ACE_FLAG_MULTIPOINT_D_LEAF,
00939                                                  1);
00940 
00941       if (result < 0)
00942         ACE_ERROR_RETURN ((LM_ERROR,
00943                            "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"),
00944                           -1);
00945 
00946       result = handler->get_socket ()->get_local_addr (*local_addr);
00947       if (result < 0)
00948         ACE_ERROR_RETURN ((LM_ERROR,
00949                            "Error in getting Local Address (%N|%l)\n"),
00950                           -1);
00951 
00952       // Create a destination address for the QoS session. The same
00953       // address should be used for the subscribe call later. A copy
00954       // is made below only to distinguish the two usages of the dest
00955       // address.
00956       ACE_INET_Addr dest_addr;
00957       dest_addr.set  (local_addr->get_port_number (),
00958                       local_addr->get_host_name ());
00959 
00960       char dest_buf [BUFSIZ];
00961       dest_addr.addr_to_string (dest_buf,
00962                                 BUFSIZ);
00963 
00964       if (TAO_debug_level > 0)
00965         ACE_DEBUG ((LM_DEBUG,
00966                     "Session Address is %s\n",
00967                     dest_buf));
00968 
00969       this->qos_session_ = helper.open_qos_session (handler,
00970                                                     dest_addr);
00971 
00972       if (this->qos_session_ == 0)
00973         ACE_ERROR_RETURN ((LM_ERROR,
00974                            "QoS Session Open Failed (%N|%l)\n"),
00975                           -1);
00976 
00977       handler->qos_session (this->qos_session_);
00978 
00979       if (this->activate_svc_handler (handler) == -1)
00980         ACE_ERROR_RETURN ((LM_ERROR,
00981                            "Activate Svc Handler Failed (%N|%l)\n"),
00982                           -1);
00983 
00984       AVStreams::QoS qos;
00985       int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
00986                                                                 qos);
00987 
00988       if (qos_available == 0)
00989         {
00990 
00991           ACE_Flow_Spec *ace_flow_spec = 0;
00992           ACE_NEW_RETURN (ace_flow_spec,
00993                           ACE_Flow_Spec,
00994                           -1);
00995 
00996           handler->translate (qos.QoSParams,
00997                               ace_flow_spec);
00998 
00999           if (helper.set_qos (*ace_flow_spec,
01000                               handler) == -1)
01001 
01002             ACE_ERROR_RETURN ((LM_ERROR,
01003                                "Set QoS Failed (%N|%l)\n"),
01004                               -1);
01005         }
01006     }
01007   else
01008     {
01009 
01010       int result = handler->get_socket ()->open (*inet_addr,
01011                                                  qos_params,
01012                                                  AF_INET,
01013                                                  0,
01014                                                  0,
01015                                                  0,
01016                                                  ACE_OVERLAPPED_SOCKET_FLAG
01017                                                  | ACE_FLAG_MULTIPOINT_C_LEAF
01018                                                  | ACE_FLAG_MULTIPOINT_D_LEAF,
01019                                                  1);
01020 
01021       if (result < 0)
01022         ACE_ERROR_RETURN ((LM_ERROR,
01023                            "TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"),
01024                           -1);
01025     }
01026 
01027   TAO_AV_Protocol_Object *object =
01028     this->flow_protocol_factory_->make_protocol_object (this->entry_,
01029                                                         this->endpoint_,
01030                                                         flow_handler,
01031                                                         flow_handler->transport ());
01032   flow_handler->protocol_object (object);
01033 
01034   AVStreams::Negotiator_ptr negotiator;
01035 
01036   ACE_TRY_EX (negotiator)
01037     {
01038       CORBA::Any_ptr negotiator_any =
01039         this->endpoint_->get_property_value ("Negotiator"
01040                                              ACE_ENV_ARG_PARAMETER);
01041       ACE_TRY_CHECK_EX (negotiator);
01042 
01043       *negotiator_any >>= negotiator;
01044       handler->negotiator (negotiator);
01045     }
01046   ACE_CATCHANY
01047     {
01048       ACE_DEBUG ((LM_DEBUG,
01049                   "(%N,%l) Negotiator Not Found \n"));
01050     }
01051   ACE_ENDTRY;
01052 
01053   this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
01054   this->entry_->protocol_object (object);
01055 
01056   result = handler->get_socket ()->get_local_addr (*local_addr);
01057   if (result < 0)
01058     ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
01059   local_addr->set (local_addr->get_port_number (),
01060                    local_addr->get_host_name ());
01061 
01062   if (TAO_debug_level > 0)
01063     {
01064       char buf [BUFSIZ];
01065       local_addr->addr_to_string (buf,
01066                                   BUFSIZ);
01067       ACE_DEBUG ((LM_DEBUG,
01068                   "Local Address is %s\n",
01069                   buf));
01070     }
01071 
01072   this->entry_->set_local_addr (local_addr);
01073   this->entry_->handler (flow_handler);
01074 
01075   return 0;
01076 
01077 }
01078 
01079 int
01080 TAO_AV_UDP_QoS_Acceptor::close (void)
01081 {
01082   return 0;
01083 }
01084 
01085 //------------------------------------------------------------
01086 // TAO_AV_UDP_Connector
01087 //------------------------------------------------------------
01088 TAO_AV_UDP_QoS_Connector::TAO_AV_UDP_QoS_Connector (void)
01089 {
01090 }
01091 
01092 TAO_AV_UDP_QoS_Connector::~TAO_AV_UDP_QoS_Connector (void)
01093 {
01094 }
01095 
01096 int
01097 TAO_AV_UDP_QoS_Connector::open (TAO_Base_StreamEndPoint *endpoint,
01098                                 TAO_AV_Core *av_core,
01099                                 TAO_AV_Flow_Protocol_Factory *factory)
01100 
01101 {
01102   if (TAO_debug_level > 0)
01103     ACE_DEBUG ((LM_DEBUG,
01104                 "TAO_AV_UDP_QoS_Connector::open "));
01105 
01106   this->endpoint_ = endpoint;
01107   this->av_core_ = av_core;
01108   this->flow_protocol_factory_ = factory;
01109   return 0;
01110 }
01111 
01112 //  int
01113 //  TAO_AV_UDP_QoS_Connector::translate (CosPropertyService::Properties &qos_params,
01114 //                                   ACE_Flow_Spec *ace_flow_spec)
01115 //  {
01116 //    for (unsigned int i = 0;
01117 //         i < qos_params.length ();
01118 //         i++)
01119 //      {
01120 //        if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0)
01121 //          {
01122 //            CORBA::Short type;
01123 //            qos_params [i].property_value >>= type;
01124 //            ace_flow_spec->service_type (type);
01125 //          }
01126 //        else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0)
01127 //          {
01128 //            CORBA::ULong tok_rate;
01129 //            qos_params [i].property_value >>= tok_rate;
01130 //            ace_flow_spec->token_rate (tok_rate);
01131 //          }
01132 //        else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Rate") == 0)
01133 //          {
01134 //            CORBA::ULong tok_buck_size;
01135 //            qos_params [i].property_value >>= tok_buck_size;
01136 //            ace_flow_spec->token_bucket_size (tok_buck_size);
01137 //          }
01138 //        else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0)
01139 //          {
01140 //            CORBA::ULong peak_bw;
01141 //            qos_params [i].property_value >>= peak_bw;
01142 //            ace_flow_spec->peak_bandwidth (peak_bw);
01143 //          }
01144 //        else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0)
01145 //          {
01146 //            CORBA::ULong lat;
01147 //            qos_params [i].property_value >>= lat;
01148 //            ace_flow_spec->latency (lat);
01149 //          }
01150 //        else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0)
01151 //          {
01152 //            CORBA::ULong delay_var;
01153 //            qos_params [i].property_value >>= delay_var;
01154 //            ace_flow_spec->delay_variation (delay_var);
01155 //          }
01156 
01157 //      }
01158 
01159 //    return 0;
01160 //  }
01161 
01162 
01163 int
01164 TAO_AV_UDP_QoS_Connector::connect (TAO_FlowSpec_Entry *entry,
01165                                    TAO_AV_Transport *&transport,
01166                                    TAO_AV_Core::Flow_Component flow_comp)
01167 {
01168   ACE_UNUSED_ARG (flow_comp);
01169 
01170   ACE_DECLARE_NEW_CORBA_ENV;
01171   int result = 0;
01172   this->entry_ = entry;
01173   this->flowname_ = entry->flowname ();
01174 
01175   ACE_INET_Addr *local_addr;
01176 
01177   if (entry->get_peer_addr () != 0)
01178     {
01179       local_addr = dynamic_cast<ACE_INET_Addr*> (entry->get_peer_addr ());
01180     }
01181   else
01182     ACE_NEW_RETURN (local_addr,
01183                     ACE_INET_Addr ("0"),
01184                     -1);
01185 
01186   TAO_AV_Flow_Handler *flow_handler = 0;
01187 
01188   TAO_AV_UDP_QoS_Flow_Handler *handler;
01189   ACE_NEW_RETURN (handler,
01190                   TAO_AV_UDP_QoS_Flow_Handler,
01191                   -1);
01192 
01193   flow_handler = handler;
01194 
01195   handler->endpoint (this->endpoint_);
01196   handler->flowspec_entry (this->entry_);
01197   handler->av_core (this->av_core_);
01198 
01199   ACE_INET_Addr *inet_addr = dynamic_cast<ACE_INET_Addr*> (entry->address ());
01200 
01201   ACE_QoS_Params qos_params;
01202 
01203   ACE_QoS* ace_qos = 0;
01204 
01205   FillQoSParams (qos_params,
01206                  0,
01207                  ace_qos);
01208 
01209   result = handler->get_socket ()->open (*local_addr,
01210                                          qos_params,
01211                                          AF_INET,
01212                                          0,
01213                                          0,
01214                                          0,
01215                                          ACE_OVERLAPPED_SOCKET_FLAG
01216                                          | ACE_FLAG_MULTIPOINT_C_LEAF
01217                                          | ACE_FLAG_MULTIPOINT_D_LEAF,
01218                                          1);
01219 
01220   if (result < 0)
01221     ACE_ERROR_RETURN ((LM_ERROR,
01222                        "Data socket open failed (%N|%l)\n"),
01223                       -1);
01224 
01225   result = handler->get_socket ()->get_local_addr (*local_addr);
01226 
01227 
01228   ACE_INET_Addr *session_addr = 0;
01229   if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
01230     {
01231       ACE_NEW_RETURN (session_addr,
01232                       ACE_INET_Addr,
01233                       -1);
01234 
01235       session_addr->set (local_addr->get_port_number (),
01236                          local_addr->get_host_name ());
01237 
01238     }
01239   else
01240     {
01241       session_addr = inet_addr;
01242     }
01243 
01244   char sess_buf [BUFSIZ];
01245   session_addr->addr_to_string (sess_buf,
01246                                 BUFSIZ);
01247 
01248   if (TAO_debug_level > 0)
01249     ACE_DEBUG ((LM_DEBUG,
01250                 "Session Address is %s\n",
01251                 sess_buf));
01252 
01253   // Create a destination address for the QoS session. The same
01254   // address should be used for the subscribe call later. A copy
01255   // is made below only to distinguish the two usages of the dest
01256   // address.
01257   ACE_INET_Addr dest_addr (*session_addr);
01258 
01259   TAO_AV_UDP_QoS_Session_Helper helper;
01260 
01261   this->qos_session_ = helper.open_qos_session (handler,
01262                                                 *session_addr);
01263 
01264   if (this->qos_session_ == 0)
01265     ACE_ERROR_RETURN ((LM_ERROR,
01266                        "QoS Session Open Failed (%N|%l)\n"),
01267                       -1);
01268   else
01269     ACE_DEBUG ((LM_DEBUG,
01270                 "QoS session opened successfully\n"));
01271 
01272   if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_PRODUCER)
01273     {
01274       //this->qos_session_->source_port (local_addr->get_port_number ());
01275       ACE_INET_Addr* src_addr;
01276       ACE_NEW_RETURN (src_addr,
01277                       ACE_INET_Addr (local_addr->get_port_number (),
01278                                      local_addr->get_host_name ()),
01279                       -1);
01280 
01281       this->qos_session_->source_addr (src_addr);
01282 
01283     }
01284 
01285   handler->qos_session (this->qos_session_);
01286 
01287   this->qos_manager_ =
01288     handler->get_socket ()->qos_manager ();
01289 
01290   AVStreams::QoS qos;
01291 
01292   int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
01293                                                             qos);
01294   if (qos_available == 0)
01295     {
01296 
01297       ACE_Flow_Spec* ace_flow_spec;
01298       ACE_NEW_RETURN (ace_flow_spec,
01299                       ACE_Flow_Spec,
01300                       -1);
01301 
01302       handler->translate (qos.QoSParams,
01303                           ace_flow_spec);
01304 
01305       if (helper.set_qos (*ace_flow_spec,
01306                           handler) == -1)
01307         ACE_ERROR_RETURN ((LM_ERROR,
01308                            "Unable to set QoS (%N|%l)\n"),
01309                           -1);
01310       else
01311       {
01312         if( TAO_debug_level > 0)
01313             ACE_DEBUG ((LM_DEBUG,
01314                     "(%N,%l) Setting QOS succeeds.\n"));
01315       }
01316     }
01317 
01318   TAO_AV_Protocol_Object *object =
01319     this->flow_protocol_factory_->make_protocol_object (this->entry_,
01320                                                         this->endpoint_,
01321                                                         flow_handler,
01322                                                         flow_handler->transport ());
01323 
01324   AVStreams::Negotiator_ptr negotiator;
01325 
01326   ACE_TRY_EX (negotiator)
01327     {
01328       CORBA::Any_ptr negotiator_any =
01329         this->endpoint_->get_property_value ("Negotiator"
01330                                              ACE_ENV_ARG_PARAMETER);
01331       ACE_TRY_CHECK_EX (negotiator);
01332 
01333       *negotiator_any >>= negotiator;
01334       handler->negotiator (negotiator);
01335     }
01336   ACE_CATCHANY
01337     {
01338       ACE_DEBUG ((LM_DEBUG,
01339                   "Negotiator not found for flow %s\n",
01340                   this->entry_->flowname ()));
01341     }
01342   ACE_ENDTRY;
01343 
01344   flow_handler->protocol_object (object);
01345 
01346   this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
01347   this->entry_->protocol_object (object);
01348 
01349   result = handler->get_socket ()->get_local_addr (*local_addr);
01350   if (result < 0)
01351     ACE_ERROR_RETURN ((LM_ERROR,
01352                        "Get local addr failed (%N|%l)\n"),
01353                       result);
01354 
01355   local_addr->set (local_addr->get_port_number (),
01356                    local_addr->get_host_name ());
01357 
01358   if (TAO_debug_level > 0)
01359     {
01360       char buf[BUFSIZ];
01361       local_addr->addr_to_string (buf,
01362                                   BUFSIZ);
01363 
01364       ACE_DEBUG ((LM_DEBUG,
01365                   "Local Address is %s\n",
01366                   buf));
01367     }
01368 
01369   entry->set_local_addr (local_addr);
01370   entry->handler (flow_handler);
01371   transport = flow_handler->transport ();
01372 
01373   // call activate svc handler.
01374   return this->activate_svc_handler (handler);
01375 }
01376 
01377 int
01378 TAO_AV_UDP_QoS_Connector::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
01379 {
01380   int result = 0;
01381 
01382   TAO_AV_UDP_QoS_Session_Helper helper;
01383 
01384   result = helper.activate_qos_handler (this->qos_session_,
01385                                         handler);
01386 
01387   if (result == -1)
01388     ACE_ERROR_RETURN ((LM_ERROR,
01389                        "(%N,%l) Error in registering the Decorator with the Reactor\n"),
01390                       -1);
01391 
01392   return result;
01393 }
01394 
01395 int
01396 TAO_AV_UDP_QoS_Connector::close (void)
01397 {
01398   return 0;
01399 }
01400 
01401 //------------------------------------------------------------
01402 // TAO_AV_UDP_Protocol_Factory
01403 //------------------------------------------------------------
01404 
01405 TAO_AV_UDP_QoS_Factory::TAO_AV_UDP_QoS_Factory (void)
01406 {
01407   if (TAO_debug_level > 0)
01408     ACE_DEBUG ((LM_DEBUG,
01409                 "TAO_AV_UDP_QoS_Factory::TAO_AV_UDP_QoS_Factory\n"));
01410 }
01411 
01412 TAO_AV_UDP_QoS_Factory::~TAO_AV_UDP_QoS_Factory (void)
01413 {
01414 }
01415 
01416 int
01417 TAO_AV_UDP_QoS_Factory::match_protocol (const char *protocol_string)
01418 {
01419   if (TAO_debug_level > 0)
01420     ACE_DEBUG ((LM_DEBUG,
01421                 "TAO_AV_UDP_QoS_Factory::match_protocol\n"));
01422 
01423   if (ACE_OS::strcasecmp (protocol_string,"QoS_UDP") == 0)
01424     return 1;
01425   return 0;
01426 }
01427 
01428 TAO_AV_Acceptor*
01429 TAO_AV_UDP_QoS_Factory::make_acceptor (void)
01430 {
01431   if (TAO_debug_level > 0)
01432     ACE_DEBUG ((LM_DEBUG,
01433                 "TAO_AV_UDP_QoS_Factory::make_acceptor "));
01434 
01435   TAO_AV_Acceptor *acceptor = 0;
01436   ACE_NEW_RETURN (acceptor,
01437                   TAO_AV_UDP_QoS_Acceptor,
01438                   0);
01439   return acceptor;
01440 }
01441 
01442 TAO_AV_Connector*
01443 TAO_AV_UDP_QoS_Factory::make_connector (void)
01444 {
01445   if (TAO_debug_level > 0)
01446     ACE_DEBUG ((LM_DEBUG,
01447                 "TAO_AV_UDP_QoS_Factory::make_connector "));
01448 
01449   TAO_AV_Connector *connector = 0;
01450   ACE_NEW_RETURN (connector,
01451                   TAO_AV_UDP_QoS_Connector,
01452                   0);
01453   return connector;
01454 }
01455 
01456 int
01457 TAO_AV_UDP_QoS_Factory::init (int /* argc */,
01458                               char * /* argv */ [])
01459 {
01460   return 0;
01461 }
01462 
01463 
01464 //------------------------------------------------------------
01465 // TAO_AV_UDP_Flow_Factory
01466 //------------------------------------------------------------
01467 TAO_AV_UDP_QoS_Flow_Factory::TAO_AV_UDP_QoS_Flow_Factory (void)
01468 {
01469   if (TAO_debug_level > 0)
01470     ACE_DEBUG ((LM_DEBUG,
01471                 "TAO_AV_UDP_QoS_Flow_Factory::TAO_AV_UDP_QoS_Flow_Factory\n"));
01472 }
01473 
01474 TAO_AV_UDP_QoS_Flow_Factory::~TAO_AV_UDP_QoS_Flow_Factory (void)
01475 {
01476 }
01477 
01478 int
01479 TAO_AV_UDP_QoS_Flow_Factory::init (int /* argc */,
01480                                    char * /* argv */ [])
01481 {
01482   return 0;
01483 }
01484 
01485 int
01486 TAO_AV_UDP_QoS_Flow_Factory::match_protocol (const char *flow_string)
01487 {
01488   if (ACE_OS::strcasecmp (flow_string,"QoS_UDP") == 0)
01489     return 1;
01490   return 0;
01491 }
01492 
01493 TAO_AV_Protocol_Object*
01494 TAO_AV_UDP_QoS_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
01495                                                    TAO_Base_StreamEndPoint *endpoint,
01496                                                    TAO_AV_Flow_Handler *handler,
01497                                                    TAO_AV_Transport *transport)
01498 {
01499   TAO_AV_Callback *callback = 0;
01500   endpoint->get_callback (entry->flowname (),
01501                           callback);
01502 
01503 
01504   TAO_AV_UDP_Object *object = 0;
01505   ACE_NEW_RETURN (object,
01506                   TAO_AV_UDP_Object (callback,
01507                                      transport),
01508                   0);
01509   callback->open (object,
01510                   handler);
01511   endpoint->set_protocol_object (entry->flowname (),
01512                                  object);
01513   return object;
01514 }
01515 
01516 TAO_END_VERSIONED_NAMESPACE_DECL
01517 
01518 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_QoS_Flow_Factory)
01519 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_QoS_Flow_Factory,
01520                        ACE_TEXT ("UDP_QoS_Flow_Factory"),
01521                        ACE_SVC_OBJ_T,
01522                        &ACE_SVC_NAME (TAO_AV_UDP_QoS_Flow_Factory),
01523                        ACE_Service_Type::DELETE_THIS |
01524                        ACE_Service_Type::DELETE_OBJ,
01525                        0)
01526 
01527 ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_UDP_QoS_Factory)
01528 
01529 ACE_STATIC_SVC_DEFINE (TAO_AV_UDP_QoS_Factory,
01530                        ACE_TEXT ("UDP_QoS_Factory"),
01531                        ACE_SVC_OBJ_T,
01532                        &ACE_SVC_NAME (TAO_AV_UDP_QoS_Factory),
01533                        ACE_Service_Type::DELETE_THIS |
01534                        ACE_Service_Type::DELETE_OBJ,
01535                        0)
01536 
01537 #endif /* ACE_HAS_RAPI || ACE_HAS_WINSOCK2_GQOS */

Generated on Thu Nov 9 13:44:42 2006 for TAO_AV by doxygen 1.3.6