00001
00002 #include "tao/RTPortableServer/RT_Servant_Dispatcher.h"
00003
00004 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00005
00006 #include "tao/RTPortableServer/RT_POA.h"
00007
00008 #include "tao/ORB_Core.h"
00009 #include "tao/ORB_Core_TSS_Resources.h"
00010 #include "tao/TAO_Server_Request.h"
00011 #include "tao/Transport.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/Service_Context.h"
00014 #include "tao/Protocols_Hooks.h"
00015 #include "tao/Network_Priority_Protocols_Hooks.h"
00016 #include "tao/PortableServer/Network_Priority_Hook.h"
00017 #include "tao/debug.h"
00018 #include "tao/CDR.h"
00019
00020 #include "tao/RTCORBA/Thread_Pool.h"
00021
00022 #include "ace/OS_NS_stdio.h"
00023 #include "ace/OS_NS_string.h"
00024
00025 ACE_RCSID (RTPortableServer,
00026 RT_Servant_Dispatcher,
00027 "$Id: RT_Servant_Dispatcher.cpp 84830 2009-03-16 08:34:12Z johnnyw $")
00028
00029
00030 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00031
00032 TAO_RT_Servant_Dispatcher::~TAO_RT_Servant_Dispatcher (void)
00033 {
00034 }
00035
00036 void
00037 TAO_RT_Servant_Dispatcher::pre_invoke_remote_request (
00038 TAO_Root_POA &poa,
00039 CORBA::Short servant_priority,
00040 TAO_ServerRequest &req,
00041 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state
00042 )
00043 {
00044 TAO_Service_Context &request_service_context = req.request_service_context ();
00045 TAO_Service_Context &reply_service_context = req.reply_service_context ();
00046
00047 TAO_Thread_Pool *thread_pool =
00048 static_cast <TAO_Thread_Pool *> (poa.thread_pool ());
00049
00050 if (thread_pool != 0 &&
00051 thread_pool->with_lanes ())
00052 {
00053
00054
00055
00056
00057 if (TAO_debug_level > 0)
00058 {
00059
00060 TAO_ORB_Core_TSS_Resources *tss =
00061 poa.orb_core ().get_tss_resources ();
00062
00063
00064 TAO_Thread_Lane *lane =
00065 static_cast<TAO_Thread_Lane *> (tss->lane_);
00066
00067 ACE_ASSERT (lane->pool ().id () ==
00068 thread_pool->id ());
00069
00070 ACE_DEBUG ((LM_DEBUG,
00071 ACE_TEXT ("Using thread pool with lane ")
00072 ACE_TEXT ("(%P|%t|%d|%d): original thread ")
00073 ACE_TEXT ("CORBA/native priority %d/%d not changed\n"),
00074 lane->pool ().id (),
00075 lane->id (),
00076 lane->lane_priority (),
00077 lane->native_priority ()));
00078 }
00079
00080 return;
00081 }
00082
00083
00084 TAO_Protocols_Hooks *tph = poa.orb_core ().get_protocols_hooks ();
00085
00086 if (tph != 0)
00087 {
00088 const char *priority_model = 0;
00089 RTCORBA::Priority target_priority = TAO_INVALID_PRIORITY;
00090
00091
00092 if (poa.priority_model () ==
00093 TAO::Portable_Server::Cached_Policies::NOT_SPECIFIED)
00094 {
00095 priority_model = "RTCORBA::NOT_SPECIFIED";
00096 }
00097
00098
00099 else if (poa.priority_model () ==
00100 TAO::Portable_Server::Cached_Policies::CLIENT_PROPAGATED)
00101 {
00102 priority_model = "RTCORBA::CLIENT_PROPAGATED";
00103
00104
00105
00106 const IOP::ServiceContext *context = 0;
00107
00108 if (request_service_context.get_context (IOP::RTCorbaPriority,
00109 &context) == 1)
00110 {
00111
00112 TAO_InputCDR cdr (reinterpret_cast
00113 <const char*>
00114 (context->context_data.get_buffer ()),
00115 context->context_data.length ());
00116 CORBA::Boolean byte_order;
00117 if (!(cdr >> ACE_InputCDR::to_boolean (byte_order)))
00118 throw ::CORBA::MARSHAL ();
00119 cdr.reset_byte_order (static_cast<int> (byte_order));
00120
00121 if (!(cdr >> target_priority))
00122 throw ::CORBA::MARSHAL ();
00123
00124
00125
00126
00127 reply_service_context.set_context (*context);
00128 }
00129 else
00130 {
00131
00132
00133 target_priority = poa.server_priority ();
00134 }
00135 }
00136 else
00137
00138 {
00139 priority_model = "RTCORBA::SERVER_DECLARED";
00140
00141
00142 target_priority = servant_priority;
00143 }
00144
00145 char thread_pool_id[BUFSIZ];
00146 if (TAO_debug_level > 0)
00147 {
00148 if (thread_pool == 0)
00149 ACE_OS::strcpy (thread_pool_id,
00150 "default thread pool");
00151 else
00152 ACE_OS::sprintf (thread_pool_id,
00153 "thread pool %d",
00154 thread_pool->id ());
00155 }
00156
00157
00158 if (target_priority == TAO_INVALID_PRIORITY)
00159 {
00160 if (TAO_debug_level > 0)
00161 {
00162
00163
00164
00165 #if defined (ACE_HAS_THREADS)
00166
00167 if (tph->get_thread_CORBA_and_native_priority (
00168 pre_invoke_state.original_CORBA_priority_,
00169 pre_invoke_state.original_native_priority_) == -1)
00170 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00171 CORBA::COMPLETED_NO);
00172
00173 ACE_DEBUG ((LM_DEBUG,
00174 ACE_TEXT ("(%P|%t): %C processing using %C ")
00175 ACE_TEXT ("original thread CORBA/native priority %d/%d ")
00176 ACE_TEXT ("not changed\n"),
00177 priority_model,
00178 thread_pool_id,
00179 pre_invoke_state.original_CORBA_priority_,
00180 pre_invoke_state.original_native_priority_));
00181
00182
00183
00184 #else
00185
00186 ACE_DEBUG ((LM_DEBUG,
00187 ACE_TEXT ("(%P|%t): %C processing using %C ")
00188 ACE_TEXT ("original thread CORBA/native priority ")
00189 ACE_TEXT ("not changed\n"),
00190 priority_model,
00191 thread_pool_id));
00192
00193 #endif
00194
00195 }
00196 }
00197 else
00198 {
00199
00200
00201 if (tph->get_thread_CORBA_and_native_priority (
00202 pre_invoke_state.original_CORBA_priority_,
00203 pre_invoke_state.original_native_priority_) == -1)
00204 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00205 CORBA::COMPLETED_NO);
00206
00207
00208
00209 if (target_priority != pre_invoke_state.original_CORBA_priority_)
00210 {
00211 if (tph->set_thread_CORBA_priority (target_priority) == -1)
00212 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00213 CORBA::COMPLETED_NO);
00214
00215 pre_invoke_state.state_ =
00216 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED;
00217
00218 if (TAO_debug_level > 0)
00219 {
00220 CORBA::Short native_priority = 0;
00221 tph->get_thread_native_priority (native_priority);
00222
00223 ACE_DEBUG ((LM_DEBUG,
00224 ACE_TEXT ("%C processing using %C ")
00225 ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
00226 ACE_TEXT ("temporarily changed to CORBA/native priority %d/%d\n"),
00227 priority_model,
00228 thread_pool_id,
00229 pre_invoke_state.original_CORBA_priority_,
00230 pre_invoke_state.original_native_priority_,
00231 target_priority,
00232 native_priority));
00233 }
00234 }
00235
00236 else
00237 {
00238 if (TAO_debug_level > 0)
00239 {
00240 ACE_DEBUG ((LM_DEBUG,
00241 ACE_TEXT ("%C processing using %C ")
00242 ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
00243 ACE_TEXT ("is the same as the target priority\n"),
00244 priority_model,
00245 thread_pool_id,
00246 pre_invoke_state.original_CORBA_priority_,
00247 pre_invoke_state.original_native_priority_));
00248 }
00249 }
00250 }
00251 }
00252
00253 TAO_Network_Priority_Protocols_Hooks *nph =
00254 poa.orb_core ().get_network_priority_protocols_hooks ();
00255
00256 if (nph != 0)
00257 {
00258 poa.network_priority_hook ()-> set_dscp_codepoint (req, poa);
00259 }
00260 else if (tph != 0)
00261 {
00262 CORBA::Policy_var policy =
00263 poa.policies ().get_cached_policy (
00264 TAO_CACHED_POLICY_RT_SERVER_PROTOCOL);
00265 CORBA::Boolean set_server_network_priority =
00266 tph->set_server_network_priority (
00267 req.transport ()->tag (), policy.in ());
00268 TAO_Connection_Handler *connection_handler =
00269 req.transport ()->connection_handler ();
00270 connection_handler->set_dscp_codepoint (set_server_network_priority);
00271 }
00272 }
00273
00274 void
00275 TAO_RT_Servant_Dispatcher::pre_invoke_collocated_request (TAO_Root_POA &poa,
00276 CORBA::Short servant_priority,
00277 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state)
00278 {
00279 TAO_Thread_Pool *thread_pool =
00280 static_cast <TAO_Thread_Pool *> (poa.thread_pool ());
00281
00282 if (thread_pool == 0 || thread_pool->with_lanes ())
00283 {
00284
00285
00286
00287
00288 return;
00289 }
00290
00291 if (poa.priority_model () !=
00292 TAO::Portable_Server::Cached_Policies::SERVER_DECLARED ||
00293 servant_priority == TAO_INVALID_PRIORITY)
00294 {
00295
00296
00297
00298
00299 return;
00300 }
00301
00302
00303
00304
00305
00306
00307 TAO_Protocols_Hooks *tph = poa.orb_core ().get_protocols_hooks ();
00308
00309 if (tph != 0)
00310 {
00311 if (tph->get_thread_CORBA_and_native_priority (
00312 pre_invoke_state.original_CORBA_priority_,
00313 pre_invoke_state.original_native_priority_) == -1)
00314 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
00315
00316
00317
00318 if (servant_priority != pre_invoke_state.original_CORBA_priority_)
00319 {
00320 if (tph->set_thread_CORBA_priority (servant_priority) == -1)
00321 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
00322
00323 pre_invoke_state.state_ =
00324 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED;
00325 }
00326 }
00327 }
00328
00329 void
00330 TAO_RT_Servant_Dispatcher::post_invoke (TAO_Root_POA &poa,
00331 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state)
00332
00333 {
00334 if (pre_invoke_state.state_ ==
00335 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED)
00336 {
00337 pre_invoke_state.state_ =
00338 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::NO_ACTION_REQUIRED;
00339
00340 try
00341 {
00342
00343
00344 TAO_Protocols_Hooks *tph = poa.orb_core ().get_protocols_hooks ();
00345
00346 if (tph != 0)
00347 {
00348 if (tph->restore_thread_CORBA_and_native_priority (
00349 pre_invoke_state.original_CORBA_priority_,
00350 pre_invoke_state.original_native_priority_) == -1)
00351 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00352 CORBA::COMPLETED_NO);
00353 }
00354 }
00355 catch (const ::CORBA::Exception& ex)
00356 {
00357
00358 ex._tao_print_exception (
00359 "Exception caught: TAO - "
00360 "Priority_Model_Processing::"
00361 "~Priority_Model_Processing");
00362 }
00363 }
00364 }
00365
00366 TAO_Root_POA *
00367 TAO_RT_Servant_Dispatcher::create_Root_POA (const ACE_CString &name,
00368 PortableServer::POAManager_ptr poa_manager,
00369 const TAO_POA_Policy_Set &policies,
00370 ACE_Lock &lock,
00371 TAO_SYNCH_MUTEX &thread_lock,
00372 TAO_ORB_Core &orb_core,
00373 TAO_Object_Adapter *object_adapter)
00374 {
00375 TAO_RT_POA *poa = 0;
00376
00377 ACE_NEW_THROW_EX (poa,
00378 TAO_RT_POA (name,
00379 poa_manager,
00380 policies,
00381 0,
00382 lock,
00383 thread_lock,
00384 orb_core,
00385 object_adapter),
00386 CORBA::NO_MEMORY ());
00387
00388 return poa;
00389 }
00390
00391 TAO_END_VERSIONED_NAMESPACE_DECL
00392
00393 #endif