00001
00002
00003 #include "tao/AnyTypeCode/Any.h"
00004 #include "tao/AnyTypeCode/TypeCode.h"
00005
00006 #include "tao/RTScheduling/Request_Interceptor.h"
00007 #include "tao/RTScheduling/Current.h"
00008 #include "tao/RTScheduling/Distributable_Thread.h"
00009
00010 #include "tao/TSS_Resources.h"
00011 #include "tao/debug.h"
00012 #include "tao/ORB_Constants.h"
00013 #include "ace/OS_NS_string.h"
00014
00015 ACE_RCSID (RTScheduling,
00016 Request_Interceptor,
00017 "$Id: Request_Interceptor.cpp 89511 2010-03-17 13:55:49Z vzykov $")
00018
00019 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 const IOP::ServiceId
00022 Client_Interceptor::SchedulingInfo = 30;
00023
00024 void
00025 Client_Interceptor::send_request (PortableInterceptor::ClientRequestInfo_ptr ri)
00026 {
00027 if (TAO_debug_level > 0)
00028 ACE_DEBUG ((LM_DEBUG,
00029 "Client_Interceptor::send_request\n"));
00030
00031
00032 TAO_RTScheduler_Current_i *new_current = 0;
00033 TAO_RTScheduler_Current_i *current = 0;
00034
00035 TAO_TSS_Resources *tss = TAO_TSS_Resources::instance ();
00036
00037 current = static_cast<TAO_RTScheduler_Current_i *> (tss->rtscheduler_current_impl_);
00038
00039 if (current != 0)
00040 {
00041
00042 if (!ri->response_expected ())
00043 {
00044
00045 RTScheduling::Current::IdType guid;
00046 guid.length (sizeof(long));
00047
00048 size_t temp = ++TAO_RTScheduler_Current::guid_counter;
00049 ACE_OS::memcpy (guid.get_buffer (),
00050 &temp,
00051 sizeof(size_t));
00052
00053 size_t id;
00054 ACE_OS::memcpy (&id,
00055 guid.get_buffer (),
00056 guid.length ());
00057
00058 if (TAO_debug_level > 0)
00059 ACE_DEBUG ((LM_DEBUG,
00060 "The Guid is %d %d\n",
00061 id,
00062 TAO_RTScheduler_Current::guid_counter.value_i ()));
00063
00064
00065 RTScheduling::DistributableThread_var dt =
00066 TAO_DistributableThread_Factory::create_DT ();
00067
00068
00069 int result = current->dt_hash ()->bind (guid, dt);
00070 if (result != 0)
00071 {
00072 ACE_DEBUG ((LM_DEBUG,
00073 "No Scheduling Segment Context\n"));
00074 throw ::CORBA::INTERNAL ();
00075
00076 }
00077
00078
00079
00080
00081
00082
00083
00084 CORBA::Policy_var implicit_sched_param =
00085 current->implicit_scheduling_parameter ();
00086 ACE_NEW (new_current,
00087 TAO_RTScheduler_Current_i (current->orb (),
00088 current->dt_hash (),
00089 guid,
00090 0,
00091 implicit_sched_param.in (),
00092 0,
00093 dt.in (),
00094 current));
00095
00096
00097
00098 tss->rtscheduler_current_impl_ = new_current;
00099
00100 }
00101
00102
00103
00104 RTScheduling::Scheduler_var scheduler = current->scheduler ();
00105 scheduler->send_request (ri);
00106
00107
00108 if (!ri->response_expected ())
00109 {
00110
00111 new_current->cleanup_DT ();
00112
00113
00114 new_current->cleanup_current ();
00115 }
00116 }
00117 }
00118
00119 void
00120 Client_Interceptor::send_poll (PortableInterceptor::ClientRequestInfo_ptr ri)
00121 {
00122 if (TAO_debug_level > 0)
00123 ACE_DEBUG ((LM_DEBUG,
00124 "Client_Interceptor::send_poll\n"));
00125
00126 TAO_RTScheduler_Current_i *current = 0;
00127
00128 TAO_TSS_Resources *tss = TAO_TSS_Resources::instance ();
00129
00130 current = static_cast<TAO_RTScheduler_Current_i *> (tss->rtscheduler_current_impl_);
00131 if (current != 0)
00132 {
00133 RTScheduling::Scheduler_var scheduler = current->scheduler ();
00134 scheduler->send_poll (ri);
00135 }
00136 }
00137
00138 void
00139 Client_Interceptor::receive_reply (PortableInterceptor::ClientRequestInfo_ptr ri)
00140 {
00141 if (TAO_debug_level > 0)
00142 ACE_DEBUG ((LM_DEBUG,
00143 "Client_Interceptor::receive_reply\n"));
00144
00145 TAO_RTScheduler_Current_i *current = 0;
00146
00147 TAO_TSS_Resources *tss = TAO_TSS_Resources::instance ();
00148
00149 current = static_cast<TAO_RTScheduler_Current_i *> (tss->rtscheduler_current_impl_);
00150 if (current != 0)
00151 {
00152 RTScheduling::Scheduler_var scheduler = current->scheduler ();
00153 scheduler->receive_reply (ri);
00154 }
00155 }
00156
00157 void
00158 Client_Interceptor::receive_exception (PortableInterceptor::ClientRequestInfo_ptr ri)
00159 {
00160 if (TAO_debug_level > 0)
00161 ACE_DEBUG ((LM_DEBUG,
00162 "Client_Interceptor::receive_exception\n"));
00163
00164 TAO_TSS_Resources *tss = TAO_TSS_Resources::instance ();
00165
00166 TAO_RTScheduler_Current_i *current =
00167 static_cast<TAO_RTScheduler_Current_i *> (tss->rtscheduler_current_impl_);
00168
00169 if (current != 0)
00170 {
00171 if (ri == 0)
00172 {
00173 ACE_ERROR ((LM_ERROR,
00174 "ri = 0\n"));
00175 return;
00176 }
00177
00178 CORBA::Any_var ex = ri->received_exception ();
00179 CORBA::TypeCode_var type = ex->type ();
00180
00181 if (CORBA::is_nil (type.in ()))
00182 {
00183 ACE_ERROR ((LM_ERROR,
00184 "type = 0\n"));
00185 return;
00186 }
00187 const char * id = type->id ();
00188
00189 if (TAO_debug_level > 0)
00190 ACE_DEBUG ((LM_DEBUG,
00191 "Received Exception %C\n",
00192 id));
00193
00194
00195
00196
00197 if (ACE_OS::strstr (id, "CORBA::THREAD_CANCELLED") == 0)
00198 {
00199
00200
00201 current->cancel_thread ();
00202 }
00203 else
00204 {
00205
00206
00207 RTScheduling::Scheduler_var scheduler = current->scheduler ();
00208 scheduler->receive_exception (ri);
00209 }
00210 }
00211 }
00212
00213 void
00214 Client_Interceptor::receive_other (PortableInterceptor::ClientRequestInfo_ptr ri)
00215 {
00216 if (TAO_debug_level > 0)
00217 ACE_DEBUG ((LM_DEBUG,
00218 "Client_Interceptor::receive_other\n"));
00219
00220 TAO_RTScheduler_Current_i *current = 0;
00221
00222 TAO_TSS_Resources *tss = TAO_TSS_Resources::instance ();
00223
00224 current = static_cast<TAO_RTScheduler_Current_i *> (tss->rtscheduler_current_impl_);
00225 if (current != 0)
00226 {
00227 RTScheduling::Scheduler_var scheduler = current->scheduler ();
00228 scheduler->receive_other (ri);
00229 }
00230 }
00231
00232 char*
00233 Client_Interceptor::name (void)
00234 {
00235 return CORBA::string_dup ("RTSchdeuler_Client_Interceptor");
00236 }
00237
00238 void
00239 Client_Interceptor::destroy (void)
00240 {
00241 }
00242
00243 const IOP::ServiceId
00244 Server_Interceptor::SchedulingInfo = 30;
00245
00246 Server_Interceptor::Server_Interceptor (TAO_RTScheduler_Current_ptr current)
00247 {
00248 this->current_ = TAO_RTScheduler_Current::_duplicate (current);
00249 }
00250
00251 void
00252 Server_Interceptor::receive_request_service_contexts (
00253 PortableInterceptor::ServerRequestInfo_ptr)
00254 {
00255 if (TAO_debug_level > 0)
00256 ACE_DEBUG ((LM_DEBUG,
00257 "Server_Interceptor::receive_request_service_contexts\n"));
00258
00259 }
00260
00261 void
00262 Server_Interceptor::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri
00263 )
00264 {
00265 if (TAO_debug_level > 0)
00266 ACE_DEBUG ((LM_DEBUG,
00267 "Server_Interceptor::receive_request\n"));
00268
00269 IOP::ServiceContext_var serv_cxt;
00270
00271 try
00272 {
00273 serv_cxt =
00274 ri->get_request_service_context (Server_Interceptor::SchedulingInfo);
00275 }
00276 catch (const ::CORBA::Exception&)
00277 {
00278 if (TAO_debug_level > 0)
00279 ACE_DEBUG ((LM_DEBUG,
00280 "Invalid Service Request\n"));
00281 return;
00282 }
00283
00284 if (TAO_debug_level > 0)
00285 ACE_DEBUG ((LM_DEBUG,
00286 "Request from Distributable Thread\n"));
00287
00288 RTScheduling::Current::IdType_var guid_var;
00289 char* name = 0;
00290 CORBA::Policy_var sched_param = 0;
00291 CORBA::Policy_var implicit_sched_param = 0;
00292
00293 TAO_RTScheduler_Current_i* new_current = 0;
00294 ACE_NEW_THROW_EX (new_current,
00295 TAO_RTScheduler_Current_i (this->current_->orb (),
00296 this->current_->dt_hash ()),
00297 CORBA::NO_MEMORY (
00298 CORBA::SystemException::_tao_minor_code (
00299 TAO::VMCID,
00300 ENOMEM),
00301 CORBA::COMPLETED_NO));
00302
00303
00304
00305
00306 RTScheduling::Scheduler_var scheduler = new_current->scheduler();
00307 scheduler->receive_request (ri,
00308 guid_var.out (),
00309 name,
00310 sched_param.out (),
00311 implicit_sched_param.out ());
00312
00313 if (guid_var->length () == 0)
00314 {
00315 ACE_ERROR ((LM_ERROR,
00316 "The scheduler MUST retreive and return the "
00317 "GUID from the service context\n"));
00318 return;
00319 }
00320 RTScheduling::Current::IdType guid;
00321 guid.length (sizeof (size_t));
00322 ACE_OS::memcpy (guid.get_buffer (),
00323 guid_var->get_buffer (),
00324 sizeof (size_t));
00325
00326 size_t id;
00327 ACE_OS::memcpy (&id,
00328 guid.get_buffer (),
00329 guid.length ());
00330
00331 if (TAO_debug_level > 0)
00332 ACE_DEBUG ((LM_DEBUG,
00333 "The Guid is %d\n",
00334 id));
00335
00336
00337 RTScheduling::DistributableThread_var dt = TAO_DistributableThread_Factory::create_DT ();
00338
00339
00340 int result = new_current->dt_hash ()->bind (guid, dt);
00341
00342 if (result != 0)
00343 {
00344 throw ::CORBA::INTERNAL ();
00345 }
00346
00347
00348
00349 new_current->id (guid);
00350 new_current->name (name);
00351 new_current->scheduling_parameter (sched_param.in ());
00352 new_current->implicit_scheduling_parameter (implicit_sched_param.in ());
00353 new_current->DT (dt.in ());
00354
00355
00356
00357
00358 TAO_TSS_Resources *tss = TAO_TSS_Resources::instance ();
00359
00360 tss->rtscheduler_previous_current_impl_ = this->current_->implementation (new_current);
00361 }
00362
00363 void
00364 Server_Interceptor::send_reply (PortableInterceptor::ServerRequestInfo_ptr ri)
00365 {
00366 if (TAO_debug_level > 0)
00367 ACE_DEBUG ((LM_DEBUG,
00368 "Server_Interceptor::send_reply\n"));
00369
00370 TAO_RTScheduler_Current_i *current = 0;
00371 TAO_RTScheduler_Current_i *prev_current = 0;
00372
00373 TAO_TSS_Resources *tss = TAO_TSS_Resources::instance ();
00374
00375 current = static_cast<TAO_RTScheduler_Current_i *> (tss->rtscheduler_current_impl_);
00376 if (current != 0)
00377 {
00378 RTScheduling::DistributableThread_var dt = current->DT ();
00379 if (dt->state () == RTScheduling::DistributableThread::CANCELLED)
00380 {
00381 current->cancel_thread ();
00382
00383 return;
00384 }
00385 else ACE_DEBUG ((LM_DEBUG,
00386 "Thread Not Cancelled\n"));
00387
00388
00389
00390 RTScheduling::Scheduler_var scheduler = current->scheduler ();
00391 scheduler->send_reply (ri);
00392
00393 current->cleanup_DT ();
00394 current->cleanup_current ();
00395
00396
00397 prev_current = static_cast<TAO_RTScheduler_Current_i *> (tss->rtscheduler_previous_current_impl_);
00398
00399
00400 tss->rtscheduler_current_impl_ = prev_current;
00401
00402
00403 tss->rtscheduler_previous_current_impl_ = 0;
00404
00405 }
00406 else ACE_DEBUG ((LM_DEBUG,
00407 "Send Reply Current is 0\n"));
00408 }
00409
00410 void
00411 Server_Interceptor::send_exception (PortableInterceptor::ServerRequestInfo_ptr ri)
00412 {
00413 if (TAO_debug_level > 0)
00414 ACE_DEBUG ((LM_DEBUG,
00415 "Server_Interceptor::send_exception\n"));
00416
00417 TAO_RTScheduler_Current_i *current = 0;
00418
00419 TAO_TSS_Resources *tss = TAO_TSS_Resources::instance ();
00420
00421 current = static_cast<TAO_RTScheduler_Current_i *> (tss->rtscheduler_current_impl_);
00422 if (current != 0)
00423 {
00424
00425 RTScheduling::Scheduler_var scheduler = current->scheduler ();
00426 scheduler->send_exception (ri);
00427
00428 current->cleanup_DT ();
00429 current->cleanup_current ();
00430 }
00431 }
00432
00433 void
00434 Server_Interceptor::send_other (PortableInterceptor::ServerRequestInfo_ptr ri)
00435 {
00436 if (TAO_debug_level > 0)
00437 ACE_DEBUG ((LM_DEBUG,
00438 "Server_Interceptor::send_other\n"));
00439
00440 TAO_RTScheduler_Current_i *current = 0;
00441
00442 TAO_TSS_Resources *tss = TAO_TSS_Resources::instance ();
00443
00444 current = static_cast<TAO_RTScheduler_Current_i *> (tss->rtscheduler_current_impl_);
00445 if (current != 0)
00446 {
00447
00448 RTScheduling::Scheduler_var scheduler = current->scheduler ();
00449 scheduler->send_other (ri);
00450
00451 current->cleanup_DT ();
00452 current->cleanup_current ();
00453 }
00454 }
00455
00456 char*
00457 Server_Interceptor::name (void)
00458 {
00459 return CORBA::string_dup ("RTSchdeuler_Server_Interceptor");
00460 }
00461
00462 void
00463 Server_Interceptor::destroy (void)
00464 {
00465 }
00466
00467 TAO_END_VERSIONED_NAMESPACE_DECL