00001
00002
00003 #include "tao/GIOP_Message_Base.h"
00004 #include "tao/operation_details.h"
00005 #include "tao/debug.h"
00006 #include "tao/ORB_Core.h"
00007 #include "tao/TAO_Server_Request.h"
00008 #include "tao/GIOP_Message_Locate_Header.h"
00009 #include "tao/Transport.h"
00010 #include "tao/Transport_Mux_Strategy.h"
00011 #include "tao/LF_Strategy.h"
00012 #include "tao/Request_Dispatcher.h"
00013 #include "tao/Codeset_Manager.h"
00014 #include "tao/SystemException.h"
00015 #include "tao/ZIOP_Adapter.h"
00016 #include "ace/Min_Max.h"
00017
00018
00019
00020
00021
00022
00023 ACE_RCSID (tao,
00024 GIOP_Message_Base,
00025 "$Id: GIOP_Message_Base.cpp 88212 2009-12-17 10:23:30Z mcorino $")
00026
00027 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00028
00029 TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
00030 TAO_Transport *transport,
00031 size_t input_cdr_size)
00032 : orb_core_ (orb_core)
00033 , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
00034 , out_stream_ (0,
00035 input_cdr_size,
00036 TAO_ENCAP_BYTE_ORDER,
00037 orb_core->output_cdr_buffer_allocator (),
00038 orb_core->output_cdr_dblock_allocator (),
00039 orb_core->output_cdr_msgblock_allocator (),
00040 orb_core->orb_params ()->cdr_memcpy_tradeoff (),
00041 fragmentation_strategy_.get (),
00042 TAO_DEF_GIOP_MAJOR,
00043 TAO_DEF_GIOP_MINOR)
00044 {
00045 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00046 const int nibbles = 2 * sizeof (size_t);
00047 char hex_string[nibbles + 1];
00048 ACE_OS::sprintf (hex_string,
00049 "%8.8X",
00050 transport->id ());
00051 hex_string[nibbles] = '\0';
00052 ACE_CString monitor_name ("OutputCDR_");
00053 monitor_name += hex_string;
00054 this->out_stream_.register_monitor (monitor_name.c_str ());
00055 #endif
00056 }
00057
00058
00059 TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void)
00060 {
00061 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00062 this->out_stream_.unregister_monitor ();
00063 #endif
00064 }
00065
00066 void
00067 TAO_GIOP_Message_Base::init (CORBA::Octet major, CORBA::Octet minor)
00068 {
00069
00070 this->out_stream_.set_version (major, minor);
00071 }
00072
00073 TAO_OutputCDR &
00074 TAO_GIOP_Message_Base::out_stream (void)
00075 {
00076 return this->out_stream_;
00077 }
00078
00079 int
00080 TAO_GIOP_Message_Base::generate_request_header (
00081 TAO_Operation_Details &op,
00082 TAO_Target_Specification &spec,
00083 TAO_OutputCDR &cdr)
00084 {
00085
00086 TAO_GIOP_Message_Version giop_version;
00087
00088 cdr.get_version (giop_version);
00089
00090
00091 if (!this->write_protocol_header (GIOP::Request, giop_version, cdr))
00092 {
00093 if (TAO_debug_level)
00094 {
00095 ACE_ERROR ((LM_ERROR,
00096 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
00097 }
00098
00099 return -1;
00100 }
00101
00102
00103 TAO_GIOP_Message_Generator_Parser *generator_parser =
00104 this->get_parser (giop_version);
00105
00106
00107 if (!generator_parser->write_request_header (op, spec, cdr))
00108 {
00109 if (TAO_debug_level)
00110 ACE_ERROR ((LM_ERROR,
00111 ACE_TEXT ("(%P|%t) Error in writing request header\n")));
00112
00113 return -1;
00114 }
00115
00116 return 0;
00117 }
00118
00119 int
00120 TAO_GIOP_Message_Base::generate_locate_request_header (
00121 TAO_Operation_Details &op,
00122 TAO_Target_Specification &spec,
00123 TAO_OutputCDR &cdr)
00124 {
00125 TAO_GIOP_Message_Version giop_version;
00126
00127 cdr.get_version (giop_version);
00128
00129
00130 TAO_GIOP_Message_Generator_Parser *generator_parser =
00131 this->get_parser (giop_version);
00132
00133
00134 if (!this->write_protocol_header (GIOP::LocateRequest, giop_version, cdr))
00135 {
00136 if (TAO_debug_level)
00137 ACE_ERROR ((LM_ERROR,
00138 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
00139
00140 return -1;
00141 }
00142
00143
00144 if (!generator_parser->write_locate_request_header
00145 (op.request_id (), spec, cdr))
00146 {
00147 if (TAO_debug_level)
00148 ACE_ERROR ((LM_ERROR,
00149 ACE_TEXT ("(%P|%t) Error in writing locate request header\n")));
00150
00151
00152 return -1;
00153
00154 }
00155
00156 return 0;
00157 }
00158
00159 int
00160 TAO_GIOP_Message_Base::generate_reply_header (
00161 TAO_OutputCDR &cdr,
00162 TAO_Pluggable_Reply_Params_Base ¶ms)
00163 {
00164 TAO_GIOP_Message_Version giop_version;
00165
00166 cdr.get_version (giop_version);
00167
00168
00169 if (!this->write_protocol_header (GIOP::Reply, giop_version, cdr))
00170 {
00171 if (TAO_debug_level)
00172 ACE_ERROR ((LM_ERROR,
00173 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
00174
00175 return -1;
00176 }
00177
00178 try
00179 {
00180
00181 TAO_GIOP_Message_Generator_Parser *generator_parser =
00182 this->get_parser (giop_version);
00183
00184
00185 if (!generator_parser->write_reply_header (cdr, params))
00186 {
00187 if (TAO_debug_level > 4)
00188 ACE_ERROR ((LM_ERROR,
00189 ACE_TEXT ("(%P|%t) Error in writing reply ")
00190 ACE_TEXT ("header\n")));
00191
00192 return -1;
00193 }
00194 }
00195 catch (const ::CORBA::Exception& ex)
00196 {
00197 if (TAO_debug_level > 4)
00198 ex._tao_print_exception (
00199 ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
00200
00201 return -1;
00202 }
00203
00204 return 0;
00205 }
00206
00207 int
00208 TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
00209 CORBA::ULong request_id)
00210 {
00211 TAO_GIOP_Message_Version giop_version;
00212
00213 cdr.get_version (giop_version);
00214
00215
00216
00217
00218 if (giop_version.major == 1 && giop_version.minor < 2)
00219 return -1;
00220
00221
00222 TAO_GIOP_Message_Generator_Parser *generator_parser =
00223 this->get_parser (giop_version);
00224
00225
00226 if (!this->write_protocol_header (GIOP::Fragment, giop_version, cdr)
00227 || !generator_parser->write_fragment_header (cdr, request_id))
00228 {
00229 if (TAO_debug_level)
00230 ACE_ERROR ((LM_ERROR,
00231 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
00232
00233 return -1;
00234 }
00235
00236 return 0;
00237 }
00238
00239 int
00240 TAO_GIOP_Message_Base::dump_consolidated_msg (TAO_OutputCDR &stream)
00241 {
00242
00243
00244
00245 ACE_Message_Block* consolidated_block = 0;
00246 char *buf = const_cast <char*> (stream.buffer ());
00247 size_t const total_len = stream.total_length ();
00248 if (stream.begin()->cont () != 0)
00249 {
00250 ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0);
00251 ACE_CDR::consolidate (consolidated_block, stream.begin ());
00252 buf = (char *) (consolidated_block->rd_ptr ());
00253 }
00254
00255 this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len);
00256
00257 delete consolidated_block;
00258 consolidated_block = 0;
00259
00260 return 0;
00261 }
00262
00263 int
00264 TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream, TAO_Stub* stub)
00265 {
00266 this->set_giop_flags (stream);
00267
00268 bool log_msg = TAO_debug_level > 9;
00269
00270 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
00271 TAO_ZIOP_Adapter* ziop_adapter = this->orb_core_->ziop_adapter ();
00272
00273
00274 if (ziop_adapter)
00275 {
00276 if (TAO_debug_level > 9)
00277 {
00278 this->dump_consolidated_msg (stream);
00279 }
00280 bool compressed;
00281 if (stub)
00282 {
00283 compressed = ziop_adapter->marshal_data (stream, *stub);
00284 }
00285 else
00286 {
00287 compressed = ziop_adapter->marshal_data (stream, *this->orb_core_);
00288 }
00289
00290 if (TAO_debug_level > 9)
00291 {
00292 if (!compressed)
00293 ACE_DEBUG ((LM_DEBUG,
00294 ACE_TEXT ("TAO (%P|%t) - ")
00295 ACE_TEXT ("TAO_GIOP_Message_Base::format_message, ")
00296 ACE_TEXT ("GIOP message not compressed\n")));
00297
00298
00299
00300
00301 log_msg = false;
00302 }
00303 }
00304 #else
00305 ACE_UNUSED_ARG (stub);
00306 #endif
00307
00308
00309 size_t const total_len = stream.total_length ();
00310
00311
00312
00313
00314
00315
00316
00317
00318 char *buf = const_cast <char*> (stream.buffer ());
00319
00320 CORBA::ULong bodylen = static_cast <CORBA::ULong>
00321 (total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
00322
00323 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00324 *(reinterpret_cast <CORBA::ULong *> (buf +
00325 TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00326 #else
00327 if (!stream.do_byte_swap ())
00328 *(reinterpret_cast <CORBA::ULong *>
00329 (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00330 else
00331 ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen),
00332 buf + TAO_GIOP_MESSAGE_SIZE_OFFSET);
00333 #endif
00334
00335 if (log_msg)
00336 {
00337 this->dump_consolidated_msg (stream);
00338 }
00339
00340 return 0;
00341 }
00342
00343 int
00344 TAO_GIOP_Message_Base::parse_next_message (TAO_Queued_Data &qd,
00345 size_t &mesg_length)
00346 {
00347 if (qd.msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00348 {
00349 qd.missing_data (TAO_MISSING_DATA_UNDEFINED);
00350
00351 return 0;
00352 }
00353 else
00354 {
00355 TAO_GIOP_Message_State state;
00356
00357 if (state.parse_message_header (*(qd.msg_block ())) == -1)
00358 {
00359 return -1;
00360 }
00361
00362 size_t const message_size = state.message_size ();
00363
00364 if (message_size > qd.msg_block ()->length ())
00365 {
00366 qd.missing_data (message_size - qd.msg_block ()->length ());
00367 }
00368 else
00369 {
00370 qd.missing_data (0);
00371 }
00372
00373
00374 qd.state (state);
00375 mesg_length = message_size;
00376
00377 return 1;
00378 }
00379 }
00380
00381 int
00382 TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
00383 TAO_Queued_Data *&qd)
00384 {
00385 if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00386 {
00387 if (incoming.length () > 0)
00388 {
00389
00390
00391
00392
00393 size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
00394
00395
00396
00397 size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
00398 default_buf_size);
00399
00400
00401
00402 qd = this->make_queued_data (buf_size);
00403
00404 if (qd == 0)
00405 {
00406 if (TAO_debug_level > 0)
00407 {
00408 ACE_ERROR((LM_ERROR,
00409 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00410 ACE_TEXT ("out of memory\n")));
00411 }
00412 return -1;
00413 }
00414
00415 qd->msg_block ()->copy (incoming.rd_ptr (), incoming.length ());
00416
00417 incoming.rd_ptr (incoming.length ());
00418
00419 qd->missing_data (TAO_MISSING_DATA_UNDEFINED);
00420 }
00421 else
00422 {
00423
00424 qd = 0;
00425 }
00426
00427 return 0;
00428 }
00429
00430 TAO_GIOP_Message_State state;
00431 if (state.parse_message_header (incoming) == -1)
00432 {
00433 return -1;
00434 }
00435
00436 size_t copying_len = state.message_size ();
00437
00438 qd = this->make_queued_data (copying_len);
00439
00440 if (qd == 0)
00441 {
00442 if (TAO_debug_level > 0)
00443 {
00444 ACE_ERROR ((LM_ERROR,
00445 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00446 ACE_TEXT ("out of memory\n")));
00447 }
00448 return -1;
00449 }
00450
00451 if (copying_len > incoming.length ())
00452 {
00453 qd->missing_data (copying_len - incoming.length ());
00454 copying_len = incoming.length ();
00455 }
00456 else
00457 {
00458 qd->missing_data (0);
00459 }
00460
00461 qd->msg_block ()->copy (incoming.rd_ptr (), copying_len);
00462
00463 incoming.rd_ptr (copying_len);
00464 qd->state (state);
00465
00466 return 1;
00467 }
00468
00469 int
00470 TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
00471 ACE_Message_Block &incoming)
00472 {
00473
00474 if (qd->missing_data () == TAO_MISSING_DATA_UNDEFINED)
00475 {
00476
00477
00478 size_t const len = qd->msg_block ()->length ();
00479
00480
00481 if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
00482 {
00483
00484
00485 return -1;
00486 }
00487
00488
00489
00490
00491 size_t const available = incoming.length ();
00492 size_t const desired = TAO_GIOP_MESSAGE_HEADER_LEN - len;
00493 size_t const n_copy = ace_min (available, desired);
00494
00495
00496 if (n_copy == 0)
00497 {
00498 return -1;
00499 }
00500
00501 if (qd->msg_block ()->copy (incoming.rd_ptr (), n_copy) == -1)
00502 {
00503 return -1;
00504 }
00505
00506
00507 incoming.rd_ptr (n_copy);
00508
00509
00510 if (qd->msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00511 {
00512 return 0;
00513 }
00514
00515 TAO_GIOP_Message_State state;
00516
00517
00518 if (state.parse_message_header (*qd->msg_block ()) == -1)
00519 {
00520 if (TAO_debug_level > 0)
00521 {
00522 ACE_ERROR ((LM_ERROR,
00523 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
00524 ACE_TEXT ("error parsing header\n") ));
00525 }
00526 return -1;
00527 }
00528
00529
00530 if (ACE_CDR::grow (qd->msg_block (),
00531 state.message_size ()) == -1)
00532 {
00533
00534
00535 return -1;
00536 }
00537
00538
00539
00540 size_t copy_len = state.payload_size ();
00541
00542
00543
00544 if (copy_len > incoming.length ())
00545 {
00546
00547 qd->missing_data (copy_len - incoming.length ());
00548
00549
00550 copy_len = incoming.length ();
00551 }
00552 else
00553 {
00554 qd->missing_data (0);
00555 }
00556
00557
00558
00559 if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00560 {
00561 return -1;
00562 }
00563
00564
00565 incoming.rd_ptr (copy_len);
00566
00567
00568 qd->state (state);
00569 }
00570 else
00571 {
00572
00573 size_t copy_len = qd->missing_data ();
00574
00575 if (copy_len > incoming.length ())
00576 {
00577
00578 qd->missing_data (copy_len - incoming.length ());
00579
00580
00581 copy_len = incoming.length ();
00582 }
00583
00584
00585 if (copy_len == 0)
00586 {
00587 return -1;
00588 }
00589
00590
00591 if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00592 {
00593 return -1;
00594 }
00595
00596
00597 qd->msg_block ()->rd_ptr (copy_len);
00598
00599 }
00600
00601 return 0;
00602 }
00603
00604 int
00605 TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
00606 TAO_Queued_Data *qd)
00607 {
00608
00609 this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
00610
00611
00612 TAO_GIOP_Message_Generator_Parser *generator_parser =
00613 this->get_parser (qd->giop_version ());
00614
00615
00616
00617
00618
00619 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00620 char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
00621 #else
00622 char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
00623 #endif
00624 ACE_Data_Block out_db (sizeof (repbuf),
00625 ACE_Message_Block::MB_DATA,
00626 repbuf,
00627 this->orb_core_->input_cdr_buffer_allocator (),
00628 0,
00629 ACE_Message_Block::DONT_DELETE,
00630 this->orb_core_->input_cdr_dblock_allocator ());
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642 TAO_OutputCDR output (&out_db,
00643 TAO_ENCAP_BYTE_ORDER,
00644 this->orb_core_->input_cdr_msgblock_allocator (),
00645 this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
00646 this->fragmentation_strategy_.get (),
00647 qd->giop_version ().major_version (),
00648 qd->giop_version ().minor_version ());
00649
00650
00651 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00652 size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00653 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665 ACE_Data_Block *db = 0;
00666
00667
00668 ACE_Message_Block::Message_Flags flg = qd->msg_block ()->self_flags ();
00669
00670 if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
00671 {
00672
00673 db = qd->msg_block ()->data_block ();
00674 }
00675 else
00676 {
00677
00678
00679 db = qd->msg_block ()->data_block ()->duplicate ();
00680 }
00681 db->size (qd->msg_block ()->length ());
00682
00683 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
00684 if (qd->state ().compressed ())
00685 {
00686 if (!this->decompress (&db, *qd, rd_pos, wr_pos))
00687 return -1;
00688 }
00689 #endif
00690 if (TAO_debug_level > 9)
00691 {
00692 this->dump_msg ("recv",
00693 reinterpret_cast <u_char *> (db->base () + rd_pos - TAO_GIOP_MESSAGE_HEADER_LEN),
00694 db->size () + rd_pos - TAO_GIOP_MESSAGE_HEADER_LEN);
00695 }
00696
00697 TAO_InputCDR input_cdr (db,
00698 db->flags (),
00699 rd_pos,
00700 wr_pos,
00701 qd->byte_order (),
00702 qd->giop_version ().major_version (),
00703 qd->giop_version ().minor_version (),
00704 this->orb_core_);
00705
00706 transport->assign_translators(&input_cdr,&output);
00707
00708
00709
00710
00711
00712
00713
00714
00715 switch (qd->msg_type ())
00716 {
00717 case GIOP::Request:
00718
00719
00720
00721 return this->process_request (transport,
00722 input_cdr,
00723 output,
00724 generator_parser);
00725
00726 case GIOP::LocateRequest:
00727 return this->process_locate_request (transport,
00728 input_cdr,
00729 output,
00730 generator_parser);
00731 default:
00732 return -1;
00733 }
00734 }
00735
00736 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
00737 bool
00738 TAO_GIOP_Message_Base::decompress (ACE_Data_Block **db, TAO_Queued_Data& qd,
00739 size_t& rd_pos, size_t& wr_pos)
00740 {
00741 TAO_ZIOP_Adapter* adapter = this->orb_core_->ziop_adapter ();
00742 if (adapter)
00743 {
00744 if (!adapter->decompress (db, qd, *this->orb_core_))
00745 return false;
00746 rd_pos = TAO_GIOP_MESSAGE_HEADER_LEN;
00747 wr_pos = (*db)->size();
00748 }
00749 else
00750 {
00751 if (TAO_debug_level > 0)
00752 ACE_ERROR ((LM_ERROR,
00753 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to decompress ")
00754 ACE_TEXT ("data.\n")));
00755
00756 return false;
00757 }
00758 return true;
00759 }
00760 #endif
00761
00762 int
00763 TAO_GIOP_Message_Base::process_reply_message (
00764 TAO_Pluggable_Reply_Params ¶ms,
00765 TAO_Queued_Data *qd)
00766 {
00767
00768 TAO_GIOP_Message_Generator_Parser *generator_parser =
00769 this->get_parser (qd->giop_version ());
00770
00771
00772 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00773 size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00774 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00775
00776 ACE_Data_Block *db = 0;
00777
00778
00779 ACE_Message_Block::Message_Flags flg = qd->msg_block ()->self_flags ();
00780
00781 if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
00782 {
00783
00784 db = qd->msg_block ()->data_block ();
00785 }
00786 else
00787 {
00788
00789
00790 db = qd->msg_block ()->data_block ()->duplicate ();
00791 }
00792 db->size (qd->msg_block ()->length ());
00793
00794 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
00795 if (qd->state ().compressed ())
00796 {
00797 if (!this->decompress (&db, *qd, rd_pos, wr_pos))
00798 return -1;
00799 }
00800 #endif
00801 if (TAO_debug_level > 9)
00802 {
00803 this->dump_msg ("recv",
00804 reinterpret_cast <u_char *> (db->base () + rd_pos - TAO_GIOP_MESSAGE_HEADER_LEN),
00805 db->size () + rd_pos - TAO_GIOP_MESSAGE_HEADER_LEN);
00806 }
00807
00808
00809
00810
00811
00812 TAO_InputCDR input_cdr (db,
00813 db->flags (),
00814 rd_pos,
00815 wr_pos,
00816 qd->byte_order (),
00817 qd->giop_version ().major_version (),
00818 qd->giop_version ().minor_version (),
00819 this->orb_core_);
00820
00821
00822
00823
00824
00825
00826
00827 int retval = 0;
00828
00829 switch (qd->msg_type ())
00830 {
00831 case GIOP::Reply:
00832
00833 retval = generator_parser->parse_reply (input_cdr, params);
00834 break;
00835 case GIOP::LocateReply:
00836 retval = generator_parser->parse_locate_reply (input_cdr, params);
00837 break;
00838 default:
00839 retval = -1;
00840 }
00841
00842 if (retval == -1)
00843 return retval;
00844
00845 params.input_cdr_ = &input_cdr;
00846 params.transport_->assign_translators (params.input_cdr_, 0);
00847
00848 retval = params.transport_->tms ()->dispatch_reply (params);
00849
00850 if (retval == -1)
00851 {
00852
00853
00854 if (TAO_debug_level > 0)
00855 ACE_ERROR ((LM_ERROR,
00856 ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_reply_message, ")
00857 ACE_TEXT ("dispatch reply failed\n"),
00858 params.transport_->id ()));
00859 }
00860 return retval;
00861 }
00862
00863 int
00864 TAO_GIOP_Message_Base::generate_exception_reply (
00865 TAO_OutputCDR &cdr,
00866 TAO_Pluggable_Reply_Params_Base ¶ms,
00867 const CORBA::Exception &x)
00868 {
00869
00870
00871
00872 try
00873 {
00874
00875 this->generate_reply_header (cdr, params);
00876 x._tao_encode (cdr);
00877 }
00878 catch (const ::CORBA::BAD_PARAM&)
00879 {
00880 throw;
00881 }
00882 catch (const ::CORBA::Exception&)
00883 {
00884
00885
00886
00887
00888 if (TAO_debug_level > 0)
00889 ACE_DEBUG ((LM_DEBUG,
00890 ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
00891 ACE_TEXT ("generate_exception_reply ()\n")));
00892 return -1;
00893 }
00894
00895 return 0;
00896 }
00897
00898 int
00899 TAO_GIOP_Message_Base::write_protocol_header (GIOP::MsgType type,
00900 const TAO_GIOP_Message_Version &version,
00901 TAO_OutputCDR &msg)
00902 {
00903
00904 msg.reset ();
00905
00906 CORBA::Octet header[12] =
00907 {
00908
00909
00910 0x47,
00911 0x49,
00912 0x4f,
00913 0x50
00914 };
00915
00916 header[4] = version.major;
00917 header[5] = version.minor;
00918
00919
00920
00921
00922 header[7] = static_cast <CORBA::Octet> (type);
00923
00924 static ACE_CDR::ULong const header_size =
00925 sizeof (header) / sizeof (header[0]);
00926
00927
00928
00929
00930 msg.write_octet_array (header, header_size);
00931
00932 return msg.good_bit ();
00933 }
00934
00935 int
00936 TAO_GIOP_Message_Base::process_request (
00937 TAO_Transport * transport,
00938 TAO_InputCDR & cdr,
00939 TAO_OutputCDR & output,
00940 TAO_GIOP_Message_Generator_Parser * parser)
00941 {
00942
00943
00944 TAO_ServerRequest request (this,
00945 cdr,
00946 output,
00947 transport,
00948 this->orb_core_);
00949
00950 CORBA::ULong request_id = 0;
00951 CORBA::Boolean response_required = false;
00952
00953 try
00954 {
00955 int const parse_error = parser->parse_request_header (request);
00956
00957
00958 if (parse_error != 0)
00959 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
00960
00961 TAO_Codeset_Manager *csm = request.orb_core ()->codeset_manager ();
00962 if (csm)
00963 {
00964 csm->process_service_context (request);
00965 transport->assign_translators (&cdr, &output);
00966 }
00967
00968 request_id = request.request_id ();
00969
00970 response_required = request.response_expected ();
00971
00972 CORBA::Object_var forward_to;
00973
00974
00975
00976
00977
00978
00979
00980
00981
00982 this->orb_core_->request_dispatcher ()->dispatch (
00983 this->orb_core_,
00984 request,
00985 forward_to);
00986
00987
00988
00989 if (request.is_forwarded ())
00990 {
00991 CORBA::Boolean const permanent_forward_condition =
00992 this->orb_core_->is_permanent_forward_condition
00993 (forward_to.in (),
00994 request.request_service_context ());
00995
00996
00997 TAO_Pluggable_Reply_Params_Base reply_params;
00998 reply_params.request_id_ = request_id;
00999 reply_params.reply_status (
01000 permanent_forward_condition
01001 ? GIOP::LOCATION_FORWARD_PERM
01002 : GIOP::LOCATION_FORWARD);
01003 reply_params.svc_ctx_.length (0);
01004
01005
01006 reply_params.service_context_notowned (
01007 &request.reply_service_info ());
01008
01009 output.message_attributes (request_id,
01010 0,
01011 TAO_REPLY,
01012 0);
01013
01014
01015 this->generate_reply_header (output, reply_params);
01016
01017 if (!(output << forward_to.in ()))
01018 {
01019 if (TAO_debug_level > 0)
01020 ACE_ERROR ((LM_ERROR,
01021 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
01022 ACE_TEXT ("forward reference.\n")));
01023
01024 return -1;
01025 }
01026
01027 output.more_fragments (false);
01028
01029 int const result = transport->send_message (output,
01030 0,
01031 TAO_REPLY);
01032 if (result == -1)
01033 {
01034 if (TAO_debug_level > 0)
01035 {
01036
01037
01038 ACE_ERROR ((LM_ERROR,
01039 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
01040 ACE_TEXT ("cannot send reply\n"),
01041 ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
01042 }
01043 }
01044 return result;
01045 }
01046 }
01047
01048 catch ( ::CORBA::Exception& ex)
01049 {
01050 int result = 0;
01051
01052 if (response_required)
01053 {
01054 result = this->send_reply_exception (transport,
01055 output,
01056 request_id,
01057 &request.reply_service_info (),
01058 &ex);
01059 if (result == -1)
01060 {
01061 if (TAO_debug_level > 0)
01062 {
01063 ACE_ERROR ((LM_ERROR,
01064 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
01065 ACE_TEXT ("cannot send exception\n"),
01066 ACE_TEXT ("process_connector_request ()")));
01067
01068 ex._tao_print_exception (
01069 "TAO_GIOP_Message_Base::process_request[1]");
01070 }
01071 }
01072
01073 }
01074 else if (TAO_debug_level > 0)
01075 {
01076
01077
01078
01079
01080
01081
01082 ACE_ERROR ((LM_ERROR,
01083 ACE_TEXT ("(%P|%t) exception thrown ")
01084 ACE_TEXT ("but client is not waiting a response\n")));
01085
01086 ex._tao_print_exception (
01087 "TAO_GIOP_Message_Base::process_request[2]");
01088 }
01089
01090 return result;
01091 }
01092 catch (...)
01093 {
01094
01095
01096
01097
01098
01099
01100
01101 int result = 0;
01102
01103 if (response_required)
01104 {
01105 CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code
01106 (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0),
01107 CORBA::COMPLETED_MAYBE);
01108
01109 if (this->send_reply_exception (transport,
01110 output,
01111 request_id,
01112 &request.reply_service_info (),
01113 &exception) == -1
01114 && TAO_debug_level > 0)
01115 {
01116 ACE_ERROR ((LM_ERROR,
01117 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
01118 ACE_TEXT ("%p: ")
01119 ACE_TEXT ("cannot send exception\n"),
01120 ACE_TEXT ("process_request ()")));
01121 exception._tao_print_exception (
01122 "TAO_GIOP_Message_Base::process_request[3]");
01123 }
01124 }
01125 else if (TAO_debug_level > 0)
01126 {
01127
01128
01129
01130
01131
01132 ACE_ERROR ((LM_ERROR,
01133 ACE_TEXT ("TAO (%P|%t) exception thrown ")
01134 ACE_TEXT ("but client is not waiting a response\n")));
01135 }
01136
01137 return result;
01138 }
01139
01140 return 0;
01141 }
01142
01143
01144 int
01145 TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
01146 TAO_InputCDR &input,
01147 TAO_OutputCDR &output,
01148 TAO_GIOP_Message_Generator_Parser *parser)
01149 {
01150
01151
01152 TAO_GIOP_Locate_Request_Header locate_request (input, this->orb_core_);
01153
01154 TAO_GIOP_Locate_Status_Msg status_info;
01155
01156
01157 status_info.status = GIOP::UNKNOWN_OBJECT;
01158
01159 CORBA::Boolean response_required = true;
01160
01161 try
01162 {
01163 int parse_error = parser->parse_locate_header (locate_request);
01164
01165 if (parse_error != 0)
01166 {
01167 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01168 }
01169
01170 TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
01171 locate_request.object_key ().length (),
01172 locate_request.object_key ().get_buffer (),
01173 0);
01174
01175
01176 parse_error = 1;
01177 CORBA::ULong req_id = locate_request.request_id ();
01178
01179
01180
01181 CORBA::Boolean deferred_reply = true;
01182 TAO_ServerRequest server_request (this,
01183 req_id,
01184 response_required,
01185 deferred_reply,
01186 tmp_key,
01187 "_non_existent",
01188 output,
01189 transport,
01190 this->orb_core_,
01191 parse_error);
01192
01193 if (parse_error != 0)
01194 {
01195 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01196 }
01197
01198 CORBA::Object_var forward_to;
01199
01200 this->orb_core_->request_dispatcher ()->dispatch (
01201 this->orb_core_,
01202 server_request,
01203 forward_to);
01204
01205 if (server_request.is_forwarded ())
01206 {
01207 status_info.status = GIOP::OBJECT_FORWARD;
01208 status_info.forward_location_var = forward_to;
01209 if (TAO_debug_level > 0)
01210 ACE_DEBUG ((LM_DEBUG,
01211 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01212 ACE_TEXT ("called: forwarding\n")));
01213 }
01214 else if (server_request.reply_status () == GIOP::NO_EXCEPTION)
01215 {
01216
01217 status_info.status = GIOP::OBJECT_HERE;
01218 if (TAO_debug_level > 0)
01219 ACE_DEBUG ((LM_DEBUG,
01220 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01221 ACE_TEXT ("found\n")));
01222 }
01223 else
01224 {
01225
01226 status_info.status = GIOP::UNKNOWN_OBJECT;
01227 ACE_DEBUG ((LM_DEBUG,
01228 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01229 ACE_TEXT ("not here\n")));
01230 }
01231 }
01232
01233 catch (const ::CORBA::Exception&)
01234 {
01235
01236 status_info.status = GIOP::UNKNOWN_OBJECT;
01237 if (TAO_debug_level > 0)
01238 ACE_DEBUG ((LM_DEBUG,
01239 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01240 ACE_TEXT ("CORBA exception raised\n")));
01241 }
01242 catch (...)
01243 {
01244
01245 status_info.status = GIOP::UNKNOWN_OBJECT;
01246 if (TAO_debug_level > 0)
01247 ACE_DEBUG ((LM_DEBUG,
01248 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
01249 ACE_TEXT ("C++ exception raised\n")));
01250 }
01251
01252 return this->make_send_locate_reply (transport,
01253 locate_request,
01254 status_info,
01255 output,
01256 parser);
01257 }
01258
01259 int
01260 TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
01261 TAO_GIOP_Locate_Request_Header &request,
01262 TAO_GIOP_Locate_Status_Msg &status_info,
01263 TAO_OutputCDR &output,
01264 TAO_GIOP_Message_Generator_Parser *parser)
01265 {
01266 TAO_GIOP_Message_Version giop_version;
01267 output.get_version (giop_version);
01268
01269
01270
01271
01272 this->write_protocol_header (GIOP::LocateReply, giop_version, output);
01273
01274
01275 parser->write_locate_reply_mesg (output,
01276 request.request_id (),
01277 status_info);
01278
01279 output.more_fragments (false);
01280
01281
01282 int const result = transport->send_message (output,
01283 0,
01284 TAO_REPLY);
01285
01286
01287 if (result == -1)
01288 {
01289 if (TAO_debug_level > 0)
01290 {
01291 ACE_ERROR ((LM_ERROR,
01292 ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
01293 ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
01294 }
01295 }
01296
01297 return result;
01298 }
01299
01300
01301
01302
01303
01304
01305 int
01306 TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
01307 {
01308 const char error_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01309 {
01310
01311
01312 0x47,
01313 0x49,
01314 0x4f,
01315 0x50,
01316 (CORBA::Octet) 1,
01317 (CORBA::Octet) 0,
01318 TAO_ENCAP_BYTE_ORDER,
01319 GIOP::MessageError,
01320 0, 0, 0, 0
01321 };
01322
01323 if (TAO_debug_level > 9)
01324 {
01325 this->dump_msg ("send_error",
01326 reinterpret_cast <const u_char *> (error_message),
01327 TAO_GIOP_MESSAGE_HEADER_LEN);
01328 }
01329
01330 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01331 ACE_Message_Block::MB_DATA,
01332 error_message,
01333 0,
01334 0,
01335 ACE_Message_Block::DONT_DELETE,
01336 0);
01337 ACE_Message_Block message_block(&data_block,
01338 ACE_Message_Block::DONT_DELETE);
01339 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01340
01341 size_t bt;
01342 int const result = transport->send_message_block_chain (&message_block, bt);
01343 if (result == -1)
01344 {
01345 if (TAO_debug_level > 0)
01346 ACE_DEBUG ((LM_DEBUG,
01347 ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
01348 transport->id ()));
01349 }
01350
01351 return result;
01352 }
01353
01354 TAO_GIOP_Message_Generator_Parser*
01355 TAO_GIOP_Message_Base::get_parser (
01356 const TAO_GIOP_Message_Version &version) const
01357 {
01358 switch (version.major)
01359 {
01360 case 1:
01361 switch (version.minor)
01362 {
01363 case 0:
01364 return
01365 const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
01366 &this->tao_giop_impl_.tao_giop_10);
01367 break;
01368 case 1:
01369 return
01370 const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
01371 &this->tao_giop_impl_.tao_giop_11);
01372 break;
01373 case 2:
01374 return
01375 const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
01376 &this->tao_giop_impl_.tao_giop_12);
01377 break;
01378 default:
01379 throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
01380 break;
01381 }
01382 break;
01383 default:
01384 throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
01385 break;
01386 }
01387 }
01388
01389
01390
01391
01392
01393
01394
01395
01396
01397
01398
01399
01400
01401
01402
01403
01404 void
01405 TAO_GIOP_Message_Base::
01406 send_close_connection (const TAO_GIOP_Message_Version &version,
01407 TAO_Transport *transport)
01408 {
01409
01410
01411
01412 const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01413 {
01414
01415
01416 0x47,
01417 0x49,
01418 0x4f,
01419 0x50,
01420 version.major,
01421 version.minor,
01422 TAO_ENCAP_BYTE_ORDER,
01423 GIOP::CloseConnection,
01424 0, 0, 0, 0
01425 };
01426
01427
01428
01429
01430
01431
01432
01433 if (TAO_debug_level > 9)
01434 {
01435 this->dump_msg ("send_close_connection",
01436 reinterpret_cast <const u_char *> (close_message),
01437 TAO_GIOP_MESSAGE_HEADER_LEN);
01438 }
01439
01440 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01441 ACE_Message_Block::MB_DATA,
01442 close_message,
01443 0,
01444 0,
01445 ACE_Message_Block::DONT_DELETE,
01446 0);
01447 ACE_Message_Block message_block(&data_block);
01448 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01449
01450 size_t bt;
01451 int const result = transport->send_message_block_chain (&message_block, bt);
01452 if (result == -1)
01453 {
01454 if (TAO_debug_level > 0)
01455 ACE_ERROR ((LM_ERROR,
01456 ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
01457 transport->id (), ACE_ERRNO_GET));
01458 }
01459
01460 transport->close_connection ();
01461 ACE_DEBUG ((LM_DEBUG,
01462 ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
01463 transport-> id ()));
01464 }
01465
01466
01467 int
01468 TAO_GIOP_Message_Base::send_reply_exception (
01469 TAO_Transport *transport,
01470 TAO_OutputCDR &output,
01471 CORBA::ULong request_id,
01472 IOP::ServiceContextList *svc_info,
01473 CORBA::Exception *x
01474 )
01475 {
01476 TAO_Pluggable_Reply_Params_Base reply_params;
01477 reply_params.request_id_ = request_id;
01478 reply_params.svc_ctx_.length (0);
01479
01480
01481 reply_params.argument_flag_ = true;
01482
01483
01484
01485 reply_params.service_context_notowned (svc_info);
01486
01487 if (CORBA::SystemException::_downcast (x) != 0)
01488 {
01489 reply_params.reply_status (GIOP::SYSTEM_EXCEPTION);
01490 }
01491 else
01492 {
01493 reply_params.reply_status (GIOP::USER_EXCEPTION);
01494 }
01495
01496 if (this->generate_exception_reply (output, reply_params, *x) == -1)
01497 return -1;
01498
01499 output.more_fragments (false);
01500
01501 return transport->send_message (output, 0, TAO_REPLY);
01502 }
01503
01504 void
01505 TAO_GIOP_Message_Base::dump_msg (const char *label,
01506 const u_char *ptr,
01507 size_t len)
01508 {
01509 if (TAO_debug_level < 10)
01510 {
01511 return;
01512 }
01513
01514 static const char digits[] = "0123456789ABCD";
01515 static const char *names[] =
01516 {
01517 "Request",
01518 "Reply",
01519 "CancelRequest",
01520 "LocateRequest",
01521 "LocateReply",
01522 "CloseConnection",
01523 "MessageError",
01524 "Fragment"
01525 };
01526
01527
01528 const char *message_name = "UNKNOWN MESSAGE";
01529 u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET];
01530 if (slot < sizeof (names) / sizeof (names[0]))
01531 message_name = names[slot];
01532
01533
01534 int const byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
01535
01536
01537 CORBA::Octet const major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
01538 CORBA::Octet const minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
01539
01540
01541 CORBA::ULong tmp = 0;
01542 CORBA::ULong *id = &tmp;
01543 char *tmp_id = 0;
01544
01545 if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Request ||
01546 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Reply ||
01547 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Fragment)
01548 {
01549 if (major == 1 && minor < 2)
01550 {
01551
01552 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4);
01553 }
01554 else
01555 {
01556 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
01557 }
01558 #if !defined (ACE_DISABLE_SWAP_ON_READ)
01559 if (byte_order == TAO_ENCAP_BYTE_ORDER)
01560 {
01561 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01562 }
01563 else
01564 {
01565 ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
01566 }
01567 #else
01568 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01569 #endif
01570
01571 }
01572 else if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::CancelRequest ||
01573 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::LocateRequest ||
01574 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::LocateReply)
01575 {
01576 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
01577 #if !defined (ACE_DISABLE_SWAP_ON_READ)
01578 if (byte_order == TAO_ENCAP_BYTE_ORDER)
01579 {
01580 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01581 }
01582 else
01583 {
01584 ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
01585 }
01586 #else
01587 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01588 #endif
01589 }
01590
01591
01592 ACE_DEBUG ((LM_DEBUG,
01593 ACE_TEXT("TAO (%P|%t) - GIOP_Message_Base::dump_msg, ")
01594 ACE_TEXT("%C GIOP message v%c.%c, %d data bytes, %s endian, ")
01595 ACE_TEXT("Type %C[%u]\n"),
01596 label,
01597 digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
01598 digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
01599 len - TAO_GIOP_MESSAGE_HEADER_LEN ,
01600 (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
01601 message_name,
01602 *id));
01603 ACE_HEX_DUMP ((LM_DEBUG,
01604 (const char *) ptr,
01605 len,
01606 ACE_TEXT ("GIOP message")));
01607 }
01608
01609 int
01610 TAO_GIOP_Message_Base::generate_locate_reply_header (
01611 TAO_OutputCDR & ,
01612 TAO_Pluggable_Reply_Params_Base & )
01613 {
01614 return 0;
01615 }
01616
01617 bool
01618 TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) const
01619 {
01620 TAO_GIOP_Message_Version giop_version;
01621
01622 msg.get_version (giop_version);
01623
01624
01625 TAO_GIOP_Message_Generator_Parser *generator_parser =
01626 this->get_parser (giop_version);
01627
01628
01629
01630
01631
01632 return generator_parser->is_ready_for_bidirectional ();
01633 }
01634
01635
01636 TAO_Queued_Data *
01637 TAO_GIOP_Message_Base::make_queued_data (size_t sz)
01638 {
01639
01640
01641
01642
01643
01644 ACE_Data_Block *db =
01645 this->orb_core_->create_input_cdr_data_block (sz +
01646 ACE_CDR::MAX_ALIGNMENT);
01647
01648 TAO_Queued_Data *qd =
01649 TAO_Queued_Data::make_queued_data (
01650 this->orb_core_->transport_message_buffer_allocator (),
01651 this->orb_core_->input_cdr_msgblock_allocator (),
01652 db);
01653
01654 if (qd == 0)
01655 {
01656 if (TAO_debug_level > 0)
01657 {
01658 ACE_ERROR ((LM_ERROR,
01659 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01660 ACE_TEXT ("out of memory, failed to allocate queued data object\n")));
01661 }
01662 db->release ();
01663 return 0;
01664 }
01665
01666 return qd;
01667 }
01668
01669 size_t
01670 TAO_GIOP_Message_Base::header_length (void) const
01671 {
01672 return TAO_GIOP_MESSAGE_HEADER_LEN;
01673 }
01674
01675 size_t
01676 TAO_GIOP_Message_Base::fragment_header_length (
01677 const TAO_GIOP_Message_Version& giop_version) const
01678 {
01679
01680 TAO_GIOP_Message_Generator_Parser *generator_parser =
01681 this->get_parser (giop_version);
01682
01683 return generator_parser->fragment_header_length ();
01684 }
01685
01686 int
01687 TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd,
01688 CORBA::ULong &request_id) const
01689 {
01690
01691 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
01692 size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
01693 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
01694
01695
01696
01697
01698
01699
01700
01701
01702
01703
01704
01705 ACE_Message_Block::Message_Flags flg = 0;
01706 ACE_Data_Block *db = 0;
01707
01708
01709 flg = qd->msg_block ()->self_flags ();
01710
01711 if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
01712 {
01713
01714 db = qd->msg_block ()->data_block ();
01715 }
01716 else
01717 {
01718
01719
01720 db = qd->msg_block ()->data_block ()->duplicate ();
01721 }
01722
01723 TAO_InputCDR input_cdr (db,
01724 flg,
01725 rd_pos,
01726 wr_pos,
01727 qd->byte_order (),
01728 qd->giop_version ().major_version (),
01729 qd->giop_version ().minor_version (),
01730 this->orb_core_);
01731
01732 if (qd->giop_version ().major == 1 &&
01733 (qd->giop_version ().minor == 0 || qd->giop_version ().minor == 1))
01734 {
01735 switch (qd->msg_type ())
01736 {
01737 case GIOP::Request:
01738 case GIOP::Reply:
01739 {
01740 IOP::ServiceContextList service_context;
01741
01742 if ((input_cdr >> service_context)
01743 && (input_cdr >> request_id))
01744 {
01745 return 0;
01746 }
01747 }
01748 break;
01749 case GIOP::CancelRequest:
01750 case GIOP::LocateRequest:
01751 case GIOP::LocateReply:
01752 {
01753 if ((input_cdr >> request_id))
01754 {
01755 return 0;
01756 }
01757 }
01758 break;
01759 default:
01760 break;
01761 }
01762 }
01763 else
01764 {
01765 switch (qd->msg_type ())
01766 {
01767 case GIOP::Request:
01768 case GIOP::Reply:
01769 case GIOP::Fragment:
01770 case GIOP::CancelRequest:
01771 case GIOP::LocateRequest:
01772 case GIOP::LocateReply:
01773 {
01774
01775
01776
01777
01778 if ((input_cdr >> request_id))
01779 {
01780 return 0;
01781 }
01782 }
01783 break;
01784 default:
01785 break;
01786 }
01787 }
01788
01789 return -1;
01790 }
01791
01792
01793 int
01794 TAO_GIOP_Message_Base::consolidate_fragmented_message (
01795 TAO_Queued_Data * qd,
01796 TAO_Queued_Data *& msg)
01797 {
01798 TAO::Incoming_Message_Stack reverse_stack;
01799
01800 TAO_Queued_Data *tail = 0;
01801 TAO_Queued_Data *head = 0;
01802
01803
01804
01805
01806
01807
01808 if (qd == 0)
01809 {
01810 return -1;
01811 }
01812
01813 if (qd->giop_version ().major == 1 && qd->giop_version ().minor == 0)
01814 {
01815 TAO_Queued_Data::release (qd);
01816 return -1;
01817 }
01818
01819
01820 if (qd->more_fragments ())
01821 {
01822 this->fragment_stack_.push (qd);
01823
01824 msg = 0;
01825 return 1;
01826 }
01827
01828 tail = qd;
01829
01830
01831
01832 size_t const header_adjustment =
01833 this->header_length () +
01834 this->fragment_header_length (tail->giop_version ().major_version ());
01835
01836 if (tail->msg_block ()->length () < header_adjustment)
01837 {
01838
01839 TAO_Queued_Data::release (qd);
01840 return -1;
01841 }
01842
01843
01844 if (tail->giop_version ().major_version () == 1 && tail->giop_version ().minor_version () == 1)
01845 {
01846
01847
01848 while (this->fragment_stack_.pop (head) != -1)
01849 {
01850 if (head->more_fragments () &&
01851 head->giop_version ().major_version () == 1 &&
01852 head->giop_version ().minor_version () == 1 &&
01853 head->msg_block ()->length () >= header_adjustment)
01854 {
01855
01856 tail->msg_block ()->rd_ptr(header_adjustment);
01857
01858 head->msg_block ()->cont (tail->msg_block ());
01859
01860 tail->msg_block (0);
01861
01862 TAO_Queued_Data::release (tail);
01863
01864 tail = head;
01865 }
01866 else
01867 {
01868 reverse_stack.push (head);
01869 }
01870 }
01871 }
01872 else
01873 {
01874
01875
01876 CORBA::ULong tmp_request_id = 0;
01877 if (this->parse_request_id (tail, tmp_request_id) == -1)
01878 {
01879 return -1;
01880 }
01881
01882 CORBA::ULong const request_id = tmp_request_id;
01883
01884 while (this->fragment_stack_.pop (head) != -1)
01885 {
01886 CORBA::ULong head_request_id = 0;
01887 int parse_status = 0;
01888
01889 if (head->more_fragments () &&
01890 head->giop_version ().major_version () >= 1 &&
01891 head->giop_version ().minor_version () >= 2 &&
01892 head->msg_block ()->length () >= header_adjustment &&
01893 (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
01894 request_id == head_request_id)
01895 {
01896
01897 tail->msg_block ()->rd_ptr(header_adjustment);
01898
01899 head->msg_block ()->cont (tail->msg_block ());
01900
01901 tail->msg_block (0);
01902
01903 TAO_Queued_Data::release (tail);
01904
01905 tail = head;
01906 }
01907 else
01908 {
01909 if (parse_status == -1)
01910 {
01911 TAO_Queued_Data::release (head);
01912 return -1;
01913 }
01914
01915 reverse_stack.push (head);
01916 }
01917 }
01918 }
01919
01920
01921 while (reverse_stack.pop (head) != -1)
01922 {
01923 this->fragment_stack_.push (head);
01924 }
01925
01926 if (tail->consolidate () == -1)
01927 {
01928
01929 TAO_Queued_Data::release (tail);
01930 return -1;
01931 }
01932
01933
01934 msg = tail;
01935
01936 return 0;
01937 }
01938
01939
01940 int
01941 TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request)
01942 {
01943
01944
01945
01946 TAO::Incoming_Message_Stack reverse_stack;
01947
01948 CORBA::ULong cancel_request_id;
01949
01950 if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
01951 {
01952 return -1;
01953 }
01954
01955 TAO_Queued_Data *head = 0;
01956
01957
01958 while (this->fragment_stack_.pop (head) != -1)
01959 {
01960 reverse_stack.push (head);
01961 }
01962
01963 bool discard_all_GIOP11_messages = false;
01964
01965
01966
01967
01968
01969
01970
01971 while (reverse_stack.pop (head) != -1)
01972 {
01973 CORBA::ULong head_request_id;
01974
01975 if (head->giop_version ().major_version () == 1 &&
01976 head->giop_version ().minor_version () <= 1 &&
01977 head->msg_type () != GIOP::Fragment &&
01978 this->parse_request_id (head, head_request_id) >= 0 &&
01979 cancel_request_id == head_request_id)
01980 {
01981 TAO_Queued_Data::release (head);
01982 discard_all_GIOP11_messages = true;
01983 }
01984 else if (head->giop_version ().major_version () == 1 &&
01985 head->giop_version ().minor_version () <= 1 &&
01986 discard_all_GIOP11_messages)
01987 {
01988 TAO_Queued_Data::release (head);
01989 }
01990 else if (head->giop_version ().major_version () >= 1 &&
01991 head->giop_version ().minor_version () >= 2 &&
01992 this->parse_request_id (head, head_request_id) >= 0 &&
01993 cancel_request_id == head_request_id)
01994 {
01995 TAO_Queued_Data::release (head);
01996 }
01997 else
01998 {
01999 this->fragment_stack_.push (head);
02000 }
02001 }
02002
02003 return 0;
02004 }
02005
02006 TAO_GIOP_Fragmentation_Strategy *
02007 TAO_GIOP_Message_Base::fragmentation_strategy (void)
02008 {
02009 return this->fragmentation_strategy_.get ();
02010 }
02011
02012 void
02013 TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
02014 {
02015 CORBA::Octet * const buf =
02016 reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
02017
02018 CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
02019 CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
02020
02021
02022 CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
02023
02024
02025 ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
02026
02027
02028
02029
02030 if (!(major <= 1 && minor == 0))
02031 ACE_SET_BITS (flags, msg.more_fragments () << 1);
02032 }
02033
02034 TAO_END_VERSIONED_NAMESPACE_DECL