00001
00002
00003 #include "tao/GIOP_Message_Base.h"
00004 #include "tao/operation_details.h"
00005 #include "tao/debug.h"
00006 #include "tao/ORB_Core.h"
00007 #include "tao/TAO_Server_Request.h"
00008 #include "tao/GIOP_Message_Locate_Header.h"
00009 #include "tao/Transport.h"
00010 #include "tao/Transport_Mux_Strategy.h"
00011 #include "tao/LF_Strategy.h"
00012 #include "tao/Request_Dispatcher.h"
00013 #include "tao/Codeset_Manager.h"
00014 #include "tao/SystemException.h"
00015
00016
00017
00018
00019
00020
00021 ACE_RCSID (tao,
00022 GIOP_Message_Base,
00023 "GIOP_Message_Base.cpp,v 1.122 2006/05/30 13:41:29 jwillemsen Exp")
00024
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core * orb_core,
00028 TAO_Transport * transport,
00029 size_t input_cdr_size)
00030 : orb_core_ (orb_core)
00031 , message_state_ ()
00032 , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
00033 , out_stream_ (0,
00034 input_cdr_size,
00035 TAO_ENCAP_BYTE_ORDER,
00036 orb_core->output_cdr_buffer_allocator (),
00037 orb_core->output_cdr_dblock_allocator (),
00038 orb_core->output_cdr_msgblock_allocator (),
00039 orb_core->orb_params ()->cdr_memcpy_tradeoff (),
00040 fragmentation_strategy_.get (),
00041 TAO_DEF_GIOP_MAJOR,
00042 TAO_DEF_GIOP_MINOR)
00043 {
00044 }
00045
00046
00047 TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void)
00048 {
00049 }
00050
00051
00052 void
00053 TAO_GIOP_Message_Base::init (CORBA::Octet major,
00054 CORBA::Octet minor)
00055 {
00056
00057 this->out_stream_.set_version (major,
00058 minor);
00059 }
00060
00061 TAO_OutputCDR &
00062 TAO_GIOP_Message_Base::out_stream (void)
00063 {
00064 return this->out_stream_;
00065 }
00066
00067 void
00068 TAO_GIOP_Message_Base::reset (void)
00069 {
00070
00071 }
00072
00073 int
00074 TAO_GIOP_Message_Base::generate_request_header (
00075 TAO_Operation_Details &op,
00076 TAO_Target_Specification &spec,
00077 TAO_OutputCDR &cdr
00078 )
00079 {
00080
00081 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00082
00083 CORBA::Octet major, minor;
00084
00085 cdr.get_version (major, minor);
00086
00087
00088 this->set_state (major,
00089 minor,
00090 generator_parser);
00091
00092
00093 if (!this->write_protocol_header (TAO_GIOP_REQUEST,
00094 cdr))
00095 {
00096 if (TAO_debug_level)
00097 {
00098 ACE_ERROR ((LM_ERROR,
00099 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00100 }
00101
00102 return -1;
00103 }
00104
00105
00106 if (!generator_parser->write_request_header (op,
00107 spec,
00108 cdr))
00109 {
00110 if (TAO_debug_level)
00111 ACE_ERROR ((LM_ERROR,
00112 ACE_TEXT ("(%P|%t) Error in writing request header \n")));
00113
00114 return -1;
00115 }
00116
00117 return 0;
00118 }
00119
00120 int
00121 TAO_GIOP_Message_Base::generate_locate_request_header (
00122 TAO_Operation_Details &op,
00123 TAO_Target_Specification &spec,
00124 TAO_OutputCDR &cdr
00125 )
00126 {
00127
00128 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00129
00130 CORBA::Octet major, minor;
00131
00132 cdr.get_version (major, minor);
00133
00134
00135 this->set_state (major,
00136 minor,
00137 generator_parser);
00138
00139
00140 if (!this->write_protocol_header (TAO_GIOP_LOCATEREQUEST,
00141 cdr))
00142 {
00143 if (TAO_debug_level)
00144 ACE_ERROR ((LM_ERROR,
00145 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00146
00147 return -1;
00148 }
00149
00150
00151 if (!generator_parser->write_locate_request_header
00152 (op.request_id (),
00153 spec,
00154 cdr))
00155 {
00156 if (TAO_debug_level)
00157 ACE_ERROR ((LM_ERROR,
00158 ACE_TEXT ("(%P|%t) Error in writing locate request header \n")));
00159
00160
00161 return -1;
00162
00163 }
00164
00165 return 0;
00166 }
00167
00168 int
00169 TAO_GIOP_Message_Base::generate_reply_header (
00170 TAO_OutputCDR &cdr,
00171 TAO_Pluggable_Reply_Params_Base ¶ms
00172 )
00173 {
00174
00175 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00176
00177 CORBA::Octet major, minor;
00178
00179 cdr.get_version (major, minor);
00180
00181
00182 this->set_state (major,
00183 minor,
00184 generator_parser);
00185
00186
00187 if (!this->write_protocol_header (TAO_GIOP_REPLY,
00188 cdr))
00189 {
00190 if (TAO_debug_level)
00191 ACE_ERROR ((LM_ERROR,
00192 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00193
00194 return -1;
00195 }
00196
00197 ACE_DECLARE_NEW_CORBA_ENV;
00198 ACE_TRY
00199 {
00200
00201 int const result =
00202 generator_parser->write_reply_header (cdr,
00203 params
00204 ACE_ENV_ARG_PARAMETER);
00205 ACE_TRY_CHECK;
00206
00207 if (!result)
00208 {
00209 if (TAO_debug_level > 4)
00210 ACE_ERROR ((LM_ERROR,
00211 ACE_TEXT ("(%P|%t) Error in writing reply ")
00212 ACE_TEXT ("header\n")));
00213
00214 return -1;
00215 }
00216 }
00217 ACE_CATCHANY
00218 {
00219 if (TAO_debug_level > 4)
00220 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00221 ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
00222
00223 return -1;
00224 }
00225 ACE_ENDTRY;
00226
00227 return 0;
00228 }
00229
00230 int
00231 TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
00232 CORBA::ULong request_id)
00233 {
00234
00235 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00236
00237 CORBA::Octet major, minor;
00238
00239 cdr.get_version (major, minor);
00240
00241
00242
00243
00244 if (major == 1 && minor < 2)
00245 return -1;
00246
00247
00248 this->set_state (major,
00249 minor,
00250 generator_parser);
00251
00252
00253 if (!this->write_protocol_header (TAO_GIOP_FRAGMENT, cdr)
00254 || !generator_parser->write_fragment_header (cdr, request_id))
00255 {
00256 if (TAO_debug_level)
00257 ACE_ERROR ((LM_ERROR,
00258 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00259
00260 return -1;
00261 }
00262
00263 return 0;
00264 }
00265
00266 int
00267 TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
00268 {
00269
00270 char * buf = (char *) stream.buffer ();
00271
00272 this->set_giop_flags (stream);
00273
00274
00275 size_t const total_len = stream.total_length ();
00276
00277
00278
00279
00280
00281
00282
00283
00284 CORBA::ULong const bodylen = static_cast <CORBA::ULong>
00285 (total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
00286
00287 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00288 *(reinterpret_cast <CORBA::ULong *> (buf +
00289 TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00290 #else
00291 if (!stream.do_byte_swap ())
00292 *(reinterpret_cast <CORBA::ULong *>
00293 (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00294 else
00295 ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen),
00296 buf + TAO_GIOP_MESSAGE_SIZE_OFFSET);
00297 #endif
00298
00299 if (TAO_debug_level > 2)
00300 {
00301
00302
00303
00304 ACE_Message_Block* consolidated_block = 0;
00305 if (stream.begin()->cont () != 0)
00306 {
00307 consolidated_block = new ACE_Message_Block;
00308 ACE_CDR::consolidate (consolidated_block, stream.begin ());
00309 buf = (char *) (consolidated_block->rd_ptr ());
00310 }
00311
00312 this->dump_msg ("send",
00313 reinterpret_cast <u_char *> (buf),
00314 total_len);
00315
00316
00317 delete consolidated_block;
00318 consolidated_block = 0;
00319
00320 }
00321
00322 return 0;
00323 }
00324
00325 TAO_Pluggable_Message_Type
00326 TAO_GIOP_Message_Base::message_type (
00327 const TAO_GIOP_Message_State &msg_state) const
00328 {
00329
00330
00331 switch (msg_state.message_type_)
00332 {
00333 case TAO_GIOP_REQUEST:
00334 return TAO_PLUGGABLE_MESSAGE_REQUEST;
00335 case TAO_GIOP_LOCATEREQUEST:
00336 return TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST;
00337
00338 case TAO_GIOP_LOCATEREPLY:
00339 return TAO_PLUGGABLE_MESSAGE_LOCATEREPLY;
00340
00341 case TAO_GIOP_REPLY:
00342 return TAO_PLUGGABLE_MESSAGE_REPLY;
00343
00344 case TAO_GIOP_CLOSECONNECTION:
00345 return TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION;
00346
00347 case TAO_GIOP_FRAGMENT:
00348 return TAO_PLUGGABLE_MESSAGE_FRAGMENT;
00349
00350 case TAO_GIOP_MESSAGERROR:
00351 return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
00352
00353 case TAO_GIOP_CANCELREQUEST:
00354 return TAO_PLUGGABLE_MESSAGE_CANCELREQUEST;
00355
00356 default:
00357 if (TAO_debug_level > 0)
00358 {
00359 ACE_ERROR ((LM_ERROR,
00360 ACE_TEXT ("TAO (%P|%t) %N:%l message_type : ")
00361 ACE_TEXT ("wrong message.\n")));
00362 }
00363 }
00364
00365 return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
00366 }
00367
00368 int
00369 TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
00370 {
00371 this->message_state_.reset ();
00372
00373 return this->message_state_.parse_message_header (incoming);
00374 }
00375
00376 int
00377 TAO_GIOP_Message_Base::parse_next_message (ACE_Message_Block &incoming,
00378 TAO_Queued_Data &qd,
00379 size_t &mesg_length)
00380 {
00381 if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00382 {
00383 qd.missing_data_ = TAO_MISSING_DATA_UNDEFINED;
00384
00385 return 0;
00386 }
00387 else
00388 {
00389 TAO_GIOP_Message_State state;
00390
00391 if (state.parse_message_header (incoming) == -1)
00392 {
00393 return -1;
00394 }
00395
00396 const size_t message_size = state.message_size ();
00397
00398 if (message_size > incoming.length ())
00399 {
00400 qd.missing_data_ = message_size - incoming.length ();
00401 }
00402 else
00403 {
00404 qd.missing_data_ = 0;
00405 }
00406
00407
00408 this->init_queued_data (&qd, state);
00409 mesg_length = TAO_GIOP_MESSAGE_HEADER_LEN
00410 + state.payload_size ();
00411
00412 return 1;
00413 }
00414 }
00415
00416 int
00417 TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
00418 TAO_Queued_Data *&qd)
00419 {
00420 if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00421 {
00422 if (incoming.length () > 0)
00423 {
00424
00425
00426
00427
00428 size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
00429
00430
00431
00432 size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
00433 default_buf_size);
00434
00435
00436
00437 qd = this->make_queued_data (buf_size);
00438
00439 if (qd == 0)
00440 {
00441 if (TAO_debug_level > 0)
00442 {
00443 ACE_ERROR((LM_ERROR,
00444 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00445 ACE_TEXT ("out of memory\n")));
00446 }
00447 return -1;
00448 }
00449
00450 qd->msg_block_->copy (incoming.rd_ptr (),
00451 incoming.length ());
00452
00453 incoming.rd_ptr (incoming.length ());
00454
00455 qd->missing_data_ = TAO_MISSING_DATA_UNDEFINED;
00456 }
00457 else
00458 {
00459
00460 qd = 0;
00461 }
00462
00463 return 0;
00464 }
00465
00466 TAO_GIOP_Message_State state;
00467 if (state.parse_message_header (incoming) == -1)
00468 {
00469 return -1;
00470 }
00471
00472 size_t copying_len = state.message_size ();
00473
00474 qd = this->make_queued_data (copying_len);
00475
00476 if (qd == 0)
00477 {
00478 if (TAO_debug_level > 0)
00479 {
00480 ACE_ERROR ((LM_ERROR,
00481 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00482 ACE_TEXT ("out of memory\n")));
00483 }
00484 return -1;
00485 }
00486
00487 if (copying_len > incoming.length ())
00488 {
00489 qd->missing_data_ = copying_len - incoming.length ();
00490 copying_len = incoming.length ();
00491 }
00492 else
00493 {
00494 qd->missing_data_ = 0;
00495 }
00496
00497 qd->msg_block_->copy (incoming.rd_ptr (),
00498 copying_len);
00499
00500 incoming.rd_ptr (copying_len);
00501 this->init_queued_data (qd, state);
00502
00503 return 1;
00504 }
00505
00506 int
00507 TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
00508 ACE_Message_Block &incoming)
00509 {
00510
00511 if (qd->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
00512 {
00513
00514
00515 size_t const len =
00516 qd->msg_block_->length ();
00517
00518
00519 if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
00520 {
00521
00522
00523 return -1;
00524 }
00525
00526
00527
00528
00529 size_t const available = incoming.length ();
00530 size_t const desired = TAO_GIOP_MESSAGE_HEADER_LEN - len;
00531 size_t const n_copy = ace_min (available, desired);
00532
00533
00534 if (n_copy == 0)
00535 {
00536 return -1;
00537 }
00538
00539 if (qd->msg_block_->copy (incoming.rd_ptr (),
00540 n_copy) == -1)
00541 {
00542 return -1;
00543 }
00544
00545
00546 incoming.rd_ptr (n_copy);
00547
00548
00549 if (qd->msg_block_->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00550 {
00551 return 0;
00552 }
00553
00554 TAO_GIOP_Message_State state;
00555
00556
00557 if (state.parse_message_header (*qd->msg_block_) == -1)
00558 {
00559 if (TAO_debug_level > 0)
00560 {
00561 ACE_ERROR ((LM_ERROR,
00562 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
00563 ACE_TEXT ("error parsing header\n") ));
00564 }
00565 return -1;
00566 }
00567
00568
00569 if (ACE_CDR::grow (qd->msg_block_,
00570 state.message_size ()) == -1)
00571 {
00572
00573
00574 return -1;
00575 }
00576
00577
00578
00579 size_t copy_len = state.payload_size ();
00580
00581
00582
00583 if (copy_len > incoming.length ())
00584 {
00585
00586 qd->missing_data_ = copy_len - incoming.length ();
00587
00588
00589 copy_len = incoming.length ();
00590 }
00591 else
00592 {
00593 qd->missing_data_ = 0;
00594 }
00595
00596
00597
00598 if (qd->msg_block_->copy (incoming.rd_ptr (),
00599 copy_len) == -1)
00600 {
00601 return -1;
00602 }
00603
00604
00605 incoming.rd_ptr (copy_len);
00606
00607
00608 this->init_queued_data (qd, state);
00609 }
00610 else
00611 {
00612
00613 size_t copy_len = qd->missing_data_;
00614
00615 if (copy_len > incoming.length ())
00616 {
00617
00618 qd->missing_data_ = copy_len - incoming.length ();
00619
00620
00621 copy_len = incoming.length ();
00622 }
00623
00624
00625 if (copy_len == 0)
00626 {
00627 return -1;
00628 }
00629
00630
00631
00632 if (qd->msg_block_->copy (incoming.rd_ptr (),
00633 copy_len) == -1)
00634 {
00635 return -1;
00636 }
00637
00638
00639 qd->msg_block_->rd_ptr (copy_len);
00640
00641 }
00642
00643 return 0;
00644 }
00645
00646 int
00647 TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
00648 TAO_Queued_Data *qd)
00649 {
00650
00651 this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
00652
00653
00654 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00655
00656
00657 this->set_state (qd->major_version_,
00658 qd->minor_version_,
00659 generator_parser);
00660
00661
00662 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00663 char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
00664 #else
00665 char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
00666 #endif
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678 TAO_OutputCDR output (repbuf,
00679 sizeof repbuf,
00680 TAO_ENCAP_BYTE_ORDER,
00681 this->orb_core_->input_cdr_buffer_allocator (),
00682 this->orb_core_->input_cdr_dblock_allocator (),
00683 this->orb_core_->input_cdr_msgblock_allocator (),
00684 this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
00685 this->fragmentation_strategy_.get (),
00686 qd->major_version_,
00687 qd->minor_version_);
00688
00689
00690 size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
00691 size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
00692 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00693
00694 if (TAO_debug_level > 0)
00695 this->dump_msg ("recv",
00696 reinterpret_cast <u_char *> (qd->msg_block_->rd_ptr ()),
00697 qd->msg_block_->length ());
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710 ACE_Message_Block::Message_Flags flg = 0;
00711 ACE_Data_Block *db = 0;
00712
00713
00714 flg = qd->msg_block_->self_flags ();
00715
00716 if (ACE_BIT_ENABLED (flg,
00717 ACE_Message_Block::DONT_DELETE))
00718 {
00719
00720 db = qd->msg_block_->data_block ();
00721 }
00722 else
00723 {
00724
00725
00726 db = qd->msg_block_->data_block ()->duplicate ();
00727 }
00728
00729
00730 TAO_InputCDR input_cdr (db,
00731 flg,
00732 rd_pos,
00733 wr_pos,
00734 qd->byte_order_,
00735 qd->major_version_,
00736 qd->minor_version_,
00737 this->orb_core_);
00738
00739 transport->assign_translators(&input_cdr,&output);
00740
00741
00742
00743
00744
00745
00746
00747
00748 switch (qd->msg_type_)
00749 {
00750 case TAO_PLUGGABLE_MESSAGE_REQUEST:
00751
00752
00753
00754 return this->process_request (transport,
00755 input_cdr,
00756 output,
00757 generator_parser);
00758
00759 case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
00760 return this->process_locate_request (transport,
00761 input_cdr,
00762 output,
00763 generator_parser);
00764 default:
00765 return -1;
00766 }
00767 }
00768
00769 int
00770 TAO_GIOP_Message_Base::process_reply_message (
00771 TAO_Pluggable_Reply_Params ¶ms,
00772 TAO_Queued_Data *qd)
00773 {
00774
00775 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00776
00777
00778 this->set_state (qd->major_version_,
00779 qd->minor_version_,
00780 generator_parser);
00781
00782
00783 size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
00784 size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
00785 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00786
00787 if (TAO_debug_level > 0)
00788 this->dump_msg ("recv",
00789 reinterpret_cast <u_char *> (qd->msg_block_->rd_ptr ()),
00790 qd->msg_block_->length ());
00791
00792
00793
00794
00795
00796
00797 TAO_InputCDR input_cdr (qd->msg_block_->data_block (),
00798 ACE_Message_Block::DONT_DELETE,
00799 rd_pos,
00800 wr_pos,
00801 qd->byte_order_,
00802 qd->major_version_,
00803 qd->minor_version_,
00804 this->orb_core_);
00805
00806
00807
00808
00809
00810
00811
00812 int retval = 0;
00813
00814 switch (qd->msg_type_)
00815 {
00816 case TAO_PLUGGABLE_MESSAGE_REPLY:
00817
00818 retval =
00819 generator_parser->parse_reply (input_cdr,
00820 params);
00821
00822 break;
00823 case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
00824 retval =
00825 generator_parser->parse_locate_reply (input_cdr,
00826 params);
00827 break;
00828 default:
00829 retval = -1;
00830 }
00831
00832 if (retval == -1)
00833 return retval;
00834
00835 params.input_cdr_ = &input_cdr;
00836
00837 retval =
00838 params.transport_->tms ()->dispatch_reply (params);
00839
00840 if (retval == -1)
00841 {
00842
00843
00844 if (TAO_debug_level > 0)
00845 ACE_ERROR ((LM_ERROR,
00846 ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, ")
00847 ACE_TEXT ("dispatch reply failed\n"),
00848 params.transport_->id ()));
00849 }
00850
00851 return retval;
00852 }
00853
00854 int
00855 TAO_GIOP_Message_Base::generate_exception_reply (
00856 TAO_OutputCDR &cdr,
00857 TAO_Pluggable_Reply_Params_Base ¶ms,
00858 CORBA::Exception &x
00859 )
00860 {
00861
00862
00863 ACE_DECLARE_NEW_CORBA_ENV;
00864
00865 ACE_TRY
00866 {
00867
00868 this->generate_reply_header (cdr,
00869 params);
00870 x._tao_encode (cdr
00871 ACE_ENV_ARG_PARAMETER);
00872 ACE_TRY_CHECK;
00873 }
00874 ACE_CATCH (CORBA::Exception, ex)
00875 {
00876
00877
00878
00879
00880 if (TAO_debug_level > 0)
00881 ACE_DEBUG ((LM_DEBUG,
00882 ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
00883 ACE_TEXT ("generate_exception_reply ()\n")));
00884 return -1;
00885 }
00886 ACE_ENDTRY;
00887 ACE_CHECK_RETURN (-1);
00888
00889 return 0;
00890 }
00891
00892 int
00893 TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type,
00894 TAO_OutputCDR &msg)
00895 {
00896
00897 msg.reset ();
00898
00899 CORBA::Octet header[12] =
00900 {
00901
00902
00903 0x47,
00904 0x49,
00905 0x4f,
00906 0x50
00907 };
00908
00909 CORBA::Octet major, minor = 0;
00910
00911 (void) msg.get_version (major, minor);
00912
00913 header[4] = major;
00914 header[5] = minor;
00915
00916
00917
00918
00919 header[7] = CORBA::Octet (type);
00920
00921 static ACE_CDR::ULong const header_size =
00922 sizeof (header) / sizeof (header[0]);
00923
00924
00925
00926
00927 msg.write_octet_array (header, header_size);
00928
00929 return msg.good_bit ();
00930 }
00931
00932 int
00933 TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
00934 TAO_InputCDR &cdr,
00935 TAO_OutputCDR &output,
00936 TAO_GIOP_Message_Generator_Parser *parser)
00937 {
00938
00939
00940 TAO_ServerRequest request (this,
00941 cdr,
00942 output,
00943 transport,
00944 this->orb_core_);
00945
00946 CORBA::ULong request_id = 0;
00947 CORBA::Boolean response_required = false;
00948
00949 int parse_error = 0;
00950
00951 ACE_DECLARE_NEW_CORBA_ENV;
00952 ACE_TRY
00953 {
00954 parse_error =
00955 parser->parse_request_header (request);
00956
00957 TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager();
00958 if (csm)
00959 {
00960 csm->process_service_context(request);
00961 transport->assign_translators(&cdr,&output);
00962 }
00963
00964
00965 if (parse_error != 0)
00966 ACE_TRY_THROW (CORBA::MARSHAL (0,
00967 CORBA::COMPLETED_NO));
00968 request_id = request.request_id ();
00969
00970 response_required = request.response_expected ();
00971
00972 CORBA::Object_var forward_to;
00973
00974
00975
00976
00977
00978
00979
00980
00981
00982 this->orb_core_->request_dispatcher ()->dispatch (
00983 this->orb_core_,
00984 request,
00985 forward_to
00986 ACE_ENV_ARG_PARAMETER);
00987 ACE_TRY_CHECK;
00988
00989
00990
00991 if (!CORBA::is_nil (forward_to.in ()))
00992 {
00993 const CORBA::Boolean permanent_forward_condition =
00994 this->orb_core_->is_permanent_forward_condition
00995 (forward_to.in (),
00996 request.request_service_context ());
00997
00998
00999 TAO_Pluggable_Reply_Params_Base reply_params;
01000 reply_params.request_id_ = request_id;
01001 reply_params.reply_status_ =
01002 permanent_forward_condition
01003 ? TAO_GIOP_LOCATION_FORWARD_PERM
01004 : TAO_GIOP_LOCATION_FORWARD;
01005 reply_params.svc_ctx_.length (0);
01006
01007
01008 reply_params.service_context_notowned (&request.reply_service_info ());
01009
01010 output.message_attributes (request_id,
01011 0,
01012 TAO_Transport::TAO_REPLY,
01013 0);
01014
01015
01016 this->generate_reply_header (output,
01017 reply_params);
01018
01019 if (!(output << forward_to.in ()))
01020 {
01021 if (TAO_debug_level > 0)
01022 ACE_ERROR ((LM_ERROR,
01023 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
01024 ACE_TEXT ("forward reference.\n")));
01025
01026 return -1;
01027 }
01028
01029 output.more_fragments (false);
01030
01031 int result = transport->send_message (output,
01032 0,
01033 TAO_Transport::TAO_REPLY);
01034 if (result == -1)
01035 {
01036 if (TAO_debug_level > 0)
01037 {
01038
01039
01040 ACE_ERROR ((LM_ERROR,
01041 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
01042 ACE_TEXT ("cannot send reply\n"),
01043 ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
01044 }
01045 }
01046 return result;
01047 }
01048 }
01049
01050 ACE_CATCHANY
01051 {
01052 int result = 0;
01053
01054 if (response_required)
01055 {
01056 result = this->send_reply_exception (transport,
01057 output,
01058 request_id,
01059 &request.reply_service_info (),
01060 &ACE_ANY_EXCEPTION);
01061 if (result == -1)
01062 {
01063 if (TAO_debug_level > 0)
01064 {
01065 ACE_ERROR ((LM_ERROR,
01066 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
01067 ACE_TEXT ("cannot send exception\n"),
01068 ACE_TEXT ("process_connector_request ()")));
01069
01070 ACE_PRINT_EXCEPTION (
01071 ACE_ANY_EXCEPTION,
01072 "TAO_GIOP_Message_Base::process_request[1]");
01073 }
01074 }
01075
01076 }
01077 else if (TAO_debug_level > 0)
01078 {
01079
01080
01081
01082
01083
01084
01085 ACE_ERROR ((LM_ERROR,
01086 ACE_TEXT ("(%P|%t) exception thrown ")
01087 ACE_TEXT ("but client is not waiting a response\n")));
01088
01089 ACE_PRINT_EXCEPTION (
01090 ACE_ANY_EXCEPTION,
01091 "TAO_GIOP_Message_Base::process_request[2]");
01092 }
01093
01094 return result;
01095 }
01096 #if defined (TAO_HAS_EXCEPTIONS)
01097 ACE_CATCHALL
01098 {
01099
01100
01101
01102
01103
01104
01105
01106 int result = 0;
01107
01108 if (response_required)
01109 {
01110 CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code
01111 (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0),
01112 CORBA::COMPLETED_MAYBE);
01113
01114 result = this->send_reply_exception (transport,
01115 output,
01116 request_id,
01117 &request.reply_service_info (),
01118 &exception);
01119 if (result == -1)
01120 {
01121 if (TAO_debug_level > 0)
01122 {
01123 ACE_ERROR ((LM_ERROR,
01124 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
01125 ACE_TEXT ("%p: ")
01126 ACE_TEXT ("cannot send exception\n"),
01127 ACE_TEXT ("process_request ()")));
01128 ACE_PRINT_EXCEPTION (
01129 exception,
01130 "TAO_GIOP_Message_Base::process_request[3]");
01131 }
01132 }
01133 }
01134 else if (TAO_debug_level > 0)
01135 {
01136
01137
01138
01139
01140
01141 ACE_ERROR ((LM_ERROR,
01142 ACE_TEXT ("(%P|%t|%N|%l) exception thrown ")
01143 ACE_TEXT ("but client is not waiting a response\n")));
01144 }
01145
01146 return result;
01147 }
01148 #endif
01149 ACE_ENDTRY;
01150
01151 return 0;
01152 }
01153
01154
01155 int
01156 TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
01157 TAO_InputCDR &input,
01158 TAO_OutputCDR &output,
01159 TAO_GIOP_Message_Generator_Parser *parser)
01160 {
01161
01162
01163 TAO_GIOP_Locate_Request_Header locate_request (input,
01164 this->orb_core_);
01165
01166 TAO_GIOP_Locate_Status_Msg status_info;
01167
01168
01169 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01170
01171 CORBA::Boolean response_required = 1;
01172
01173 ACE_DECLARE_NEW_CORBA_ENV;
01174 ACE_TRY
01175 {
01176 int parse_error =
01177 parser->parse_locate_header (locate_request);
01178
01179 if (parse_error != 0)
01180 {
01181 ACE_TRY_THROW (CORBA::MARSHAL (0,
01182 CORBA::COMPLETED_NO));
01183 }
01184
01185 TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
01186 locate_request.object_key ().length (),
01187 locate_request.object_key ().get_buffer (),
01188 0);
01189
01190
01191 parse_error = 1;
01192 CORBA::ULong req_id = locate_request.request_id ();
01193
01194
01195
01196 CORBA::Boolean deferred_reply = true;
01197 TAO_ServerRequest server_request (this,
01198 req_id,
01199 response_required,
01200 deferred_reply,
01201 tmp_key,
01202 "_non_existent",
01203 output,
01204 transport,
01205 this->orb_core_,
01206 parse_error);
01207
01208 if (parse_error != 0)
01209 {
01210 ACE_TRY_THROW (CORBA::MARSHAL (0,
01211 CORBA::COMPLETED_NO));
01212 }
01213
01214 CORBA::Object_var forward_to;
01215
01216 this->orb_core_->request_dispatcher ()->dispatch (
01217 this->orb_core_,
01218 server_request,
01219 forward_to
01220 ACE_ENV_ARG_PARAMETER);
01221 ACE_TRY_CHECK;
01222
01223 if (!CORBA::is_nil (forward_to.in ()))
01224 {
01225 status_info.status = TAO_GIOP_OBJECT_FORWARD;
01226 status_info.forward_location_var = forward_to;
01227 if (TAO_debug_level > 0)
01228 ACE_DEBUG ((LM_DEBUG,
01229 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01230 ACE_TEXT ("called: forwarding\n")));
01231 }
01232 else if (server_request.exception_type () == TAO_GIOP_NO_EXCEPTION)
01233 {
01234
01235 status_info.status = TAO_GIOP_OBJECT_HERE;
01236 if (TAO_debug_level > 0)
01237 ACE_DEBUG ((LM_DEBUG,
01238 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01239 ACE_TEXT ("found\n")));
01240 }
01241 else
01242 {
01243 status_info.forward_location_var = server_request.forward_location ();
01244
01245 if (!CORBA::is_nil (status_info.forward_location_var.in ()))
01246 {
01247 status_info.status = TAO_GIOP_OBJECT_FORWARD;
01248 ACE_DEBUG ((LM_DEBUG,
01249 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01250 ACE_TEXT ("forwarding\n")));
01251 }
01252 else
01253 {
01254
01255 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01256 ACE_DEBUG ((LM_DEBUG,
01257 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01258 ACE_TEXT ("not here\n")));
01259 }
01260 }
01261 }
01262
01263 ACE_CATCHANY
01264 {
01265
01266 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01267 if (TAO_debug_level > 0)
01268 ACE_DEBUG ((LM_DEBUG,
01269 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01270 ACE_TEXT ("CORBA exception raised\n")));
01271 }
01272 #if defined (TAO_HAS_EXCEPTIONS)
01273 ACE_CATCHALL
01274 {
01275
01276 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01277 if (TAO_debug_level > 0)
01278 ACE_DEBUG ((LM_DEBUG,
01279 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
01280 ACE_TEXT ("C++ exception raised\n")));
01281 }
01282 #endif
01283 ACE_ENDTRY;
01284
01285 return this->make_send_locate_reply (transport,
01286 locate_request,
01287 status_info,
01288 output,
01289 parser);
01290 }
01291
01292 int
01293 TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
01294 TAO_GIOP_Locate_Request_Header &request,
01295 TAO_GIOP_Locate_Status_Msg &status_info,
01296 TAO_OutputCDR &output,
01297 TAO_GIOP_Message_Generator_Parser *parser)
01298 {
01299
01300
01301
01302 this->write_protocol_header (TAO_GIOP_LOCATEREPLY,
01303 output);
01304
01305
01306 parser->write_locate_reply_mesg (output,
01307 request.request_id (),
01308 status_info);
01309
01310 output.more_fragments (false);
01311
01312
01313 int result = transport->send_message (output,
01314 0,
01315 TAO_Transport::TAO_REPLY);
01316
01317
01318 if (result == -1)
01319 {
01320 if (TAO_debug_level > 0)
01321 {
01322 ACE_ERROR ((LM_ERROR,
01323 ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
01324 ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
01325 }
01326 }
01327
01328 return result;
01329 }
01330
01331
01332
01333
01334
01335
01336 int
01337 TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
01338 {
01339 const char
01340 error_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01341 {
01342
01343
01344 0x47,
01345 0x49,
01346 0x4f,
01347 0x50,
01348 (CORBA::Octet) 1,
01349 (CORBA::Octet) 0,
01350 TAO_ENCAP_BYTE_ORDER,
01351 TAO_GIOP_MESSAGERROR,
01352 0, 0, 0, 0
01353 };
01354
01355
01356
01357
01358 this->dump_msg ("send_error",
01359 (const u_char *) error_message,
01360 TAO_GIOP_MESSAGE_HEADER_LEN);
01361
01362 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01363 ACE_Message_Block::MB_DATA,
01364 error_message,
01365 0,
01366 0,
01367 ACE_Message_Block::DONT_DELETE,
01368 0);
01369 ACE_Message_Block message_block(&data_block,
01370 ACE_Message_Block::DONT_DELETE);
01371 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01372
01373 size_t bt;
01374 int result = transport->send_message_block_chain (&message_block, bt);
01375 if (result == -1)
01376 {
01377 if (TAO_debug_level > 0)
01378 ACE_DEBUG ((LM_DEBUG,
01379 ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
01380 transport->id ()));
01381 }
01382
01383 return result;
01384 }
01385
01386 void
01387 TAO_GIOP_Message_Base::set_state (
01388 CORBA::Octet def_major,
01389 CORBA::Octet def_minor,
01390 TAO_GIOP_Message_Generator_Parser *&gen_parser) const
01391 {
01392 switch (def_major)
01393 {
01394 case 1:
01395 switch (def_minor)
01396 {
01397 case 0:
01398 gen_parser =
01399 const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
01400 &this->tao_giop_impl_.tao_giop_10);
01401 break;
01402 case 1:
01403 gen_parser =
01404 const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
01405 &this->tao_giop_impl_.tao_giop_11);
01406 break;
01407 case 2:
01408 gen_parser =
01409 const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
01410 &this->tao_giop_impl_.tao_giop_12);
01411 break;
01412 default:
01413 break;
01414 }
01415 break;
01416 default:
01417 break;
01418 }
01419 }
01420
01421
01422
01423
01424
01425
01426
01427
01428
01429
01430
01431
01432
01433
01434
01435
01436
01437 void
01438 TAO_GIOP_Message_Base::
01439 send_close_connection (const TAO_GIOP_Message_Version &version,
01440 TAO_Transport *transport,
01441 void *)
01442 {
01443
01444
01445
01446
01447 const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01448 {
01449
01450
01451 0x47,
01452 0x49,
01453 0x4f,
01454 0x50,
01455 version.major,
01456 version.minor,
01457 TAO_ENCAP_BYTE_ORDER,
01458 TAO_GIOP_CLOSECONNECTION,
01459 0, 0, 0, 0
01460 };
01461
01462
01463
01464
01465
01466
01467
01468 this->dump_msg ("send_close_connection",
01469 (const u_char *) close_message,
01470 TAO_GIOP_MESSAGE_HEADER_LEN);
01471
01472 #if 0
01473
01474
01475
01476
01477
01478
01479
01480 ACE_HANDLE which = transport->handle ();
01481 if (which == ACE_INVALID_HANDLE)
01482 {
01483 if (TAO_debug_level > 0)
01484 ACE_DEBUG ((LM_DEBUG,
01485 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -")
01486 ACE_TEXT (" connection already closed\n")));
01487 return;
01488 }
01489 #endif
01490
01491 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01492 ACE_Message_Block::MB_DATA,
01493 close_message,
01494 0,
01495 0,
01496 ACE_Message_Block::DONT_DELETE,
01497 0);
01498 ACE_Message_Block message_block(&data_block);
01499 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01500
01501 size_t bt;
01502 int result = transport->send_message_block_chain (&message_block, bt);
01503 if (result == -1)
01504 {
01505 if (TAO_debug_level > 0)
01506 ACE_ERROR ((LM_ERROR,
01507 ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
01508 transport->id (), errno));
01509 }
01510
01511 transport->close_connection ();
01512 ACE_DEBUG ((LM_DEBUG,
01513 ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
01514 transport-> id ()));
01515
01516 }
01517
01518
01519 int
01520 TAO_GIOP_Message_Base::send_reply_exception (
01521 TAO_Transport *transport,
01522 TAO_OutputCDR &output,
01523 CORBA::ULong request_id,
01524 IOP::ServiceContextList *svc_info,
01525 CORBA::Exception *x
01526 )
01527 {
01528 TAO_Pluggable_Reply_Params_Base reply_params;
01529 reply_params.request_id_ = request_id;
01530 reply_params.svc_ctx_.length (0);
01531
01532
01533 reply_params.argument_flag_ = 1;
01534
01535
01536
01537 reply_params.service_context_notowned (svc_info);
01538
01539 reply_params.reply_status_ = TAO_GIOP_USER_EXCEPTION;
01540
01541 if (CORBA::SystemException::_downcast (x) != 0)
01542 {
01543 reply_params.reply_status_ = TAO_GIOP_SYSTEM_EXCEPTION;
01544 }
01545
01546 if (this->generate_exception_reply (output,
01547 reply_params,
01548 *x) == -1)
01549 return -1;
01550
01551 output.more_fragments (false);
01552
01553 return transport->send_message (output,
01554 0,
01555 TAO_Transport::TAO_REPLY);
01556 }
01557
01558 void
01559 TAO_GIOP_Message_Base::dump_msg (const char *label,
01560 const u_char *ptr,
01561 size_t len)
01562 {
01563
01564 if (TAO_debug_level >= 5)
01565 {
01566 static const char digits[] = "0123456789ABCD";
01567 static const char *names[] =
01568 {
01569 "Request",
01570 "Reply",
01571 "CancelRequest",
01572 "LocateRequest",
01573 "LocateReply",
01574 "CloseConnection",
01575 "MessageError",
01576 "Fragment"
01577 };
01578
01579
01580 const char *message_name = "UNKNOWN MESSAGE";
01581 u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET];
01582 if (slot < sizeof (names) / sizeof (names[0]))
01583 message_name = names[slot];
01584
01585
01586 int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
01587
01588
01589 CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
01590 CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
01591
01592
01593 CORBA::ULong tmp = 0;
01594 CORBA::ULong *id = &tmp;
01595 char *tmp_id = 0;
01596
01597 if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST ||
01598 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY ||
01599 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_FRAGMENT)
01600 {
01601 if (major == 1 && minor < 2)
01602 {
01603
01604 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4);
01605 }
01606 else
01607 {
01608 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
01609 }
01610 #if !defined (ACE_DISABLE_SWAP_ON_READ)
01611 if (byte_order == TAO_ENCAP_BYTE_ORDER)
01612 {
01613 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01614 }
01615 else
01616 {
01617 ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
01618 }
01619 #else
01620 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01621 #endif
01622
01623 }
01624
01625
01626 ACE_DEBUG ((LM_DEBUG,
01627 "TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
01628 "%s GIOP v%c.%c msg, %d data bytes, %s endian, "
01629 "Type %s[%u]\n",
01630 ACE_TEXT_CHAR_TO_TCHAR (label),
01631 digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
01632 digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
01633 len - TAO_GIOP_MESSAGE_HEADER_LEN ,
01634 (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
01635 ACE_TEXT_CHAR_TO_TCHAR(message_name),
01636 *id));
01637
01638 if (TAO_debug_level >= 10)
01639 ACE_HEX_DUMP ((LM_DEBUG,
01640 (const char *) ptr,
01641 len,
01642 ACE_TEXT ("GIOP message")));
01643 }
01644 }
01645
01646 int
01647 TAO_GIOP_Message_Base::generate_locate_reply_header (
01648 TAO_OutputCDR & ,
01649 TAO_Pluggable_Reply_Params_Base & )
01650 {
01651 return 0;
01652 }
01653
01654 int
01655 TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg)
01656 {
01657
01658 TAO_GIOP_Message_Generator_Parser *parser = 0;
01659
01660 CORBA::Octet major, minor = 0;
01661
01662 msg.get_version (major, minor);
01663
01664
01665 this->set_state (major,
01666 minor,
01667 parser);
01668
01669
01670
01671
01672
01673 return parser->is_ready_for_bidirectional ();
01674 }
01675
01676
01677 TAO_Queued_Data *
01678 TAO_GIOP_Message_Base::make_queued_data (size_t sz)
01679 {
01680
01681 TAO_Queued_Data *qd =
01682 TAO_Queued_Data::make_queued_data (
01683 this->orb_core_->transport_message_buffer_allocator ());
01684
01685 if (qd == 0)
01686 {
01687 if (TAO_debug_level > 0)
01688 {
01689 ACE_ERROR ((LM_ERROR,
01690 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01691 ACE_TEXT ("our of memory, failed to allocate queued data object\n")));
01692 }
01693 return 0;
01694 }
01695
01696
01697
01698
01699
01700
01701
01702
01703 ACE_Data_Block *db =
01704 this->orb_core_->create_input_cdr_data_block (sz +
01705 ACE_CDR::MAX_ALIGNMENT);
01706
01707 if (db == 0)
01708 {
01709 TAO_Queued_Data::release (qd);
01710
01711 if (TAO_debug_level > 0)
01712 {
01713 ACE_ERROR ((LM_ERROR,
01714 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01715 ACE_TEXT ("out of memory, failed to allocate input data block of size %u\n"),
01716 sz));
01717 }
01718 return 0;
01719 }
01720
01721 ACE_Allocator *alloc =
01722 this->orb_core_->input_cdr_msgblock_allocator ();
01723
01724 if (alloc == 0)
01725 {
01726 if (TAO_debug_level >= 8)
01727 {
01728 ACE_DEBUG ((LM_DEBUG,
01729 ACE_TEXT ("(%P|%t) - TAO_GIOP_Message_Base::make_queued_data,")
01730 ACE_TEXT (" no ACE_Allocator defined\n")));
01731 }
01732 }
01733
01734
01735 ACE_Message_Block mb (db,
01736 0,
01737 alloc);
01738
01739 ACE_Message_Block *new_mb = mb.duplicate ();
01740
01741 if (new_mb == 0)
01742 {
01743 TAO_Queued_Data::release (qd);
01744 db->release();
01745
01746 if (TAO_debug_level > 0)
01747 {
01748 ACE_ERROR ((LM_ERROR,
01749 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01750 ACE_TEXT ("out of memory, failed to allocate message block\n")));
01751 }
01752 return 0;
01753 }
01754
01755 ACE_CDR::mb_align (new_mb);
01756
01757 qd->msg_block_ = new_mb;
01758
01759 return qd;
01760 }
01761
01762 size_t
01763 TAO_GIOP_Message_Base::header_length (void) const
01764 {
01765 return TAO_GIOP_MESSAGE_HEADER_LEN;
01766 }
01767
01768 size_t
01769 TAO_GIOP_Message_Base::fragment_header_length (CORBA::Octet major,
01770 CORBA::Octet minor) const
01771 {
01772 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
01773
01774
01775 this->set_state (major,
01776 minor,
01777 generator_parser);
01778
01779 return generator_parser->fragment_header_length ();
01780 }
01781
01782 void
01783 TAO_GIOP_Message_Base::init_queued_data (
01784 TAO_Queued_Data* qd,
01785 const TAO_GIOP_Message_State& state) const
01786 {
01787 qd->byte_order_ = state.byte_order_;
01788 qd->major_version_ = state.giop_version_.major;
01789 qd->minor_version_ = state.giop_version_.minor;
01790 qd->more_fragments_ = state.more_fragments_;
01791 qd->msg_type_ = this->message_type (state);
01792 }
01793
01794 int
01795 TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd, CORBA::ULong &request_id) const
01796 {
01797
01798 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
01799
01800
01801 this->set_state (qd->major_version_,
01802 qd->minor_version_,
01803 generator_parser);
01804
01805
01806 size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
01807 size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
01808 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
01809
01810
01811
01812
01813
01814
01815
01816
01817
01818
01819
01820 ACE_Message_Block::Message_Flags flg = 0;
01821 ACE_Data_Block *db = 0;
01822
01823
01824 flg = qd->msg_block_->self_flags ();
01825
01826 if (ACE_BIT_ENABLED (flg,
01827 ACE_Message_Block::DONT_DELETE))
01828 {
01829
01830 db = qd->msg_block_->data_block ();
01831 }
01832 else
01833 {
01834
01835
01836 db = qd->msg_block_->data_block ()->duplicate ();
01837 }
01838
01839
01840 TAO_InputCDR input_cdr (db,
01841 flg,
01842 rd_pos,
01843 wr_pos,
01844 qd->byte_order_,
01845 qd->major_version_,
01846 qd->minor_version_,
01847 this->orb_core_);
01848
01849 if (qd->major_version_ >= 1 &&
01850 (qd->minor_version_ == 0 || qd->minor_version_ == 1))
01851 {
01852 if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST ||
01853 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY)
01854 {
01855 IOP::ServiceContextList service_context;
01856
01857 if ( ! (input_cdr >> service_context &&
01858 input_cdr >> request_id) )
01859 {
01860 return -1;
01861 }
01862
01863 return 0;
01864 }
01865 else if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST ||
01866 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST ||
01867 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
01868 {
01869 if ( ! (input_cdr >> request_id) )
01870 {
01871 return -1;
01872 }
01873
01874 return 0;
01875 }
01876 else
01877 {
01878 return -1;
01879 }
01880 }
01881 else
01882 {
01883 if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST ||
01884 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY ||
01885 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT ||
01886 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST ||
01887 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST ||
01888 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
01889 {
01890
01891
01892 if ( ! (input_cdr >> request_id) )
01893 {
01894 return -1;
01895 }
01896
01897 return 0;
01898 }
01899 else
01900 {
01901 return -1;
01902 }
01903 }
01904
01905 return -1;
01906 }
01907
01908
01909 int
01910 TAO_GIOP_Message_Base::consolidate_fragmented_message (TAO_Queued_Data *qd, TAO_Queued_Data *&msg)
01911 {
01912 TAO::Incoming_Message_Stack reverse_stack;
01913
01914 TAO_Queued_Data *tail = 0;
01915 TAO_Queued_Data *head = 0;
01916
01917
01918
01919
01920
01921
01922 if (qd == 0)
01923 {
01924 return -1;
01925 }
01926
01927 if (qd->major_version_ == 1 && qd->minor_version_ == 0)
01928 {
01929 TAO_Queued_Data::release (qd);
01930 return -1;
01931 }
01932
01933
01934 if (qd->more_fragments_)
01935 {
01936 this->fragment_stack_.push (qd);
01937
01938 msg = 0;
01939 return 1;
01940 }
01941
01942 tail = qd;
01943
01944
01945
01946 const size_t header_adjustment =
01947 this->header_length () +
01948 this->fragment_header_length (tail->major_version_,
01949 tail->minor_version_);
01950
01951 if (tail->msg_block_->length () < header_adjustment)
01952 {
01953
01954 TAO_Queued_Data::release (qd);
01955 return -1;
01956 }
01957
01958
01959 if (tail->major_version_ == 1 && tail->minor_version_ == 1)
01960 {
01961
01962
01963 while (this->fragment_stack_.pop (head) != -1)
01964 {
01965 if (head->more_fragments_ &&
01966 head->major_version_ == 1 &&
01967 head->minor_version_ == 1 &&
01968 head->msg_block_->length () >= header_adjustment)
01969 {
01970
01971 tail->msg_block_->rd_ptr(header_adjustment);
01972
01973 head->msg_block_->cont (tail->msg_block_);
01974
01975 tail->msg_block_ = 0;
01976
01977 TAO_Queued_Data::release (tail);
01978
01979 tail = head;
01980 }
01981 else
01982 {
01983 reverse_stack.push (head);
01984 }
01985 }
01986 }
01987 else
01988 {
01989
01990
01991 CORBA::ULong tmp_request_id = 0;
01992 if (this->parse_request_id (tail, tmp_request_id) == -1)
01993 {
01994 return -1;
01995 }
01996
01997 const CORBA::ULong request_id = tmp_request_id;
01998
01999 while (this->fragment_stack_.pop (head) != -1)
02000 {
02001 CORBA::ULong head_request_id = 0;
02002 int parse_status = 0;
02003
02004 if (head->more_fragments_ &&
02005 head->major_version_ >= 1 &&
02006 head->minor_version_ >= 2 &&
02007 head->msg_block_->length () >= header_adjustment &&
02008 (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
02009 request_id == head_request_id)
02010 {
02011
02012 tail->msg_block_->rd_ptr(header_adjustment);
02013
02014 head->msg_block_->cont (tail->msg_block_);
02015
02016 tail->msg_block_ = 0;
02017
02018 TAO_Queued_Data::release (tail);
02019
02020 tail = head;
02021 }
02022 else
02023 {
02024 if (parse_status == -1)
02025 {
02026 TAO_Queued_Data::release (head);
02027 return -1;
02028 }
02029
02030 reverse_stack.push (head);
02031 }
02032 }
02033 }
02034
02035
02036 while (reverse_stack.pop (head) != -1)
02037 {
02038 this->fragment_stack_.push (head);
02039 }
02040
02041 if (tail->consolidate () == -1)
02042 {
02043
02044 TAO_Queued_Data::release (tail);
02045 return -1;
02046 }
02047
02048
02049 msg = tail;
02050
02051 return 0;
02052 }
02053
02054
02055 int
02056 TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request)
02057 {
02058
02059
02060
02061 TAO::Incoming_Message_Stack reverse_stack;
02062
02063 CORBA::ULong cancel_request_id;
02064
02065 if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
02066 {
02067 return -1;
02068 }
02069
02070 TAO_Queued_Data *head = 0;
02071
02072
02073 while (this->fragment_stack_.pop (head) != -1)
02074 {
02075 reverse_stack.push (head);
02076 }
02077
02078 bool discard_all_GIOP11_messages = false;
02079
02080
02081
02082
02083
02084
02085
02086 while (reverse_stack.pop (head) != -1)
02087 {
02088 CORBA::ULong head_request_id;
02089
02090 if (head->major_version_ == 1 &&
02091 head->minor_version_ <= 1 &&
02092 head->msg_type_ != TAO_PLUGGABLE_MESSAGE_FRAGMENT &&
02093 this->parse_request_id (head, head_request_id) >= 0 &&
02094 cancel_request_id == head_request_id)
02095 {
02096 TAO_Queued_Data::release (head);
02097 discard_all_GIOP11_messages = true;
02098 }
02099 else if (head->major_version_ == 1 &&
02100 head->minor_version_ <= 1 &&
02101 discard_all_GIOP11_messages)
02102 {
02103 TAO_Queued_Data::release (head);
02104 }
02105 else if (head->major_version_ >= 1 &&
02106 head->minor_version_ >= 2 &&
02107 this->parse_request_id (head, head_request_id) >= 0 &&
02108 cancel_request_id == head_request_id)
02109 {
02110 TAO_Queued_Data::release (head);
02111 }
02112 else
02113 {
02114 this->fragment_stack_.push (head);
02115 }
02116 }
02117
02118 return 0;
02119 }
02120
02121 TAO_GIOP_Fragmentation_Strategy *
02122 TAO_GIOP_Message_Base::fragmentation_strategy (void)
02123 {
02124 return this->fragmentation_strategy_.get ();
02125 }
02126
02127 void
02128 TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
02129 {
02130 CORBA::Octet * const buf =
02131 reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
02132
02133 CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
02134 CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
02135
02136
02137 CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
02138
02139
02140 ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
02141
02142
02143
02144
02145 if (!(major <= 1 && minor == 0))
02146 ACE_SET_BITS (flags, msg.more_fragments () << 1);
02147 }
02148
02149 TAO_END_VERSIONED_NAMESPACE_DECL