#include <GIOP_Message_Base.h>
Inheritance diagram for TAO_GIOP_Message_Base:


This class will hold the specific details common to all the GIOP versions. Some of them which are here may be shifted if things start changing between versions.
Definition at line 51 of file GIOP_Message_Base.h.
|
||||||||||||||||
|
Constructor.
Definition at line 27 of file GIOP_Message_Base.cpp. References TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR, and TAO_ENCAP_BYTE_ORDER.
00030 : orb_core_ (orb_core) 00031 , message_state_ () 00032 , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport)) 00033 , out_stream_ (0, 00034 input_cdr_size, 00035 TAO_ENCAP_BYTE_ORDER, 00036 orb_core->output_cdr_buffer_allocator (), 00037 orb_core->output_cdr_dblock_allocator (), 00038 orb_core->output_cdr_msgblock_allocator (), 00039 orb_core->orb_params ()->cdr_memcpy_tradeoff (), 00040 fragmentation_strategy_.get (), 00041 TAO_DEF_GIOP_MAJOR, 00042 TAO_DEF_GIOP_MINOR) 00043 { 00044 } |
|
|
Dtor.
Definition at line 47 of file GIOP_Message_Base.cpp.
00048 {
00049 }
|
|
||||||||||||
|
Consolidate fragmented message with associated fragments, being stored withi this class. If reliable transport is used (like TCP) fragments are partialy ordered on stack, last fragment on top. Otherwise If un-reliable transport is used (like UDP) fragments may be dis-ordered, and must be ordered before consolidation.
Implements TAO_Pluggable_Messaging. Definition at line 1910 of file GIOP_Message_Base.cpp. References TAO_Queued_Data::consolidate(), ACE_Message_Block::cont(), fragment_header_length(), fragment_stack_, header_length(), ACE_Message_Block::length(), TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_block_, parse_request_id(), TAO::Incoming_Message_Stack::pop(), TAO::Incoming_Message_Stack::push(), ACE_Message_Block::rd_ptr(), and TAO_Queued_Data::release().
01911 {
01912 TAO::Incoming_Message_Stack reverse_stack;
01913
01914 TAO_Queued_Data *tail = 0;
01915 TAO_Queued_Data *head = 0;
01916
01917 //
01918 // CONSOLIDATE FRAGMENTED MESSAGE
01919 //
01920
01921 // check for error-condition
01922 if (qd == 0)
01923 {
01924 return -1;
01925 }
01926
01927 if (qd->major_version_ == 1 && qd->minor_version_ == 0)
01928 {
01929 TAO_Queued_Data::release (qd);
01930 return -1; // error: GIOP-1.0 does not support fragments
01931 }
01932
01933 // If this is not the last fragment, push it onto stack for later processing
01934 if (qd->more_fragments_)
01935 {
01936 this->fragment_stack_.push (qd);
01937
01938 msg = 0; // no consolidated message available yet
01939 return 1; // status: more messages expected.
01940 }
01941
01942 tail = qd; // init
01943
01944 // Add the current message block to the end of the chain
01945 // after adjusting the read pointer to skip the header(s)
01946 const size_t header_adjustment =
01947 this->header_length () +
01948 this->fragment_header_length (tail->major_version_,
01949 tail->minor_version_);
01950
01951 if (tail->msg_block_->length () < header_adjustment)
01952 {
01953 // buffer length not sufficient
01954 TAO_Queued_Data::release (qd);
01955 return -1;
01956 }
01957
01958 // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2
01959 if (tail->major_version_ == 1 && tail->minor_version_ == 1)
01960 {
01961 // GIOP-1.1
01962
01963 while (this->fragment_stack_.pop (head) != -1)
01964 {
01965 if (head->more_fragments_ &&
01966 head->major_version_ == 1 &&
01967 head->minor_version_ == 1 &&
01968 head->msg_block_->length () >= header_adjustment)
01969 {
01970 // adjust the read-pointer, skip the fragment header
01971 tail->msg_block_->rd_ptr(header_adjustment);
01972
01973 head->msg_block_->cont (tail->msg_block_);
01974
01975 tail->msg_block_ = 0;
01976
01977 TAO_Queued_Data::release (tail);
01978
01979 tail = head;
01980 }
01981 else
01982 {
01983 reverse_stack.push (head);
01984 }
01985 }
01986 }
01987 else
01988 {
01989 // > GIOP-1.2
01990
01991 CORBA::ULong tmp_request_id = 0;
01992 if (this->parse_request_id (tail, tmp_request_id) == -1)
01993 {
01994 return -1;
01995 }
01996
01997 const CORBA::ULong request_id = tmp_request_id;
01998
01999 while (this->fragment_stack_.pop (head) != -1)
02000 {
02001 CORBA::ULong head_request_id = 0;
02002 int parse_status = 0;
02003
02004 if (head->more_fragments_ &&
02005 head->major_version_ >= 1 &&
02006 head->minor_version_ >= 2 &&
02007 head->msg_block_->length () >= header_adjustment &&
02008 (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
02009 request_id == head_request_id)
02010 {
02011 // adjust the read-pointer, skip the fragment header
02012 tail->msg_block_->rd_ptr(header_adjustment);
02013
02014 head->msg_block_->cont (tail->msg_block_);
02015
02016 tail->msg_block_ = 0;
02017
02018 TAO_Queued_Data::release (tail);
02019
02020 tail = head;
02021 }
02022 else
02023 {
02024 if (parse_status == -1)
02025 {
02026 TAO_Queued_Data::release (head);
02027 return -1;
02028 }
02029
02030 reverse_stack.push (head);
02031 }
02032 }
02033 }
02034
02035 // restore stack
02036 while (reverse_stack.pop (head) != -1)
02037 {
02038 this->fragment_stack_.push (head);
02039 }
02040
02041 if (tail->consolidate () == -1)
02042 {
02043 // memory allocation failed
02044 TAO_Queued_Data::release (tail);
02045 return -1;
02046 }
02047
02048 // set out value
02049 msg = tail;
02050
02051 return 0;
02052 }
|
|
||||||||||||
|
Check whether the node qd needs consolidation from incoming.
Implements TAO_Pluggable_Messaging. Definition at line 507 of file GIOP_Message_Base.cpp. References ACE_ERROR, ace_min(), ACE_TEXT, ACE_Message_Block::copy(), ACE_CDR::grow(), init_queued_data(), ACE_Message_Block::length(), LM_ERROR, TAO_GIOP_Message_State::message_size(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::msg_block_, TAO_GIOP_Message_State::parse_message_header(), TAO_GIOP_Message_State::payload_size(), ACE_Message_Block::rd_ptr(), TAO_debug_level, and TAO_MISSING_DATA_UNDEFINED.
00509 {
00510 // Look to see whether we had atleast parsed the GIOP header ...
00511 if (qd->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
00512 {
00513 // The data length that has been stuck in there during the last
00514 // read ....
00515 size_t const len =
00516 qd->msg_block_->length ();
00517
00518 // paranoid check
00519 if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
00520 {
00521 // inconsistency - this code should have parsed the header
00522 // so far
00523 return -1;
00524 }
00525
00526 // We know that we would have space for
00527 // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
00528 // from the <incoming> into the message block in <qd>
00529 size_t const available = incoming.length ();
00530 size_t const desired = TAO_GIOP_MESSAGE_HEADER_LEN - len;
00531 size_t const n_copy = ace_min (available, desired);
00532
00533 // paranoid check, but would cause endless looping
00534 if (n_copy == 0)
00535 {
00536 return -1;
00537 }
00538
00539 if (qd->msg_block_->copy (incoming.rd_ptr (),
00540 n_copy) == -1)
00541 {
00542 return -1;
00543 }
00544
00545 // Move the rd_ptr () in the incoming message block..
00546 incoming.rd_ptr (n_copy);
00547
00548 // verify sufficient data to parse GIOP header
00549 if (qd->msg_block_->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00550 {
00551 return 0; /* continue */
00552 }
00553
00554 TAO_GIOP_Message_State state;
00555
00556 // Parse the message header now...
00557 if (state.parse_message_header (*qd->msg_block_) == -1)
00558 {
00559 if (TAO_debug_level > 0)
00560 {
00561 ACE_ERROR ((LM_ERROR,
00562 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
00563 ACE_TEXT ("error parsing header\n") ));
00564 }
00565 return -1;
00566 }
00567 // Now grow the message block so that we can copy the rest of
00568 // the data, the message_block must be able to hold complete message
00569 if (ACE_CDR::grow (qd->msg_block_,
00570 state.message_size ()) == -1) /* GIOP_Header + Payload */
00571 {
00572 // on mem-error get rid of context silently, try to avoid
00573 // system calls that might allocate additional memory
00574 return -1;
00575 }
00576
00577 // Copy the pay load..
00578 // Calculate the bytes that needs to be copied in the queue...
00579 size_t copy_len = state.payload_size ();
00580
00581 // If the data that needs to be copied is more than that is
00582 // available to us ..
00583 if (copy_len > incoming.length ())
00584 {
00585 // Calculate the missing data..
00586 qd->missing_data_ = copy_len - incoming.length ();
00587
00588 // Set the actual possible copy_len that is available...
00589 copy_len = incoming.length ();
00590 }
00591 else
00592 {
00593 qd->missing_data_ = 0;
00594 }
00595
00596 // ..now we are set to copy the right amount of data to the
00597 // node..
00598 if (qd->msg_block_->copy (incoming.rd_ptr (),
00599 copy_len) == -1)
00600 {
00601 return -1;
00602 }
00603
00604 // Set the <rd_ptr> of the <incoming>..
00605 incoming.rd_ptr (copy_len);
00606
00607 // Get the other details...
00608 this->init_queued_data (qd, state);
00609 }
00610 else
00611 {
00612 // @@todo: Need to abstract this out to a seperate method...
00613 size_t copy_len = qd->missing_data_;
00614
00615 if (copy_len > incoming.length ())
00616 {
00617 // Calculate the missing data..
00618 qd->missing_data_ = copy_len - incoming.length ();
00619
00620 // Set the actual possible copy_len that is available...
00621 copy_len = incoming.length ();
00622 }
00623
00624 // paranoid check for endless-event-looping
00625 if (copy_len == 0)
00626 {
00627 return -1;
00628 }
00629
00630 // Copy the right amount of data in to the node...
00631 // node..
00632 if (qd->msg_block_->copy (incoming.rd_ptr (),
00633 copy_len) == -1)
00634 {
00635 return -1;
00636 }
00637
00638 // Set the <rd_ptr> of the <incoming>..
00639 qd->msg_block_->rd_ptr (copy_len);
00640
00641 }
00642
00643 return 0;
00644 }
|
|
|
Discard all fragments associated to request-id encoded in cancel_request. This operation will never be called concurrently by multiplpe threads nor concurrently to consolidate_fragmented_message
Implements TAO_Pluggable_Messaging. Definition at line 2056 of file GIOP_Message_Base.cpp. References fragment_stack_, TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::msg_type_, parse_request_id(), TAO::Incoming_Message_Stack::pop(), TAO::Incoming_Message_Stack::push(), TAO_Queued_Data::release(), and TAO_PLUGGABLE_MESSAGE_FRAGMENT.
02057 {
02058 // We must extract the specific request-id from message-buffer
02059 // and remove all fragments from stack that match this request-id.
02060
02061 TAO::Incoming_Message_Stack reverse_stack;
02062
02063 CORBA::ULong cancel_request_id;
02064
02065 if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
02066 {
02067 return -1;
02068 }
02069
02070 TAO_Queued_Data *head = 0;
02071
02072 // Revert stack
02073 while (this->fragment_stack_.pop (head) != -1)
02074 {
02075 reverse_stack.push (head);
02076 }
02077
02078 bool discard_all_GIOP11_messages = false;
02079
02080 // Now we are able to process message in order they have arrived.
02081 // If the cancel_request_id matches to GIOP-1.1 message, all succeeding
02082 // fragments belong to this message and must be discarded.
02083 // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the
02084 // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments
02085 // having encoded the request id will be discarded.
02086 while (reverse_stack.pop (head) != -1)
02087 {
02088 CORBA::ULong head_request_id;
02089
02090 if (head->major_version_ == 1 &&
02091 head->minor_version_ <= 1 &&
02092 head->msg_type_ != TAO_PLUGGABLE_MESSAGE_FRAGMENT && // GIOP11 fragment does not provide request id
02093 this->parse_request_id (head, head_request_id) >= 0 &&
02094 cancel_request_id == head_request_id)
02095 {
02096 TAO_Queued_Data::release (head);
02097 discard_all_GIOP11_messages = true;
02098 }
02099 else if (head->major_version_ == 1 &&
02100 head->minor_version_ <= 1 &&
02101 discard_all_GIOP11_messages)
02102 {
02103 TAO_Queued_Data::release (head);
02104 }
02105 else if (head->major_version_ >= 1 &&
02106 head->minor_version_ >= 2 &&
02107 this->parse_request_id (head, head_request_id) >= 0 &&
02108 cancel_request_id == head_request_id)
02109 {
02110 TAO_Queued_Data::release (head);
02111 }
02112 else
02113 {
02114 this->fragment_stack_.push (head);
02115 }
02116 }
02117
02118 return 0;
02119 }
|
|
||||||||||||||||
|
Print out a debug messages..
Definition at line 1559 of file GIOP_Message_Base.cpp. References ACE_DEBUG, ACE_HEX_DUMP, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, names, ACE_CDR::swap_4(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_FRAGMENT, TAO_GIOP_REPLY, and TAO_GIOP_REQUEST. Referenced by format_message(), process_reply_message(), process_request_message(), send_close_connection(), and send_error().
01562 {
01563
01564 if (TAO_debug_level >= 5)
01565 {
01566 static const char digits[] = "0123456789ABCD";
01567 static const char *names[] =
01568 {
01569 "Request",
01570 "Reply",
01571 "CancelRequest",
01572 "LocateRequest",
01573 "LocateReply",
01574 "CloseConnection",
01575 "MessageError",
01576 "Fragment"
01577 };
01578
01579 // Message name.
01580 const char *message_name = "UNKNOWN MESSAGE";
01581 u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET];
01582 if (slot < sizeof (names) / sizeof (names[0]))
01583 message_name = names[slot];
01584
01585 // Byte order.
01586 int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
01587
01588 // Get the version info
01589 CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
01590 CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
01591
01592 // request/reply id.
01593 CORBA::ULong tmp = 0;
01594 CORBA::ULong *id = &tmp;
01595 char *tmp_id = 0;
01596
01597 if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST ||
01598 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY ||
01599 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_FRAGMENT)
01600 {
01601 if (major == 1 && minor < 2)
01602 {
01603 // @@ Only works if ServiceContextList is empty....
01604 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4);
01605 }
01606 else
01607 {
01608 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
01609 }
01610 #if !defined (ACE_DISABLE_SWAP_ON_READ)
01611 if (byte_order == TAO_ENCAP_BYTE_ORDER)
01612 {
01613 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01614 }
01615 else
01616 {
01617 ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
01618 }
01619 #else
01620 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01621 #endif /* ACE_DISABLE_SWAP_ON_READ */
01622
01623 }
01624
01625 // Print.
01626 ACE_DEBUG ((LM_DEBUG,
01627 "TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
01628 "%s GIOP v%c.%c msg, %d data bytes, %s endian, "
01629 "Type %s[%u]\n",
01630 ACE_TEXT_CHAR_TO_TCHAR (label),
01631 digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
01632 digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
01633 len - TAO_GIOP_MESSAGE_HEADER_LEN ,
01634 (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
01635 ACE_TEXT_CHAR_TO_TCHAR(message_name),
01636 *id));
01637
01638 if (TAO_debug_level >= 10)
01639 ACE_HEX_DUMP ((LM_DEBUG,
01640 (const char *) ptr,
01641 len,
01642 ACE_TEXT ("GIOP message")));
01643 }
01644 }
|
|
||||||||||||
|
Extract the details of the next message from the incoming through qd. Returns 0 if the message header could not be parsed completely, returns a 1 if the message header could be parsed completely and returns -1 on error. Implements TAO_Pluggable_Messaging. Definition at line 417 of file GIOP_Message_Base.cpp. References ACE_ERROR, ace_max(), ACE_TEXT, ACE_Message_Block::copy(), init_queued_data(), ACE_Message_Block::length(), LM_ERROR, make_queued_data(), TAO_GIOP_Message_State::message_size(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::msg_block_, TAO_GIOP_Message_State::parse_message_header(), ACE_Message_Block::rd_ptr(), TAO_debug_level, and TAO_MISSING_DATA_UNDEFINED.
00419 {
00420 if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00421 {
00422 if (incoming.length () > 0)
00423 {
00424 // Optimize memory usage, we dont know actual message size
00425 // so far, but allocate enough space to hold small GIOP
00426 // messages. This way we avoid expensive "grow" operation
00427 // for small messages.
00428 size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
00429
00430 // Make a node which has at least message block of the size
00431 // of MESSAGE_HEADER_LEN.
00432 size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
00433 default_buf_size);
00434
00435 // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN
00436
00437 qd = this->make_queued_data (buf_size);
00438
00439 if (qd == 0)
00440 {
00441 if (TAO_debug_level > 0)
00442 {
00443 ACE_ERROR((LM_ERROR,
00444 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00445 ACE_TEXT ("out of memory\n")));
00446 }
00447 return -1;
00448 }
00449
00450 qd->msg_block_->copy (incoming.rd_ptr (),
00451 incoming.length ());
00452
00453 incoming.rd_ptr (incoming.length ()); // consume all available data
00454
00455 qd->missing_data_ = TAO_MISSING_DATA_UNDEFINED;
00456 }
00457 else
00458 {
00459 // handle not initialized variables
00460 qd = 0; // reset
00461 }
00462
00463 return 0;
00464 }
00465
00466 TAO_GIOP_Message_State state;
00467 if (state.parse_message_header (incoming) == -1)
00468 {
00469 return -1;
00470 }
00471
00472 size_t copying_len = state.message_size ();
00473
00474 qd = this->make_queued_data (copying_len);
00475
00476 if (qd == 0)
00477 {
00478 if (TAO_debug_level > 0)
00479 {
00480 ACE_ERROR ((LM_ERROR,
00481 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00482 ACE_TEXT ("out of memory\n")));
00483 }
00484 return -1;
00485 }
00486
00487 if (copying_len > incoming.length ())
00488 {
00489 qd->missing_data_ = copying_len - incoming.length ();
00490 copying_len = incoming.length ();
00491 }
00492 else
00493 {
00494 qd->missing_data_ = 0;
00495 }
00496
00497 qd->msg_block_->copy (incoming.rd_ptr (),
00498 copying_len);
00499
00500 incoming.rd_ptr (copying_len);
00501 this->init_queued_data (qd, state);
00502
00503 return 1;
00504 }
|
|
|
Format the message. As we have not written the message length in the header, we make use of this oppurtunity to insert and format the message. Implements TAO_Pluggable_Messaging. Definition at line 267 of file GIOP_Message_Base.cpp. References ACE_OutputCDR::begin(), ACE_OutputCDR::buffer(), ACE_CDR::consolidate(), ACE_Message_Block::cont(), ACE_OutputCDR::do_byte_swap(), dump_msg(), ACE_Message_Block::rd_ptr(), set_giop_flags(), ACE_CDR::swap_4(), TAO_debug_level, and ACE_OutputCDR::total_length().
00268 {
00269 // Ptr to first buffer.
00270 char * buf = (char *) stream.buffer ();
00271
00272 this->set_giop_flags (stream);
00273
00274 // Length of all buffers.
00275 size_t const total_len = stream.total_length ();
00276
00277 // NOTE: Here would also be a fine place to calculate a digital
00278 // signature for the message and place it into a preallocated slot
00279 // in the "ServiceContext". Similarly, this is a good spot to
00280 // encrypt messages (or just the message bodies) if that's needed in
00281 // this particular environment and that isn't handled by the
00282 // networking infrastructure (e.g., IPSEC).
00283
00284 CORBA::ULong const bodylen = static_cast <CORBA::ULong>
00285 (total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
00286
00287 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00288 *(reinterpret_cast <CORBA::ULong *> (buf +
00289 TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00290 #else
00291 if (!stream.do_byte_swap ())
00292 *(reinterpret_cast <CORBA::ULong *>
00293 (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00294 else
00295 ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen),
00296 buf + TAO_GIOP_MESSAGE_SIZE_OFFSET);
00297 #endif /* ACE_ENABLE_SWAP_ON_WRITE */
00298
00299 if (TAO_debug_level > 2)
00300 {
00301 // Check whether the output cdr stream is build up of multiple
00302 // messageblocks. If so, consolidate them to one block that can be
00303 // dumped
00304 ACE_Message_Block* consolidated_block = 0;
00305 if (stream.begin()->cont () != 0)
00306 {
00307 consolidated_block = new ACE_Message_Block;
00308 ACE_CDR::consolidate (consolidated_block, stream.begin ());
00309 buf = (char *) (consolidated_block->rd_ptr ());
00310 }
00311 ///
00312 this->dump_msg ("send",
00313 reinterpret_cast <u_char *> (buf),
00314 total_len);
00315
00316 //
00317 delete consolidated_block;
00318 consolidated_block = 0;
00319 //
00320 }
00321
00322 return 0;
00323 }
|
|
||||||||||||
|
The header length of a fragment.
Implements TAO_Pluggable_Messaging. Definition at line 1769 of file GIOP_Message_Base.cpp. References TAO_GIOP_Message_Generator_Parser::fragment_header_length(), and set_state(). Referenced by consolidate_fragmented_message().
01771 {
01772 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
01773
01774 // Get the state information that we need to use
01775 this->set_state (major,
01776 minor,
01777 generator_parser);
01778
01779 return generator_parser->fragment_header_length ();
01780 }
|
|
|
Outgoing GIOP message fragmentation strategy.
Implements TAO_Pluggable_Messaging. Definition at line 2122 of file GIOP_Message_Base.cpp. References ACE_Auto_Basic_Ptr< X >::get().
02123 {
02124 return this->fragmentation_strategy_.get ();
02125 }
|
|
||||||||||||||||
|
Generate a reply message with the exception ex.
Implements TAO_Pluggable_Messaging. Definition at line 855 of file GIOP_Message_Base.cpp. References CORBA::Exception::_tao_encode(), ACE_CATCH, ACE_CHECK_RETURN, ACE_DEBUG, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TEXT, ACE_TRY, ACE_TRY_CHECK, generate_reply_header(), LM_DEBUG, and TAO_debug_level. Referenced by send_reply_exception().
00860 {
00861 // A new try/catch block, but if something goes wrong now we have no
00862 // hope, just abort.
00863 ACE_DECLARE_NEW_CORBA_ENV;
00864
00865 ACE_TRY
00866 {
00867 // Make the GIOP & reply header.
00868 this->generate_reply_header (cdr,
00869 params);
00870 x._tao_encode (cdr
00871 ACE_ENV_ARG_PARAMETER);
00872 ACE_TRY_CHECK;
00873 }
00874 ACE_CATCH (CORBA::Exception, ex)
00875 {
00876 // Now we know that while handling the error an other error
00877 // happened -> no hope, close connection.
00878
00879 // Close the handle.
00880 if (TAO_debug_level > 0)
00881 ACE_DEBUG ((LM_DEBUG,
00882 ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
00883 ACE_TEXT ("generate_exception_reply ()\n")));
00884 return -1;
00885 }
00886 ACE_ENDTRY;
00887 ACE_CHECK_RETURN (-1);
00888
00889 return 0;
00890 }
|
|
||||||||||||
|
Implements TAO_Pluggable_Messaging. Definition at line 231 of file GIOP_Message_Base.cpp. References ACE_ERROR, ACE_TEXT, ACE_OutputCDR::get_version(), LM_ERROR, set_state(), TAO_debug_level, TAO_GIOP_FRAGMENT, TAO_GIOP_Message_Generator_Parser::write_fragment_header(), and write_protocol_header().
00233 {
00234 // Get a parser for us
00235 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00236
00237 CORBA::Octet major, minor;
00238
00239 cdr.get_version (major, minor);
00240
00241 // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
00242 // supports them in 1.2 or better since GIOP 1.1 fragments do not
00243 // have a fragment message header.
00244 if (major == 1 && minor < 2)
00245 return -1;
00246
00247 // Get the state information that we need to use
00248 this->set_state (major,
00249 minor,
00250 generator_parser);
00251
00252 // Write the GIOP header first
00253 if (!this->write_protocol_header (TAO_GIOP_FRAGMENT, cdr)
00254 || !generator_parser->write_fragment_header (cdr, request_id))
00255 {
00256 if (TAO_debug_level)
00257 ACE_ERROR ((LM_ERROR,
00258 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00259
00260 return -1;
00261 }
00262
00263 return 0;
00264 }
|
|
||||||||||||
|
Write the locate reply header.
Implements TAO_Pluggable_Messaging. Definition at line 1647 of file GIOP_Message_Base.cpp.
01650 {
01651 return 0;
01652 }
|
|
||||||||||||||||
|
Write the RequestHeader in to the cdr stream.
Implements TAO_Pluggable_Messaging. Definition at line 121 of file GIOP_Message_Base.cpp. References ACE_ERROR, ACE_TEXT, ACE_OutputCDR::get_version(), LM_ERROR, TAO_Operation_Details::request_id(), set_state(), TAO_debug_level, TAO_GIOP_LOCATEREQUEST, TAO_GIOP_Message_Generator_Parser::write_locate_request_header(), and write_protocol_header().
00126 {
00127 // Get a parser for us
00128 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00129
00130 CORBA::Octet major, minor;
00131
00132 cdr.get_version (major, minor);
00133
00134 // Get the state information that we need to use
00135 this->set_state (major,
00136 minor,
00137 generator_parser);
00138
00139 // Write the GIOP header first
00140 if (!this->write_protocol_header (TAO_GIOP_LOCATEREQUEST,
00141 cdr))
00142 {
00143 if (TAO_debug_level)
00144 ACE_ERROR ((LM_ERROR,
00145 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00146
00147 return -1;
00148 }
00149
00150 // Now call the implementation for the rest of the header
00151 if (!generator_parser->write_locate_request_header
00152 (op.request_id (),
00153 spec,
00154 cdr))
00155 {
00156 if (TAO_debug_level)
00157 ACE_ERROR ((LM_ERROR,
00158 ACE_TEXT ("(%P|%t) Error in writing locate request header \n")));
00159
00160
00161 return -1;
00162
00163 }
00164
00165 return 0;
00166 }
|
|
||||||||||||
|
Write the reply header.
Implements TAO_Pluggable_Messaging. Definition at line 169 of file GIOP_Message_Base.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_PRINT_EXCEPTION, ACE_TEXT, ACE_TRY, ACE_TRY_CHECK, ACE_OutputCDR::get_version(), LM_ERROR, set_state(), TAO_debug_level, TAO_GIOP_REPLY, write_protocol_header(), and TAO_GIOP_Message_Generator_Parser::write_reply_header(). Referenced by generate_exception_reply(), and process_request().
00173 {
00174 // Get a parser for us
00175 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00176
00177 CORBA::Octet major, minor;
00178
00179 cdr.get_version (major, minor);
00180
00181 // Get the state information that we need to use
00182 this->set_state (major,
00183 minor,
00184 generator_parser);
00185
00186 // Write the GIOP header first
00187 if (!this->write_protocol_header (TAO_GIOP_REPLY,
00188 cdr))
00189 {
00190 if (TAO_debug_level)
00191 ACE_ERROR ((LM_ERROR,
00192 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00193
00194 return -1;
00195 }
00196
00197 ACE_DECLARE_NEW_CORBA_ENV;
00198 ACE_TRY
00199 {
00200 // Now call the implementation for the rest of the header
00201 int const result =
00202 generator_parser->write_reply_header (cdr,
00203 params
00204 ACE_ENV_ARG_PARAMETER);
00205 ACE_TRY_CHECK;
00206
00207 if (!result)
00208 {
00209 if (TAO_debug_level > 4)
00210 ACE_ERROR ((LM_ERROR,
00211 ACE_TEXT ("(%P|%t) Error in writing reply ")
00212 ACE_TEXT ("header\n")));
00213
00214 return -1;
00215 }
00216 }
00217 ACE_CATCHANY
00218 {
00219 if (TAO_debug_level > 4)
00220 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00221 ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
00222
00223 return -1;
00224 }
00225 ACE_ENDTRY;
00226
00227 return 0;
00228 }
|
|
||||||||||||||||
|
Write the RequestHeader in to the cdr stream. The underlying implementation of the mesaging should do the right thing. Implements TAO_Pluggable_Messaging. Definition at line 74 of file GIOP_Message_Base.cpp. References ACE_ERROR, ACE_TEXT, ACE_OutputCDR::get_version(), LM_ERROR, set_state(), TAO_debug_level, TAO_GIOP_REQUEST, write_protocol_header(), and TAO_GIOP_Message_Generator_Parser::write_request_header().
00079 {
00080 // Get a parser for us
00081 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00082
00083 CORBA::Octet major, minor;
00084
00085 cdr.get_version (major, minor);
00086
00087 // Get the state information that we need to use
00088 this->set_state (major,
00089 minor,
00090 generator_parser);
00091
00092 // Write the GIOP header first
00093 if (!this->write_protocol_header (TAO_GIOP_REQUEST,
00094 cdr))
00095 {
00096 if (TAO_debug_level)
00097 {
00098 ACE_ERROR ((LM_ERROR,
00099 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00100 }
00101
00102 return -1;
00103 }
00104
00105 // Now call the implementation for the rest of the header
00106 if (!generator_parser->write_request_header (op,
00107 spec,
00108 cdr))
00109 {
00110 if (TAO_debug_level)
00111 ACE_ERROR ((LM_ERROR,
00112 ACE_TEXT ("(%P|%t) Error in writing request header \n")));
00113
00114 return -1;
00115 }
00116
00117 return 0;
00118 }
|
|
|
Header length.
Implements TAO_Pluggable_Messaging. Definition at line 1763 of file GIOP_Message_Base.cpp. Referenced by consolidate_fragmented_message().
01764 {
01765 return TAO_GIOP_MESSAGE_HEADER_LEN;
01766 }
|
|
||||||||||||
|
Initialize the underlying state object based on the major and minor revision numbers Implements TAO_Pluggable_Messaging. Definition at line 53 of file GIOP_Message_Base.cpp. References out_stream_, and ACE_OutputCDR::set_version().
00055 {
00056 // Set the giop version of the out stream
00057 this->out_stream_.set_version (major,
00058 minor);
00059 }
|
|
||||||||||||
|
Initialize the TAO_Queued_Data from the relevant portions of a GIOP_Message_State. Definition at line 1783 of file GIOP_Message_Base.cpp. References TAO_GIOP_Message_State::byte_order_, TAO_Queued_Data::byte_order_, TAO_GIOP_Message_State::giop_version_, TAO_GIOP_Message_Version::major, TAO_Queued_Data::major_version_, message_type(), TAO_GIOP_Message_Version::minor, TAO_Queued_Data::minor_version_, TAO_GIOP_Message_State::more_fragments_, TAO_Queued_Data::more_fragments_, and TAO_Queued_Data::msg_type_. Referenced by consolidate_node(), extract_next_message(), and parse_next_message().
01786 {
01787 qd->byte_order_ = state.byte_order_;
01788 qd->major_version_ = state.giop_version_.major;
01789 qd->minor_version_ = state.giop_version_.minor;
01790 qd->more_fragments_ = state.more_fragments_;
01791 qd->msg_type_ = this->message_type (state);
01792 }
|
|
|
Is the messaging object ready for processing BiDirectional request/response? Implements TAO_Pluggable_Messaging. Definition at line 1655 of file GIOP_Message_Base.cpp. References ACE_OutputCDR::get_version(), TAO_GIOP_Message_Generator_Parser::is_ready_for_bidirectional(), and set_state().
01656 {
01657 // Get a parser for us
01658 TAO_GIOP_Message_Generator_Parser *parser = 0;
01659
01660 CORBA::Octet major, minor = 0;
01661
01662 msg.get_version (major, minor);
01663
01664 // Get the state information that we need to use
01665 this->set_state (major,
01666 minor,
01667 parser);
01668
01669 // We dont really know.. So ask the generator and parser objects that
01670 // we know.
01671 // @@ TODO: Need to make this faster, instead of making virtual
01672 // call, try todo the check within this class
01673 return parser->is_ready_for_bidirectional ();
01674 }
|
|
|
Creates a new node for the queue with a message block in the node of size sz. Definition at line 1678 of file GIOP_Message_Base.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_ORB_Core::create_input_cdr_data_block(), ACE_Message_Block::duplicate(), TAO_ORB_Core::input_cdr_msgblock_allocator(), LM_DEBUG, LM_ERROR, TAO_Queued_Data::make_queued_data(), ACE_CDR::mb_align(), TAO_Queued_Data::msg_block_, ACE_Data_Block::release(), TAO_Queued_Data::release(), and TAO_debug_level. Referenced by extract_next_message().
01679 {
01680 // Get a node for the queue..
01681 TAO_Queued_Data *qd =
01682 TAO_Queued_Data::make_queued_data (
01683 this->orb_core_->transport_message_buffer_allocator ());
01684
01685 if (qd == 0)
01686 {
01687 if (TAO_debug_level > 0)
01688 {
01689 ACE_ERROR ((LM_ERROR,
01690 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01691 ACE_TEXT ("our of memory, failed to allocate queued data object\n")));
01692 }
01693 return 0; // NULL pointer
01694 }
01695
01696 // @@todo: We have a similar method in Transport.cpp. Need to see how
01697 // we can factor them out..
01698 // Make a datablock for the size requested + something. The
01699 // "something" is required because we are going to align the data
01700 // block in the message block. During alignment we could loose some
01701 // bytes. As we may not know how many bytes will be lost, we will
01702 // allocate ACE_CDR::MAX_ALIGNMENT extra.
01703 ACE_Data_Block *db =
01704 this->orb_core_->create_input_cdr_data_block (sz +
01705 ACE_CDR::MAX_ALIGNMENT);
01706
01707 if (db == 0)
01708 {
01709 TAO_Queued_Data::release (qd);
01710
01711 if (TAO_debug_level > 0)
01712 {
01713 ACE_ERROR ((LM_ERROR,
01714 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01715 ACE_TEXT ("out of memory, failed to allocate input data block of size %u\n"),
01716 sz));
01717 }
01718 return 0; // NULL pointer
01719 }
01720
01721 ACE_Allocator *alloc =
01722 this->orb_core_->input_cdr_msgblock_allocator ();
01723
01724 if (alloc == 0)
01725 {
01726 if (TAO_debug_level >= 8)
01727 {
01728 ACE_DEBUG ((LM_DEBUG,
01729 ACE_TEXT ("(%P|%t) - TAO_GIOP_Message_Base::make_queued_data,")
01730 ACE_TEXT (" no ACE_Allocator defined\n")));
01731 }
01732 }
01733
01734
01735 ACE_Message_Block mb (db,
01736 0,
01737 alloc);
01738
01739 ACE_Message_Block *new_mb = mb.duplicate ();
01740
01741 if (new_mb == 0)
01742 {
01743 TAO_Queued_Data::release (qd);
01744 db->release();
01745
01746 if (TAO_debug_level > 0)
01747 {
01748 ACE_ERROR ((LM_ERROR,
01749 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01750 ACE_TEXT ("out of memory, failed to allocate message block\n")));
01751 }
01752 return 0;
01753 }
01754
01755 ACE_CDR::mb_align (new_mb);
01756
01757 qd->msg_block_ = new_mb;
01758
01759 return qd;
01760 }
|
|
||||||||||||||||||||||||
|
Make a GIOP_LOCATEREPLY and hand that over to the transport so that it can be sent over the connection.
Definition at line 1293 of file GIOP_Message_Base.cpp. References ACE_ERROR, ACE_TEXT, LM_ERROR, TAO_OutputCDR::more_fragments(), TAO_GIOP_Locate_Request_Header::request_id(), TAO_Transport::send_message(), TAO_debug_level, TAO_GIOP_LOCATEREPLY, TAO_GIOP_Message_Generator_Parser::write_locate_reply_mesg(), and write_protocol_header(). Referenced by process_locate_request().
01298 {
01299 // Note here we are making the Locate reply header which is *QUITE*
01300 // different from the reply header made by the make_reply () call..
01301 // Make the GIOP message header
01302 this->write_protocol_header (TAO_GIOP_LOCATEREPLY,
01303 output);
01304
01305 // This writes the header & body
01306 parser->write_locate_reply_mesg (output,
01307 request.request_id (),
01308 status_info);
01309
01310 output.more_fragments (false);
01311
01312 // Send the message
01313 int result = transport->send_message (output,
01314 0,
01315 TAO_Transport::TAO_REPLY);
01316
01317 // Print out message if there is an error
01318 if (result == -1)
01319 {
01320 if (TAO_debug_level > 0)
01321 {
01322 ACE_ERROR ((LM_ERROR,
01323 ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
01324 ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
01325 }
01326 }
01327
01328 return result;
01329 }
|
|
|
|
Accessor for the output CDR stream.
Implements TAO_Pluggable_Messaging. Definition at line 62 of file GIOP_Message_Base.cpp. References out_stream_.
00063 {
00064 return this->out_stream_;
00065 }
|
|
|
Definition at line 369 of file GIOP_Message_Base.cpp. References message_state_, TAO_GIOP_Message_State::parse_message_header(), and TAO_GIOP_Message_State::reset().
00370 {
00371 this->message_state_.reset ();
00372
00373 return this->message_state_.parse_message_header (incoming);
00374 }
|
|
||||||||||||||||
|
Parse the details of the next message from the incoming and initializes attributes of qd.
Implements TAO_Pluggable_Messaging. Definition at line 377 of file GIOP_Message_Base.cpp. References init_queued_data(), ACE_Message_Block::length(), TAO_GIOP_Message_State::message_size(), TAO_Queued_Data::missing_data_, TAO_GIOP_Message_State::parse_message_header(), TAO_GIOP_Message_State::payload_size(), and TAO_MISSING_DATA_UNDEFINED.
00380 {
00381 if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00382 {
00383 qd.missing_data_ = TAO_MISSING_DATA_UNDEFINED;
00384
00385 return 0; /* incomplete header */
00386 }
00387 else
00388 {
00389 TAO_GIOP_Message_State state;
00390
00391 if (state.parse_message_header (incoming) == -1)
00392 {
00393 return -1;
00394 }
00395
00396 const size_t message_size = state.message_size (); /* Header + Payload */
00397
00398 if (message_size > incoming.length ())
00399 {
00400 qd.missing_data_ = message_size - incoming.length ();
00401 }
00402 else
00403 {
00404 qd.missing_data_ = 0;
00405 }
00406
00407 /* init out-parameters */
00408 this->init_queued_data (&qd, state);
00409 mesg_length = TAO_GIOP_MESSAGE_HEADER_LEN
00410 + state.payload_size ();
00411
00412 return 1; /* complete header */
00413 }
00414 }
|
|
||||||||||||
|
Parse GIOP request-id of TAO_InputCDR cdr.
|
|
||||||||||||
|
Parse GIOP request-id of TAO_Queued_Data qd
Definition at line 1795 of file GIOP_Message_Base.cpp. References ACE_BIT_ENABLED, ACE_Message_Block::base(), TAO_Queued_Data::byte_order_, ACE_Message_Block::data_block(), ACE_Data_Block::duplicate(), TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::msg_block_, TAO_Queued_Data::msg_type_, ACE_Message_Block::rd_ptr(), ACE_Message_Block::self_flags(), set_state(), TAO_PLUGGABLE_MESSAGE_CANCELREQUEST, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_REPLY, TAO_PLUGGABLE_MESSAGE_REQUEST, and ACE_Message_Block::wr_ptr(). Referenced by consolidate_fragmented_message(), and discard_fragmented_message().
01796 {
01797 // Get a parser for us
01798 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
01799
01800 // Get the state information that we need to use
01801 this->set_state (qd->major_version_,
01802 qd->minor_version_,
01803 generator_parser);
01804
01805 // Get the read and write positions before we steal data.
01806 size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
01807 size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
01808 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
01809
01810 // Create a input CDR stream. We do the following
01811 // 1 - If the incoming message block has a data block with a flag
01812 // DONT_DELETE (for the data block) we create an input CDR
01813 // stream the same way.
01814 // 2 - If the incoming message block had a datablock from heap just
01815 // use it by duplicating it and make the flag 0.
01816 // NOTE: We use the same data block in which we read the message and
01817 // we pass it on to the higher layers of the ORB. So we dont to any
01818 // copies at all here. The same is also done in the higher layers.
01819
01820 ACE_Message_Block::Message_Flags flg = 0;
01821 ACE_Data_Block *db = 0;
01822
01823 // Get the flag in the message block
01824 flg = qd->msg_block_->self_flags ();
01825
01826 if (ACE_BIT_ENABLED (flg,
01827 ACE_Message_Block::DONT_DELETE))
01828 {
01829 // Use the same datablock
01830 db = qd->msg_block_->data_block ();
01831 }
01832 else
01833 {
01834 // Use a duplicated datablock as the datablock has come off the
01835 // heap.
01836 db = qd->msg_block_->data_block ()->duplicate ();
01837 }
01838
01839
01840 TAO_InputCDR input_cdr (db,
01841 flg,
01842 rd_pos,
01843 wr_pos,
01844 qd->byte_order_,
01845 qd->major_version_,
01846 qd->minor_version_,
01847 this->orb_core_);
01848
01849 if (qd->major_version_ >= 1 &&
01850 (qd->minor_version_ == 0 || qd->minor_version_ == 1))
01851 {
01852 if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST ||
01853 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY)
01854 {
01855 IOP::ServiceContextList service_context;
01856
01857 if ( ! (input_cdr >> service_context &&
01858 input_cdr >> request_id) )
01859 {
01860 return -1;
01861 }
01862
01863 return 0;
01864 }
01865 else if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST ||
01866 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST ||
01867 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
01868 {
01869 if ( ! (input_cdr >> request_id) )
01870 {
01871 return -1;
01872 }
01873
01874 return 0;
01875 }
01876 else
01877 {
01878 return -1;
01879 }
01880 }
01881 else
01882 {
01883 if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST ||
01884 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY ||
01885 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT ||
01886 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST ||
01887 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST ||
01888 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
01889 {
01890 // Dealing with GIOP-1.2, the request-id is located directly behind the GIOP-Header.
01891 // This is true for all message types that might be sent in form of fragments or cancel-requests.
01892 if ( ! (input_cdr >> request_id) )
01893 {
01894 return -1;
01895 }
01896
01897 return 0;
01898 }
01899 else
01900 {
01901 return -1;
01902 }
01903 }
01904
01905 return -1;
01906 }
|
|
||||||||||||||||||||
|
Processes the GIOP_LOCATE_REQUEST messages.
Definition at line 1156 of file GIOP_Message_Base.cpp. References ACE_CATCHALL, ACE_CATCHANY, ACE_DEBUG, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TEXT, ACE_TRY, ACE_TRY_CHECK, ACE_TRY_THROW, TAO_Request_Dispatcher::dispatch(), TAO_ServerRequest::exception_type(), TAO_ServerRequest::forward_location(), TAO_GIOP_Locate_Status_Msg::forward_location_var, TAO::unbounded_value_sequence< T >::get_buffer(), TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), TAO::unbounded_value_sequence< T >::length(), LM_DEBUG, make_send_locate_reply(), TAO_GIOP_Locate_Request_Header::object_key(), CORBA::Object_var, TAO_Transport::orb_core_, TAO_InputCDR::orb_core_, TAO_GIOP_Message_Generator_Parser::parse_locate_header(), TAO_ORB_Core::request_dispatcher(), TAO_GIOP_Locate_Request_Header::request_id(), TAO_GIOP_Locate_Status_Msg::status, TAO_debug_level, TAO_GIOP_NO_EXCEPTION, TAO_GIOP_OBJECT_FORWARD, TAO_GIOP_OBJECT_HERE, and TAO_GIOP_UNKNOWN_OBJECT. Referenced by process_request_message().
01160 {
01161 // This will extract the request header, set <response_required> as
01162 // appropriate.
01163 TAO_GIOP_Locate_Request_Header locate_request (input,
01164 this->orb_core_);
01165
01166 TAO_GIOP_Locate_Status_Msg status_info;
01167
01168 // Defaulting.
01169 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01170
01171 CORBA::Boolean response_required = 1;
01172
01173 ACE_DECLARE_NEW_CORBA_ENV;
01174 ACE_TRY
01175 {
01176 int parse_error =
01177 parser->parse_locate_header (locate_request);
01178
01179 if (parse_error != 0)
01180 {
01181 ACE_TRY_THROW (CORBA::MARSHAL (0,
01182 CORBA::COMPLETED_NO));
01183 }
01184
01185 TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
01186 locate_request.object_key ().length (),
01187 locate_request.object_key ().get_buffer (),
01188 0);
01189
01190 // Set it to an error state
01191 parse_error = 1;
01192 CORBA::ULong req_id = locate_request.request_id ();
01193
01194 // We will send the reply. The ServerRequest class need not send
01195 // the reply
01196 CORBA::Boolean deferred_reply = true;
01197 TAO_ServerRequest server_request (this,
01198 req_id,
01199 response_required,
01200 deferred_reply,
01201 tmp_key,
01202 "_non_existent",
01203 output,
01204 transport,
01205 this->orb_core_,
01206 parse_error);
01207
01208 if (parse_error != 0)
01209 {
01210 ACE_TRY_THROW (CORBA::MARSHAL (0,
01211 CORBA::COMPLETED_NO));
01212 }
01213
01214 CORBA::Object_var forward_to;
01215
01216 this->orb_core_->request_dispatcher ()->dispatch (
01217 this->orb_core_,
01218 server_request,
01219 forward_to
01220 ACE_ENV_ARG_PARAMETER);
01221 ACE_TRY_CHECK;
01222
01223 if (!CORBA::is_nil (forward_to.in ()))
01224 {
01225 status_info.status = TAO_GIOP_OBJECT_FORWARD;
01226 status_info.forward_location_var = forward_to;
01227 if (TAO_debug_level > 0)
01228 ACE_DEBUG ((LM_DEBUG,
01229 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01230 ACE_TEXT ("called: forwarding\n")));
01231 }
01232 else if (server_request.exception_type () == TAO_GIOP_NO_EXCEPTION)
01233 {
01234 // We got no exception, so the object is here.
01235 status_info.status = TAO_GIOP_OBJECT_HERE;
01236 if (TAO_debug_level > 0)
01237 ACE_DEBUG ((LM_DEBUG,
01238 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01239 ACE_TEXT ("found\n")));
01240 }
01241 else
01242 {
01243 status_info.forward_location_var = server_request.forward_location ();
01244
01245 if (!CORBA::is_nil (status_info.forward_location_var.in ()))
01246 {
01247 status_info.status = TAO_GIOP_OBJECT_FORWARD;
01248 ACE_DEBUG ((LM_DEBUG,
01249 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01250 ACE_TEXT ("forwarding\n")));
01251 }
01252 else
01253 {
01254 // Normal exception, so the object is not here
01255 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01256 ACE_DEBUG ((LM_DEBUG,
01257 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01258 ACE_TEXT ("not here\n")));
01259 }
01260 }
01261 }
01262
01263 ACE_CATCHANY
01264 {
01265 // Normal exception, so the object is not here
01266 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01267 if (TAO_debug_level > 0)
01268 ACE_DEBUG ((LM_DEBUG,
01269 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01270 ACE_TEXT ("CORBA exception raised\n")));
01271 }
01272 #if defined (TAO_HAS_EXCEPTIONS)
01273 ACE_CATCHALL
01274 {
01275 // Normal exception, so the object is not here
01276 status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01277 if (TAO_debug_level > 0)
01278 ACE_DEBUG ((LM_DEBUG,
01279 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
01280 ACE_TEXT ("C++ exception raised\n")));
01281 }
01282 #endif /* TAO_HAS_EXCEPTIONS */
01283 ACE_ENDTRY;
01284
01285 return this->make_send_locate_reply (transport,
01286 locate_request,
01287 status_info,
01288 output,
01289 parser);
01290 }
|
|
||||||||||||
|
Parse the reply message that we received and return the reply information through reply_info Implements TAO_Pluggable_Messaging. Definition at line 770 of file GIOP_Message_Base.cpp. References ACE_ERROR, ACE_TEXT, ACE_Message_Block::base(), TAO_Queued_Data::byte_order_, ACE_Message_Block::data_block(), TAO_Transport_Mux_Strategy::dispatch_reply(), dump_msg(), TAO_Transport::id(), TAO_Pluggable_Reply_Params::input_cdr_, ACE_Message_Block::length(), LM_ERROR, TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::msg_block_, TAO_Queued_Data::msg_type_, TAO_GIOP_Message_Generator_Parser::parse_locate_reply(), TAO_GIOP_Message_Generator_Parser::parse_reply(), ACE_Message_Block::rd_ptr(), set_state(), TAO_debug_level, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_REPLY, TAO_Transport::tms(), TAO_Pluggable_Reply_Params::transport_, and ACE_Message_Block::wr_ptr().
00773 {
00774 // Get a parser for us
00775 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00776
00777 // Get the state information that we need to use
00778 this->set_state (qd->major_version_,
00779 qd->minor_version_,
00780 generator_parser);
00781
00782 // Get the read and write positions before we steal data.
00783 size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
00784 size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
00785 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00786
00787 if (TAO_debug_level > 0)
00788 this->dump_msg ("recv",
00789 reinterpret_cast <u_char *> (qd->msg_block_->rd_ptr ()),
00790 qd->msg_block_->length ());
00791
00792
00793 // Create a empty buffer on stack
00794 // NOTE: We use the same data block in which we read the message and
00795 // we pass it on to the higher layers of the ORB. So we dont to any
00796 // copies at all here.
00797 TAO_InputCDR input_cdr (qd->msg_block_->data_block (),
00798 ACE_Message_Block::DONT_DELETE,
00799 rd_pos,
00800 wr_pos,
00801 qd->byte_order_,
00802 qd->major_version_,
00803 qd->minor_version_,
00804 this->orb_core_);
00805
00806 // We know we have some reply message. Check whether it is a
00807 // GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
00808
00809 // Once we send the InputCDR stream we need to just forget about
00810 // the stream and never touch that again for anything. We basically
00811 // loose ownership of the data_block.
00812 int retval = 0;
00813
00814 switch (qd->msg_type_)
00815 {
00816 case TAO_PLUGGABLE_MESSAGE_REPLY:
00817 // Should be taken care by the state specific parsing
00818 retval =
00819 generator_parser->parse_reply (input_cdr,
00820 params);
00821
00822 break;
00823 case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
00824 retval =
00825 generator_parser->parse_locate_reply (input_cdr,
00826 params);
00827 break;
00828 default:
00829 retval = -1;
00830 }
00831
00832 if (retval == -1)
00833 return retval;
00834
00835 params.input_cdr_ = &input_cdr;
00836
00837 retval =
00838 params.transport_->tms ()->dispatch_reply (params);
00839
00840 if (retval == -1)
00841 {
00842 // Something really critical happened, we will forget about
00843 // every reply on this connection.
00844 if (TAO_debug_level > 0)
00845 ACE_ERROR ((LM_ERROR,
00846 ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, ")
00847 ACE_TEXT ("dispatch reply failed\n"),
00848 params.transport_->id ()));
00849 }
00850
00851 return retval;
00852 }
|
|
||||||||||||||||||||
|
Processes the GIOP_REQUEST messages.
Definition at line 933 of file GIOP_Message_Base.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHALL, ACE_CATCHANY, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_PRINT_EXCEPTION, ACE_TEXT, ACE_TRY, ACE_TRY_CHECK, ACE_TRY_THROW, TAO_Transport::assign_translators(), TAO_ORB_Core::codeset_manager(), TAO_Request_Dispatcher::dispatch(), generate_reply_header(), TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), TAO_ORB_Core::is_permanent_forward_condition(), TAO::unbounded_value_sequence< ServiceContext >::length(), LM_ERROR, TAO_OutputCDR::message_attributes(), TAO_OutputCDR::more_fragments(), CORBA::Object_var, TAO_ServerRequest::orb_core(), TAO_Transport::orb_core_, TAO_GIOP_Message_Generator_Parser::parse_request_header(), TAO_Codeset_Manager::process_service_context(), TAO_ServerRequest::reply_service_info(), TAO_Pluggable_Reply_Params_Base::reply_status_, TAO_ORB_Core::request_dispatcher(), TAO_ServerRequest::request_id(), TAO_Pluggable_Reply_Params_Base::request_id_, TAO_ServerRequest::request_service_context(), TAO_ServerRequest::response_expected(), TAO_Transport::send_message(), send_reply_exception(), TAO_Pluggable_Reply_Params_Base::service_context_notowned(), TAO_Pluggable_Reply_Params_Base::svc_ctx_, TAO_debug_level, TAO_GIOP_LOCATION_FORWARD, TAO_GIOP_LOCATION_FORWARD_PERM, and TAO_UNHANDLED_SERVER_CXX_EXCEPTION. Referenced by process_request_message().
00937 {
00938 // This will extract the request header, set <response_required>
00939 // and <sync_with_server> as appropriate.
00940 TAO_ServerRequest request (this,
00941 cdr,
00942 output,
00943 transport,
00944 this->orb_core_);
00945
00946 CORBA::ULong request_id = 0;
00947 CORBA::Boolean response_required = false;
00948
00949 int parse_error = 0;
00950
00951 ACE_DECLARE_NEW_CORBA_ENV;
00952 ACE_TRY
00953 {
00954 parse_error =
00955 parser->parse_request_header (request);
00956
00957 TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager();
00958 if (csm)
00959 {
00960 csm->process_service_context(request);
00961 transport->assign_translators(&cdr,&output);
00962 }
00963
00964 // Throw an exception if the
00965 if (parse_error != 0)
00966 ACE_TRY_THROW (CORBA::MARSHAL (0,
00967 CORBA::COMPLETED_NO));
00968 request_id = request.request_id ();
00969
00970 response_required = request.response_expected ();
00971
00972 CORBA::Object_var forward_to;
00973
00974 /*
00975 * Hook to specialize request processing within TAO
00976 * This hook will be replaced by specialized request
00977 * processing implementation.
00978 */
00979 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START
00980
00981 // Do this before the reply is sent.
00982 this->orb_core_->request_dispatcher ()->dispatch (
00983 this->orb_core_,
00984 request,
00985 forward_to
00986 ACE_ENV_ARG_PARAMETER);
00987 ACE_TRY_CHECK;
00988
00989 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_END
00990
00991 if (!CORBA::is_nil (forward_to.in ()))
00992 {
00993 const CORBA::Boolean permanent_forward_condition =
00994 this->orb_core_->is_permanent_forward_condition
00995 (forward_to.in (),
00996 request.request_service_context ());
00997
00998 // We should forward to another object...
00999 TAO_Pluggable_Reply_Params_Base reply_params;
01000 reply_params.request_id_ = request_id;
01001 reply_params.reply_status_ =
01002 permanent_forward_condition
01003 ? TAO_GIOP_LOCATION_FORWARD_PERM
01004 : TAO_GIOP_LOCATION_FORWARD;
01005 reply_params.svc_ctx_.length (0);
01006
01007 // Send back the reply service context.
01008 reply_params.service_context_notowned (&request.reply_service_info ());
01009
01010 output.message_attributes (request_id,
01011 0,
01012 TAO_Transport::TAO_REPLY,
01013 0);
01014
01015 // Make the GIOP header and Reply header
01016 this->generate_reply_header (output,
01017 reply_params);
01018
01019 if (!(output << forward_to.in ()))
01020 {
01021 if (TAO_debug_level > 0)
01022 ACE_ERROR ((LM_ERROR,
01023 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
01024 ACE_TEXT ("forward reference.\n")));
01025
01026 return -1;
01027 }
01028
01029 output.more_fragments (false);
01030
01031 int result = transport->send_message (output,
01032 0,
01033 TAO_Transport::TAO_REPLY);
01034 if (result == -1)
01035 {
01036 if (TAO_debug_level > 0)
01037 {
01038 // No exception but some kind of error, yet a
01039 // response is required.
01040 ACE_ERROR ((LM_ERROR,
01041 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
01042 ACE_TEXT ("cannot send reply\n"),
01043 ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
01044 }
01045 }
01046 return result;
01047 }
01048 }
01049 // Only CORBA exceptions are caught here.
01050 ACE_CATCHANY
01051 {
01052 int result = 0;
01053
01054 if (response_required)
01055 {
01056 result = this->send_reply_exception (transport,
01057 output,
01058 request_id,
01059 &request.reply_service_info (),
01060 &ACE_ANY_EXCEPTION);
01061 if (result == -1)
01062 {
01063 if (TAO_debug_level > 0)
01064 {
01065 ACE_ERROR ((LM_ERROR,
01066 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
01067 ACE_TEXT ("cannot send exception\n"),
01068 ACE_TEXT ("process_connector_request ()")));
01069
01070 ACE_PRINT_EXCEPTION (
01071 ACE_ANY_EXCEPTION,
01072 "TAO_GIOP_Message_Base::process_request[1]");
01073 }
01074 }
01075
01076 }
01077 else if (TAO_debug_level > 0)
01078 {
01079 // It is unfortunate that an exception (probably a system
01080 // exception) was thrown by the upcall code (even by the
01081 // user) when the client was not expecting a response.
01082 // However, in this case, we cannot close the connection
01083 // down, since it really isn't the client's fault.
01084
01085 ACE_ERROR ((LM_ERROR,
01086 ACE_TEXT ("(%P|%t) exception thrown ")
01087 ACE_TEXT ("but client is not waiting a response\n")));
01088
01089 ACE_PRINT_EXCEPTION (
01090 ACE_ANY_EXCEPTION,
01091 "TAO_GIOP_Message_Base::process_request[2]");
01092 }
01093
01094 return result;
01095 }
01096 #if defined (TAO_HAS_EXCEPTIONS)
01097 ACE_CATCHALL
01098 {
01099 // @@ TODO some c++ exception or another, but what do we do with
01100 // it?
01101 // We are supposed to map it into a CORBA::UNKNOWN exception.
01102 // BTW, this cannot be detected if using the <env> mapping. If
01103 // we have native exceptions but no support for them in the ORB
01104 // we should still be able to catch it. If we don't have native
01105 // exceptions it couldn't have been raised in the first place!
01106 int result = 0;
01107
01108 if (response_required)
01109 {
01110 CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code
01111 (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0),
01112 CORBA::COMPLETED_MAYBE);
01113
01114 result = this->send_reply_exception (transport,
01115 output,
01116 request_id,
01117 &request.reply_service_info (),
01118 &exception);
01119 if (result == -1)
01120 {
01121 if (TAO_debug_level > 0)
01122 {
01123 ACE_ERROR ((LM_ERROR,
01124 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
01125 ACE_TEXT ("%p: ")
01126 ACE_TEXT ("cannot send exception\n"),
01127 ACE_TEXT ("process_request ()")));
01128 ACE_PRINT_EXCEPTION (
01129 exception,
01130 "TAO_GIOP_Message_Base::process_request[3]");
01131 }
01132 }
01133 }
01134 else if (TAO_debug_level > 0)
01135 {
01136 // It is unfotunate that an exception (probably a system
01137 // exception) was thrown by the upcall code (even by the
01138 // user) when the client was not expecting a response.
01139 // However, in this case, we cannot close the connection
01140 // down, since it really isn't the client's fault.
01141 ACE_ERROR ((LM_ERROR,
01142 ACE_TEXT ("(%P|%t|%N|%l) exception thrown ")
01143 ACE_TEXT ("but client is not waiting a response\n")));
01144 }
01145
01146 return result;
01147 }
01148 #endif /* TAO_HAS_EXCEPTIONS */
01149 ACE_ENDTRY;
01150
01151 return 0;
01152 }
|
|
||||||||||||
|
Process the request message that we have received on the connection Implements TAO_Pluggable_Messaging. Definition at line 647 of file GIOP_Message_Base.cpp. References ACE_BIT_ENABLED, TAO_Transport::assign_translators(), ACE_Message_Block::base(), TAO_Queued_Data::byte_order_, TAO_ORB_Parameters::cdr_memcpy_tradeoff(), ACE_Message_Block::data_block(), dump_msg(), ACE_Data_Block::duplicate(), ACE_Auto_Basic_Ptr< X >::get(), TAO_ORB_Core::input_cdr_dblock_allocator(), TAO_ORB_Core::input_cdr_msgblock_allocator(), ACE_Message_Block::length(), TAO_ORB_Core::lf_strategy(), TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::msg_block_, TAO_Queued_Data::msg_type_, TAO_ORB_Core::orb_params(), process_locate_request(), process_request(), ACE_Message_Block::rd_ptr(), ACE_Message_Block::self_flags(), set_state(), TAO_LF_Strategy::set_upcall_thread(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_REQUEST, and ACE_Message_Block::wr_ptr().
00649 {
00650 // Set the upcall thread
00651 this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
00652
00653 // Get a parser for us
00654 TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00655
00656 // Get the state information that we need to use
00657 this->set_state (qd->major_version_,
00658 qd->minor_version_,
00659 generator_parser);
00660
00661 // A buffer that we will use to initialise the CDR stream
00662 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00663 char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
00664 #else
00665 char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
00666 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00667
00668 // Initialize an output CDR on the stack
00669 // NOTE: Don't jump to a conclusion as to why we are using the
00670 // input_cdr and hence the global pool here. These pools will move
00671 // to the lanes anyway at some point of time. Further, it would have
00672 // been awesome to have this in TSS. But for some reason the cloning
00673 // that happens when the ORB gets flow controlled while writing a
00674 // reply is messing things up. We crash horribly. Doing this adds a
00675 // lock, we need to set things like this -- put stuff in TSS here
00676 // and transfer to global memory when we get flow controlled. We
00677 // need to work on the message block to get it right!
00678 TAO_OutputCDR output (repbuf,
00679 sizeof repbuf,
00680 TAO_ENCAP_BYTE_ORDER,
00681 this->orb_core_->input_cdr_buffer_allocator (),
00682 this->orb_core_->input_cdr_dblock_allocator (),
00683 this->orb_core_->input_cdr_msgblock_allocator (),
00684 this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
00685 this->fragmentation_strategy_.get (),
00686 qd->major_version_,
00687 qd->minor_version_);
00688
00689 // Get the read and write positions before we steal data.
00690 size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
00691 size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
00692 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00693
00694 if (TAO_debug_level > 0)
00695 this->dump_msg ("recv",
00696 reinterpret_cast <u_char *> (qd->msg_block_->rd_ptr ()),
00697 qd->msg_block_->length ());
00698
00699
00700 // Create a input CDR stream. We do the following
00701 // 1 - If the incoming message block has a data block with a flag
00702 // DONT_DELETE (for the data block) we create an input CDR
00703 // stream the same way.
00704 // 2 - If the incoming message block had a datablock from heap just
00705 // use it by duplicating it and make the flag 0.
00706 // NOTE: We use the same data block in which we read the message and
00707 // we pass it on to the higher layers of the ORB. So we dont to any
00708 // copies at all here. The same is also done in the higher layers.
00709
00710 ACE_Message_Block::Message_Flags flg = 0;
00711 ACE_Data_Block *db = 0;
00712
00713 // Get the flag in the message block
00714 flg = qd->msg_block_->self_flags ();
00715
00716 if (ACE_BIT_ENABLED (flg,
00717 ACE_Message_Block::DONT_DELETE))
00718 {
00719 // Use the same datablock
00720 db = qd->msg_block_->data_block ();
00721 }
00722 else
00723 {
00724 // Use a duplicated datablock as the datablock has come off the
00725 // heap.
00726 db = qd->msg_block_->data_block ()->duplicate ();
00727 }
00728
00729
00730 TAO_InputCDR input_cdr (db,
00731 flg,
00732 rd_pos,
00733 wr_pos,
00734 qd->byte_order_,
00735 qd->major_version_,
00736 qd->minor_version_,
00737 this->orb_core_);
00738
00739 transport->assign_translators(&input_cdr,&output);
00740
00741 // We know we have some request message. Check whether it is a
00742 // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action.
00743
00744 // Once we send the InputCDR stream we need to just forget about
00745 // the stream and never touch that again for anything. We basically
00746 // loose ownership of the data_block.
00747
00748 switch (qd->msg_type_)
00749 {
00750 case TAO_PLUGGABLE_MESSAGE_REQUEST:
00751 // Should be taken care by the state specific invocations. They
00752 // could raise an exception or write things in the output CDR
00753 // stream
00754 return this->process_request (transport,
00755 input_cdr,
00756 output,
00757 generator_parser);
00758
00759 case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
00760 return this->process_locate_request (transport,
00761 input_cdr,
00762 output,
00763 generator_parser);
00764 default:
00765 return -1;
00766 }
00767 }
|
|
|
Reset the messaging the object.
Implements TAO_Pluggable_Messaging. Definition at line 68 of file GIOP_Message_Base.cpp.
00069 {
00070 // no-op
00071 }
|
|
||||||||||||||||
|
Close a connection, first sending GIOP::CloseConnection.
Definition at line 1439 of file GIOP_Message_Base.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_Transport::close_connection(), dump_msg(), TAO_Transport::id(), LM_DEBUG, LM_ERROR, TAO_GIOP_Message_Version::major, TAO_GIOP_Message_Version::minor, TAO_Transport::send_message_block_chain(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_CLOSECONNECTION, and ACE_Message_Block::wr_ptr().
01442 {
01443
01444 // static CORBA::Octet
01445 // I hate this in every method. Till the time I figure out a way
01446 // around I will have them here hanging around.
01447 const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01448 {
01449 // The following works on non-ASCII platforms, such as MVS (which
01450 // uses EBCDIC).
01451 0x47, // 'G'
01452 0x49, // 'I'
01453 0x4f, // 'O'
01454 0x50, // 'P'
01455 version.major,
01456 version.minor,
01457 TAO_ENCAP_BYTE_ORDER,
01458 TAO_GIOP_CLOSECONNECTION,
01459 0, 0, 0, 0
01460 };
01461
01462 // It's important that we use a reliable shutdown after we send this
01463 // message, so we know it's received.
01464 //
01465 // @@ should recv and discard queued data for portability; note
01466 // that this won't block (long) since we never set SO_LINGER
01467
01468 this->dump_msg ("send_close_connection",
01469 (const u_char *) close_message,
01470 TAO_GIOP_MESSAGE_HEADER_LEN);
01471
01472 #if 0
01473 // @@CJC I don't think we need this check b/c the transport's send()
01474 // will simply return -1. However, I guess we could create something
01475 // like TAO_Tranport::is_closed() that returns whether the connection
01476 // is already closed. The problem with that, however, is that it's
01477 // entirely possible that is_closed() could return TRUE, and then the
01478 // transport could get closed down btw. the time it gets called and the
01479 // time that the send actually occurs.
01480 ACE_HANDLE which = transport->handle ();
01481 if (which == ACE_INVALID_HANDLE)
01482 {
01483 if (TAO_debug_level > 0)
01484 ACE_DEBUG ((LM_DEBUG,
01485 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -")
01486 ACE_TEXT (" connection already closed\n")));
01487 return;
01488 }
01489 #endif
01490
01491 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01492 ACE_Message_Block::MB_DATA,
01493 close_message,
01494 0,
01495 0,
01496 ACE_Message_Block::DONT_DELETE,
01497 0);
01498 ACE_Message_Block message_block(&data_block);
01499 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01500
01501 size_t bt;
01502 int result = transport->send_message_block_chain (&message_block, bt);
01503 if (result == -1)
01504 {
01505 if (TAO_debug_level > 0)
01506 ACE_ERROR ((LM_ERROR,
01507 ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
01508 transport->id (), errno));
01509 }
01510
01511 transport->close_connection ();
01512 ACE_DEBUG ((LM_DEBUG,
01513 ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
01514 transport-> id ()));
01515
01516 }
|
|
|
Send error messages.
Definition at line 1337 of file GIOP_Message_Base.cpp. References ACE_DEBUG, ACE_TEXT, dump_msg(), TAO_Transport::id(), LM_DEBUG, TAO_Transport::send_message_block_chain(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_MESSAGERROR, and ACE_Message_Block::wr_ptr().
01338 {
01339 const char
01340 error_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01341 {
01342 // The following works on non-ASCII platforms, such as MVS (which
01343 // uses EBCDIC).
01344 0x47, // 'G'
01345 0x49, // 'I'
01346 0x4f, // 'O'
01347 0x50, // 'P'
01348 (CORBA::Octet) 1, // Use the lowest GIOP version
01349 (CORBA::Octet) 0,
01350 TAO_ENCAP_BYTE_ORDER,
01351 TAO_GIOP_MESSAGERROR,
01352 0, 0, 0, 0
01353 };
01354
01355 // @@ Q: How does this works with GIOP lite?
01356 // A: It doesn't
01357
01358 this->dump_msg ("send_error",
01359 (const u_char *) error_message,
01360 TAO_GIOP_MESSAGE_HEADER_LEN);
01361
01362 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01363 ACE_Message_Block::MB_DATA,
01364 error_message,
01365 0,
01366 0,
01367 ACE_Message_Block::DONT_DELETE,
01368 0);
01369 ACE_Message_Block message_block(&data_block,
01370 ACE_Message_Block::DONT_DELETE);
01371 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01372
01373 size_t bt;
01374 int result = transport->send_message_block_chain (&message_block, bt);
01375 if (result == -1)
01376 {
01377 if (TAO_debug_level > 0)
01378 ACE_DEBUG ((LM_DEBUG,
01379 ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
01380 transport->id ()));
01381 }
01382
01383 return result;
01384 }
|
|
||||||||||||||||||||||||
|
We must send a LocateReply through transport, this request resulted in some kind of exception. Definition at line 1520 of file GIOP_Message_Base.cpp. References CORBA::SystemException::_downcast(), TAO_Pluggable_Reply_Params_Base::argument_flag_, generate_exception_reply(), TAO::unbounded_value_sequence< ServiceContext >::length(), TAO_OutputCDR::more_fragments(), TAO_Pluggable_Reply_Params_Base::reply_status_, TAO_Pluggable_Reply_Params_Base::request_id_, TAO_Transport::send_message(), TAO_Pluggable_Reply_Params_Base::service_context_notowned(), TAO_Pluggable_Reply_Params_Base::svc_ctx_, TAO_GIOP_SYSTEM_EXCEPTION, and TAO_GIOP_USER_EXCEPTION. Referenced by process_request().
01527 {
01528 TAO_Pluggable_Reply_Params_Base reply_params;
01529 reply_params.request_id_ = request_id;
01530 reply_params.svc_ctx_.length (0);
01531
01532 // We are going to send some data
01533 reply_params.argument_flag_ = 1;
01534
01535 // Send back the service context we received. (RTCORBA relies on
01536 // this).
01537 reply_params.service_context_notowned (svc_info);
01538
01539 reply_params.reply_status_ = TAO_GIOP_USER_EXCEPTION;
01540
01541 if (CORBA::SystemException::_downcast (x) != 0)
01542 {
01543 reply_params.reply_status_ = TAO_GIOP_SYSTEM_EXCEPTION;
01544 }
01545
01546 if (this->generate_exception_reply (output,
01547 reply_params,
01548 *x) == -1)
01549 return -1;
01550
01551 output.more_fragments (false);
01552
01553 return transport->send_message (output,
01554 0,
01555 TAO_Transport::TAO_REPLY);
01556 }
|
|
|
Definition at line 2128 of file GIOP_Message_Base.cpp. References ACE_SET_BITS, ACE_OutputCDR::buffer(), ACE_OutputCDR::do_byte_swap(), TAO_OutputCDR::more_fragments(), and TAO_ENCAP_BYTE_ORDER. Referenced by format_message().
02129 {
02130 CORBA::Octet * const buf =
02131 reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
02132
02133 CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
02134 CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
02135
02136 // Flags for the GIOP protocol header "flags" field.
02137 CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
02138
02139 // Least significant bit: Byte order
02140 ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
02141
02142 // Second least significant bit: More fragments
02143 //
02144 // Only supported in GIOP 1.1 or better.
02145 if (!(major <= 1 && minor == 0))
02146 ACE_SET_BITS (flags, msg.more_fragments () << 1);
02147 }
|
|
||||||||||||||||
|
Set the state.
Definition at line 1387 of file GIOP_Message_Base.cpp. References TAO_GIOP_Message_Generator_Parser_Impl::tao_giop_10, TAO_GIOP_Message_Generator_Parser_Impl::tao_giop_11, TAO_GIOP_Message_Generator_Parser_Impl::tao_giop_12, and tao_giop_impl_. Referenced by fragment_header_length(), generate_fragment_header(), generate_locate_request_header(), generate_reply_header(), generate_request_header(), is_ready_for_bidirectional(), parse_request_id(), process_reply_message(), and process_request_message().
01391 {
01392 switch (def_major)
01393 {
01394 case 1:
01395 switch (def_minor)
01396 {
01397 case 0:
01398 gen_parser =
01399 const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
01400 &this->tao_giop_impl_.tao_giop_10);
01401 break;
01402 case 1:
01403 gen_parser =
01404 const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
01405 &this->tao_giop_impl_.tao_giop_11);
01406 break;
01407 case 2:
01408 gen_parser =
01409 const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
01410 &this->tao_giop_impl_.tao_giop_12);
01411 break;
01412 default:
01413 break;
01414 }
01415 break;
01416 default:
01417 break;
01418 }
01419 }
|
|
||||||||||||
|
Writes the GIOP header in to msg
Definition at line 893 of file GIOP_Message_Base.cpp. References ACE_OutputCDR::get_version(), ACE_OutputCDR::good_bit(), CORBA::Octet, ACE_OutputCDR::reset(), TAO_GIOP_Message_Type, CORBA::ULong, and ACE_OutputCDR::write_octet_array(). Referenced by generate_fragment_header(), generate_locate_request_header(), generate_reply_header(), generate_request_header(), and make_send_locate_reply().
00895 {
00896 // Reset the message type
00897 msg.reset ();
00898
00899 CORBA::Octet header[12] =
00900 {
00901 // The following works on non-ASCII platforms, such as MVS (which
00902 // uses EBCDIC).
00903 0x47, // 'G'
00904 0x49, // 'I'
00905 0x4f, // 'O'
00906 0x50 // 'P'
00907 };
00908
00909 CORBA::Octet major, minor = 0;
00910
00911 (void) msg.get_version (major, minor);
00912
00913 header[4] = major;
00914 header[5] = minor;
00915
00916 // "flags" octet, i.e. header[6] will be set up later when message
00917 // is formatted by the transport.
00918
00919 header[7] = CORBA::Octet (type); // Message type
00920
00921 static ACE_CDR::ULong const header_size =
00922 sizeof (header) / sizeof (header[0]);
00923
00924 // Fragmentation should not occur at this point since there are only
00925 // 12 bytes in the stream, and fragmentation may only occur when
00926 // the stream length >= 16.
00927 msg.write_octet_array (header, header_size);
00928
00929 return msg.good_bit ();
00930 }
|
|
|
All Fragments being received are stored on stack in reverse order, last top Definition at line 285 of file GIOP_Message_Base.h. Referenced by consolidate_fragmented_message(), and discard_fragmented_message(). |
|
|
Strategy that sends data currently marshaled into this TAO_OutputCDR stream if necessary. Definition at line 297 of file GIOP_Message_Base.h. |
|
|
The message handler object that does reading and parsing of the incoming messages Definition at line 278 of file GIOP_Message_Base.h. Referenced by parse_incoming_messages(). |
|
|
Cached ORB_Core pointer...
Definition at line 274 of file GIOP_Message_Base.h. |
|
|
Buffer where the request is placed.
Definition at line 300 of file GIOP_Message_Base.h. Referenced by init(), and out_stream(). |
|
|
All the implementations of GIOP message generator and parsers.
Definition at line 281 of file GIOP_Message_Base.h. Referenced by set_state(). |
1.3.6