00001
00002 #include "tao/RTPortableServer/RT_Servant_Dispatcher.h"
00003
00004 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00005
00006 #include "tao/RTPortableServer/RT_POA.h"
00007
00008 #include "tao/ORB_Core.h"
00009 #include "tao/ORB_Core_TSS_Resources.h"
00010 #include "tao/TAO_Server_Request.h"
00011 #include "tao/Transport.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/Service_Context.h"
00014 #include "tao/Protocols_Hooks.h"
00015 #include "tao/debug.h"
00016 #include "tao/CDR.h"
00017
00018 #include "tao/RTCORBA/Thread_Pool.h"
00019
00020 #include "ace/OS_NS_stdio.h"
00021 #include "ace/OS_NS_string.h"
00022
00023 ACE_RCSID (RTPortableServer,
00024 RT_Servant_Dispatcher,
00025 "RT_Servant_Dispatcher.cpp,v 1.30 2006/04/26 13:42:42 mesnier_p Exp")
00026
00027
00028 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00029
00030 TAO_RT_Servant_Dispatcher::~TAO_RT_Servant_Dispatcher (void)
00031 {
00032 }
00033
00034 void
00035 TAO_RT_Servant_Dispatcher::pre_invoke_remote_request (
00036 TAO_Root_POA &poa,
00037 CORBA::Short servant_priority,
00038 TAO_ServerRequest &req,
00039 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state
00040 ACE_ENV_ARG_DECL)
00041 {
00042 TAO_Service_Context &request_service_context =
00043 req.request_service_context ();
00044 TAO_Service_Context &reply_service_context = req.reply_service_context ();
00045
00046 TAO_Thread_Pool *thread_pool =
00047 static_cast <TAO_Thread_Pool *> (poa.thread_pool ());
00048
00049 if (thread_pool != 0 &&
00050 thread_pool->with_lanes ())
00051 {
00052
00053
00054
00055
00056 if (TAO_debug_level > 0)
00057 {
00058
00059 TAO_ORB_Core_TSS_Resources *tss =
00060 poa.orb_core ().get_tss_resources ();
00061
00062
00063 TAO_Thread_Lane *lane =
00064 static_cast<TAO_Thread_Lane *> (tss->lane_);
00065
00066 ACE_ASSERT (lane->pool ().id () ==
00067 thread_pool->id ());
00068
00069 ACE_DEBUG ((LM_DEBUG,
00070 ACE_TEXT ("Using thread pool with lane ")
00071 ACE_TEXT ("(%P|%t|%d|%d): original thread ")
00072 ACE_TEXT ("CORBA/native priority %d/%d not changed\n"),
00073 lane->pool ().id (),
00074 lane->id (),
00075 lane->lane_priority (),
00076 lane->native_priority ()));
00077 }
00078
00079 return;
00080 }
00081
00082
00083 TAO_Protocols_Hooks *tph =
00084 poa.orb_core ().get_protocols_hooks ();
00085
00086 const char *priority_model = 0;
00087 RTCORBA::Priority target_priority = TAO_INVALID_PRIORITY;
00088
00089
00090 if (poa.priority_model () ==
00091 TAO::Portable_Server::Cached_Policies::NOT_SPECIFIED)
00092 {
00093 priority_model = "RTCORBA::NOT_SPECIFIED";
00094 }
00095
00096
00097 else if (poa.priority_model () ==
00098 TAO::Portable_Server::Cached_Policies::CLIENT_PROPAGATED)
00099 {
00100 priority_model = "RTCORBA::CLIENT_PROPAGATED";
00101
00102
00103
00104 const IOP::ServiceContext *context;
00105
00106 if (request_service_context.get_context (IOP::RTCorbaPriority,
00107 &context) == 1)
00108 {
00109
00110 TAO_InputCDR cdr (reinterpret_cast
00111 <const char*>
00112 (context->context_data.get_buffer ()),
00113 context->context_data.length ());
00114 CORBA::Boolean byte_order;
00115 if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
00116 ACE_THROW (CORBA::MARSHAL ());
00117 cdr.reset_byte_order (static_cast<int> (byte_order));
00118
00119 if ((cdr >> target_priority) == 0)
00120 ACE_THROW (CORBA::MARSHAL ());
00121
00122
00123
00124
00125 reply_service_context.set_context (*context);
00126 }
00127 else
00128 {
00129
00130
00131 target_priority = poa.server_priority ();
00132 }
00133 }
00134 else
00135
00136 {
00137 priority_model = "RTCORBA::SERVER_DECLARED";
00138
00139
00140 target_priority = servant_priority;
00141 }
00142
00143 char thread_pool_id[BUFSIZ];
00144 if (TAO_debug_level > 0)
00145 {
00146 if (thread_pool == 0)
00147 ACE_OS::strcpy (thread_pool_id,
00148 "default thread pool");
00149 else
00150 ACE_OS::sprintf (thread_pool_id,
00151 "thread pool %d",
00152 thread_pool->id ());
00153 }
00154
00155
00156 if (target_priority == TAO_INVALID_PRIORITY)
00157 {
00158 if (TAO_debug_level > 0)
00159 {
00160
00161
00162
00163 #if defined (ACE_HAS_THREADS)
00164
00165 if (tph->get_thread_CORBA_and_native_priority (
00166 pre_invoke_state.original_CORBA_priority_,
00167 pre_invoke_state.original_native_priority_
00168 ACE_ENV_ARG_PARAMETER) == -1)
00169 ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00170 CORBA::COMPLETED_NO));
00171
00172 ACE_DEBUG ((LM_DEBUG,
00173 ACE_TEXT ("(%P|%t): %s processing using %s ")
00174 ACE_TEXT ("original thread CORBA/native priority %d/%d ")
00175 ACE_TEXT ("not changed\n"),
00176 ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00177 ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id),
00178 pre_invoke_state.original_CORBA_priority_,
00179 pre_invoke_state.original_native_priority_));
00180
00181
00182
00183 #else
00184
00185 ACE_DEBUG ((LM_DEBUG,
00186 ACE_TEXT ("(%P|%t): %s processing using %s ")
00187 ACE_TEXT ("original thread CORBA/native priority ")
00188 ACE_TEXT ("not changed\n"),
00189 ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00190 ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id)));
00191
00192 #endif
00193
00194 }
00195 }
00196 else
00197 {
00198
00199
00200 if (tph->get_thread_CORBA_and_native_priority (
00201 pre_invoke_state.original_CORBA_priority_,
00202 pre_invoke_state.original_native_priority_
00203 ACE_ENV_ARG_PARAMETER) == -1)
00204 ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00205 CORBA::COMPLETED_NO));
00206
00207
00208
00209 if (target_priority != pre_invoke_state.original_CORBA_priority_)
00210 {
00211 if (tph->set_thread_CORBA_priority (target_priority
00212 ACE_ENV_ARG_PARAMETER)
00213 == -1)
00214 ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00215 CORBA::COMPLETED_NO));
00216
00217 pre_invoke_state.state_ =
00218 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED;
00219
00220 if (TAO_debug_level > 0)
00221 {
00222 CORBA::Short native_priority;
00223 tph->get_thread_native_priority (native_priority
00224 ACE_ENV_ARG_PARAMETER);
00225
00226 ACE_DEBUG ((LM_DEBUG,
00227 ACE_TEXT ("%s processing using %s ")
00228 ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
00229 ACE_TEXT ("temporarily changed to CORBA/native priority %d/%d\n"),
00230 ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00231 ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id),
00232 pre_invoke_state.original_CORBA_priority_,
00233 pre_invoke_state.original_native_priority_,
00234 target_priority,
00235 native_priority));
00236 }
00237 }
00238
00239 else
00240 {
00241 if (TAO_debug_level > 0)
00242 {
00243 ACE_DEBUG ((LM_DEBUG,
00244 ACE_TEXT ("%s processing using %s ")
00245 ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
00246 ACE_TEXT ("is the same as the target priority\n"),
00247 ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00248 ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id),
00249 pre_invoke_state.original_CORBA_priority_,
00250 pre_invoke_state.original_native_priority_));
00251 }
00252 }
00253 }
00254
00255 CORBA::Policy_var policy =
00256 poa.policies ().get_cached_policy (
00257 TAO_CACHED_POLICY_RT_SERVER_PROTOCOL
00258 ACE_ENV_ARG_PARAMETER);
00259 ACE_CHECK;
00260
00261 CORBA::Boolean set_server_network_priority =
00262 tph->set_server_network_priority (req.transport ()->tag (),
00263 policy.in ()
00264 ACE_ENV_ARG_PARAMETER);
00265 ACE_CHECK;
00266
00267 TAO_Connection_Handler *connection_handler =
00268 req.transport ()->connection_handler ();
00269
00270 connection_handler->set_dscp_codepoint (set_server_network_priority);
00271 }
00272
00273 void
00274 TAO_RT_Servant_Dispatcher::pre_invoke_collocated_request (TAO_Root_POA &poa,
00275 CORBA::Short servant_priority,
00276 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state
00277 ACE_ENV_ARG_DECL)
00278 {
00279 TAO_Thread_Pool *thread_pool =
00280 static_cast <TAO_Thread_Pool *> (poa.thread_pool ());
00281
00282 if (thread_pool == 0 ||
00283 thread_pool->with_lanes ())
00284 {
00285
00286
00287
00288
00289 return;
00290 }
00291
00292 if (poa.priority_model () !=
00293 TAO::Portable_Server::Cached_Policies::SERVER_DECLARED ||
00294 servant_priority == TAO_INVALID_PRIORITY)
00295 {
00296
00297
00298
00299
00300 return;
00301 }
00302
00303
00304
00305
00306
00307
00308 TAO_Protocols_Hooks *tph =
00309 poa.orb_core ().get_protocols_hooks ();
00310
00311 if (tph->get_thread_CORBA_and_native_priority (pre_invoke_state.original_CORBA_priority_,
00312 pre_invoke_state.original_native_priority_
00313 ACE_ENV_ARG_PARAMETER)
00314 == -1)
00315 ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00316 CORBA::COMPLETED_NO));
00317
00318
00319
00320 if (servant_priority != pre_invoke_state.original_CORBA_priority_)
00321 {
00322 if (tph->set_thread_CORBA_priority (servant_priority
00323 ACE_ENV_ARG_PARAMETER)
00324 == -1)
00325 ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00326 CORBA::COMPLETED_NO));
00327
00328 pre_invoke_state.state_ =
00329 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED;
00330 }
00331 }
00332
00333 void
00334 TAO_RT_Servant_Dispatcher::post_invoke (TAO_Root_POA &poa,
00335 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state)
00336
00337 {
00338 if (pre_invoke_state.state_ ==
00339 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED)
00340 {
00341 pre_invoke_state.state_ =
00342 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::NO_ACTION_REQUIRED;
00343
00344 ACE_DECLARE_NEW_CORBA_ENV;
00345
00346 ACE_TRY
00347 {
00348
00349
00350 TAO_Protocols_Hooks *tph =
00351 poa.orb_core ().get_protocols_hooks ();
00352
00353 if (tph->set_thread_native_priority (
00354 pre_invoke_state.original_native_priority_
00355 ACE_ENV_ARG_PARAMETER)
00356 == -1)
00357 ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00358 CORBA::COMPLETED_NO));
00359 ACE_TRY_CHECK;
00360 }
00361 ACE_CATCHANY
00362 {
00363
00364 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00365 "Exception caught: TAO - "
00366 "Priority_Model_Processing::"
00367 "~Priority_Model_Processing");
00368 }
00369 ACE_ENDTRY;
00370 }
00371 }
00372
00373 TAO_Root_POA *
00374 TAO_RT_Servant_Dispatcher::create_Root_POA (const ACE_CString &name,
00375 PortableServer::POAManager_ptr poa_manager,
00376 const TAO_POA_Policy_Set &policies,
00377 ACE_Lock &lock,
00378 TAO_SYNCH_MUTEX &thread_lock,
00379 TAO_ORB_Core &orb_core,
00380 TAO_Object_Adapter *object_adapter
00381 ACE_ENV_ARG_DECL)
00382 {
00383 TAO_RT_POA *poa = 0;
00384
00385 ACE_NEW_THROW_EX (poa,
00386 TAO_RT_POA (name,
00387 poa_manager,
00388 policies,
00389 0,
00390 lock,
00391 thread_lock,
00392 orb_core,
00393 object_adapter
00394 ACE_ENV_ARG_PARAMETER),
00395 CORBA::NO_MEMORY ());
00396 ACE_CHECK_RETURN (0);
00397
00398 return poa;
00399 }
00400
00401 TAO_END_VERSIONED_NAMESPACE_DECL
00402
00403 #endif