00001
00002
00003 #include "tao/Synch_Invocation.h"
00004 #include "tao/Profile_Transport_Resolver.h"
00005 #include "tao/Profile.h"
00006 #include "tao/Synch_Reply_Dispatcher.h"
00007 #include "tao/Transport.h"
00008 #include "tao/Stub.h"
00009 #include "tao/Bind_Dispatcher_Guard.h"
00010 #include "tao/operation_details.h"
00011 #include "tao/Wait_Strategy.h"
00012 #include "tao/debug.h"
00013 #include "tao/ORB_Constants.h"
00014 #include "tao/Messaging_SyncScopeC.h"
00015 #include "tao/ORB_Core.h"
00016 #include "tao/Service_Context.h"
00017 #include "tao/SystemException.h"
00018 #include "ace/Intrusive_Auto_Ptr.h"
00019
00020 #if TAO_HAS_INTERCEPTORS == 1
00021 # include "tao/PortableInterceptorC.h"
00022 #endif
00023
00024 #include "ace/Auto_Ptr.h"
00025 #include "ace/OS_NS_string.h"
00026 #include "ace/Countdown_Time.h"
00027
00028 #if !defined (__ACE_INLINE__)
00029 # include "tao/Synch_Invocation.inl"
00030 #endif
00031
00032
00033 ACE_RCSID (tao,
00034 Synch_Invocation,
00035 "$Id: Synch_Invocation.cpp 88333 2009-12-24 10:21:16Z johnnyw $")
00036
00037
00038 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00039
00040 namespace TAO
00041 {
00042 Synch_Twoway_Invocation::Synch_Twoway_Invocation (
00043 CORBA::Object_ptr otarget,
00044 Profile_Transport_Resolver &resolver,
00045 TAO_Operation_Details &detail,
00046 bool response_expected)
00047 : Remote_Invocation (otarget,
00048 resolver,
00049 detail,
00050 response_expected)
00051 {
00052 }
00053
00054 Invocation_Status
00055 Synch_Twoway_Invocation::remote_twoway (ACE_Time_Value *max_wait_time)
00056 {
00057 ACE_Countdown_Time countdown (max_wait_time);
00058
00059 TAO_Synch_Reply_Dispatcher *rd_p = 0;
00060 ACE_NEW_NORETURN (rd_p, TAO_Synch_Reply_Dispatcher (this->resolver_.stub ()->orb_core (),
00061 this->details_.reply_service_info ()));
00062 if (!rd_p)
00063 {
00064 throw ::CORBA::NO_MEMORY ();
00065 }
00066
00067 ACE_Intrusive_Auto_Ptr<TAO_Synch_Reply_Dispatcher> rd(rd_p, false);
00068
00069 Invocation_Status s = TAO_INVOKE_FAILURE;
00070
00071 #if TAO_HAS_INTERCEPTORS == 1
00072
00073 s = this->send_request_interception ();
00074
00075 if (s != TAO_INVOKE_SUCCESS)
00076 return s;
00077
00078
00079
00080
00081 try
00082 {
00083 #endif
00084 TAO_Transport* const transport = this->resolver_.transport ();
00085
00086 if (!transport)
00087 {
00088
00089
00090
00091 throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
00092 }
00093
00094 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon,
00095 transport->output_cdr_lock (), TAO_INVOKE_FAILURE);
00096
00097 TAO_OutputCDR &cdr = transport->out_stream ();
00098
00099 cdr.message_attributes (this->details_.request_id (),
00100 this->resolver_.stub (),
00101 TAO_TWOWAY_REQUEST,
00102 max_wait_time);
00103
00104 this->write_header (cdr);
00105
00106 this->marshal_data (cdr);
00107
00108
00109
00110 TAO_Bind_Dispatcher_Guard dispatch_guard (
00111 this->details_.request_id (),
00112 rd.get (),
00113 transport->tms ());
00114
00115 if (dispatch_guard.status () != 0)
00116 {
00117
00118
00119 transport->close_connection ();
00120
00121 throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
00122 }
00123
00124 countdown.update ();
00125
00126 s = this->send_message (cdr,
00127 TAO_TWOWAY_REQUEST,
00128 max_wait_time);
00129
00130 ace_mon.release();
00131
00132 #if TAO_HAS_INTERCEPTORS == 1
00133
00134
00135
00136
00137 if (s == TAO_INVOKE_RESTART)
00138 {
00139 Invocation_Status const tmp = this->receive_other_interception ();
00140
00141 if (tmp != TAO_INVOKE_SUCCESS)
00142 s = tmp;
00143 }
00144 #endif
00145
00146 if (s != TAO_INVOKE_SUCCESS)
00147 return s;
00148
00149 countdown.update ();
00150
00151
00152
00153
00154 if (transport->idle_after_send ())
00155 this->resolver_.transport_released ();
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174 s = this->wait_for_reply (max_wait_time, *rd.get (), dispatch_guard);
00175
00176 #if TAO_HAS_INTERCEPTORS == 1
00177 if (s == TAO_INVOKE_RESTART)
00178 {
00179 Invocation_Status const tmp = this->receive_other_interception ();
00180
00181
00182 if (tmp != TAO_INVOKE_SUCCESS)
00183 s = tmp;
00184 }
00185 #endif
00186
00187 if (s != TAO_INVOKE_SUCCESS)
00188 return s;
00189
00190
00191
00192
00193 s = this->check_reply_status (*rd.get ());
00194
00195
00196
00197 if (transport->idle_after_reply ())
00198 this->resolver_.transport_released ();
00199
00200 #if TAO_HAS_INTERCEPTORS == 1
00201 Invocation_Status tmp = TAO_INVOKE_FAILURE;
00202 if (s == TAO_INVOKE_RESTART)
00203 {
00204 tmp = this->receive_other_interception ();
00205 }
00206 else if (s == TAO_INVOKE_SUCCESS)
00207 {
00208 tmp = this->receive_reply_interception ();
00209 }
00210 if (tmp != TAO_INVOKE_SUCCESS)
00211 s = tmp;
00212 }
00213 catch ( ::CORBA::Exception& ex)
00214 {
00215 PortableInterceptor::ReplyStatus const status =
00216 this->handle_any_exception (&ex);
00217
00218 if (status == PortableInterceptor::LOCATION_FORWARD ||
00219 status == PortableInterceptor::TRANSPORT_RETRY)
00220 s = TAO_INVOKE_RESTART;
00221 else if (status == PortableInterceptor::SYSTEM_EXCEPTION
00222 || status == PortableInterceptor::USER_EXCEPTION)
00223 throw;
00224 }
00225 catch (...)
00226 {
00227
00228
00229
00230 PortableInterceptor::ReplyStatus const st =
00231 this->handle_all_exception ();
00232
00233 if (st == PortableInterceptor::LOCATION_FORWARD ||
00234 st == PortableInterceptor::TRANSPORT_RETRY)
00235 s = TAO_INVOKE_RESTART;
00236 else
00237 throw;
00238 }
00239 #endif
00240
00241 return s;
00242 }
00243
00244 Invocation_Status
00245 Synch_Twoway_Invocation::wait_for_reply (ACE_Time_Value *max_wait_time,
00246 TAO_Synch_Reply_Dispatcher &rd,
00247 TAO_Bind_Dispatcher_Guard &bd)
00248 {
00249
00250
00251
00252
00253
00254
00255
00256
00257 bool const
00258 expired= (max_wait_time && ACE_Time_Value::zero == *max_wait_time);
00259 if (expired)
00260 errno= ETIME;
00261 int const
00262 reply_error = expired ? -1 :
00263 this->resolver_.transport ()->wait_strategy ()->wait (max_wait_time, rd);
00264
00265 if (TAO_debug_level > 0 && max_wait_time)
00266 {
00267 ACE_DEBUG ((LM_DEBUG,
00268 "TAO (%P|%t) - Synch_Twoway_Invocation::wait_for_reply, "
00269 "timeout after recv is <%u> status <%d>\n",
00270 max_wait_time->msec (),
00271 reply_error));
00272 }
00273
00274
00275 if (reply_error == -1)
00276 {
00277
00278
00279 if (TAO_debug_level > 3)
00280 {
00281 ACE_DEBUG ((LM_DEBUG,
00282 "TAO (%P|%t) - Synch_Twoway_Invocation::wait_for_reply, "
00283 "recovering after an error\n"));
00284 }
00285
00286
00287
00288
00289
00290
00291 if (errno == ETIME)
00292 {
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305 if (bd.unbind_dispatcher () == 0)
00306 {
00307
00308
00309 throw ::CORBA::TIMEOUT (
00310 CORBA::SystemException::_tao_minor_code (
00311 TAO_TIMEOUT_RECV_MINOR_CODE,
00312 errno),
00313 CORBA::COMPLETED_MAYBE);
00314 }
00315 }
00316 else
00317 {
00318 (void) bd.unbind_dispatcher ();
00319 this->resolver_.transport ()->close_connection ();
00320
00321 try
00322 {
00323 return
00324 this->stub()->orb_core ()->service_raise_comm_failure (
00325 this->details_.request_service_context ().service_info (),
00326 this->resolver_.profile ());
00327
00328 }
00329 catch (const ::CORBA::Exception&)
00330 {
00331 this->resolver_.stub ()->reset_profiles ();
00332 throw;
00333 }
00334 }
00335 }
00336
00337 return TAO_INVOKE_SUCCESS;
00338 }
00339
00340 Invocation_Status
00341 Synch_Twoway_Invocation::check_reply_status (TAO_Synch_Reply_Dispatcher &rd)
00342 {
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352 TAO_InputCDR &cdr = rd.reply_cdr ();
00353
00354
00355 this->resolver_.transport ()->assign_translators (&cdr, 0);
00356
00357
00358
00359
00360
00361
00362 switch (rd.reply_status ())
00363 {
00364 case GIOP::NO_EXCEPTION:
00365 {
00366 Reply_Guard mon (this, TAO_INVOKE_FAILURE);
00367 if (this->details_.demarshal_args (cdr) == false)
00368 {
00369 throw ::CORBA::MARSHAL ();
00370 }
00371
00372 mon.set_status (TAO_INVOKE_SUCCESS);
00373 }
00374 break;
00375 case GIOP::LOCATION_FORWARD:
00376 return this->location_forward (cdr);
00377 case GIOP::LOCATION_FORWARD_PERM:
00378 {
00379
00380
00381 Invocation_Status const s = this->location_forward (cdr);
00382 if (s != TAO_INVOKE_FAILURE)
00383 {
00384
00385 CORBA::Boolean const permanent_forward_condition =
00386 this->stub ()->orb_core ()->is_permanent_forward_condition
00387 (this->forwarded_to_.in (),
00388 this->request_service_context ());
00389
00390 if (!permanent_forward_condition)
00391 {
00392
00393 if (TAO_debug_level > 3)
00394 ACE_DEBUG ((LM_DEBUG,
00395 "TAO (%P|%t) - Synch_Twoway_Invocation::"
00396 "check_reply_status: unexpected LOCATION_FORWARD_PERM reply\n"));
00397
00398 throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
00399 }
00400
00401
00402 this->reply_status (GIOP::LOCATION_FORWARD_PERM);
00403 }
00404
00405 return s;
00406 }
00407 case GIOP::USER_EXCEPTION:
00408 return this->handle_user_exception (cdr);
00409 case GIOP::SYSTEM_EXCEPTION:
00410 return this->handle_system_exception (cdr);
00411
00412 case GIOP::NEEDS_ADDRESSING_MODE:
00413 {
00414 Reply_Guard mon (this, TAO_INVOKE_FAILURE);
00415
00416
00417
00418 CORBA::Short addr_mode = 0;
00419
00420 if (cdr.read_short (addr_mode) == 0)
00421 {
00422
00423
00424 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_MAYBE);
00425 }
00426
00427
00428
00429 this->resolver_.profile ()->addressing_mode (addr_mode);
00430
00431 mon.set_status (TAO_INVOKE_RESTART);
00432
00433
00434 return TAO_INVOKE_RESTART;
00435 }
00436 }
00437 return TAO_INVOKE_SUCCESS;
00438 }
00439
00440 Invocation_Status
00441 Synch_Twoway_Invocation::location_forward (TAO_InputCDR &inp_stream)
00442 {
00443 Reply_Guard mon (this, TAO_INVOKE_FAILURE);
00444
00445 if (TAO_debug_level > 3)
00446 {
00447 ACE_DEBUG ((LM_DEBUG,
00448 ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::location_forward ")
00449 ACE_TEXT ("being handled\n")));
00450 }
00451
00452 CORBA::Object_var fwd;
00453
00454 if (!(inp_stream >> fwd))
00455 {
00456 throw ::CORBA::MARSHAL (
00457 CORBA::SystemException::_tao_minor_code (
00458 TAO_INVOCATION_LOCATION_FORWARD_MINOR_CODE,
00459 errno),
00460 CORBA::COMPLETED_NO);
00461 }
00462
00463 this->forwarded_reference (fwd.in ());
00464
00465 mon.set_status (TAO_INVOKE_RESTART);
00466
00467 return TAO_INVOKE_RESTART;
00468 }
00469
00470 Invocation_Status
00471 Synch_Twoway_Invocation::handle_user_exception (TAO_InputCDR &cdr)
00472 {
00473 Reply_Guard mon (this,
00474 TAO_INVOKE_FAILURE);
00475
00476 if (TAO_debug_level > 3)
00477 ACE_DEBUG ((LM_DEBUG,
00478 "TAO (%P|%t) - Synch_Twoway_Invocation::"
00479 "handle_user_exception\n"));
00480
00481
00482 CORBA::String_var buf;
00483
00484 if (!(cdr >> buf.inout ()))
00485 {
00486
00487
00488 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_MAYBE);
00489 }
00490
00491 CORBA::Exception *exception = this->details_.corba_exception (buf.in ());
00492
00493 exception->_tao_decode (cdr);
00494
00495 if (TAO_debug_level > 5)
00496 {
00497 ACE_DEBUG ((LM_DEBUG,
00498 ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::")
00499 ACE_TEXT ("handle_user_exception - ")
00500 ACE_TEXT ("raising exception %C\n"),
00501 buf.in ()));
00502 }
00503
00504 mon.set_status (TAO_INVOKE_USER_EXCEPTION);
00505
00506
00507
00508 auto_ptr<CORBA::Exception> safety (exception);
00509
00510 exception->_raise ();
00511
00512 return TAO_INVOKE_USER_EXCEPTION;
00513 }
00514
00515 Invocation_Status
00516 Synch_Twoway_Invocation::handle_system_exception (TAO_InputCDR &cdr)
00517 {
00518 Reply_Guard mon (this, TAO_INVOKE_FAILURE);
00519
00520 if (TAO_debug_level > 3)
00521 ACE_DEBUG ((LM_DEBUG,
00522 "TAO (%P|%t) - Synch_Twoway_Invocation::"
00523 "handle_system_exception\n"));
00524
00525 CORBA::String_var type_id;
00526
00527 if (!(cdr >> type_id.inout ()))
00528 {
00529
00530
00531 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_MAYBE);
00532 }
00533
00534 CORBA::ULong minor = 0;
00535 CORBA::ULong completion = 0;
00536
00537 if (!(cdr >> minor) || !(cdr >> completion))
00538 {
00539 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_MAYBE);
00540 }
00541
00542 bool do_forward = false;
00543 int foe_kind = this->stub ()->orb_core ()->orb_params ()->forward_once_exception();
00544
00545 if ((CORBA::CompletionStatus) completion != CORBA::COMPLETED_YES
00546 && (((foe_kind & TAO::FOE_TRANSIENT) == 0
00547 && ACE_OS_String::strcmp (type_id.in (),
00548 "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0) ||
00549 ACE_OS_String::strcmp (type_id.in (),
00550 "IDL:omg.org/CORBA/OBJ_ADAPTER:1.0") == 0 ||
00551 ACE_OS_String::strcmp (type_id.in (),
00552 "IDL:omg.org/CORBA/NO_RESPONSE:1.0") == 0 ||
00553 ((foe_kind & TAO::FOE_COMM_FAILURE) == 0
00554 && ACE_OS_String::strcmp (type_id.in (),
00555 "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0) ||
00556 (this->stub ()->orb_core ()->orb_params ()->forward_invocation_on_object_not_exist ()
00557 && ACE_OS_String::strcmp (type_id.in (),
00558 "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0) ||
00559 (do_forward = ! this->stub ()->forwarded_on_exception ()
00560 && ((((foe_kind & TAO::FOE_OBJECT_NOT_EXIST) == TAO::FOE_OBJECT_NOT_EXIST)
00561 && (ACE_OS_String::strcmp (type_id.in (),
00562 "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0)) ||
00563 (((foe_kind & TAO::FOE_COMM_FAILURE) == TAO::FOE_COMM_FAILURE)
00564 && (ACE_OS_String::strcmp (type_id.in (),
00565 "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0)) ||
00566 (((foe_kind & TAO::FOE_TRANSIENT) == TAO::FOE_TRANSIENT)
00567 && (ACE_OS_String::strcmp (type_id.in (),
00568 "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0)) ||
00569 (((foe_kind & TAO::FOE_INV_OBJREF) == TAO::FOE_INV_OBJREF)
00570 && (ACE_OS_String::strcmp (type_id.in (),
00571 "IDL:omg.org/CORBA/INV_OBJREF:1.0") == 0))))))
00572 {
00573 if (do_forward)
00574 this->stub ()->forwarded_on_exception (true);
00575
00576
00577
00578
00579
00580
00581
00582 Invocation_Status const s =
00583 this->stub ()->orb_core ()->service_raise_transient_failure (
00584 this->details_.request_service_context ().service_info (),
00585 this->resolver_.profile ());
00586
00587 if (s == TAO_INVOKE_RESTART)
00588 return s;
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599 if (completion != CORBA::COMPLETED_MAYBE &&
00600 this->resolver_.stub ()->next_profile_retry ())
00601 {
00602 return TAO_INVOKE_RESTART;
00603 }
00604
00605
00606 }
00607
00608 CORBA::SystemException *ex = TAO::create_system_exception (type_id.in ());
00609
00610 if (ex == 0)
00611 {
00612
00613
00614
00615 ACE_NEW_RETURN (ex,
00616 CORBA::UNKNOWN,
00617 TAO_INVOKE_FAILURE);
00618 }
00619
00620
00621
00622
00623 auto_ptr<CORBA::SystemException> safety (ex);
00624
00625 ex->minor (minor);
00626 ex->completed (CORBA::CompletionStatus (completion));
00627
00628 if (TAO_debug_level > 4)
00629 ACE_DEBUG ((LM_DEBUG,
00630 "TAO (%P|%t) - Synch_Twoway_Invocation::"
00631 "handle_system_exception, about to raise\n"));
00632
00633 mon.set_status (TAO_INVOKE_SYSTEM_EXCEPTION);
00634
00635
00636 ex->_raise ();
00637
00638 return TAO_INVOKE_SYSTEM_EXCEPTION;
00639 }
00640
00641
00642
00643 Synch_Oneway_Invocation::Synch_Oneway_Invocation (
00644 CORBA::Object_ptr otarget,
00645 Profile_Transport_Resolver &r,
00646 TAO_Operation_Details &d)
00647 : Synch_Twoway_Invocation (otarget, r, d, false)
00648 {
00649 }
00650
00651 Invocation_Status
00652 Synch_Oneway_Invocation::remote_oneway (ACE_Time_Value *max_wait_time)
00653 {
00654 ACE_Countdown_Time countdown (max_wait_time);
00655
00656 CORBA::Octet const response_flags = this->details_.response_flags ();
00657
00658 Invocation_Status s = TAO_INVOKE_FAILURE;
00659
00660 if (response_flags == CORBA::Octet (Messaging::SYNC_WITH_SERVER) ||
00661 response_flags == CORBA::Octet (Messaging::SYNC_WITH_TARGET))
00662 {
00663 s = Synch_Twoway_Invocation::remote_twoway (max_wait_time);
00664
00665 return s;
00666 }
00667
00668 #if TAO_HAS_INTERCEPTORS == 1
00669 s = this->send_request_interception ();
00670
00671 if (s != TAO_INVOKE_SUCCESS)
00672 return s;
00673
00674 try
00675 {
00676 #endif
00677 TAO_Transport* const transport = this->resolver_.transport ();
00678
00679 if (!transport)
00680 {
00681
00682
00683
00684 throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
00685 }
00686
00687 {
00688 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, transport->output_cdr_lock (),
00689 TAO_INVOKE_FAILURE);
00690
00691 TAO_OutputCDR &cdr = transport->out_stream ();
00692
00693 cdr.message_attributes (this->details_.request_id (),
00694 this->resolver_.stub (),
00695 TAO_ONEWAY_REQUEST,
00696 max_wait_time);
00697
00698 this->write_header (cdr);
00699
00700 this->marshal_data (cdr);
00701
00702 countdown.update ();
00703
00704 if (transport->is_connected ())
00705 {
00706
00707 s = this->send_message (cdr,
00708 TAO_ONEWAY_REQUEST,
00709 max_wait_time);
00710 }
00711 else
00712 {
00713 if (TAO_debug_level > 4)
00714 ACE_DEBUG ((LM_DEBUG,
00715 "TAO (%P|%t) - Synch_Oneway_Invocation::"
00716 "remote_oneway, queueing message\n"));
00717
00718 if (transport->format_queue_message (cdr,
00719 max_wait_time,
00720 this->resolver_.stub()) != 0)
00721 {
00722 s = TAO_INVOKE_FAILURE;
00723 }
00724 }
00725 }
00726
00727 #if TAO_HAS_INTERCEPTORS == 1
00728 s = this->receive_other_interception ();
00729 }
00730 catch ( ::CORBA::Exception& ex)
00731 {
00732 PortableInterceptor::ReplyStatus const status =
00733 this->handle_any_exception (&ex);
00734
00735 if (status == PortableInterceptor::LOCATION_FORWARD ||
00736 status == PortableInterceptor::TRANSPORT_RETRY)
00737 s = TAO_INVOKE_RESTART;
00738 else if (status == PortableInterceptor::SYSTEM_EXCEPTION
00739 || status == PortableInterceptor::USER_EXCEPTION)
00740 throw;
00741 }
00742 catch (...)
00743 {
00744
00745
00746
00747 PortableInterceptor::ReplyStatus const st =
00748 this->handle_all_exception ();
00749
00750 if (st == PortableInterceptor::LOCATION_FORWARD ||
00751 st == PortableInterceptor::TRANSPORT_RETRY)
00752 s = TAO_INVOKE_RESTART;
00753 else
00754 throw;
00755 }
00756 #endif
00757
00758 return s;
00759 }
00760 }
00761
00762 TAO_END_VERSIONED_NAMESPACE_DECL