00001
00002
00003 #include "tao/GIOP_Message_Base.h"
00004 #include "tao/operation_details.h"
00005 #include "tao/debug.h"
00006 #include "tao/ORB_Core.h"
00007 #include "tao/TAO_Server_Request.h"
00008 #include "tao/GIOP_Message_Locate_Header.h"
00009 #include "tao/Transport.h"
00010 #include "tao/Transport_Mux_Strategy.h"
00011 #include "tao/LF_Strategy.h"
00012 #include "tao/Request_Dispatcher.h"
00013 #include "tao/Codeset_Manager.h"
00014 #include "tao/SystemException.h"
00015 #include "ace/Min_Max.h"
00016
00017
00018
00019
00020
00021
00022 ACE_RCSID (tao,
00023 GIOP_Message_Base,
00024 "$Id: GIOP_Message_Base.cpp 79169 2007-08-02 08:41:23Z johnnyw $")
00025
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
00029 TAO_Transport *transport,
00030 size_t input_cdr_size)
00031 : orb_core_ (orb_core)
00032 , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
00033 , out_stream_ (0,
00034 input_cdr_size,
00035 TAO_ENCAP_BYTE_ORDER,
00036 orb_core->output_cdr_buffer_allocator (),
00037 orb_core->output_cdr_dblock_allocator (),
00038 orb_core->output_cdr_msgblock_allocator (),
00039 orb_core->orb_params ()->cdr_memcpy_tradeoff (),
00040 fragmentation_strategy_.get (),
00041 TAO_DEF_GIOP_MAJOR,
00042 TAO_DEF_GIOP_MINOR)
00043 {
00044 }
00045
00046
00047 TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void)
00048 {
00049 }
00050
00051 void
00052 TAO_GIOP_Message_Base::init (CORBA::Octet major, CORBA::Octet minor)
00053 {
00054
00055 this->out_stream_.set_version (major, minor);
00056 }
00057
00058 TAO_OutputCDR &
00059 TAO_GIOP_Message_Base::out_stream (void)
00060 {
00061 return this->out_stream_;
00062 }
00063
00064 void
00065 TAO_GIOP_Message_Base::reset (void)
00066 {
00067
00068 }
00069
00070 int
00071 TAO_GIOP_Message_Base::generate_request_header (
00072 TAO_Operation_Details &op,
00073 TAO_Target_Specification &spec,
00074 TAO_OutputCDR &cdr)
00075 {
00076
00077 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00078 TAO_GIOP_Message_Version giop_version;
00079
00080 cdr.get_version (giop_version);
00081
00082
00083 this->set_state (giop_version, generator_parser);
00084
00085
00086 if (!this->write_protocol_header (TAO_GIOP_REQUEST, cdr))
00087 {
00088 if (TAO_debug_level)
00089 {
00090 ACE_ERROR ((LM_ERROR,
00091 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00092 }
00093
00094 return -1;
00095 }
00096
00097
00098 if (!generator_parser->write_request_header (op, spec, cdr))
00099 {
00100 if (TAO_debug_level)
00101 ACE_ERROR ((LM_ERROR,
00102 ACE_TEXT ("(%P|%t) Error in writing request header \n")));
00103
00104 return -1;
00105 }
00106
00107 return 0;
00108 }
00109
00110 int
00111 TAO_GIOP_Message_Base::generate_locate_request_header (
00112 TAO_Operation_Details &op,
00113 TAO_Target_Specification &spec,
00114 TAO_OutputCDR &cdr)
00115 {
00116
00117 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00118 TAO_GIOP_Message_Version giop_version;
00119
00120 cdr.get_version (giop_version);
00121
00122
00123 this->set_state (giop_version, generator_parser);
00124
00125
00126 if (!this->write_protocol_header (TAO_GIOP_LOCATEREQUEST, cdr))
00127 {
00128 if (TAO_debug_level)
00129 ACE_ERROR ((LM_ERROR,
00130 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00131
00132 return -1;
00133 }
00134
00135
00136 if (!generator_parser->write_locate_request_header
00137 (op.request_id (), spec, cdr))
00138 {
00139 if (TAO_debug_level)
00140 ACE_ERROR ((LM_ERROR,
00141 ACE_TEXT ("(%P|%t) Error in writing locate request header \n")));
00142
00143
00144 return -1;
00145
00146 }
00147
00148 return 0;
00149 }
00150
00151 int
00152 TAO_GIOP_Message_Base::generate_reply_header (
00153 TAO_OutputCDR &cdr,
00154 TAO_Pluggable_Reply_Params_Base ¶ms)
00155 {
00156
00157 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00158 TAO_GIOP_Message_Version giop_version;
00159
00160 cdr.get_version (giop_version);
00161
00162
00163 this->set_state (giop_version, generator_parser);
00164
00165
00166 if (!this->write_protocol_header (TAO_GIOP_REPLY, cdr))
00167 {
00168 if (TAO_debug_level)
00169 ACE_ERROR ((LM_ERROR,
00170 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00171
00172 return -1;
00173 }
00174
00175 try
00176 {
00177
00178 int const result =
00179 generator_parser->write_reply_header (cdr, params);
00180
00181 if (!result)
00182 {
00183 if (TAO_debug_level > 4)
00184 ACE_ERROR ((LM_ERROR,
00185 ACE_TEXT ("(%P|%t) Error in writing reply ")
00186 ACE_TEXT ("header\n")));
00187
00188 return -1;
00189 }
00190 }
00191 catch (const ::CORBA::Exception& ex)
00192 {
00193 if (TAO_debug_level > 4)
00194 ex._tao_print_exception (
00195 ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
00196
00197 return -1;
00198 }
00199
00200 return 0;
00201 }
00202
00203 int
00204 TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
00205 CORBA::ULong request_id)
00206 {
00207
00208 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00209 TAO_GIOP_Message_Version giop_version;
00210
00211 cdr.get_version (giop_version);
00212
00213
00214
00215
00216 if (giop_version.major == 1 && giop_version.minor < 2)
00217 return -1;
00218
00219
00220 this->set_state (giop_version, generator_parser);
00221
00222
00223 if (!this->write_protocol_header (TAO_GIOP_FRAGMENT, cdr)
00224 || !generator_parser->write_fragment_header (cdr, request_id))
00225 {
00226 if (TAO_debug_level)
00227 ACE_ERROR ((LM_ERROR,
00228 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00229
00230 return -1;
00231 }
00232
00233 return 0;
00234 }
00235
00236 int
00237 TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
00238 {
00239
00240 char * buf = (char *) stream.buffer ();
00241
00242 this->set_giop_flags (stream);
00243
00244
00245 size_t const total_len = stream.total_length ();
00246
00247
00248
00249
00250
00251
00252
00253
00254 CORBA::ULong bodylen = static_cast <CORBA::ULong>
00255 (total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
00256
00257 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00258 *(reinterpret_cast <CORBA::ULong *> (buf +
00259 TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00260 #else
00261 if (!stream.do_byte_swap ())
00262 *(reinterpret_cast <CORBA::ULong *>
00263 (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00264 else
00265 ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen),
00266 buf + TAO_GIOP_MESSAGE_SIZE_OFFSET);
00267 #endif
00268
00269 if (TAO_debug_level > 2)
00270 {
00271
00272
00273
00274 ACE_Message_Block* consolidated_block = 0;
00275 if (stream.begin()->cont () != 0)
00276 {
00277 ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0);
00278 ACE_CDR::consolidate (consolidated_block, stream.begin ());
00279 buf = (char *) (consolidated_block->rd_ptr ());
00280 }
00281
00282 this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len);
00283
00284
00285 delete consolidated_block;
00286 consolidated_block = 0;
00287
00288 }
00289
00290 return 0;
00291 }
00292
00293 int
00294 TAO_GIOP_Message_Base::parse_next_message (TAO_Queued_Data &qd,
00295 size_t &mesg_length)
00296 {
00297 if (qd.msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00298 {
00299 qd.missing_data (TAO_MISSING_DATA_UNDEFINED);
00300
00301 return 0;
00302 }
00303 else
00304 {
00305 TAO_GIOP_Message_State state;
00306
00307 if (state.parse_message_header (*(qd.msg_block ())) == -1)
00308 {
00309 return -1;
00310 }
00311
00312 size_t const message_size = state.message_size ();
00313
00314 if (message_size > qd.msg_block ()->length ())
00315 {
00316 qd.missing_data (message_size - qd.msg_block ()->length ());
00317 }
00318 else
00319 {
00320 qd.missing_data (0);
00321 }
00322
00323
00324 qd.set_state (state);
00325 mesg_length = message_size;
00326
00327 return 1;
00328 }
00329 }
00330
00331 int
00332 TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
00333 TAO_Queued_Data *&qd)
00334 {
00335 if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00336 {
00337 if (incoming.length () > 0)
00338 {
00339
00340
00341
00342
00343 size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
00344
00345
00346
00347 size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
00348 default_buf_size);
00349
00350
00351
00352 qd = this->make_queued_data (buf_size);
00353
00354 if (qd == 0)
00355 {
00356 if (TAO_debug_level > 0)
00357 {
00358 ACE_ERROR((LM_ERROR,
00359 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00360 ACE_TEXT ("out of memory\n")));
00361 }
00362 return -1;
00363 }
00364
00365 qd->msg_block ()->copy (incoming.rd_ptr (), incoming.length ());
00366
00367 incoming.rd_ptr (incoming.length ());
00368
00369 qd->missing_data (TAO_MISSING_DATA_UNDEFINED);
00370 }
00371 else
00372 {
00373
00374 qd = 0;
00375 }
00376
00377 return 0;
00378 }
00379
00380 TAO_GIOP_Message_State state;
00381 if (state.parse_message_header (incoming) == -1)
00382 {
00383 return -1;
00384 }
00385
00386 size_t copying_len = state.message_size ();
00387
00388 qd = this->make_queued_data (copying_len);
00389
00390 if (qd == 0)
00391 {
00392 if (TAO_debug_level > 0)
00393 {
00394 ACE_ERROR ((LM_ERROR,
00395 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00396 ACE_TEXT ("out of memory\n")));
00397 }
00398 return -1;
00399 }
00400
00401 if (copying_len > incoming.length ())
00402 {
00403 qd->missing_data (copying_len - incoming.length ());
00404 copying_len = incoming.length ();
00405 }
00406 else
00407 {
00408 qd->missing_data (0);
00409 }
00410
00411 qd->msg_block ()->copy (incoming.rd_ptr (), copying_len);
00412
00413 incoming.rd_ptr (copying_len);
00414 qd->set_state (state);
00415
00416 return 1;
00417 }
00418
00419 int
00420 TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
00421 ACE_Message_Block &incoming)
00422 {
00423
00424 if (qd->missing_data () == TAO_MISSING_DATA_UNDEFINED)
00425 {
00426
00427
00428 size_t const len = qd->msg_block ()->length ();
00429
00430
00431 if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
00432 {
00433
00434
00435 return -1;
00436 }
00437
00438
00439
00440
00441 size_t const available = incoming.length ();
00442 size_t const desired = TAO_GIOP_MESSAGE_HEADER_LEN - len;
00443 size_t const n_copy = ace_min (available, desired);
00444
00445
00446 if (n_copy == 0)
00447 {
00448 return -1;
00449 }
00450
00451 if (qd->msg_block ()->copy (incoming.rd_ptr (), n_copy) == -1)
00452 {
00453 return -1;
00454 }
00455
00456
00457 incoming.rd_ptr (n_copy);
00458
00459
00460 if (qd->msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00461 {
00462 return 0;
00463 }
00464
00465 TAO_GIOP_Message_State state;
00466
00467
00468 if (state.parse_message_header (*qd->msg_block ()) == -1)
00469 {
00470 if (TAO_debug_level > 0)
00471 {
00472 ACE_ERROR ((LM_ERROR,
00473 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
00474 ACE_TEXT ("error parsing header\n") ));
00475 }
00476 return -1;
00477 }
00478
00479
00480 if (ACE_CDR::grow (qd->msg_block (),
00481 state.message_size ()) == -1)
00482 {
00483
00484
00485 return -1;
00486 }
00487
00488
00489
00490 size_t copy_len = state.payload_size ();
00491
00492
00493
00494 if (copy_len > incoming.length ())
00495 {
00496
00497 qd->missing_data (copy_len - incoming.length ());
00498
00499
00500 copy_len = incoming.length ();
00501 }
00502 else
00503 {
00504 qd->missing_data (0);
00505 }
00506
00507
00508
00509 if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00510 {
00511 return -1;
00512 }
00513
00514
00515 incoming.rd_ptr (copy_len);
00516
00517
00518 qd->set_state (state);
00519 }
00520 else
00521 {
00522
00523 size_t copy_len = qd->missing_data ();
00524
00525 if (copy_len > incoming.length ())
00526 {
00527
00528 qd->missing_data (copy_len - incoming.length ());
00529
00530
00531 copy_len = incoming.length ();
00532 }
00533
00534
00535 if (copy_len == 0)
00536 {
00537 return -1;
00538 }
00539
00540
00541 if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00542 {
00543 return -1;
00544 }
00545
00546
00547 qd->msg_block ()->rd_ptr (copy_len);
00548
00549 }
00550
00551 return 0;
00552 }
00553
00554 int
00555 TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
00556 TAO_Queued_Data *qd)
00557 {
00558
00559 this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
00560
00561
00562 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00563
00564
00565 this->set_state (qd->giop_version (), generator_parser);
00566
00567
00568
00569
00570
00571 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00572 char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
00573 #else
00574 char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
00575 #endif
00576 ACE_Data_Block out_db (sizeof (repbuf),
00577 ACE_Message_Block::MB_DATA,
00578 repbuf,
00579 this->orb_core_->input_cdr_buffer_allocator (),
00580 0,
00581 ACE_Message_Block::DONT_DELETE,
00582 this->orb_core_->input_cdr_dblock_allocator ());
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594 TAO_OutputCDR output (&out_db,
00595 TAO_ENCAP_BYTE_ORDER,
00596 this->orb_core_->input_cdr_msgblock_allocator (),
00597 this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
00598 this->fragmentation_strategy_.get (),
00599 qd->giop_version ().major_version (),
00600 qd->giop_version ().minor_version ());
00601
00602
00603 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00604 size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00605 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00606
00607 if (TAO_debug_level > 0)
00608 this->dump_msg ("recv",
00609 reinterpret_cast <u_char *> (qd->msg_block ()->rd_ptr ()),
00610 qd->msg_block ()->length ());
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623 ACE_Message_Block::Message_Flags flg = 0;
00624 ACE_Data_Block *db = 0;
00625
00626
00627 flg = qd->msg_block ()->self_flags ();
00628
00629 if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
00630 {
00631
00632 db = qd->msg_block ()->data_block ();
00633 }
00634 else
00635 {
00636
00637
00638 db = qd->msg_block ()->data_block ()->duplicate ();
00639 }
00640
00641 TAO_InputCDR input_cdr (db,
00642 flg,
00643 rd_pos,
00644 wr_pos,
00645 qd->byte_order (),
00646 qd->giop_version ().major_version (),
00647 qd->giop_version ().minor_version (),
00648 this->orb_core_);
00649
00650 transport->assign_translators(&input_cdr,&output);
00651
00652
00653
00654
00655
00656
00657
00658
00659 switch (qd->msg_type ())
00660 {
00661 case TAO_PLUGGABLE_MESSAGE_REQUEST:
00662
00663
00664
00665 return this->process_request (transport,
00666 input_cdr,
00667 output,
00668 generator_parser);
00669
00670 case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
00671 return this->process_locate_request (transport,
00672 input_cdr,
00673 output,
00674 generator_parser);
00675 default:
00676 return -1;
00677 }
00678 }
00679
00680 int
00681 TAO_GIOP_Message_Base::process_reply_message (
00682 TAO_Pluggable_Reply_Params ¶ms,
00683 TAO_Queued_Data *qd)
00684 {
00685
00686 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00687
00688
00689 this->set_state (qd->giop_version (), generator_parser);
00690
00691
00692 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00693 size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00694 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00695
00696 if (TAO_debug_level > 0)
00697 this->dump_msg ("recv",
00698 reinterpret_cast <u_char *> (qd->msg_block ()->rd_ptr ()),
00699 qd->msg_block ()->length ());
00700
00701
00702
00703
00704
00705
00706 TAO_InputCDR input_cdr (qd->msg_block ()->data_block (),
00707 ACE_Message_Block::DONT_DELETE,
00708 rd_pos,
00709 wr_pos,
00710 qd->byte_order (),
00711 qd->giop_version ().major_version (),
00712 qd->giop_version ().minor_version (),
00713 this->orb_core_);
00714
00715
00716
00717
00718
00719
00720
00721 int retval = 0;
00722
00723 switch (qd->msg_type ())
00724 {
00725 case TAO_PLUGGABLE_MESSAGE_REPLY:
00726
00727 retval = generator_parser->parse_reply (input_cdr, params);
00728
00729 break;
00730 case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
00731 retval = generator_parser->parse_locate_reply (input_cdr, params);
00732 break;
00733 default:
00734 retval = -1;
00735 }
00736
00737 if (retval == -1)
00738 return retval;
00739
00740 params.input_cdr_ = &input_cdr;
00741 params.transport_->assign_translators (params.input_cdr_, 0);
00742
00743 retval = params.transport_->tms ()->dispatch_reply (params);
00744
00745 if (retval == -1)
00746 {
00747
00748
00749 if (TAO_debug_level > 0)
00750 ACE_ERROR ((LM_ERROR,
00751 ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_reply_message, ")
00752 ACE_TEXT ("dispatch reply failed\n"),
00753 params.transport_->id ()));
00754 }
00755
00756 return retval;
00757 }
00758
00759 int
00760 TAO_GIOP_Message_Base::generate_exception_reply (
00761 TAO_OutputCDR &cdr,
00762 TAO_Pluggable_Reply_Params_Base ¶ms,
00763 const CORBA::Exception &x)
00764 {
00765
00766
00767
00768 try
00769 {
00770
00771 this->generate_reply_header (cdr, params);
00772 x._tao_encode (cdr);
00773 }
00774 catch (const ::CORBA::Exception&)
00775 {
00776
00777
00778
00779
00780 if (TAO_debug_level > 0)
00781 ACE_DEBUG ((LM_DEBUG,
00782 ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
00783 ACE_TEXT ("generate_exception_reply ()\n")));
00784 return -1;
00785 }
00786
00787 return 0;
00788 }
00789
00790 int
00791 TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type,
00792 TAO_OutputCDR &msg)
00793 {
00794
00795 msg.reset ();
00796
00797 CORBA::Octet header[12] =
00798 {
00799
00800
00801 0x47,
00802 0x49,
00803 0x4f,
00804 0x50
00805 };
00806
00807 TAO_GIOP_Message_Version giop_version;
00808
00809 msg.get_version (giop_version);
00810
00811 header[4] = giop_version.major;
00812 header[5] = giop_version.minor;
00813
00814
00815
00816
00817 header[7] = CORBA::Octet (type);
00818
00819 static ACE_CDR::ULong const header_size =
00820 sizeof (header) / sizeof (header[0]);
00821
00822
00823
00824
00825 msg.write_octet_array (header, header_size);
00826
00827 return msg.good_bit ();
00828 }
00829
00830 int
00831 TAO_GIOP_Message_Base::process_request (
00832 TAO_Transport * transport,
00833 TAO_InputCDR & cdr,
00834 TAO_OutputCDR & output,
00835 TAO_GIOP_Message_Generator_Parser * parser)
00836 {
00837
00838
00839 TAO_ServerRequest request (this,
00840 cdr,
00841 output,
00842 transport,
00843 this->orb_core_);
00844
00845 CORBA::ULong request_id = 0;
00846 CORBA::Boolean response_required = false;
00847 int parse_error = 0;
00848
00849 try
00850 {
00851 parse_error = parser->parse_request_header (request);
00852
00853
00854 if (parse_error != 0)
00855 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
00856
00857 TAO_Codeset_Manager *csm = request.orb_core ()->codeset_manager ();
00858 if (csm)
00859 {
00860 csm->process_service_context (request);
00861 transport->assign_translators (&cdr, &output);
00862 }
00863
00864 request_id = request.request_id ();
00865
00866 response_required = request.response_expected ();
00867
00868 CORBA::Object_var forward_to;
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878 this->orb_core_->request_dispatcher ()->dispatch (
00879 this->orb_core_,
00880 request,
00881 forward_to);
00882
00883
00884
00885 if (!CORBA::is_nil (forward_to.in ()))
00886 {
00887 CORBA::Boolean const permanent_forward_condition =
00888 this->orb_core_->is_permanent_forward_condition
00889 (forward_to.in (),
00890 request.request_service_context ());
00891
00892
00893 TAO_Pluggable_Reply_Params_Base reply_params;
00894 reply_params.request_id_ = request_id;
00895 reply_params.reply_status_ =
00896 permanent_forward_condition
00897 ? TAO_GIOP_LOCATION_FORWARD_PERM
00898 : TAO_GIOP_LOCATION_FORWARD;
00899 reply_params.svc_ctx_.length (0);
00900
00901
00902 reply_params.service_context_notowned (
00903 &request.reply_service_info ());
00904
00905 output.message_attributes (request_id,
00906 0,
00907 TAO_Transport::TAO_REPLY,
00908 0);
00909
00910
00911 this->generate_reply_header (output, reply_params);
00912
00913 if (!(output << forward_to.in ()))
00914 {
00915 if (TAO_debug_level > 0)
00916 ACE_ERROR ((LM_ERROR,
00917 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
00918 ACE_TEXT ("forward reference.\n")));
00919
00920 return -1;
00921 }
00922
00923 output.more_fragments (false);
00924
00925 int result = transport->send_message (output,
00926 0,
00927 TAO_Transport::TAO_REPLY);
00928 if (result == -1)
00929 {
00930 if (TAO_debug_level > 0)
00931 {
00932
00933
00934 ACE_ERROR ((LM_ERROR,
00935 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
00936 ACE_TEXT ("cannot send reply\n"),
00937 ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
00938 }
00939 }
00940 return result;
00941 }
00942 }
00943
00944 catch ( ::CORBA::Exception& ex)
00945 {
00946 int result = 0;
00947
00948 if (response_required)
00949 {
00950 result = this->send_reply_exception (transport,
00951 output,
00952 request_id,
00953 &request.reply_service_info (),
00954 &ex);
00955 if (result == -1)
00956 {
00957 if (TAO_debug_level > 0)
00958 {
00959 ACE_ERROR ((LM_ERROR,
00960 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
00961 ACE_TEXT ("cannot send exception\n"),
00962 ACE_TEXT ("process_connector_request ()")));
00963
00964 ex._tao_print_exception (
00965 "TAO_GIOP_Message_Base::process_request[1]");
00966 }
00967 }
00968
00969 }
00970 else if (TAO_debug_level > 0)
00971 {
00972
00973
00974
00975
00976
00977
00978 ACE_ERROR ((LM_ERROR,
00979 ACE_TEXT ("(%P|%t) exception thrown ")
00980 ACE_TEXT ("but client is not waiting a response\n")));
00981
00982 ex._tao_print_exception (
00983 "TAO_GIOP_Message_Base::process_request[2]");
00984 }
00985
00986 return result;
00987 }
00988 catch (...)
00989 {
00990
00991
00992
00993
00994
00995
00996
00997 int result = 0;
00998
00999 if (response_required)
01000 {
01001 CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code
01002 (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0),
01003 CORBA::COMPLETED_MAYBE);
01004
01005 if (this->send_reply_exception (transport,
01006 output,
01007 request_id,
01008 &request.reply_service_info (),
01009 &exception) == -1
01010 && TAO_debug_level > 0)
01011 {
01012 ACE_ERROR ((LM_ERROR,
01013 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
01014 ACE_TEXT ("%p: ")
01015 ACE_TEXT ("cannot send exception\n"),
01016 ACE_TEXT ("process_request ()")));
01017 exception._tao_print_exception (
01018 "TAO_GIOP_Message_Base::process_request[3]");
01019 }
01020 }
01021 else if (TAO_debug_level > 0)
01022 {
01023
01024
01025
01026
01027
01028 ACE_ERROR ((LM_ERROR,
01029 ACE_TEXT ("TAO (%P|%t) exception thrown ")
01030 ACE_TEXT ("but client is not waiting a response\n")));
01031 }
01032
01033 return result;
01034 }
01035
01036 return 0;
01037 }
01038
01039
01040 int
01041 TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
01042 TAO_InputCDR &input,
01043 TAO_OutputCDR &output,
01044 TAO_GIOP_Message_Generator_Parser *parser)
01045 {
01046
01047
01048 TAO_GIOP_Locate_Request_Header locate_request (input, this->orb_core_);
01049
01050 TAO_GIOP_Locate_Status_Msg status_info;
01051
01052
01053 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01054
01055 CORBA::Boolean response_required = true;
01056
01057 try
01058 {
01059 int parse_error = parser->parse_locate_header (locate_request);
01060
01061 if (parse_error != 0)
01062 {
01063 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01064 }
01065
01066 TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
01067 locate_request.object_key ().length (),
01068 locate_request.object_key ().get_buffer (),
01069 0);
01070
01071
01072 parse_error = 1;
01073 CORBA::ULong req_id = locate_request.request_id ();
01074
01075
01076
01077 CORBA::Boolean deferred_reply = true;
01078 TAO_ServerRequest server_request (this,
01079 req_id,
01080 response_required,
01081 deferred_reply,
01082 tmp_key,
01083 "_non_existent",
01084 output,
01085 transport,
01086 this->orb_core_,
01087 parse_error);
01088
01089 if (parse_error != 0)
01090 {
01091 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01092 }
01093
01094 CORBA::Object_var forward_to;
01095
01096 this->orb_core_->request_dispatcher ()->dispatch (
01097 this->orb_core_,
01098 server_request,
01099 forward_to);
01100
01101 if (!CORBA::is_nil (forward_to.in ()))
01102 {
01103 status_info.status = TAO_GIOP_OBJECT_FORWARD;
01104 status_info.forward_location_var = forward_to;
01105 if (TAO_debug_level > 0)
01106 ACE_DEBUG ((LM_DEBUG,
01107 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01108 ACE_TEXT ("called: forwarding\n")));
01109 }
01110 else if (server_request.exception_type () == TAO_GIOP_NO_EXCEPTION)
01111 {
01112
01113 status_info.status = TAO_GIOP_OBJECT_HERE;
01114 if (TAO_debug_level > 0)
01115 ACE_DEBUG ((LM_DEBUG,
01116 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01117 ACE_TEXT ("found\n")));
01118 }
01119 else
01120 {
01121 status_info.forward_location_var = server_request.forward_location ();
01122
01123 if (!CORBA::is_nil (status_info.forward_location_var.in ()))
01124 {
01125 status_info.status = TAO_GIOP_OBJECT_FORWARD;
01126 ACE_DEBUG ((LM_DEBUG,
01127 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01128 ACE_TEXT ("forwarding\n")));
01129 }
01130 else
01131 {
01132
01133 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01134 ACE_DEBUG ((LM_DEBUG,
01135 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01136 ACE_TEXT ("not here\n")));
01137 }
01138 }
01139 }
01140
01141 catch (const ::CORBA::Exception&)
01142 {
01143
01144 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01145 if (TAO_debug_level > 0)
01146 ACE_DEBUG ((LM_DEBUG,
01147 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01148 ACE_TEXT ("CORBA exception raised\n")));
01149 }
01150 catch (...)
01151 {
01152
01153 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01154 if (TAO_debug_level > 0)
01155 ACE_DEBUG ((LM_DEBUG,
01156 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
01157 ACE_TEXT ("C++ exception raised\n")));
01158 }
01159
01160 return this->make_send_locate_reply (transport,
01161 locate_request,
01162 status_info,
01163 output,
01164 parser);
01165 }
01166
01167 int
01168 TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
01169 TAO_GIOP_Locate_Request_Header &request,
01170 TAO_GIOP_Locate_Status_Msg &status_info,
01171 TAO_OutputCDR &output,
01172 TAO_GIOP_Message_Generator_Parser *parser)
01173 {
01174
01175
01176
01177 this->write_protocol_header (TAO_GIOP_LOCATEREPLY, output);
01178
01179
01180 parser->write_locate_reply_mesg (output,
01181 request.request_id (),
01182 status_info);
01183
01184 output.more_fragments (false);
01185
01186
01187 int const result = transport->send_message (output,
01188 0,
01189 TAO_Transport::TAO_REPLY);
01190
01191
01192 if (result == -1)
01193 {
01194 if (TAO_debug_level > 0)
01195 {
01196 ACE_ERROR ((LM_ERROR,
01197 ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
01198 ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
01199 }
01200 }
01201
01202 return result;
01203 }
01204
01205
01206
01207
01208
01209
01210 int
01211 TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
01212 {
01213 const char error_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01214 {
01215
01216
01217 0x47,
01218 0x49,
01219 0x4f,
01220 0x50,
01221 (CORBA::Octet) 1,
01222 (CORBA::Octet) 0,
01223 TAO_ENCAP_BYTE_ORDER,
01224 TAO_GIOP_MESSAGERROR,
01225 0, 0, 0, 0
01226 };
01227
01228 this->dump_msg ("send_error",
01229 (const u_char *) error_message,
01230 TAO_GIOP_MESSAGE_HEADER_LEN);
01231
01232 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01233 ACE_Message_Block::MB_DATA,
01234 error_message,
01235 0,
01236 0,
01237 ACE_Message_Block::DONT_DELETE,
01238 0);
01239 ACE_Message_Block message_block(&data_block,
01240 ACE_Message_Block::DONT_DELETE);
01241 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01242
01243 size_t bt;
01244 int const result = transport->send_message_block_chain (&message_block, bt);
01245 if (result == -1)
01246 {
01247 if (TAO_debug_level > 0)
01248 ACE_DEBUG ((LM_DEBUG,
01249 ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
01250 transport->id ()));
01251 }
01252
01253 return result;
01254 }
01255
01256 void
01257 TAO_GIOP_Message_Base::set_state (
01258 const TAO_GIOP_Message_Version &version,
01259 TAO_GIOP_Message_Generator_Parser *&gen_parser) const
01260 {
01261 switch (version.major)
01262 {
01263 case 1:
01264 switch (version.minor)
01265 {
01266 case 0:
01267 gen_parser =
01268 const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
01269 &this->tao_giop_impl_.tao_giop_10);
01270 break;
01271 case 1:
01272 gen_parser =
01273 const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
01274 &this->tao_giop_impl_.tao_giop_11);
01275 break;
01276 case 2:
01277 gen_parser =
01278 const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
01279 &this->tao_giop_impl_.tao_giop_12);
01280 break;
01281 default:
01282 break;
01283 }
01284 break;
01285 default:
01286 break;
01287 }
01288 }
01289
01290
01291
01292
01293
01294
01295
01296
01297
01298
01299
01300
01301
01302
01303
01304
01305
01306 void
01307 TAO_GIOP_Message_Base::
01308 send_close_connection (const TAO_GIOP_Message_Version &version,
01309 TAO_Transport *transport,
01310 void *)
01311 {
01312
01313
01314
01315 const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01316 {
01317
01318
01319 0x47,
01320 0x49,
01321 0x4f,
01322 0x50,
01323 version.major,
01324 version.minor,
01325 TAO_ENCAP_BYTE_ORDER,
01326 TAO_GIOP_CLOSECONNECTION,
01327 0, 0, 0, 0
01328 };
01329
01330
01331
01332
01333
01334
01335
01336 this->dump_msg ("send_close_connection",
01337 (const u_char *) close_message,
01338 TAO_GIOP_MESSAGE_HEADER_LEN);
01339
01340 #if 0
01341
01342
01343
01344
01345
01346
01347
01348 ACE_HANDLE which = transport->handle ();
01349 if (which == ACE_INVALID_HANDLE)
01350 {
01351 if (TAO_debug_level > 0)
01352 ACE_DEBUG ((LM_DEBUG,
01353 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -")
01354 ACE_TEXT (" connection already closed\n")));
01355 return;
01356 }
01357 #endif
01358
01359 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01360 ACE_Message_Block::MB_DATA,
01361 close_message,
01362 0,
01363 0,
01364 ACE_Message_Block::DONT_DELETE,
01365 0);
01366 ACE_Message_Block message_block(&data_block);
01367 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01368
01369 size_t bt;
01370 int const result = transport->send_message_block_chain (&message_block, bt);
01371 if (result == -1)
01372 {
01373 if (TAO_debug_level > 0)
01374 ACE_ERROR ((LM_ERROR,
01375 ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
01376 transport->id (), errno));
01377 }
01378
01379 transport->close_connection ();
01380 ACE_DEBUG ((LM_DEBUG,
01381 ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
01382 transport-> id ()));
01383 }
01384
01385
01386 int
01387 TAO_GIOP_Message_Base::send_reply_exception (
01388 TAO_Transport *transport,
01389 TAO_OutputCDR &output,
01390 CORBA::ULong request_id,
01391 IOP::ServiceContextList *svc_info,
01392 CORBA::Exception *x
01393 )
01394 {
01395 TAO_Pluggable_Reply_Params_Base reply_params;
01396 reply_params.request_id_ = request_id;
01397 reply_params.svc_ctx_.length (0);
01398
01399
01400 reply_params.argument_flag_ = true;
01401
01402
01403
01404 reply_params.service_context_notowned (svc_info);
01405
01406 reply_params.reply_status_ = TAO_GIOP_USER_EXCEPTION;
01407
01408 if (CORBA::SystemException::_downcast (x) != 0)
01409 {
01410 reply_params.reply_status_ = TAO_GIOP_SYSTEM_EXCEPTION;
01411 }
01412
01413 if (this->generate_exception_reply (output, reply_params, *x) == -1)
01414 return -1;
01415
01416 output.more_fragments (false);
01417
01418 return transport->send_message (output, 0, TAO_Transport::TAO_REPLY);
01419 }
01420
01421 void
01422 TAO_GIOP_Message_Base::dump_msg (const char *label,
01423 const u_char *ptr,
01424 size_t len)
01425 {
01426
01427 if (TAO_debug_level >= 5)
01428 {
01429 static const char digits[] = "0123456789ABCD";
01430 static const char *names[] =
01431 {
01432 "Request",
01433 "Reply",
01434 "CancelRequest",
01435 "LocateRequest",
01436 "LocateReply",
01437 "CloseConnection",
01438 "MessageError",
01439 "Fragment"
01440 };
01441
01442
01443 const char *message_name = "UNKNOWN MESSAGE";
01444 u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET];
01445 if (slot < sizeof (names) / sizeof (names[0]))
01446 message_name = names[slot];
01447
01448
01449 int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
01450
01451
01452 CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
01453 CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
01454
01455
01456 CORBA::ULong tmp = 0;
01457 CORBA::ULong *id = &tmp;
01458 char *tmp_id = 0;
01459
01460 if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST ||
01461 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY ||
01462 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_FRAGMENT)
01463 {
01464 if (major == 1 && minor < 2)
01465 {
01466
01467 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4);
01468 }
01469 else
01470 {
01471 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
01472 }
01473 #if !defined (ACE_DISABLE_SWAP_ON_READ)
01474 if (byte_order == TAO_ENCAP_BYTE_ORDER)
01475 {
01476 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01477 }
01478 else
01479 {
01480 ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
01481 }
01482 #else
01483 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01484 #endif
01485
01486 }
01487
01488
01489 ACE_DEBUG ((LM_DEBUG,
01490 "TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
01491 "%s GIOP v%c.%c msg, %d data bytes, %s endian, "
01492 "Type %s[%u]\n",
01493 ACE_TEXT_CHAR_TO_TCHAR (label),
01494 digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
01495 digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
01496 len - TAO_GIOP_MESSAGE_HEADER_LEN ,
01497 (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
01498 ACE_TEXT_CHAR_TO_TCHAR(message_name),
01499 *id));
01500
01501 if (TAO_debug_level >= 10)
01502 ACE_HEX_DUMP ((LM_DEBUG,
01503 (const char *) ptr,
01504 len,
01505 ACE_TEXT ("GIOP message")));
01506 }
01507 }
01508
01509 int
01510 TAO_GIOP_Message_Base::generate_locate_reply_header (
01511 TAO_OutputCDR & ,
01512 TAO_Pluggable_Reply_Params_Base & )
01513 {
01514 return 0;
01515 }
01516
01517 int
01518 TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg)
01519 {
01520
01521 TAO_GIOP_Message_Generator_Parser *parser = 0;
01522 TAO_GIOP_Message_Version giop_version;
01523
01524 msg.get_version (giop_version);
01525
01526
01527 this->set_state (giop_version, parser);
01528
01529
01530
01531
01532
01533 return parser->is_ready_for_bidirectional ();
01534 }
01535
01536
01537 TAO_Queued_Data *
01538 TAO_GIOP_Message_Base::make_queued_data (size_t sz)
01539 {
01540
01541
01542
01543
01544
01545 ACE_Data_Block *db =
01546 this->orb_core_->create_input_cdr_data_block (sz +
01547 ACE_CDR::MAX_ALIGNMENT);
01548
01549 TAO_Queued_Data *qd =
01550 TAO_Queued_Data::make_queued_data (
01551 this->orb_core_->transport_message_buffer_allocator (),
01552 this->orb_core_->input_cdr_msgblock_allocator (),
01553 db);
01554
01555 if (qd == 0)
01556 {
01557 if (TAO_debug_level > 0)
01558 {
01559 ACE_ERROR ((LM_ERROR,
01560 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01561 ACE_TEXT ("out of memory, failed to allocate queued data object\n")));
01562 }
01563 db->release ();
01564 return 0;
01565 }
01566
01567 return qd;
01568 }
01569
01570 size_t
01571 TAO_GIOP_Message_Base::header_length (void) const
01572 {
01573 return TAO_GIOP_MESSAGE_HEADER_LEN;
01574 }
01575
01576 size_t
01577 TAO_GIOP_Message_Base::fragment_header_length (
01578 const TAO_GIOP_Message_Version& giop_version) const
01579 {
01580 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
01581
01582
01583 this->set_state (giop_version, generator_parser);
01584
01585 return generator_parser->fragment_header_length ();
01586 }
01587
01588 int
01589 TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd,
01590 CORBA::ULong &request_id) const
01591 {
01592
01593 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
01594
01595
01596 this->set_state (qd->giop_version (), generator_parser);
01597
01598
01599 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
01600 size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
01601 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
01602
01603
01604
01605
01606
01607
01608
01609
01610
01611
01612
01613 ACE_Message_Block::Message_Flags flg = 0;
01614 ACE_Data_Block *db = 0;
01615
01616
01617 flg = qd->msg_block ()->self_flags ();
01618
01619 if (ACE_BIT_ENABLED (flg,
01620 ACE_Message_Block::DONT_DELETE))
01621 {
01622
01623 db = qd->msg_block ()->data_block ();
01624 }
01625 else
01626 {
01627
01628
01629 db = qd->msg_block ()->data_block ()->duplicate ();
01630 }
01631
01632 TAO_InputCDR input_cdr (db,
01633 flg,
01634 rd_pos,
01635 wr_pos,
01636 qd->byte_order (),
01637 qd->giop_version ().major_version (),
01638 qd->giop_version ().minor_version (),
01639 this->orb_core_);
01640
01641 if (qd->giop_version ().major == 1 &&
01642 (qd->giop_version ().minor == 0 || qd->giop_version ().minor == 1))
01643 {
01644 switch (qd->msg_type ())
01645 {
01646 case TAO_PLUGGABLE_MESSAGE_REQUEST:
01647 case TAO_PLUGGABLE_MESSAGE_REPLY:
01648 {
01649 IOP::ServiceContextList service_context;
01650
01651 if ((input_cdr >> service_context)
01652 && (input_cdr >> request_id))
01653 {
01654 return 0;
01655 }
01656 }
01657 break;
01658 case TAO_PLUGGABLE_MESSAGE_CANCELREQUEST:
01659 case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
01660 case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
01661 {
01662 if ((input_cdr >> request_id))
01663 {
01664 return 0;
01665 }
01666 }
01667 break;
01668 default:
01669 break;
01670 }
01671 }
01672 else
01673 {
01674 switch (qd->msg_type ())
01675 {
01676 case TAO_PLUGGABLE_MESSAGE_REQUEST:
01677 case TAO_PLUGGABLE_MESSAGE_REPLY:
01678 case TAO_PLUGGABLE_MESSAGE_FRAGMENT:
01679 case TAO_PLUGGABLE_MESSAGE_CANCELREQUEST:
01680 case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
01681 case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
01682 {
01683
01684
01685
01686
01687 if ((input_cdr >> request_id))
01688 {
01689 return 0;
01690 }
01691 }
01692 break;
01693 default:
01694 break;
01695 }
01696 }
01697
01698 return -1;
01699 }
01700
01701
01702 int
01703 TAO_GIOP_Message_Base::consolidate_fragmented_message (
01704 TAO_Queued_Data * qd,
01705 TAO_Queued_Data *& msg)
01706 {
01707 TAO::Incoming_Message_Stack reverse_stack;
01708
01709 TAO_Queued_Data *tail = 0;
01710 TAO_Queued_Data *head = 0;
01711
01712
01713
01714
01715
01716
01717 if (qd == 0)
01718 {
01719 return -1;
01720 }
01721
01722 if (qd->giop_version ().major == 1 && qd->giop_version ().minor == 0)
01723 {
01724 TAO_Queued_Data::release (qd);
01725 return -1;
01726 }
01727
01728
01729 if (qd->more_fragments ())
01730 {
01731 this->fragment_stack_.push (qd);
01732
01733 msg = 0;
01734 return 1;
01735 }
01736
01737 tail = qd;
01738
01739
01740
01741 size_t const header_adjustment =
01742 this->header_length () +
01743 this->fragment_header_length (tail->giop_version ().major_version ());
01744
01745 if (tail->msg_block ()->length () < header_adjustment)
01746 {
01747
01748 TAO_Queued_Data::release (qd);
01749 return -1;
01750 }
01751
01752
01753 if (tail->giop_version ().major_version () == 1 && tail->giop_version ().minor_version () == 1)
01754 {
01755
01756
01757 while (this->fragment_stack_.pop (head) != -1)
01758 {
01759 if (head->more_fragments () &&
01760 head->giop_version ().major_version () == 1 &&
01761 head->giop_version ().minor_version () == 1 &&
01762 head->msg_block ()->length () >= header_adjustment)
01763 {
01764
01765 tail->msg_block ()->rd_ptr(header_adjustment);
01766
01767 head->msg_block ()->cont (tail->msg_block ());
01768
01769 tail->msg_block (0);
01770
01771 TAO_Queued_Data::release (tail);
01772
01773 tail = head;
01774 }
01775 else
01776 {
01777 reverse_stack.push (head);
01778 }
01779 }
01780 }
01781 else
01782 {
01783
01784
01785 CORBA::ULong tmp_request_id = 0;
01786 if (this->parse_request_id (tail, tmp_request_id) == -1)
01787 {
01788 return -1;
01789 }
01790
01791 const CORBA::ULong request_id = tmp_request_id;
01792
01793 while (this->fragment_stack_.pop (head) != -1)
01794 {
01795 CORBA::ULong head_request_id = 0;
01796 int parse_status = 0;
01797
01798 if (head->more_fragments () &&
01799 head->giop_version ().major_version () >= 1 &&
01800 head->giop_version ().minor_version () >= 2 &&
01801 head->msg_block ()->length () >= header_adjustment &&
01802 (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
01803 request_id == head_request_id)
01804 {
01805
01806 tail->msg_block ()->rd_ptr(header_adjustment);
01807
01808 head->msg_block ()->cont (tail->msg_block ());
01809
01810 tail->msg_block (0);
01811
01812 TAO_Queued_Data::release (tail);
01813
01814 tail = head;
01815 }
01816 else
01817 {
01818 if (parse_status == -1)
01819 {
01820 TAO_Queued_Data::release (head);
01821 return -1;
01822 }
01823
01824 reverse_stack.push (head);
01825 }
01826 }
01827 }
01828
01829
01830 while (reverse_stack.pop (head) != -1)
01831 {
01832 this->fragment_stack_.push (head);
01833 }
01834
01835 if (tail->consolidate () == -1)
01836 {
01837
01838 TAO_Queued_Data::release (tail);
01839 return -1;
01840 }
01841
01842
01843 msg = tail;
01844
01845 return 0;
01846 }
01847
01848
01849 int
01850 TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request)
01851 {
01852
01853
01854
01855 TAO::Incoming_Message_Stack reverse_stack;
01856
01857 CORBA::ULong cancel_request_id;
01858
01859 if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
01860 {
01861 return -1;
01862 }
01863
01864 TAO_Queued_Data *head = 0;
01865
01866
01867 while (this->fragment_stack_.pop (head) != -1)
01868 {
01869 reverse_stack.push (head);
01870 }
01871
01872 bool discard_all_GIOP11_messages = false;
01873
01874
01875
01876
01877
01878
01879
01880 while (reverse_stack.pop (head) != -1)
01881 {
01882 CORBA::ULong head_request_id;
01883
01884 if (head->giop_version ().major_version () == 1 &&
01885 head->giop_version ().minor_version () <= 1 &&
01886 head->msg_type () != TAO_PLUGGABLE_MESSAGE_FRAGMENT &&
01887 this->parse_request_id (head, head_request_id) >= 0 &&
01888 cancel_request_id == head_request_id)
01889 {
01890 TAO_Queued_Data::release (head);
01891 discard_all_GIOP11_messages = true;
01892 }
01893 else if (head->giop_version ().major_version () == 1 &&
01894 head->giop_version ().minor_version () <= 1 &&
01895 discard_all_GIOP11_messages)
01896 {
01897 TAO_Queued_Data::release (head);
01898 }
01899 else if (head->giop_version ().major_version () >= 1 &&
01900 head->giop_version ().minor_version () >= 2 &&
01901 this->parse_request_id (head, head_request_id) >= 0 &&
01902 cancel_request_id == head_request_id)
01903 {
01904 TAO_Queued_Data::release (head);
01905 }
01906 else
01907 {
01908 this->fragment_stack_.push (head);
01909 }
01910 }
01911
01912 return 0;
01913 }
01914
01915 TAO_GIOP_Fragmentation_Strategy *
01916 TAO_GIOP_Message_Base::fragmentation_strategy (void)
01917 {
01918 return this->fragmentation_strategy_.get ();
01919 }
01920
01921 void
01922 TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
01923 {
01924 CORBA::Octet * const buf =
01925 reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
01926
01927 CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
01928 CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
01929
01930
01931 CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
01932
01933
01934 ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
01935
01936
01937
01938
01939 if (!(major <= 1 && minor == 0))
01940 ACE_SET_BITS (flags, msg.more_fragments () << 1);
01941 }
01942
01943 TAO_END_VERSIONED_NAMESPACE_DECL