00001
00002
00003 #include "tao/GIOP_Message_Base.h"
00004 #include "tao/operation_details.h"
00005 #include "tao/debug.h"
00006 #include "tao/ORB_Core.h"
00007 #include "tao/TAO_Server_Request.h"
00008 #include "tao/GIOP_Message_Locate_Header.h"
00009 #include "tao/Transport.h"
00010 #include "tao/Transport_Mux_Strategy.h"
00011 #include "tao/LF_Strategy.h"
00012 #include "tao/Request_Dispatcher.h"
00013 #include "tao/Codeset_Manager.h"
00014 #include "tao/SystemException.h"
00015 #include "ace/Min_Max.h"
00016
00017
00018
00019
00020
00021
00022 ACE_RCSID (tao,
00023 GIOP_Message_Base,
00024 "$Id: GIOP_Message_Base.cpp 81663 2008-05-09 12:43:47Z johnnyw $")
00025
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
00029 TAO_Transport *transport,
00030 size_t input_cdr_size)
00031 : orb_core_ (orb_core)
00032 , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
00033 , out_stream_ (0,
00034 input_cdr_size,
00035 TAO_ENCAP_BYTE_ORDER,
00036 orb_core->output_cdr_buffer_allocator (),
00037 orb_core->output_cdr_dblock_allocator (),
00038 orb_core->output_cdr_msgblock_allocator (),
00039 orb_core->orb_params ()->cdr_memcpy_tradeoff (),
00040 fragmentation_strategy_.get (),
00041 TAO_DEF_GIOP_MAJOR,
00042 TAO_DEF_GIOP_MINOR)
00043 {
00044 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00045 const int nibbles = 2 * sizeof (size_t);
00046 char hex_string[nibbles + 1];
00047 ACE_OS::sprintf (hex_string,
00048 "%8.8X",
00049 transport->id ());
00050 hex_string[nibbles] = '\0';
00051 ACE_CString monitor_name ("OutputCDR_");
00052 monitor_name += hex_string;
00053 this->out_stream_.register_monitor (monitor_name.c_str ());
00054 #endif
00055 }
00056
00057
00058 TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void)
00059 {
00060 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00061 this->out_stream_.unregister_monitor ();
00062 #endif
00063 }
00064
00065 void
00066 TAO_GIOP_Message_Base::init (CORBA::Octet major, CORBA::Octet minor)
00067 {
00068
00069 this->out_stream_.set_version (major, minor);
00070 }
00071
00072 TAO_OutputCDR &
00073 TAO_GIOP_Message_Base::out_stream (void)
00074 {
00075 return this->out_stream_;
00076 }
00077
00078 int
00079 TAO_GIOP_Message_Base::generate_request_header (
00080 TAO_Operation_Details &op,
00081 TAO_Target_Specification &spec,
00082 TAO_OutputCDR &cdr)
00083 {
00084
00085 TAO_GIOP_Message_Version giop_version;
00086
00087 cdr.get_version (giop_version);
00088
00089
00090 if (!this->write_protocol_header (GIOP::Request, giop_version, cdr))
00091 {
00092 if (TAO_debug_level)
00093 {
00094 ACE_ERROR ((LM_ERROR,
00095 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00096 }
00097
00098 return -1;
00099 }
00100
00101
00102 TAO_GIOP_Message_Generator_Parser *generator_parser =
00103 this->get_parser (giop_version);
00104
00105
00106 if (!generator_parser->write_request_header (op, spec, cdr))
00107 {
00108 if (TAO_debug_level)
00109 ACE_ERROR ((LM_ERROR,
00110 ACE_TEXT ("(%P|%t) Error in writing request header \n")));
00111
00112 return -1;
00113 }
00114
00115 return 0;
00116 }
00117
00118 int
00119 TAO_GIOP_Message_Base::generate_locate_request_header (
00120 TAO_Operation_Details &op,
00121 TAO_Target_Specification &spec,
00122 TAO_OutputCDR &cdr)
00123 {
00124 TAO_GIOP_Message_Version giop_version;
00125
00126 cdr.get_version (giop_version);
00127
00128
00129 TAO_GIOP_Message_Generator_Parser *generator_parser =
00130 this->get_parser (giop_version);
00131
00132
00133 if (!this->write_protocol_header (GIOP::LocateRequest, giop_version, cdr))
00134 {
00135 if (TAO_debug_level)
00136 ACE_ERROR ((LM_ERROR,
00137 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00138
00139 return -1;
00140 }
00141
00142
00143 if (!generator_parser->write_locate_request_header
00144 (op.request_id (), spec, cdr))
00145 {
00146 if (TAO_debug_level)
00147 ACE_ERROR ((LM_ERROR,
00148 ACE_TEXT ("(%P|%t) Error in writing locate request header \n")));
00149
00150
00151 return -1;
00152
00153 }
00154
00155 return 0;
00156 }
00157
00158 int
00159 TAO_GIOP_Message_Base::generate_reply_header (
00160 TAO_OutputCDR &cdr,
00161 TAO_Pluggable_Reply_Params_Base ¶ms)
00162 {
00163 TAO_GIOP_Message_Version giop_version;
00164
00165 cdr.get_version (giop_version);
00166
00167
00168 if (!this->write_protocol_header (GIOP::Reply, giop_version, cdr))
00169 {
00170 if (TAO_debug_level)
00171 ACE_ERROR ((LM_ERROR,
00172 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00173
00174 return -1;
00175 }
00176
00177 try
00178 {
00179
00180 TAO_GIOP_Message_Generator_Parser *generator_parser =
00181 this->get_parser (giop_version);
00182
00183
00184 if (!generator_parser->write_reply_header (cdr, params))
00185 {
00186 if (TAO_debug_level > 4)
00187 ACE_ERROR ((LM_ERROR,
00188 ACE_TEXT ("(%P|%t) Error in writing reply ")
00189 ACE_TEXT ("header\n")));
00190
00191 return -1;
00192 }
00193 }
00194 catch (const ::CORBA::Exception& ex)
00195 {
00196 if (TAO_debug_level > 4)
00197 ex._tao_print_exception (
00198 ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
00199
00200 return -1;
00201 }
00202
00203 return 0;
00204 }
00205
00206 int
00207 TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
00208 CORBA::ULong request_id)
00209 {
00210 TAO_GIOP_Message_Version giop_version;
00211
00212 cdr.get_version (giop_version);
00213
00214
00215
00216
00217 if (giop_version.major == 1 && giop_version.minor < 2)
00218 return -1;
00219
00220
00221 TAO_GIOP_Message_Generator_Parser *generator_parser =
00222 this->get_parser (giop_version);
00223
00224
00225 if (!this->write_protocol_header (GIOP::Fragment, giop_version, cdr)
00226 || !generator_parser->write_fragment_header (cdr, request_id))
00227 {
00228 if (TAO_debug_level)
00229 ACE_ERROR ((LM_ERROR,
00230 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00231
00232 return -1;
00233 }
00234
00235 return 0;
00236 }
00237
00238 int
00239 TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
00240 {
00241
00242 char *buf = const_cast <char*> (stream.buffer ());
00243
00244 this->set_giop_flags (stream);
00245
00246
00247 size_t const total_len = stream.total_length ();
00248
00249
00250
00251
00252
00253
00254
00255
00256 CORBA::ULong bodylen = static_cast <CORBA::ULong>
00257 (total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
00258
00259 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00260 *(reinterpret_cast <CORBA::ULong *> (buf +
00261 TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00262 #else
00263 if (!stream.do_byte_swap ())
00264 *(reinterpret_cast <CORBA::ULong *>
00265 (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00266 else
00267 ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen),
00268 buf + TAO_GIOP_MESSAGE_SIZE_OFFSET);
00269 #endif
00270
00271 if (TAO_debug_level >= 5)
00272 {
00273
00274
00275
00276 ACE_Message_Block* consolidated_block = 0;
00277 if (stream.begin()->cont () != 0)
00278 {
00279 ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0);
00280 ACE_CDR::consolidate (consolidated_block, stream.begin ());
00281 buf = (char *) (consolidated_block->rd_ptr ());
00282 }
00283
00284 this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len);
00285
00286
00287 delete consolidated_block;
00288 consolidated_block = 0;
00289
00290 }
00291
00292 return 0;
00293 }
00294
00295 int
00296 TAO_GIOP_Message_Base::parse_next_message (TAO_Queued_Data &qd,
00297 size_t &mesg_length)
00298 {
00299 if (qd.msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00300 {
00301 qd.missing_data (TAO_MISSING_DATA_UNDEFINED);
00302
00303 return 0;
00304 }
00305 else
00306 {
00307 TAO_GIOP_Message_State state;
00308
00309 if (state.parse_message_header (*(qd.msg_block ())) == -1)
00310 {
00311 return -1;
00312 }
00313
00314 size_t const message_size = state.message_size ();
00315
00316 if (message_size > qd.msg_block ()->length ())
00317 {
00318 qd.missing_data (message_size - qd.msg_block ()->length ());
00319 }
00320 else
00321 {
00322 qd.missing_data (0);
00323 }
00324
00325
00326 qd.state (state);
00327 mesg_length = message_size;
00328
00329 return 1;
00330 }
00331 }
00332
00333 int
00334 TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
00335 TAO_Queued_Data *&qd)
00336 {
00337 if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00338 {
00339 if (incoming.length () > 0)
00340 {
00341
00342
00343
00344
00345 size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
00346
00347
00348
00349 size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
00350 default_buf_size);
00351
00352
00353
00354 qd = this->make_queued_data (buf_size);
00355
00356 if (qd == 0)
00357 {
00358 if (TAO_debug_level > 0)
00359 {
00360 ACE_ERROR((LM_ERROR,
00361 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00362 ACE_TEXT ("out of memory\n")));
00363 }
00364 return -1;
00365 }
00366
00367 qd->msg_block ()->copy (incoming.rd_ptr (), incoming.length ());
00368
00369 incoming.rd_ptr (incoming.length ());
00370
00371 qd->missing_data (TAO_MISSING_DATA_UNDEFINED);
00372 }
00373 else
00374 {
00375
00376 qd = 0;
00377 }
00378
00379 return 0;
00380 }
00381
00382 TAO_GIOP_Message_State state;
00383 if (state.parse_message_header (incoming) == -1)
00384 {
00385 return -1;
00386 }
00387
00388 size_t copying_len = state.message_size ();
00389
00390 qd = this->make_queued_data (copying_len);
00391
00392 if (qd == 0)
00393 {
00394 if (TAO_debug_level > 0)
00395 {
00396 ACE_ERROR ((LM_ERROR,
00397 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00398 ACE_TEXT ("out of memory\n")));
00399 }
00400 return -1;
00401 }
00402
00403 if (copying_len > incoming.length ())
00404 {
00405 qd->missing_data (copying_len - incoming.length ());
00406 copying_len = incoming.length ();
00407 }
00408 else
00409 {
00410 qd->missing_data (0);
00411 }
00412
00413 qd->msg_block ()->copy (incoming.rd_ptr (), copying_len);
00414
00415 incoming.rd_ptr (copying_len);
00416 qd->state (state);
00417
00418 return 1;
00419 }
00420
00421 int
00422 TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
00423 ACE_Message_Block &incoming)
00424 {
00425
00426 if (qd->missing_data () == TAO_MISSING_DATA_UNDEFINED)
00427 {
00428
00429
00430 size_t const len = qd->msg_block ()->length ();
00431
00432
00433 if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
00434 {
00435
00436
00437 return -1;
00438 }
00439
00440
00441
00442
00443 size_t const available = incoming.length ();
00444 size_t const desired = TAO_GIOP_MESSAGE_HEADER_LEN - len;
00445 size_t const n_copy = ace_min (available, desired);
00446
00447
00448 if (n_copy == 0)
00449 {
00450 return -1;
00451 }
00452
00453 if (qd->msg_block ()->copy (incoming.rd_ptr (), n_copy) == -1)
00454 {
00455 return -1;
00456 }
00457
00458
00459 incoming.rd_ptr (n_copy);
00460
00461
00462 if (qd->msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00463 {
00464 return 0;
00465 }
00466
00467 TAO_GIOP_Message_State state;
00468
00469
00470 if (state.parse_message_header (*qd->msg_block ()) == -1)
00471 {
00472 if (TAO_debug_level > 0)
00473 {
00474 ACE_ERROR ((LM_ERROR,
00475 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
00476 ACE_TEXT ("error parsing header\n") ));
00477 }
00478 return -1;
00479 }
00480
00481
00482 if (ACE_CDR::grow (qd->msg_block (),
00483 state.message_size ()) == -1)
00484 {
00485
00486
00487 return -1;
00488 }
00489
00490
00491
00492 size_t copy_len = state.payload_size ();
00493
00494
00495
00496 if (copy_len > incoming.length ())
00497 {
00498
00499 qd->missing_data (copy_len - incoming.length ());
00500
00501
00502 copy_len = incoming.length ();
00503 }
00504 else
00505 {
00506 qd->missing_data (0);
00507 }
00508
00509
00510
00511 if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00512 {
00513 return -1;
00514 }
00515
00516
00517 incoming.rd_ptr (copy_len);
00518
00519
00520 qd->state (state);
00521 }
00522 else
00523 {
00524
00525 size_t copy_len = qd->missing_data ();
00526
00527 if (copy_len > incoming.length ())
00528 {
00529
00530 qd->missing_data (copy_len - incoming.length ());
00531
00532
00533 copy_len = incoming.length ();
00534 }
00535
00536
00537 if (copy_len == 0)
00538 {
00539 return -1;
00540 }
00541
00542
00543 if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00544 {
00545 return -1;
00546 }
00547
00548
00549 qd->msg_block ()->rd_ptr (copy_len);
00550
00551 }
00552
00553 return 0;
00554 }
00555
00556 int
00557 TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
00558 TAO_Queued_Data *qd)
00559 {
00560
00561 this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
00562
00563
00564 TAO_GIOP_Message_Generator_Parser *generator_parser =
00565 this->get_parser (qd->giop_version ());
00566
00567
00568
00569
00570
00571 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00572 char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
00573 #else
00574 char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
00575 #endif
00576 ACE_Data_Block out_db (sizeof (repbuf),
00577 ACE_Message_Block::MB_DATA,
00578 repbuf,
00579 this->orb_core_->input_cdr_buffer_allocator (),
00580 0,
00581 ACE_Message_Block::DONT_DELETE,
00582 this->orb_core_->input_cdr_dblock_allocator ());
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594 TAO_OutputCDR output (&out_db,
00595 TAO_ENCAP_BYTE_ORDER,
00596 this->orb_core_->input_cdr_msgblock_allocator (),
00597 this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
00598 this->fragmentation_strategy_.get (),
00599 qd->giop_version ().major_version (),
00600 qd->giop_version ().minor_version ());
00601
00602
00603 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00604 size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00605 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00606
00607 if (TAO_debug_level >= 5)
00608 {
00609 this->dump_msg ("recv",
00610 reinterpret_cast <u_char *> (qd->msg_block ()->rd_ptr ()),
00611 qd->msg_block ()->length ());
00612 }
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624 ACE_Message_Block::Message_Flags flg = 0;
00625 ACE_Data_Block *db = 0;
00626
00627
00628 flg = qd->msg_block ()->self_flags ();
00629
00630 if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
00631 {
00632
00633 db = qd->msg_block ()->data_block ();
00634 }
00635 else
00636 {
00637
00638
00639 db = qd->msg_block ()->data_block ()->duplicate ();
00640 }
00641
00642 TAO_InputCDR input_cdr (db,
00643 flg,
00644 rd_pos,
00645 wr_pos,
00646 qd->byte_order (),
00647 qd->giop_version ().major_version (),
00648 qd->giop_version ().minor_version (),
00649 this->orb_core_);
00650
00651 transport->assign_translators(&input_cdr,&output);
00652
00653
00654
00655
00656
00657
00658
00659
00660 switch (qd->msg_type ())
00661 {
00662 case GIOP::Request:
00663
00664
00665
00666 return this->process_request (transport,
00667 input_cdr,
00668 output,
00669 generator_parser);
00670
00671 case GIOP::LocateRequest:
00672 return this->process_locate_request (transport,
00673 input_cdr,
00674 output,
00675 generator_parser);
00676 default:
00677 return -1;
00678 }
00679 }
00680
00681 int
00682 TAO_GIOP_Message_Base::process_reply_message (
00683 TAO_Pluggable_Reply_Params ¶ms,
00684 TAO_Queued_Data *qd)
00685 {
00686
00687 TAO_GIOP_Message_Generator_Parser *generator_parser =
00688 this->get_parser (qd->giop_version ());
00689
00690
00691 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00692 size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00693 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00694
00695 if (TAO_debug_level >= 5)
00696 {
00697 this->dump_msg ("recv",
00698 reinterpret_cast <u_char *> (qd->msg_block ()->rd_ptr ()),
00699 qd->msg_block ()->length ());
00700 }
00701
00702
00703
00704
00705
00706
00707 TAO_InputCDR input_cdr (qd->msg_block ()->data_block (),
00708 ACE_Message_Block::DONT_DELETE,
00709 rd_pos,
00710 wr_pos,
00711 qd->byte_order (),
00712 qd->giop_version ().major_version (),
00713 qd->giop_version ().minor_version (),
00714 this->orb_core_);
00715
00716
00717
00718
00719
00720
00721
00722 int retval = 0;
00723
00724 switch (qd->msg_type ())
00725 {
00726 case GIOP::Reply:
00727
00728 retval = generator_parser->parse_reply (input_cdr, params);
00729 break;
00730 case GIOP::LocateReply:
00731 retval = generator_parser->parse_locate_reply (input_cdr, params);
00732 break;
00733 default:
00734 retval = -1;
00735 }
00736
00737 if (retval == -1)
00738 return retval;
00739
00740 params.input_cdr_ = &input_cdr;
00741 params.transport_->assign_translators (params.input_cdr_, 0);
00742
00743 retval = params.transport_->tms ()->dispatch_reply (params);
00744
00745 if (retval == -1)
00746 {
00747
00748
00749 if (TAO_debug_level > 0)
00750 ACE_ERROR ((LM_ERROR,
00751 ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_reply_message, ")
00752 ACE_TEXT ("dispatch reply failed\n"),
00753 params.transport_->id ()));
00754 }
00755
00756 return retval;
00757 }
00758
00759 int
00760 TAO_GIOP_Message_Base::generate_exception_reply (
00761 TAO_OutputCDR &cdr,
00762 TAO_Pluggable_Reply_Params_Base ¶ms,
00763 const CORBA::Exception &x)
00764 {
00765
00766
00767
00768 try
00769 {
00770
00771 this->generate_reply_header (cdr, params);
00772 x._tao_encode (cdr);
00773 }
00774 catch (const ::CORBA::Exception&)
00775 {
00776
00777
00778
00779
00780 if (TAO_debug_level > 0)
00781 ACE_DEBUG ((LM_DEBUG,
00782 ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
00783 ACE_TEXT ("generate_exception_reply ()\n")));
00784 return -1;
00785 }
00786
00787 return 0;
00788 }
00789
00790 int
00791 TAO_GIOP_Message_Base::write_protocol_header (GIOP::MsgType type,
00792 const TAO_GIOP_Message_Version &version,
00793 TAO_OutputCDR &msg)
00794 {
00795
00796 msg.reset ();
00797
00798 CORBA::Octet header[12] =
00799 {
00800
00801
00802 0x47,
00803 0x49,
00804 0x4f,
00805 0x50
00806 };
00807
00808 header[4] = version.major;
00809 header[5] = version.minor;
00810
00811
00812
00813
00814 header[7] = static_cast <CORBA::Octet> (type);
00815
00816 static ACE_CDR::ULong const header_size =
00817 sizeof (header) / sizeof (header[0]);
00818
00819
00820
00821
00822 msg.write_octet_array (header, header_size);
00823
00824 return msg.good_bit ();
00825 }
00826
00827 int
00828 TAO_GIOP_Message_Base::process_request (
00829 TAO_Transport * transport,
00830 TAO_InputCDR & cdr,
00831 TAO_OutputCDR & output,
00832 TAO_GIOP_Message_Generator_Parser * parser)
00833 {
00834
00835
00836 TAO_ServerRequest request (this,
00837 cdr,
00838 output,
00839 transport,
00840 this->orb_core_);
00841
00842 CORBA::ULong request_id = 0;
00843 CORBA::Boolean response_required = false;
00844 int parse_error = 0;
00845
00846 try
00847 {
00848 parse_error = parser->parse_request_header (request);
00849
00850
00851 if (parse_error != 0)
00852 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
00853
00854 TAO_Codeset_Manager *csm = request.orb_core ()->codeset_manager ();
00855 if (csm)
00856 {
00857 csm->process_service_context (request);
00858 transport->assign_translators (&cdr, &output);
00859 }
00860
00861 request_id = request.request_id ();
00862
00863 response_required = request.response_expected ();
00864
00865 CORBA::Object_var forward_to;
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875 this->orb_core_->request_dispatcher ()->dispatch (
00876 this->orb_core_,
00877 request,
00878 forward_to);
00879
00880
00881
00882 if (request.is_forwarded ())
00883 {
00884 CORBA::Boolean const permanent_forward_condition =
00885 this->orb_core_->is_permanent_forward_condition
00886 (forward_to.in (),
00887 request.request_service_context ());
00888
00889
00890 TAO_Pluggable_Reply_Params_Base reply_params;
00891 reply_params.request_id_ = request_id;
00892 reply_params.reply_status (
00893 permanent_forward_condition
00894 ? GIOP::LOCATION_FORWARD_PERM
00895 : GIOP::LOCATION_FORWARD);
00896 reply_params.svc_ctx_.length (0);
00897
00898
00899 reply_params.service_context_notowned (
00900 &request.reply_service_info ());
00901
00902 output.message_attributes (request_id,
00903 0,
00904 TAO_Transport::TAO_REPLY,
00905 0);
00906
00907
00908 this->generate_reply_header (output, reply_params);
00909
00910 if (!(output << forward_to.in ()))
00911 {
00912 if (TAO_debug_level > 0)
00913 ACE_ERROR ((LM_ERROR,
00914 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
00915 ACE_TEXT ("forward reference.\n")));
00916
00917 return -1;
00918 }
00919
00920 output.more_fragments (false);
00921
00922 int result = transport->send_message (output,
00923 0,
00924 TAO_Transport::TAO_REPLY);
00925 if (result == -1)
00926 {
00927 if (TAO_debug_level > 0)
00928 {
00929
00930
00931 ACE_ERROR ((LM_ERROR,
00932 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
00933 ACE_TEXT ("cannot send reply\n"),
00934 ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
00935 }
00936 }
00937 return result;
00938 }
00939 }
00940
00941 catch ( ::CORBA::Exception& ex)
00942 {
00943 int result = 0;
00944
00945 if (response_required)
00946 {
00947 result = this->send_reply_exception (transport,
00948 output,
00949 request_id,
00950 &request.reply_service_info (),
00951 &ex);
00952 if (result == -1)
00953 {
00954 if (TAO_debug_level > 0)
00955 {
00956 ACE_ERROR ((LM_ERROR,
00957 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
00958 ACE_TEXT ("cannot send exception\n"),
00959 ACE_TEXT ("process_connector_request ()")));
00960
00961 ex._tao_print_exception (
00962 "TAO_GIOP_Message_Base::process_request[1]");
00963 }
00964 }
00965
00966 }
00967 else if (TAO_debug_level > 0)
00968 {
00969
00970
00971
00972
00973
00974
00975 ACE_ERROR ((LM_ERROR,
00976 ACE_TEXT ("(%P|%t) exception thrown ")
00977 ACE_TEXT ("but client is not waiting a response\n")));
00978
00979 ex._tao_print_exception (
00980 "TAO_GIOP_Message_Base::process_request[2]");
00981 }
00982
00983 return result;
00984 }
00985 catch (...)
00986 {
00987
00988
00989
00990
00991
00992
00993
00994 int result = 0;
00995
00996 if (response_required)
00997 {
00998 CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code
00999 (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0),
01000 CORBA::COMPLETED_MAYBE);
01001
01002 if (this->send_reply_exception (transport,
01003 output,
01004 request_id,
01005 &request.reply_service_info (),
01006 &exception) == -1
01007 && TAO_debug_level > 0)
01008 {
01009 ACE_ERROR ((LM_ERROR,
01010 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
01011 ACE_TEXT ("%p: ")
01012 ACE_TEXT ("cannot send exception\n"),
01013 ACE_TEXT ("process_request ()")));
01014 exception._tao_print_exception (
01015 "TAO_GIOP_Message_Base::process_request[3]");
01016 }
01017 }
01018 else if (TAO_debug_level > 0)
01019 {
01020
01021
01022
01023
01024
01025 ACE_ERROR ((LM_ERROR,
01026 ACE_TEXT ("TAO (%P|%t) exception thrown ")
01027 ACE_TEXT ("but client is not waiting a response\n")));
01028 }
01029
01030 return result;
01031 }
01032
01033 return 0;
01034 }
01035
01036
01037 int
01038 TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
01039 TAO_InputCDR &input,
01040 TAO_OutputCDR &output,
01041 TAO_GIOP_Message_Generator_Parser *parser)
01042 {
01043
01044
01045 TAO_GIOP_Locate_Request_Header locate_request (input, this->orb_core_);
01046
01047 TAO_GIOP_Locate_Status_Msg status_info;
01048
01049
01050 status_info.status = GIOP::UNKNOWN_OBJECT;
01051
01052 CORBA::Boolean response_required = true;
01053
01054 try
01055 {
01056 int parse_error = parser->parse_locate_header (locate_request);
01057
01058 if (parse_error != 0)
01059 {
01060 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01061 }
01062
01063 TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
01064 locate_request.object_key ().length (),
01065 locate_request.object_key ().get_buffer (),
01066 0);
01067
01068
01069 parse_error = 1;
01070 CORBA::ULong req_id = locate_request.request_id ();
01071
01072
01073
01074 CORBA::Boolean deferred_reply = true;
01075 TAO_ServerRequest server_request (this,
01076 req_id,
01077 response_required,
01078 deferred_reply,
01079 tmp_key,
01080 "_non_existent",
01081 output,
01082 transport,
01083 this->orb_core_,
01084 parse_error);
01085
01086 if (parse_error != 0)
01087 {
01088 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01089 }
01090
01091 CORBA::Object_var forward_to;
01092
01093 this->orb_core_->request_dispatcher ()->dispatch (
01094 this->orb_core_,
01095 server_request,
01096 forward_to);
01097
01098 if (server_request.is_forwarded ())
01099 {
01100 status_info.status = GIOP::OBJECT_FORWARD;
01101 status_info.forward_location_var = forward_to;
01102 if (TAO_debug_level > 0)
01103 ACE_DEBUG ((LM_DEBUG,
01104 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01105 ACE_TEXT ("called: forwarding\n")));
01106 }
01107 else if (server_request.reply_status () == GIOP::NO_EXCEPTION)
01108 {
01109
01110 status_info.status = GIOP::OBJECT_HERE;
01111 if (TAO_debug_level > 0)
01112 ACE_DEBUG ((LM_DEBUG,
01113 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01114 ACE_TEXT ("found\n")));
01115 }
01116 else
01117 {
01118
01119 status_info.status = GIOP::UNKNOWN_OBJECT;
01120 ACE_DEBUG ((LM_DEBUG,
01121 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01122 ACE_TEXT ("not here\n")));
01123 }
01124 }
01125
01126 catch (const ::CORBA::Exception&)
01127 {
01128
01129 status_info.status = GIOP::UNKNOWN_OBJECT;
01130 if (TAO_debug_level > 0)
01131 ACE_DEBUG ((LM_DEBUG,
01132 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01133 ACE_TEXT ("CORBA exception raised\n")));
01134 }
01135 catch (...)
01136 {
01137
01138 status_info.status = GIOP::UNKNOWN_OBJECT;
01139 if (TAO_debug_level > 0)
01140 ACE_DEBUG ((LM_DEBUG,
01141 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
01142 ACE_TEXT ("C++ exception raised\n")));
01143 }
01144
01145 return this->make_send_locate_reply (transport,
01146 locate_request,
01147 status_info,
01148 output,
01149 parser);
01150 }
01151
01152 int
01153 TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
01154 TAO_GIOP_Locate_Request_Header &request,
01155 TAO_GIOP_Locate_Status_Msg &status_info,
01156 TAO_OutputCDR &output,
01157 TAO_GIOP_Message_Generator_Parser *parser)
01158 {
01159 TAO_GIOP_Message_Version giop_version;
01160 output.get_version (giop_version);
01161
01162
01163
01164
01165 this->write_protocol_header (GIOP::LocateReply, giop_version, output);
01166
01167
01168 parser->write_locate_reply_mesg (output,
01169 request.request_id (),
01170 status_info);
01171
01172 output.more_fragments (false);
01173
01174
01175 int const result = transport->send_message (output,
01176 0,
01177 TAO_Transport::TAO_REPLY);
01178
01179
01180 if (result == -1)
01181 {
01182 if (TAO_debug_level > 0)
01183 {
01184 ACE_ERROR ((LM_ERROR,
01185 ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
01186 ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
01187 }
01188 }
01189
01190 return result;
01191 }
01192
01193
01194
01195
01196
01197
01198 int
01199 TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
01200 {
01201 const char error_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01202 {
01203
01204
01205 0x47,
01206 0x49,
01207 0x4f,
01208 0x50,
01209 (CORBA::Octet) 1,
01210 (CORBA::Octet) 0,
01211 TAO_ENCAP_BYTE_ORDER,
01212 GIOP::MessageError,
01213 0, 0, 0, 0
01214 };
01215
01216 if (TAO_debug_level >= 5)
01217 {
01218 this->dump_msg ("send_error",
01219 reinterpret_cast <const u_char *> (error_message),
01220 TAO_GIOP_MESSAGE_HEADER_LEN);
01221 }
01222
01223 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01224 ACE_Message_Block::MB_DATA,
01225 error_message,
01226 0,
01227 0,
01228 ACE_Message_Block::DONT_DELETE,
01229 0);
01230 ACE_Message_Block message_block(&data_block,
01231 ACE_Message_Block::DONT_DELETE);
01232 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01233
01234 size_t bt;
01235 int const result = transport->send_message_block_chain (&message_block, bt);
01236 if (result == -1)
01237 {
01238 if (TAO_debug_level > 0)
01239 ACE_DEBUG ((LM_DEBUG,
01240 ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
01241 transport->id ()));
01242 }
01243
01244 return result;
01245 }
01246
01247 TAO_GIOP_Message_Generator_Parser*
01248 TAO_GIOP_Message_Base::get_parser (
01249 const TAO_GIOP_Message_Version &version) const
01250 {
01251 switch (version.major)
01252 {
01253 case 1:
01254 switch (version.minor)
01255 {
01256 case 0:
01257 return
01258 const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
01259 &this->tao_giop_impl_.tao_giop_10);
01260 break;
01261 case 1:
01262 return
01263 const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
01264 &this->tao_giop_impl_.tao_giop_11);
01265 break;
01266 case 2:
01267 return
01268 const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
01269 &this->tao_giop_impl_.tao_giop_12);
01270 break;
01271 default:
01272 throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
01273 break;
01274 }
01275 break;
01276 default:
01277 throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
01278 break;
01279 }
01280 }
01281
01282
01283
01284
01285
01286
01287
01288
01289
01290
01291
01292
01293
01294
01295
01296
01297 void
01298 TAO_GIOP_Message_Base::
01299 send_close_connection (const TAO_GIOP_Message_Version &version,
01300 TAO_Transport *transport)
01301 {
01302
01303
01304
01305 const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01306 {
01307
01308
01309 0x47,
01310 0x49,
01311 0x4f,
01312 0x50,
01313 version.major,
01314 version.minor,
01315 TAO_ENCAP_BYTE_ORDER,
01316 GIOP::CloseConnection,
01317 0, 0, 0, 0
01318 };
01319
01320
01321
01322
01323
01324
01325
01326 if (TAO_debug_level >= 5)
01327 {
01328 this->dump_msg ("send_close_connection",
01329 reinterpret_cast <const u_char *> (close_message),
01330 TAO_GIOP_MESSAGE_HEADER_LEN);
01331 }
01332
01333 #if 0
01334
01335
01336
01337
01338
01339
01340
01341 ACE_HANDLE which = transport->handle ();
01342 if (which == ACE_INVALID_HANDLE)
01343 {
01344 if (TAO_debug_level > 0)
01345 ACE_DEBUG ((LM_DEBUG,
01346 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -")
01347 ACE_TEXT (" connection already closed\n")));
01348 return;
01349 }
01350 #endif
01351
01352 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01353 ACE_Message_Block::MB_DATA,
01354 close_message,
01355 0,
01356 0,
01357 ACE_Message_Block::DONT_DELETE,
01358 0);
01359 ACE_Message_Block message_block(&data_block);
01360 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01361
01362 size_t bt;
01363 int const result = transport->send_message_block_chain (&message_block, bt);
01364 if (result == -1)
01365 {
01366 if (TAO_debug_level > 0)
01367 ACE_ERROR ((LM_ERROR,
01368 ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
01369 transport->id (), errno));
01370 }
01371
01372 transport->close_connection ();
01373 ACE_DEBUG ((LM_DEBUG,
01374 ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
01375 transport-> id ()));
01376 }
01377
01378
01379 int
01380 TAO_GIOP_Message_Base::send_reply_exception (
01381 TAO_Transport *transport,
01382 TAO_OutputCDR &output,
01383 CORBA::ULong request_id,
01384 IOP::ServiceContextList *svc_info,
01385 CORBA::Exception *x
01386 )
01387 {
01388 TAO_Pluggable_Reply_Params_Base reply_params;
01389 reply_params.request_id_ = request_id;
01390 reply_params.svc_ctx_.length (0);
01391
01392
01393 reply_params.argument_flag_ = true;
01394
01395
01396
01397 reply_params.service_context_notowned (svc_info);
01398
01399 if (CORBA::SystemException::_downcast (x) != 0)
01400 {
01401 reply_params.reply_status (GIOP::SYSTEM_EXCEPTION);
01402 }
01403 else
01404 {
01405 reply_params.reply_status (GIOP::USER_EXCEPTION);
01406 }
01407
01408 if (this->generate_exception_reply (output, reply_params, *x) == -1)
01409 return -1;
01410
01411 output.more_fragments (false);
01412
01413 return transport->send_message (output, 0, TAO_Transport::TAO_REPLY);
01414 }
01415
01416 void
01417 TAO_GIOP_Message_Base::dump_msg (const char *label,
01418 const u_char *ptr,
01419 size_t len)
01420 {
01421 static const char digits[] = "0123456789ABCD";
01422 static const char *names[] =
01423 {
01424 "Request",
01425 "Reply",
01426 "CancelRequest",
01427 "LocateRequest",
01428 "LocateReply",
01429 "CloseConnection",
01430 "MessageError",
01431 "Fragment"
01432 };
01433
01434
01435 const char *message_name = "UNKNOWN MESSAGE";
01436 u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET];
01437 if (slot < sizeof (names) / sizeof (names[0]))
01438 message_name = names[slot];
01439
01440
01441 int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
01442
01443
01444 CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
01445 CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
01446
01447
01448 CORBA::ULong tmp = 0;
01449 CORBA::ULong *id = &tmp;
01450 char *tmp_id = 0;
01451
01452 if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Request ||
01453 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Reply ||
01454 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Fragment)
01455 {
01456 if (major == 1 && minor < 2)
01457 {
01458
01459 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4);
01460 }
01461 else
01462 {
01463 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
01464 }
01465 #if !defined (ACE_DISABLE_SWAP_ON_READ)
01466 if (byte_order == TAO_ENCAP_BYTE_ORDER)
01467 {
01468 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01469 }
01470 else
01471 {
01472 ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
01473 }
01474 #else
01475 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01476 #endif
01477
01478 }
01479
01480
01481 ACE_DEBUG ((LM_DEBUG,
01482 "TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
01483 "%s GIOP v%c.%c msg, %d data bytes, %s endian, "
01484 "Type %s[%u]\n",
01485 ACE_TEXT_CHAR_TO_TCHAR (label),
01486 digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
01487 digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
01488 len - TAO_GIOP_MESSAGE_HEADER_LEN ,
01489 (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
01490 ACE_TEXT_CHAR_TO_TCHAR(message_name),
01491 *id));
01492
01493 if (TAO_debug_level >= 10)
01494 ACE_HEX_DUMP ((LM_DEBUG,
01495 (const char *) ptr,
01496 len,
01497 ACE_TEXT ("GIOP message")));
01498 }
01499
01500 int
01501 TAO_GIOP_Message_Base::generate_locate_reply_header (
01502 TAO_OutputCDR & ,
01503 TAO_Pluggable_Reply_Params_Base & )
01504 {
01505 return 0;
01506 }
01507
01508 bool
01509 TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) const
01510 {
01511 TAO_GIOP_Message_Version giop_version;
01512
01513 msg.get_version (giop_version);
01514
01515
01516 TAO_GIOP_Message_Generator_Parser *generator_parser =
01517 this->get_parser (giop_version);
01518
01519
01520
01521
01522
01523 return generator_parser->is_ready_for_bidirectional ();
01524 }
01525
01526
01527 TAO_Queued_Data *
01528 TAO_GIOP_Message_Base::make_queued_data (size_t sz)
01529 {
01530
01531
01532
01533
01534
01535 ACE_Data_Block *db =
01536 this->orb_core_->create_input_cdr_data_block (sz +
01537 ACE_CDR::MAX_ALIGNMENT);
01538
01539 TAO_Queued_Data *qd =
01540 TAO_Queued_Data::make_queued_data (
01541 this->orb_core_->transport_message_buffer_allocator (),
01542 this->orb_core_->input_cdr_msgblock_allocator (),
01543 db);
01544
01545 if (qd == 0)
01546 {
01547 if (TAO_debug_level > 0)
01548 {
01549 ACE_ERROR ((LM_ERROR,
01550 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01551 ACE_TEXT ("out of memory, failed to allocate queued data object\n")));
01552 }
01553 db->release ();
01554 return 0;
01555 }
01556
01557 return qd;
01558 }
01559
01560 size_t
01561 TAO_GIOP_Message_Base::header_length (void) const
01562 {
01563 return TAO_GIOP_MESSAGE_HEADER_LEN;
01564 }
01565
01566 size_t
01567 TAO_GIOP_Message_Base::fragment_header_length (
01568 const TAO_GIOP_Message_Version& giop_version) const
01569 {
01570
01571 TAO_GIOP_Message_Generator_Parser *generator_parser =
01572 this->get_parser (giop_version);
01573
01574 return generator_parser->fragment_header_length ();
01575 }
01576
01577 int
01578 TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd,
01579 CORBA::ULong &request_id) const
01580 {
01581
01582 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
01583 size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
01584 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
01585
01586
01587
01588
01589
01590
01591
01592
01593
01594
01595
01596 ACE_Message_Block::Message_Flags flg = 0;
01597 ACE_Data_Block *db = 0;
01598
01599
01600 flg = qd->msg_block ()->self_flags ();
01601
01602 if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
01603 {
01604
01605 db = qd->msg_block ()->data_block ();
01606 }
01607 else
01608 {
01609
01610
01611 db = qd->msg_block ()->data_block ()->duplicate ();
01612 }
01613
01614 TAO_InputCDR input_cdr (db,
01615 flg,
01616 rd_pos,
01617 wr_pos,
01618 qd->byte_order (),
01619 qd->giop_version ().major_version (),
01620 qd->giop_version ().minor_version (),
01621 this->orb_core_);
01622
01623 if (qd->giop_version ().major == 1 &&
01624 (qd->giop_version ().minor == 0 || qd->giop_version ().minor == 1))
01625 {
01626 switch (qd->msg_type ())
01627 {
01628 case GIOP::Request:
01629 case GIOP::Reply:
01630 {
01631 IOP::ServiceContextList service_context;
01632
01633 if ((input_cdr >> service_context)
01634 && (input_cdr >> request_id))
01635 {
01636 return 0;
01637 }
01638 }
01639 break;
01640 case GIOP::CancelRequest:
01641 case GIOP::LocateRequest:
01642 case GIOP::LocateReply:
01643 {
01644 if ((input_cdr >> request_id))
01645 {
01646 return 0;
01647 }
01648 }
01649 break;
01650 default:
01651 break;
01652 }
01653 }
01654 else
01655 {
01656 switch (qd->msg_type ())
01657 {
01658 case GIOP::Request:
01659 case GIOP::Reply:
01660 case GIOP::Fragment:
01661 case GIOP::CancelRequest:
01662 case GIOP::LocateRequest:
01663 case GIOP::LocateReply:
01664 {
01665
01666
01667
01668
01669 if ((input_cdr >> request_id))
01670 {
01671 return 0;
01672 }
01673 }
01674 break;
01675 default:
01676 break;
01677 }
01678 }
01679
01680 return -1;
01681 }
01682
01683
01684 int
01685 TAO_GIOP_Message_Base::consolidate_fragmented_message (
01686 TAO_Queued_Data * qd,
01687 TAO_Queued_Data *& msg)
01688 {
01689 TAO::Incoming_Message_Stack reverse_stack;
01690
01691 TAO_Queued_Data *tail = 0;
01692 TAO_Queued_Data *head = 0;
01693
01694
01695
01696
01697
01698
01699 if (qd == 0)
01700 {
01701 return -1;
01702 }
01703
01704 if (qd->giop_version ().major == 1 && qd->giop_version ().minor == 0)
01705 {
01706 TAO_Queued_Data::release (qd);
01707 return -1;
01708 }
01709
01710
01711 if (qd->more_fragments ())
01712 {
01713 this->fragment_stack_.push (qd);
01714
01715 msg = 0;
01716 return 1;
01717 }
01718
01719 tail = qd;
01720
01721
01722
01723 size_t const header_adjustment =
01724 this->header_length () +
01725 this->fragment_header_length (tail->giop_version ().major_version ());
01726
01727 if (tail->msg_block ()->length () < header_adjustment)
01728 {
01729
01730 TAO_Queued_Data::release (qd);
01731 return -1;
01732 }
01733
01734
01735 if (tail->giop_version ().major_version () == 1 && tail->giop_version ().minor_version () == 1)
01736 {
01737
01738
01739 while (this->fragment_stack_.pop (head) != -1)
01740 {
01741 if (head->more_fragments () &&
01742 head->giop_version ().major_version () == 1 &&
01743 head->giop_version ().minor_version () == 1 &&
01744 head->msg_block ()->length () >= header_adjustment)
01745 {
01746
01747 tail->msg_block ()->rd_ptr(header_adjustment);
01748
01749 head->msg_block ()->cont (tail->msg_block ());
01750
01751 tail->msg_block (0);
01752
01753 TAO_Queued_Data::release (tail);
01754
01755 tail = head;
01756 }
01757 else
01758 {
01759 reverse_stack.push (head);
01760 }
01761 }
01762 }
01763 else
01764 {
01765
01766
01767 CORBA::ULong tmp_request_id = 0;
01768 if (this->parse_request_id (tail, tmp_request_id) == -1)
01769 {
01770 return -1;
01771 }
01772
01773 const CORBA::ULong request_id = tmp_request_id;
01774
01775 while (this->fragment_stack_.pop (head) != -1)
01776 {
01777 CORBA::ULong head_request_id = 0;
01778 int parse_status = 0;
01779
01780 if (head->more_fragments () &&
01781 head->giop_version ().major_version () >= 1 &&
01782 head->giop_version ().minor_version () >= 2 &&
01783 head->msg_block ()->length () >= header_adjustment &&
01784 (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
01785 request_id == head_request_id)
01786 {
01787
01788 tail->msg_block ()->rd_ptr(header_adjustment);
01789
01790 head->msg_block ()->cont (tail->msg_block ());
01791
01792 tail->msg_block (0);
01793
01794 TAO_Queued_Data::release (tail);
01795
01796 tail = head;
01797 }
01798 else
01799 {
01800 if (parse_status == -1)
01801 {
01802 TAO_Queued_Data::release (head);
01803 return -1;
01804 }
01805
01806 reverse_stack.push (head);
01807 }
01808 }
01809 }
01810
01811
01812 while (reverse_stack.pop (head) != -1)
01813 {
01814 this->fragment_stack_.push (head);
01815 }
01816
01817 if (tail->consolidate () == -1)
01818 {
01819
01820 TAO_Queued_Data::release (tail);
01821 return -1;
01822 }
01823
01824
01825 msg = tail;
01826
01827 return 0;
01828 }
01829
01830
01831 int
01832 TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request)
01833 {
01834
01835
01836
01837 TAO::Incoming_Message_Stack reverse_stack;
01838
01839 CORBA::ULong cancel_request_id;
01840
01841 if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
01842 {
01843 return -1;
01844 }
01845
01846 TAO_Queued_Data *head = 0;
01847
01848
01849 while (this->fragment_stack_.pop (head) != -1)
01850 {
01851 reverse_stack.push (head);
01852 }
01853
01854 bool discard_all_GIOP11_messages = false;
01855
01856
01857
01858
01859
01860
01861
01862 while (reverse_stack.pop (head) != -1)
01863 {
01864 CORBA::ULong head_request_id;
01865
01866 if (head->giop_version ().major_version () == 1 &&
01867 head->giop_version ().minor_version () <= 1 &&
01868 head->msg_type () != GIOP::Fragment &&
01869 this->parse_request_id (head, head_request_id) >= 0 &&
01870 cancel_request_id == head_request_id)
01871 {
01872 TAO_Queued_Data::release (head);
01873 discard_all_GIOP11_messages = true;
01874 }
01875 else if (head->giop_version ().major_version () == 1 &&
01876 head->giop_version ().minor_version () <= 1 &&
01877 discard_all_GIOP11_messages)
01878 {
01879 TAO_Queued_Data::release (head);
01880 }
01881 else if (head->giop_version ().major_version () >= 1 &&
01882 head->giop_version ().minor_version () >= 2 &&
01883 this->parse_request_id (head, head_request_id) >= 0 &&
01884 cancel_request_id == head_request_id)
01885 {
01886 TAO_Queued_Data::release (head);
01887 }
01888 else
01889 {
01890 this->fragment_stack_.push (head);
01891 }
01892 }
01893
01894 return 0;
01895 }
01896
01897 TAO_GIOP_Fragmentation_Strategy *
01898 TAO_GIOP_Message_Base::fragmentation_strategy (void)
01899 {
01900 return this->fragmentation_strategy_.get ();
01901 }
01902
01903 void
01904 TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
01905 {
01906 CORBA::Octet * const buf =
01907 reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
01908
01909 CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
01910 CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
01911
01912
01913 CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
01914
01915
01916 ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
01917
01918
01919
01920
01921 if (!(major <= 1 && minor == 0))
01922 ACE_SET_BITS (flags, msg.more_fragments () << 1);
01923 }
01924
01925 TAO_END_VERSIONED_NAMESPACE_DECL