RT_Servant_Dispatcher.cpp

Go to the documentation of this file.
00001 
00002 #include "tao/RTPortableServer/RT_Servant_Dispatcher.h"
00003 
00004 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00005 
00006 #include "tao/RTPortableServer/RT_POA.h"
00007 
00008 #include "tao/ORB_Core.h"
00009 #include "tao/ORB_Core_TSS_Resources.h"
00010 #include "tao/TAO_Server_Request.h"
00011 #include "tao/Transport.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/Service_Context.h"
00014 #include "tao/Protocols_Hooks.h"
00015 #include "tao/debug.h"
00016 #include "tao/CDR.h"
00017 
00018 #include "tao/RTCORBA/Thread_Pool.h"
00019 
00020 #include "ace/OS_NS_stdio.h"
00021 #include "ace/OS_NS_string.h"
00022 
00023 ACE_RCSID (RTPortableServer,
00024            RT_Servant_Dispatcher,
00025            "RT_Servant_Dispatcher.cpp,v 1.30 2006/04/26 13:42:42 mesnier_p Exp")
00026 
00027 
00028 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00029 
00030 TAO_RT_Servant_Dispatcher::~TAO_RT_Servant_Dispatcher (void)
00031 {
00032 }
00033 
00034 void
00035 TAO_RT_Servant_Dispatcher::pre_invoke_remote_request (
00036   TAO_Root_POA &poa,
00037   CORBA::Short servant_priority,
00038   TAO_ServerRequest &req,
00039   TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state
00040   ACE_ENV_ARG_DECL)
00041 {
00042   TAO_Service_Context &request_service_context =
00043     req.request_service_context ();
00044   TAO_Service_Context &reply_service_context = req.reply_service_context ();
00045 
00046   TAO_Thread_Pool *thread_pool =
00047     static_cast <TAO_Thread_Pool *> (poa.thread_pool ());
00048 
00049   if (thread_pool != 0 &&
00050       thread_pool->with_lanes ())
00051     {
00052       //
00053       // We don't mess with the priority of threads in lanes.
00054       //
00055 
00056       if (TAO_debug_level > 0)
00057         {
00058           // Get the ORB_Core's TSS resources.
00059           TAO_ORB_Core_TSS_Resources *tss =
00060             poa.orb_core ().get_tss_resources ();
00061 
00062           /// Get the lane attribute in TSS.
00063           TAO_Thread_Lane *lane =
00064             static_cast<TAO_Thread_Lane *> (tss->lane_);
00065 
00066           ACE_ASSERT (lane->pool ().id () ==
00067                       thread_pool->id ());
00068 
00069           ACE_DEBUG ((LM_DEBUG,
00070                       ACE_TEXT ("Using thread pool with lane ")
00071                       ACE_TEXT ("(%P|%t|%d|%d): original thread ")
00072                       ACE_TEXT ("CORBA/native priority %d/%d not changed\n"),
00073                       lane->pool ().id (),
00074                       lane->id (),
00075                       lane->lane_priority (),
00076                       lane->native_priority ()));
00077         }
00078 
00079       return;
00080     }
00081 
00082   // Remember current thread's priority.
00083   TAO_Protocols_Hooks *tph =
00084     poa.orb_core ().get_protocols_hooks ();
00085 
00086   const char *priority_model = 0;
00087   RTCORBA::Priority target_priority = TAO_INVALID_PRIORITY;
00088 
00089   // NOT_SPECIFIED PriorityModel processing.
00090   if (poa.priority_model () ==
00091       TAO::Portable_Server::Cached_Policies::NOT_SPECIFIED)
00092     {
00093       priority_model = "RTCORBA::NOT_SPECIFIED";
00094     }
00095 
00096   // CLIENT_PROPAGATED PriorityModel processing.
00097   else if (poa.priority_model () ==
00098       TAO::Portable_Server::Cached_Policies::CLIENT_PROPAGATED)
00099     {
00100       priority_model = "RTCORBA::CLIENT_PROPAGATED";
00101 
00102       // Attempt to extract client-propagated priority from the
00103       // ServiceContextList of the request.
00104       const IOP::ServiceContext *context;
00105 
00106       if (request_service_context.get_context (IOP::RTCorbaPriority,
00107                                                &context) == 1)
00108         {
00109           // Extract the target priority
00110           TAO_InputCDR cdr (reinterpret_cast
00111                             <const char*>
00112                              (context->context_data.get_buffer ()),
00113                             context->context_data.length ());
00114           CORBA::Boolean byte_order;
00115           if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
00116             ACE_THROW (CORBA::MARSHAL ());
00117           cdr.reset_byte_order (static_cast<int> (byte_order));
00118 
00119           if ((cdr >> target_priority) == 0)
00120             ACE_THROW (CORBA::MARSHAL ());
00121 
00122           // Save the target priority in the response service
00123           // context to propagate back to the client as specified
00124           // by the RTCORBA specification.
00125           reply_service_context.set_context (*context);
00126         }
00127       else
00128         {
00129           // Use default priority if none came in the request.
00130           // (Request must have come from a non-RT ORB.)
00131           target_priority = poa.server_priority ();
00132         }
00133     }
00134   else
00135     // SERVER_DECLARED PriorityModel processing.
00136     {
00137       priority_model = "RTCORBA::SERVER_DECLARED";
00138 
00139       // Use the request associated with the servant.
00140       target_priority = servant_priority;
00141     }
00142 
00143   char thread_pool_id[BUFSIZ];
00144   if (TAO_debug_level > 0)
00145     {
00146       if (thread_pool == 0)
00147         ACE_OS::strcpy (thread_pool_id,
00148                         "default thread pool");
00149       else
00150         ACE_OS::sprintf (thread_pool_id,
00151                          "thread pool %d",
00152                          thread_pool->id ());
00153     }
00154 
00155   // Target priority is invalid.
00156   if (target_priority == TAO_INVALID_PRIORITY)
00157     {
00158       if (TAO_debug_level > 0)
00159         {
00160 
00161 // If we are in a multi-threaded configuration, print out the current
00162 // thread priority.
00163 #if defined (ACE_HAS_THREADS)
00164 
00165           if (tph->get_thread_CORBA_and_native_priority (
00166                 pre_invoke_state.original_CORBA_priority_,
00167                 pre_invoke_state.original_native_priority_
00168                 ACE_ENV_ARG_PARAMETER) == -1)
00169             ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00170                                                CORBA::COMPLETED_NO));
00171 
00172           ACE_DEBUG ((LM_DEBUG,
00173                       ACE_TEXT ("(%P|%t): %s processing using %s ")
00174                       ACE_TEXT ("original thread CORBA/native priority %d/%d ")
00175                       ACE_TEXT ("not changed\n"),
00176                       ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00177                       ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id),
00178                       pre_invoke_state.original_CORBA_priority_,
00179                       pre_invoke_state.original_native_priority_));
00180 
00181 // If we are in a single-threaded configuration, we cannot get the
00182 // current thread priority.  Therefore, print out a simpler message.
00183 #else /* ACE_HAS_THREADS */
00184 
00185           ACE_DEBUG ((LM_DEBUG,
00186                       ACE_TEXT ("(%P|%t): %s processing using %s ")
00187                       ACE_TEXT ("original thread CORBA/native priority ")
00188                       ACE_TEXT ("not changed\n"),
00189                       ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00190                       ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id)));
00191 
00192 #endif /* ACE_HAS_THREADS */
00193 
00194         }
00195     }
00196   else
00197     {
00198       // Get the current thread's priority.
00199 
00200       if (tph->get_thread_CORBA_and_native_priority (
00201             pre_invoke_state.original_CORBA_priority_,
00202             pre_invoke_state.original_native_priority_
00203             ACE_ENV_ARG_PARAMETER) == -1)
00204         ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00205                                            CORBA::COMPLETED_NO));
00206 
00207       // Priority needs to be changed temporarily changed for the
00208       // duration of request.
00209       if (target_priority != pre_invoke_state.original_CORBA_priority_)
00210         {
00211           if (tph->set_thread_CORBA_priority (target_priority
00212                                               ACE_ENV_ARG_PARAMETER)
00213               == -1)
00214             ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00215                                                CORBA::COMPLETED_NO));
00216 
00217           pre_invoke_state.state_ =
00218             TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED;
00219 
00220           if (TAO_debug_level > 0)
00221             {
00222               CORBA::Short native_priority;
00223               tph->get_thread_native_priority (native_priority
00224                                                ACE_ENV_ARG_PARAMETER);
00225 
00226               ACE_DEBUG ((LM_DEBUG,
00227                           ACE_TEXT ("%s processing using %s ")
00228                           ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
00229                           ACE_TEXT ("temporarily changed to CORBA/native priority %d/%d\n"),
00230                           ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00231                           ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id),
00232                           pre_invoke_state.original_CORBA_priority_,
00233                           pre_invoke_state.original_native_priority_,
00234                           target_priority,
00235                           native_priority));
00236             }
00237         }
00238       // No change in priority required.
00239       else
00240         {
00241           if (TAO_debug_level > 0)
00242             {
00243               ACE_DEBUG ((LM_DEBUG,
00244                           ACE_TEXT ("%s processing using %s ")
00245                           ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
00246                           ACE_TEXT ("is the same as the target priority\n"),
00247                           ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00248                           ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id),
00249                           pre_invoke_state.original_CORBA_priority_,
00250                           pre_invoke_state.original_native_priority_));
00251             }
00252         }
00253     }
00254 
00255   CORBA::Policy_var policy =
00256     poa.policies ().get_cached_policy (
00257       TAO_CACHED_POLICY_RT_SERVER_PROTOCOL
00258       ACE_ENV_ARG_PARAMETER);
00259   ACE_CHECK;
00260 
00261   CORBA::Boolean set_server_network_priority =
00262     tph->set_server_network_priority (req.transport ()->tag (),
00263                                       policy.in ()
00264                                       ACE_ENV_ARG_PARAMETER);
00265   ACE_CHECK;
00266 
00267   TAO_Connection_Handler *connection_handler =
00268     req.transport ()->connection_handler ();
00269 
00270   connection_handler->set_dscp_codepoint (set_server_network_priority);
00271 }
00272 
00273 void
00274 TAO_RT_Servant_Dispatcher::pre_invoke_collocated_request (TAO_Root_POA &poa,
00275                                                           CORBA::Short servant_priority,
00276                                                           TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state
00277                                                           ACE_ENV_ARG_DECL)
00278 {
00279   TAO_Thread_Pool *thread_pool =
00280     static_cast <TAO_Thread_Pool *> (poa.thread_pool ());
00281 
00282   if (thread_pool == 0 ||
00283       thread_pool->with_lanes ())
00284     {
00285       //
00286       // We don't mess with the priority of threads in lanes or for
00287       // the default thread pool.
00288       //
00289       return;
00290     }
00291 
00292   if (poa.priority_model () !=
00293       TAO::Portable_Server::Cached_Policies::SERVER_DECLARED ||
00294       servant_priority == TAO_INVALID_PRIORITY)
00295     {
00296       //
00297       // We either don't have server declared model or servant
00298       // priority is invalid.
00299       //
00300       return;
00301     }
00302 
00303   //
00304   // SERVER_DECLARED PriorityModel processing.
00305   //
00306 
00307   // Remember current thread's priority.
00308   TAO_Protocols_Hooks *tph =
00309     poa.orb_core ().get_protocols_hooks ();
00310 
00311   if (tph->get_thread_CORBA_and_native_priority (pre_invoke_state.original_CORBA_priority_,
00312                                                  pre_invoke_state.original_native_priority_
00313                                                  ACE_ENV_ARG_PARAMETER)
00314       == -1)
00315     ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00316                                        CORBA::COMPLETED_NO));
00317 
00318   // Change the priority of the current thread for the duration of
00319   // request.
00320   if (servant_priority != pre_invoke_state.original_CORBA_priority_)
00321     {
00322       if (tph->set_thread_CORBA_priority (servant_priority
00323                                           ACE_ENV_ARG_PARAMETER)
00324           == -1)
00325         ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00326                                            CORBA::COMPLETED_NO));
00327 
00328       pre_invoke_state.state_ =
00329         TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED;
00330     }
00331 }
00332 
00333 void
00334 TAO_RT_Servant_Dispatcher::post_invoke (TAO_Root_POA &poa,
00335                                         TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state)
00336 
00337 {
00338   if (pre_invoke_state.state_ ==
00339       TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED)
00340     {
00341       pre_invoke_state.state_ =
00342         TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::NO_ACTION_REQUIRED;
00343 
00344       ACE_DECLARE_NEW_CORBA_ENV;
00345 
00346       ACE_TRY
00347         {
00348           // Reset the priority of the current thread back to its original
00349           // value.
00350           TAO_Protocols_Hooks *tph =
00351             poa.orb_core ().get_protocols_hooks ();
00352 
00353           if (tph->set_thread_native_priority (
00354                  pre_invoke_state.original_native_priority_
00355                                                ACE_ENV_ARG_PARAMETER)
00356               == -1)
00357             ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00358                                                CORBA::COMPLETED_NO));
00359           ACE_TRY_CHECK;
00360         }
00361       ACE_CATCHANY
00362         {
00363           // Eat up the exception.
00364           ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00365                                "Exception caught: TAO - "
00366                                "Priority_Model_Processing::"
00367                                "~Priority_Model_Processing");
00368         }
00369       ACE_ENDTRY;
00370     }
00371 }
00372 
00373 TAO_Root_POA *
00374 TAO_RT_Servant_Dispatcher::create_Root_POA (const ACE_CString &name,
00375                                             PortableServer::POAManager_ptr poa_manager,
00376                                             const TAO_POA_Policy_Set &policies,
00377                                             ACE_Lock &lock,
00378                                             TAO_SYNCH_MUTEX &thread_lock,
00379                                             TAO_ORB_Core &orb_core,
00380                                             TAO_Object_Adapter *object_adapter
00381                                             ACE_ENV_ARG_DECL)
00382 {
00383   TAO_RT_POA *poa = 0;
00384 
00385   ACE_NEW_THROW_EX (poa,
00386                     TAO_RT_POA (name,
00387                                 poa_manager,
00388                                 policies,
00389                                 0,
00390                                 lock,
00391                                 thread_lock,
00392                                 orb_core,
00393                                 object_adapter
00394                                 ACE_ENV_ARG_PARAMETER),
00395                     CORBA::NO_MEMORY ());
00396   ACE_CHECK_RETURN (0);
00397 
00398   return poa;
00399 }
00400 
00401 TAO_END_VERSIONED_NAMESPACE_DECL
00402 
00403 #endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */

Generated on Thu Nov 9 12:55:46 2006 for TAO_RTPortableServer by doxygen 1.3.6