00001
00002 #include "tao/RTPortableServer/RT_Servant_Dispatcher.h"
00003
00004 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00005
00006 #include "tao/RTPortableServer/RT_POA.h"
00007
00008 #include "tao/ORB_Core.h"
00009 #include "tao/ORB_Core_TSS_Resources.h"
00010 #include "tao/TAO_Server_Request.h"
00011 #include "tao/Transport.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/Service_Context.h"
00014 #include "tao/Protocols_Hooks.h"
00015 #include "tao/Network_Priority_Protocols_Hooks.h"
00016 #include "tao/PortableServer/Network_Priority_Hook.h"
00017 #include "tao/debug.h"
00018 #include "tao/CDR.h"
00019
00020 #include "tao/RTCORBA/Thread_Pool.h"
00021
00022 #include "ace/OS_NS_stdio.h"
00023 #include "ace/OS_NS_string.h"
00024
00025 ACE_RCSID (RTPortableServer,
00026 RT_Servant_Dispatcher,
00027 "$Id: RT_Servant_Dispatcher.cpp 77287 2007-02-21 14:17:18Z jai $")
00028
00029
00030 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00031
00032 TAO_RT_Servant_Dispatcher::~TAO_RT_Servant_Dispatcher (void)
00033 {
00034 }
00035
00036 void
00037 TAO_RT_Servant_Dispatcher::pre_invoke_remote_request (
00038 TAO_Root_POA &poa,
00039 CORBA::Short servant_priority,
00040 TAO_ServerRequest &req,
00041 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state
00042 )
00043 {
00044 TAO_Service_Context &request_service_context =
00045 req.request_service_context ();
00046 TAO_Service_Context &reply_service_context = req.reply_service_context ();
00047
00048 TAO_Thread_Pool *thread_pool =
00049 static_cast <TAO_Thread_Pool *> (poa.thread_pool ());
00050
00051 if (thread_pool != 0 &&
00052 thread_pool->with_lanes ())
00053 {
00054
00055
00056
00057
00058 if (TAO_debug_level > 0)
00059 {
00060
00061 TAO_ORB_Core_TSS_Resources *tss =
00062 poa.orb_core ().get_tss_resources ();
00063
00064
00065 TAO_Thread_Lane *lane =
00066 static_cast<TAO_Thread_Lane *> (tss->lane_);
00067
00068 ACE_ASSERT (lane->pool ().id () ==
00069 thread_pool->id ());
00070
00071 ACE_DEBUG ((LM_DEBUG,
00072 ACE_TEXT ("Using thread pool with lane ")
00073 ACE_TEXT ("(%P|%t|%d|%d): original thread ")
00074 ACE_TEXT ("CORBA/native priority %d/%d not changed\n"),
00075 lane->pool ().id (),
00076 lane->id (),
00077 lane->lane_priority (),
00078 lane->native_priority ()));
00079 }
00080
00081 return;
00082 }
00083
00084
00085 TAO_Protocols_Hooks *tph = poa.orb_core ().get_protocols_hooks ();
00086
00087 if (tph != 0)
00088 {
00089 const char *priority_model = 0;
00090 RTCORBA::Priority target_priority = TAO_INVALID_PRIORITY;
00091
00092
00093 if (poa.priority_model () ==
00094 TAO::Portable_Server::Cached_Policies::NOT_SPECIFIED)
00095 {
00096 priority_model = "RTCORBA::NOT_SPECIFIED";
00097 }
00098
00099
00100 else if (poa.priority_model () ==
00101 TAO::Portable_Server::Cached_Policies::CLIENT_PROPAGATED)
00102 {
00103 priority_model = "RTCORBA::CLIENT_PROPAGATED";
00104
00105
00106
00107 const IOP::ServiceContext *context = 0;
00108
00109 if (request_service_context.get_context (IOP::RTCorbaPriority,
00110 &context) == 1)
00111 {
00112
00113 TAO_InputCDR cdr (reinterpret_cast
00114 <const char*>
00115 (context->context_data.get_buffer ()),
00116 context->context_data.length ());
00117 CORBA::Boolean byte_order;
00118 if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
00119 throw ::CORBA::MARSHAL ();
00120 cdr.reset_byte_order (static_cast<int> (byte_order));
00121
00122 if ((cdr >> target_priority) == 0)
00123 throw ::CORBA::MARSHAL ();
00124
00125
00126
00127
00128 reply_service_context.set_context (*context);
00129 }
00130 else
00131 {
00132
00133
00134 target_priority = poa.server_priority ();
00135 }
00136 }
00137 else
00138
00139 {
00140 priority_model = "RTCORBA::SERVER_DECLARED";
00141
00142
00143 target_priority = servant_priority;
00144 }
00145
00146 char thread_pool_id[BUFSIZ];
00147 if (TAO_debug_level > 0)
00148 {
00149 if (thread_pool == 0)
00150 ACE_OS::strcpy (thread_pool_id,
00151 "default thread pool");
00152 else
00153 ACE_OS::sprintf (thread_pool_id,
00154 "thread pool %d",
00155 thread_pool->id ());
00156 }
00157
00158
00159 if (target_priority == TAO_INVALID_PRIORITY)
00160 {
00161 if (TAO_debug_level > 0)
00162 {
00163
00164
00165
00166 #if defined (ACE_HAS_THREADS)
00167
00168 if (tph->get_thread_CORBA_and_native_priority (
00169 pre_invoke_state.original_CORBA_priority_,
00170 pre_invoke_state.original_native_priority_) == -1)
00171 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00172 CORBA::COMPLETED_NO);
00173
00174 ACE_DEBUG ((LM_DEBUG,
00175 ACE_TEXT ("(%P|%t): %s processing using %s ")
00176 ACE_TEXT ("original thread CORBA/native priority %d/%d ")
00177 ACE_TEXT ("not changed\n"),
00178 ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00179 ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id),
00180 pre_invoke_state.original_CORBA_priority_,
00181 pre_invoke_state.original_native_priority_));
00182
00183
00184
00185 #else
00186
00187 ACE_DEBUG ((LM_DEBUG,
00188 ACE_TEXT ("(%P|%t): %s processing using %s ")
00189 ACE_TEXT ("original thread CORBA/native priority ")
00190 ACE_TEXT ("not changed\n"),
00191 ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00192 ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id)));
00193
00194 #endif
00195
00196 }
00197 }
00198 else
00199 {
00200
00201
00202 if (tph->get_thread_CORBA_and_native_priority (
00203 pre_invoke_state.original_CORBA_priority_,
00204 pre_invoke_state.original_native_priority_) == -1)
00205 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00206 CORBA::COMPLETED_NO);
00207
00208
00209
00210 if (target_priority != pre_invoke_state.original_CORBA_priority_)
00211 {
00212 if (tph->set_thread_CORBA_priority (target_priority) == -1)
00213 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00214 CORBA::COMPLETED_NO);
00215
00216 pre_invoke_state.state_ =
00217 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED;
00218
00219 if (TAO_debug_level > 0)
00220 {
00221 CORBA::Short native_priority = 0;
00222 tph->get_thread_native_priority (native_priority);
00223
00224 ACE_DEBUG ((LM_DEBUG,
00225 ACE_TEXT ("%s processing using %s ")
00226 ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
00227 ACE_TEXT ("temporarily changed to CORBA/native priority %d/%d\n"),
00228 ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00229 ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id),
00230 pre_invoke_state.original_CORBA_priority_,
00231 pre_invoke_state.original_native_priority_,
00232 target_priority,
00233 native_priority));
00234 }
00235 }
00236
00237 else
00238 {
00239 if (TAO_debug_level > 0)
00240 {
00241 ACE_DEBUG ((LM_DEBUG,
00242 ACE_TEXT ("%s processing using %s ")
00243 ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
00244 ACE_TEXT ("is the same as the target priority\n"),
00245 ACE_TEXT_CHAR_TO_TCHAR (priority_model),
00246 ACE_TEXT_CHAR_TO_TCHAR (thread_pool_id),
00247 pre_invoke_state.original_CORBA_priority_,
00248 pre_invoke_state.original_native_priority_));
00249 }
00250 }
00251 }
00252 }
00253
00254 TAO_Network_Priority_Protocols_Hooks *nph =
00255 poa.orb_core ().get_network_priority_protocols_hooks ();
00256
00257 if (nph != 0)
00258 {
00259 poa.network_priority_hook ()-> set_dscp_codepoint (req, poa);
00260 }
00261 else if (tph != 0)
00262 {
00263 CORBA::Policy_var policy =
00264 poa.policies ().get_cached_policy (
00265 TAO_CACHED_POLICY_RT_SERVER_PROTOCOL);
00266 CORBA::Boolean set_server_network_priority =
00267 tph->set_server_network_priority (
00268 req.transport ()->tag (), policy.in ());
00269 TAO_Connection_Handler *connection_handler =
00270 req.transport ()->connection_handler ();
00271 connection_handler->set_dscp_codepoint (set_server_network_priority);
00272 }
00273 }
00274
00275 void
00276 TAO_RT_Servant_Dispatcher::pre_invoke_collocated_request (TAO_Root_POA &poa,
00277 CORBA::Short servant_priority,
00278 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state
00279 )
00280 {
00281 TAO_Thread_Pool *thread_pool =
00282 static_cast <TAO_Thread_Pool *> (poa.thread_pool ());
00283
00284 if (thread_pool == 0 || thread_pool->with_lanes ())
00285 {
00286
00287
00288
00289
00290 return;
00291 }
00292
00293 if (poa.priority_model () !=
00294 TAO::Portable_Server::Cached_Policies::SERVER_DECLARED ||
00295 servant_priority == TAO_INVALID_PRIORITY)
00296 {
00297
00298
00299
00300
00301 return;
00302 }
00303
00304
00305
00306
00307
00308
00309 TAO_Protocols_Hooks *tph = poa.orb_core ().get_protocols_hooks ();
00310
00311 if (tph != 0)
00312 {
00313 if (tph->get_thread_CORBA_and_native_priority (
00314 pre_invoke_state.original_CORBA_priority_,
00315 pre_invoke_state.original_native_priority_) == -1)
00316 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
00317
00318
00319
00320 if (servant_priority != pre_invoke_state.original_CORBA_priority_)
00321 {
00322 if (tph->set_thread_CORBA_priority (servant_priority) == -1)
00323 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
00324
00325 pre_invoke_state.state_ =
00326 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED;
00327 }
00328 }
00329 }
00330
00331 void
00332 TAO_RT_Servant_Dispatcher::post_invoke (TAO_Root_POA &poa,
00333 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State &pre_invoke_state)
00334
00335 {
00336 if (pre_invoke_state.state_ ==
00337 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED)
00338 {
00339 pre_invoke_state.state_ =
00340 TAO::Portable_Server::Servant_Upcall::Pre_Invoke_State::NO_ACTION_REQUIRED;
00341
00342 try
00343 {
00344
00345
00346 TAO_Protocols_Hooks *tph = poa.orb_core ().get_protocols_hooks ();
00347
00348 if (tph != 0)
00349 {
00350 if (tph->set_thread_native_priority (
00351 pre_invoke_state.original_native_priority_) == -1)
00352 throw ::CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
00353 CORBA::COMPLETED_NO);
00354 }
00355 }
00356 catch (const ::CORBA::Exception& ex)
00357 {
00358
00359 ex._tao_print_exception (
00360 "Exception caught: TAO - "
00361 "Priority_Model_Processing::"
00362 "~Priority_Model_Processing");
00363 }
00364 }
00365 }
00366
00367 TAO_Root_POA *
00368 TAO_RT_Servant_Dispatcher::create_Root_POA (const ACE_CString &name,
00369 PortableServer::POAManager_ptr poa_manager,
00370 const TAO_POA_Policy_Set &policies,
00371 ACE_Lock &lock,
00372 TAO_SYNCH_MUTEX &thread_lock,
00373 TAO_ORB_Core &orb_core,
00374 TAO_Object_Adapter *object_adapter)
00375 {
00376 TAO_RT_POA *poa = 0;
00377
00378 ACE_NEW_THROW_EX (poa,
00379 TAO_RT_POA (name,
00380 poa_manager,
00381 policies,
00382 0,
00383 lock,
00384 thread_lock,
00385 orb_core,
00386 object_adapter),
00387 CORBA::NO_MEMORY ());
00388
00389 return poa;
00390 }
00391
00392 TAO_END_VERSIONED_NAMESPACE_DECL
00393
00394 #endif