#include <GIOP_Message_Base.h>
Inheritance diagram for TAO_GIOP_Message_Base:
This class will hold the specific details common to all the GIOP versions. Some of them which are here may be shifted if things start changing between versions.
Definition at line 51 of file GIOP_Message_Base.h.
|
Constructor.
Definition at line 27 of file GIOP_Message_Base.cpp. References TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR, and TAO_ENCAP_BYTE_ORDER.
00030 : orb_core_ (orb_core) 00031 , message_state_ () 00032 , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport)) 00033 , out_stream_ (0, 00034 input_cdr_size, 00035 TAO_ENCAP_BYTE_ORDER, 00036 orb_core->output_cdr_buffer_allocator (), 00037 orb_core->output_cdr_dblock_allocator (), 00038 orb_core->output_cdr_msgblock_allocator (), 00039 orb_core->orb_params ()->cdr_memcpy_tradeoff (), 00040 fragmentation_strategy_.get (), 00041 TAO_DEF_GIOP_MAJOR, 00042 TAO_DEF_GIOP_MINOR) 00043 { 00044 } |
|
Dtor.
Definition at line 47 of file GIOP_Message_Base.cpp.
00048 { 00049 } |
|
Consolidate fragmented message with associated fragments, being stored withi this class. If reliable transport is used (like TCP) fragments are partialy ordered on stack, last fragment on top. Otherwise If un-reliable transport is used (like UDP) fragments may be dis-ordered, and must be ordered before consolidation.
Implements TAO_Pluggable_Messaging. Definition at line 1910 of file GIOP_Message_Base.cpp. References TAO_Queued_Data::consolidate(), ACE_Message_Block::cont(), fragment_header_length(), fragment_stack_, header_length(), ACE_Message_Block::length(), TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_block_, parse_request_id(), TAO::Incoming_Message_Stack::pop(), TAO::Incoming_Message_Stack::push(), ACE_Message_Block::rd_ptr(), and TAO_Queued_Data::release().
01911 { 01912 TAO::Incoming_Message_Stack reverse_stack; 01913 01914 TAO_Queued_Data *tail = 0; 01915 TAO_Queued_Data *head = 0; 01916 01917 // 01918 // CONSOLIDATE FRAGMENTED MESSAGE 01919 // 01920 01921 // check for error-condition 01922 if (qd == 0) 01923 { 01924 return -1; 01925 } 01926 01927 if (qd->major_version_ == 1 && qd->minor_version_ == 0) 01928 { 01929 TAO_Queued_Data::release (qd); 01930 return -1; // error: GIOP-1.0 does not support fragments 01931 } 01932 01933 // If this is not the last fragment, push it onto stack for later processing 01934 if (qd->more_fragments_) 01935 { 01936 this->fragment_stack_.push (qd); 01937 01938 msg = 0; // no consolidated message available yet 01939 return 1; // status: more messages expected. 01940 } 01941 01942 tail = qd; // init 01943 01944 // Add the current message block to the end of the chain 01945 // after adjusting the read pointer to skip the header(s) 01946 const size_t header_adjustment = 01947 this->header_length () + 01948 this->fragment_header_length (tail->major_version_, 01949 tail->minor_version_); 01950 01951 if (tail->msg_block_->length () < header_adjustment) 01952 { 01953 // buffer length not sufficient 01954 TAO_Queued_Data::release (qd); 01955 return -1; 01956 } 01957 01958 // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2 01959 if (tail->major_version_ == 1 && tail->minor_version_ == 1) 01960 { 01961 // GIOP-1.1 01962 01963 while (this->fragment_stack_.pop (head) != -1) 01964 { 01965 if (head->more_fragments_ && 01966 head->major_version_ == 1 && 01967 head->minor_version_ == 1 && 01968 head->msg_block_->length () >= header_adjustment) 01969 { 01970 // adjust the read-pointer, skip the fragment header 01971 tail->msg_block_->rd_ptr(header_adjustment); 01972 01973 head->msg_block_->cont (tail->msg_block_); 01974 01975 tail->msg_block_ = 0; 01976 01977 TAO_Queued_Data::release (tail); 01978 01979 tail = head; 01980 } 01981 else 01982 { 01983 reverse_stack.push (head); 01984 } 01985 } 01986 } 01987 else 01988 { 01989 // > GIOP-1.2 01990 01991 CORBA::ULong tmp_request_id = 0; 01992 if (this->parse_request_id (tail, tmp_request_id) == -1) 01993 { 01994 return -1; 01995 } 01996 01997 const CORBA::ULong request_id = tmp_request_id; 01998 01999 while (this->fragment_stack_.pop (head) != -1) 02000 { 02001 CORBA::ULong head_request_id = 0; 02002 int parse_status = 0; 02003 02004 if (head->more_fragments_ && 02005 head->major_version_ >= 1 && 02006 head->minor_version_ >= 2 && 02007 head->msg_block_->length () >= header_adjustment && 02008 (parse_status = this->parse_request_id (head, head_request_id)) != -1 && 02009 request_id == head_request_id) 02010 { 02011 // adjust the read-pointer, skip the fragment header 02012 tail->msg_block_->rd_ptr(header_adjustment); 02013 02014 head->msg_block_->cont (tail->msg_block_); 02015 02016 tail->msg_block_ = 0; 02017 02018 TAO_Queued_Data::release (tail); 02019 02020 tail = head; 02021 } 02022 else 02023 { 02024 if (parse_status == -1) 02025 { 02026 TAO_Queued_Data::release (head); 02027 return -1; 02028 } 02029 02030 reverse_stack.push (head); 02031 } 02032 } 02033 } 02034 02035 // restore stack 02036 while (reverse_stack.pop (head) != -1) 02037 { 02038 this->fragment_stack_.push (head); 02039 } 02040 02041 if (tail->consolidate () == -1) 02042 { 02043 // memory allocation failed 02044 TAO_Queued_Data::release (tail); 02045 return -1; 02046 } 02047 02048 // set out value 02049 msg = tail; 02050 02051 return 0; 02052 } |
|
Check whether the node qd needs consolidation from incoming.
Implements TAO_Pluggable_Messaging. Definition at line 507 of file GIOP_Message_Base.cpp. References ACE_ERROR, ace_min(), ACE_TEXT, ACE_Message_Block::copy(), ACE_CDR::grow(), init_queued_data(), ACE_Message_Block::length(), LM_ERROR, TAO_GIOP_Message_State::message_size(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::msg_block_, TAO_GIOP_Message_State::parse_message_header(), TAO_GIOP_Message_State::payload_size(), ACE_Message_Block::rd_ptr(), TAO_debug_level, and TAO_MISSING_DATA_UNDEFINED.
00509 { 00510 // Look to see whether we had atleast parsed the GIOP header ... 00511 if (qd->missing_data_ == TAO_MISSING_DATA_UNDEFINED) 00512 { 00513 // The data length that has been stuck in there during the last 00514 // read .... 00515 size_t const len = 00516 qd->msg_block_->length (); 00517 00518 // paranoid check 00519 if (len >= TAO_GIOP_MESSAGE_HEADER_LEN) 00520 { 00521 // inconsistency - this code should have parsed the header 00522 // so far 00523 return -1; 00524 } 00525 00526 // We know that we would have space for 00527 // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data 00528 // from the <incoming> into the message block in <qd> 00529 size_t const available = incoming.length (); 00530 size_t const desired = TAO_GIOP_MESSAGE_HEADER_LEN - len; 00531 size_t const n_copy = ace_min (available, desired); 00532 00533 // paranoid check, but would cause endless looping 00534 if (n_copy == 0) 00535 { 00536 return -1; 00537 } 00538 00539 if (qd->msg_block_->copy (incoming.rd_ptr (), 00540 n_copy) == -1) 00541 { 00542 return -1; 00543 } 00544 00545 // Move the rd_ptr () in the incoming message block.. 00546 incoming.rd_ptr (n_copy); 00547 00548 // verify sufficient data to parse GIOP header 00549 if (qd->msg_block_->length () < TAO_GIOP_MESSAGE_HEADER_LEN) 00550 { 00551 return 0; /* continue */ 00552 } 00553 00554 TAO_GIOP_Message_State state; 00555 00556 // Parse the message header now... 00557 if (state.parse_message_header (*qd->msg_block_) == -1) 00558 { 00559 if (TAO_debug_level > 0) 00560 { 00561 ACE_ERROR ((LM_ERROR, 00562 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ") 00563 ACE_TEXT ("error parsing header\n") )); 00564 } 00565 return -1; 00566 } 00567 // Now grow the message block so that we can copy the rest of 00568 // the data, the message_block must be able to hold complete message 00569 if (ACE_CDR::grow (qd->msg_block_, 00570 state.message_size ()) == -1) /* GIOP_Header + Payload */ 00571 { 00572 // on mem-error get rid of context silently, try to avoid 00573 // system calls that might allocate additional memory 00574 return -1; 00575 } 00576 00577 // Copy the pay load.. 00578 // Calculate the bytes that needs to be copied in the queue... 00579 size_t copy_len = state.payload_size (); 00580 00581 // If the data that needs to be copied is more than that is 00582 // available to us .. 00583 if (copy_len > incoming.length ()) 00584 { 00585 // Calculate the missing data.. 00586 qd->missing_data_ = copy_len - incoming.length (); 00587 00588 // Set the actual possible copy_len that is available... 00589 copy_len = incoming.length (); 00590 } 00591 else 00592 { 00593 qd->missing_data_ = 0; 00594 } 00595 00596 // ..now we are set to copy the right amount of data to the 00597 // node.. 00598 if (qd->msg_block_->copy (incoming.rd_ptr (), 00599 copy_len) == -1) 00600 { 00601 return -1; 00602 } 00603 00604 // Set the <rd_ptr> of the <incoming>.. 00605 incoming.rd_ptr (copy_len); 00606 00607 // Get the other details... 00608 this->init_queued_data (qd, state); 00609 } 00610 else 00611 { 00612 // @@todo: Need to abstract this out to a seperate method... 00613 size_t copy_len = qd->missing_data_; 00614 00615 if (copy_len > incoming.length ()) 00616 { 00617 // Calculate the missing data.. 00618 qd->missing_data_ = copy_len - incoming.length (); 00619 00620 // Set the actual possible copy_len that is available... 00621 copy_len = incoming.length (); 00622 } 00623 00624 // paranoid check for endless-event-looping 00625 if (copy_len == 0) 00626 { 00627 return -1; 00628 } 00629 00630 // Copy the right amount of data in to the node... 00631 // node.. 00632 if (qd->msg_block_->copy (incoming.rd_ptr (), 00633 copy_len) == -1) 00634 { 00635 return -1; 00636 } 00637 00638 // Set the <rd_ptr> of the <incoming>.. 00639 qd->msg_block_->rd_ptr (copy_len); 00640 00641 } 00642 00643 return 0; 00644 } |
|
Discard all fragments associated to request-id encoded in cancel_request. This operation will never be called concurrently by multiplpe threads nor concurrently to consolidate_fragmented_message
Implements TAO_Pluggable_Messaging. Definition at line 2056 of file GIOP_Message_Base.cpp. References fragment_stack_, TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::msg_type_, parse_request_id(), TAO::Incoming_Message_Stack::pop(), TAO::Incoming_Message_Stack::push(), TAO_Queued_Data::release(), and TAO_PLUGGABLE_MESSAGE_FRAGMENT.
02057 { 02058 // We must extract the specific request-id from message-buffer 02059 // and remove all fragments from stack that match this request-id. 02060 02061 TAO::Incoming_Message_Stack reverse_stack; 02062 02063 CORBA::ULong cancel_request_id; 02064 02065 if (this->parse_request_id (cancel_request, cancel_request_id) == -1) 02066 { 02067 return -1; 02068 } 02069 02070 TAO_Queued_Data *head = 0; 02071 02072 // Revert stack 02073 while (this->fragment_stack_.pop (head) != -1) 02074 { 02075 reverse_stack.push (head); 02076 } 02077 02078 bool discard_all_GIOP11_messages = false; 02079 02080 // Now we are able to process message in order they have arrived. 02081 // If the cancel_request_id matches to GIOP-1.1 message, all succeeding 02082 // fragments belong to this message and must be discarded. 02083 // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the 02084 // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments 02085 // having encoded the request id will be discarded. 02086 while (reverse_stack.pop (head) != -1) 02087 { 02088 CORBA::ULong head_request_id; 02089 02090 if (head->major_version_ == 1 && 02091 head->minor_version_ <= 1 && 02092 head->msg_type_ != TAO_PLUGGABLE_MESSAGE_FRAGMENT && // GIOP11 fragment does not provide request id 02093 this->parse_request_id (head, head_request_id) >= 0 && 02094 cancel_request_id == head_request_id) 02095 { 02096 TAO_Queued_Data::release (head); 02097 discard_all_GIOP11_messages = true; 02098 } 02099 else if (head->major_version_ == 1 && 02100 head->minor_version_ <= 1 && 02101 discard_all_GIOP11_messages) 02102 { 02103 TAO_Queued_Data::release (head); 02104 } 02105 else if (head->major_version_ >= 1 && 02106 head->minor_version_ >= 2 && 02107 this->parse_request_id (head, head_request_id) >= 0 && 02108 cancel_request_id == head_request_id) 02109 { 02110 TAO_Queued_Data::release (head); 02111 } 02112 else 02113 { 02114 this->fragment_stack_.push (head); 02115 } 02116 } 02117 02118 return 0; 02119 } |
|
Print out a debug messages..
Definition at line 1559 of file GIOP_Message_Base.cpp. References ACE_DEBUG, ACE_HEX_DUMP, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, names, ACE_CDR::swap_4(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_FRAGMENT, TAO_GIOP_REPLY, and TAO_GIOP_REQUEST. Referenced by format_message(), process_reply_message(), process_request_message(), send_close_connection(), and send_error().
01562 { 01563 01564 if (TAO_debug_level >= 5) 01565 { 01566 static const char digits[] = "0123456789ABCD"; 01567 static const char *names[] = 01568 { 01569 "Request", 01570 "Reply", 01571 "CancelRequest", 01572 "LocateRequest", 01573 "LocateReply", 01574 "CloseConnection", 01575 "MessageError", 01576 "Fragment" 01577 }; 01578 01579 // Message name. 01580 const char *message_name = "UNKNOWN MESSAGE"; 01581 u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET]; 01582 if (slot < sizeof (names) / sizeof (names[0])) 01583 message_name = names[slot]; 01584 01585 // Byte order. 01586 int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01; 01587 01588 // Get the version info 01589 CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]; 01590 CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET]; 01591 01592 // request/reply id. 01593 CORBA::ULong tmp = 0; 01594 CORBA::ULong *id = &tmp; 01595 char *tmp_id = 0; 01596 01597 if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST || 01598 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY || 01599 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_FRAGMENT) 01600 { 01601 if (major == 1 && minor < 2) 01602 { 01603 // @@ Only works if ServiceContextList is empty.... 01604 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4); 01605 } 01606 else 01607 { 01608 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN); 01609 } 01610 #if !defined (ACE_DISABLE_SWAP_ON_READ) 01611 if (byte_order == TAO_ENCAP_BYTE_ORDER) 01612 { 01613 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id); 01614 } 01615 else 01616 { 01617 ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id)); 01618 } 01619 #else 01620 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id); 01621 #endif /* ACE_DISABLE_SWAP_ON_READ */ 01622 01623 } 01624 01625 // Print. 01626 ACE_DEBUG ((LM_DEBUG, 01627 "TAO (%P|%t) - GIOP_Message_Base::dump_msg, " 01628 "%s GIOP v%c.%c msg, %d data bytes, %s endian, " 01629 "Type %s[%u]\n", 01630 ACE_TEXT_CHAR_TO_TCHAR (label), 01631 digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]], 01632 digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]], 01633 len - TAO_GIOP_MESSAGE_HEADER_LEN , 01634 (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"), 01635 ACE_TEXT_CHAR_TO_TCHAR(message_name), 01636 *id)); 01637 01638 if (TAO_debug_level >= 10) 01639 ACE_HEX_DUMP ((LM_DEBUG, 01640 (const char *) ptr, 01641 len, 01642 ACE_TEXT ("GIOP message"))); 01643 } 01644 } |
|
Extract the details of the next message from the incoming through qd. Returns 0 if the message header could not be parsed completely, returns a 1 if the message header could be parsed completely and returns -1 on error. Implements TAO_Pluggable_Messaging. Definition at line 417 of file GIOP_Message_Base.cpp. References ACE_ERROR, ace_max(), ACE_TEXT, ACE_Message_Block::copy(), init_queued_data(), ACE_Message_Block::length(), LM_ERROR, make_queued_data(), TAO_GIOP_Message_State::message_size(), TAO_Queued_Data::missing_data_, TAO_Queued_Data::msg_block_, TAO_GIOP_Message_State::parse_message_header(), ACE_Message_Block::rd_ptr(), TAO_debug_level, and TAO_MISSING_DATA_UNDEFINED.
00419 { 00420 if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) 00421 { 00422 if (incoming.length () > 0) 00423 { 00424 // Optimize memory usage, we dont know actual message size 00425 // so far, but allocate enough space to hold small GIOP 00426 // messages. This way we avoid expensive "grow" operation 00427 // for small messages. 00428 size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE; 00429 00430 // Make a node which has at least message block of the size 00431 // of MESSAGE_HEADER_LEN. 00432 size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN, 00433 default_buf_size); 00434 00435 // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN 00436 00437 qd = this->make_queued_data (buf_size); 00438 00439 if (qd == 0) 00440 { 00441 if (TAO_debug_level > 0) 00442 { 00443 ACE_ERROR((LM_ERROR, 00444 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ") 00445 ACE_TEXT ("out of memory\n"))); 00446 } 00447 return -1; 00448 } 00449 00450 qd->msg_block_->copy (incoming.rd_ptr (), 00451 incoming.length ()); 00452 00453 incoming.rd_ptr (incoming.length ()); // consume all available data 00454 00455 qd->missing_data_ = TAO_MISSING_DATA_UNDEFINED; 00456 } 00457 else 00458 { 00459 // handle not initialized variables 00460 qd = 0; // reset 00461 } 00462 00463 return 0; 00464 } 00465 00466 TAO_GIOP_Message_State state; 00467 if (state.parse_message_header (incoming) == -1) 00468 { 00469 return -1; 00470 } 00471 00472 size_t copying_len = state.message_size (); 00473 00474 qd = this->make_queued_data (copying_len); 00475 00476 if (qd == 0) 00477 { 00478 if (TAO_debug_level > 0) 00479 { 00480 ACE_ERROR ((LM_ERROR, 00481 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ") 00482 ACE_TEXT ("out of memory\n"))); 00483 } 00484 return -1; 00485 } 00486 00487 if (copying_len > incoming.length ()) 00488 { 00489 qd->missing_data_ = copying_len - incoming.length (); 00490 copying_len = incoming.length (); 00491 } 00492 else 00493 { 00494 qd->missing_data_ = 0; 00495 } 00496 00497 qd->msg_block_->copy (incoming.rd_ptr (), 00498 copying_len); 00499 00500 incoming.rd_ptr (copying_len); 00501 this->init_queued_data (qd, state); 00502 00503 return 1; 00504 } |
|
Format the message. As we have not written the message length in the header, we make use of this oppurtunity to insert and format the message. Implements TAO_Pluggable_Messaging. Definition at line 267 of file GIOP_Message_Base.cpp. References ACE_OutputCDR::begin(), ACE_OutputCDR::buffer(), ACE_CDR::consolidate(), ACE_Message_Block::cont(), ACE_OutputCDR::do_byte_swap(), dump_msg(), ACE_Message_Block::rd_ptr(), set_giop_flags(), ACE_CDR::swap_4(), TAO_debug_level, and ACE_OutputCDR::total_length().
00268 { 00269 // Ptr to first buffer. 00270 char * buf = (char *) stream.buffer (); 00271 00272 this->set_giop_flags (stream); 00273 00274 // Length of all buffers. 00275 size_t const total_len = stream.total_length (); 00276 00277 // NOTE: Here would also be a fine place to calculate a digital 00278 // signature for the message and place it into a preallocated slot 00279 // in the "ServiceContext". Similarly, this is a good spot to 00280 // encrypt messages (or just the message bodies) if that's needed in 00281 // this particular environment and that isn't handled by the 00282 // networking infrastructure (e.g., IPSEC). 00283 00284 CORBA::ULong const bodylen = static_cast <CORBA::ULong> 00285 (total_len - TAO_GIOP_MESSAGE_HEADER_LEN); 00286 00287 #if !defined (ACE_ENABLE_SWAP_ON_WRITE) 00288 *(reinterpret_cast <CORBA::ULong *> (buf + 00289 TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen; 00290 #else 00291 if (!stream.do_byte_swap ()) 00292 *(reinterpret_cast <CORBA::ULong *> 00293 (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen; 00294 else 00295 ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen), 00296 buf + TAO_GIOP_MESSAGE_SIZE_OFFSET); 00297 #endif /* ACE_ENABLE_SWAP_ON_WRITE */ 00298 00299 if (TAO_debug_level > 2) 00300 { 00301 // Check whether the output cdr stream is build up of multiple 00302 // messageblocks. If so, consolidate them to one block that can be 00303 // dumped 00304 ACE_Message_Block* consolidated_block = 0; 00305 if (stream.begin()->cont () != 0) 00306 { 00307 consolidated_block = new ACE_Message_Block; 00308 ACE_CDR::consolidate (consolidated_block, stream.begin ()); 00309 buf = (char *) (consolidated_block->rd_ptr ()); 00310 } 00311 /// 00312 this->dump_msg ("send", 00313 reinterpret_cast <u_char *> (buf), 00314 total_len); 00315 00316 // 00317 delete consolidated_block; 00318 consolidated_block = 0; 00319 // 00320 } 00321 00322 return 0; 00323 } |
|
The header length of a fragment.
Implements TAO_Pluggable_Messaging. Definition at line 1769 of file GIOP_Message_Base.cpp. References TAO_GIOP_Message_Generator_Parser::fragment_header_length(), and set_state(). Referenced by consolidate_fragmented_message().
01771 { 01772 TAO_GIOP_Message_Generator_Parser *generator_parser = 0; 01773 01774 // Get the state information that we need to use 01775 this->set_state (major, 01776 minor, 01777 generator_parser); 01778 01779 return generator_parser->fragment_header_length (); 01780 } |
|
Outgoing GIOP message fragmentation strategy.
Implements TAO_Pluggable_Messaging. Definition at line 2122 of file GIOP_Message_Base.cpp. References ACE_Auto_Basic_Ptr< X >::get().
02123 { 02124 return this->fragmentation_strategy_.get (); 02125 } |
|
Generate a reply message with the exception ex.
Implements TAO_Pluggable_Messaging. Definition at line 855 of file GIOP_Message_Base.cpp. References CORBA::Exception::_tao_encode(), ACE_CATCH, ACE_CHECK_RETURN, ACE_DEBUG, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TEXT, ACE_TRY, ACE_TRY_CHECK, generate_reply_header(), LM_DEBUG, and TAO_debug_level. Referenced by send_reply_exception().
00860 { 00861 // A new try/catch block, but if something goes wrong now we have no 00862 // hope, just abort. 00863 ACE_DECLARE_NEW_CORBA_ENV; 00864 00865 ACE_TRY 00866 { 00867 // Make the GIOP & reply header. 00868 this->generate_reply_header (cdr, 00869 params); 00870 x._tao_encode (cdr 00871 ACE_ENV_ARG_PARAMETER); 00872 ACE_TRY_CHECK; 00873 } 00874 ACE_CATCH (CORBA::Exception, ex) 00875 { 00876 // Now we know that while handling the error an other error 00877 // happened -> no hope, close connection. 00878 00879 // Close the handle. 00880 if (TAO_debug_level > 0) 00881 ACE_DEBUG ((LM_DEBUG, 00882 ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ") 00883 ACE_TEXT ("generate_exception_reply ()\n"))); 00884 return -1; 00885 } 00886 ACE_ENDTRY; 00887 ACE_CHECK_RETURN (-1); 00888 00889 return 0; 00890 } |
|
Implements TAO_Pluggable_Messaging. Definition at line 231 of file GIOP_Message_Base.cpp. References ACE_ERROR, ACE_TEXT, ACE_OutputCDR::get_version(), LM_ERROR, set_state(), TAO_debug_level, TAO_GIOP_FRAGMENT, TAO_GIOP_Message_Generator_Parser::write_fragment_header(), and write_protocol_header().
00233 { 00234 // Get a parser for us 00235 TAO_GIOP_Message_Generator_Parser *generator_parser = 0; 00236 00237 CORBA::Octet major, minor; 00238 00239 cdr.get_version (major, minor); 00240 00241 // GIOP fragments are supported in GIOP 1.1 and better, but TAO only 00242 // supports them in 1.2 or better since GIOP 1.1 fragments do not 00243 // have a fragment message header. 00244 if (major == 1 && minor < 2) 00245 return -1; 00246 00247 // Get the state information that we need to use 00248 this->set_state (major, 00249 minor, 00250 generator_parser); 00251 00252 // Write the GIOP header first 00253 if (!this->write_protocol_header (TAO_GIOP_FRAGMENT, cdr) 00254 || !generator_parser->write_fragment_header (cdr, request_id)) 00255 { 00256 if (TAO_debug_level) 00257 ACE_ERROR ((LM_ERROR, 00258 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); 00259 00260 return -1; 00261 } 00262 00263 return 0; 00264 } |
|
Write the locate reply header.
Implements TAO_Pluggable_Messaging. Definition at line 1647 of file GIOP_Message_Base.cpp.
01650 {
01651 return 0;
01652 }
|
|
Write the RequestHeader in to the cdr stream.
Implements TAO_Pluggable_Messaging. Definition at line 121 of file GIOP_Message_Base.cpp. References ACE_ERROR, ACE_TEXT, ACE_OutputCDR::get_version(), LM_ERROR, TAO_Operation_Details::request_id(), set_state(), TAO_debug_level, TAO_GIOP_LOCATEREQUEST, TAO_GIOP_Message_Generator_Parser::write_locate_request_header(), and write_protocol_header().
00126 { 00127 // Get a parser for us 00128 TAO_GIOP_Message_Generator_Parser *generator_parser = 0; 00129 00130 CORBA::Octet major, minor; 00131 00132 cdr.get_version (major, minor); 00133 00134 // Get the state information that we need to use 00135 this->set_state (major, 00136 minor, 00137 generator_parser); 00138 00139 // Write the GIOP header first 00140 if (!this->write_protocol_header (TAO_GIOP_LOCATEREQUEST, 00141 cdr)) 00142 { 00143 if (TAO_debug_level) 00144 ACE_ERROR ((LM_ERROR, 00145 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); 00146 00147 return -1; 00148 } 00149 00150 // Now call the implementation for the rest of the header 00151 if (!generator_parser->write_locate_request_header 00152 (op.request_id (), 00153 spec, 00154 cdr)) 00155 { 00156 if (TAO_debug_level) 00157 ACE_ERROR ((LM_ERROR, 00158 ACE_TEXT ("(%P|%t) Error in writing locate request header \n"))); 00159 00160 00161 return -1; 00162 00163 } 00164 00165 return 0; 00166 } |
|
Write the reply header.
Implements TAO_Pluggable_Messaging. Definition at line 169 of file GIOP_Message_Base.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHANY, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_PRINT_EXCEPTION, ACE_TEXT, ACE_TRY, ACE_TRY_CHECK, ACE_OutputCDR::get_version(), LM_ERROR, set_state(), TAO_debug_level, TAO_GIOP_REPLY, write_protocol_header(), and TAO_GIOP_Message_Generator_Parser::write_reply_header(). Referenced by generate_exception_reply(), and process_request().
00173 { 00174 // Get a parser for us 00175 TAO_GIOP_Message_Generator_Parser *generator_parser = 0; 00176 00177 CORBA::Octet major, minor; 00178 00179 cdr.get_version (major, minor); 00180 00181 // Get the state information that we need to use 00182 this->set_state (major, 00183 minor, 00184 generator_parser); 00185 00186 // Write the GIOP header first 00187 if (!this->write_protocol_header (TAO_GIOP_REPLY, 00188 cdr)) 00189 { 00190 if (TAO_debug_level) 00191 ACE_ERROR ((LM_ERROR, 00192 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); 00193 00194 return -1; 00195 } 00196 00197 ACE_DECLARE_NEW_CORBA_ENV; 00198 ACE_TRY 00199 { 00200 // Now call the implementation for the rest of the header 00201 int const result = 00202 generator_parser->write_reply_header (cdr, 00203 params 00204 ACE_ENV_ARG_PARAMETER); 00205 ACE_TRY_CHECK; 00206 00207 if (!result) 00208 { 00209 if (TAO_debug_level > 4) 00210 ACE_ERROR ((LM_ERROR, 00211 ACE_TEXT ("(%P|%t) Error in writing reply ") 00212 ACE_TEXT ("header\n"))); 00213 00214 return -1; 00215 } 00216 } 00217 ACE_CATCHANY 00218 { 00219 if (TAO_debug_level > 4) 00220 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, 00221 ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header")); 00222 00223 return -1; 00224 } 00225 ACE_ENDTRY; 00226 00227 return 0; 00228 } |
|
Write the RequestHeader in to the cdr stream. The underlying implementation of the mesaging should do the right thing. Implements TAO_Pluggable_Messaging. Definition at line 74 of file GIOP_Message_Base.cpp. References ACE_ERROR, ACE_TEXT, ACE_OutputCDR::get_version(), LM_ERROR, set_state(), TAO_debug_level, TAO_GIOP_REQUEST, write_protocol_header(), and TAO_GIOP_Message_Generator_Parser::write_request_header().
00079 { 00080 // Get a parser for us 00081 TAO_GIOP_Message_Generator_Parser *generator_parser = 0; 00082 00083 CORBA::Octet major, minor; 00084 00085 cdr.get_version (major, minor); 00086 00087 // Get the state information that we need to use 00088 this->set_state (major, 00089 minor, 00090 generator_parser); 00091 00092 // Write the GIOP header first 00093 if (!this->write_protocol_header (TAO_GIOP_REQUEST, 00094 cdr)) 00095 { 00096 if (TAO_debug_level) 00097 { 00098 ACE_ERROR ((LM_ERROR, 00099 ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); 00100 } 00101 00102 return -1; 00103 } 00104 00105 // Now call the implementation for the rest of the header 00106 if (!generator_parser->write_request_header (op, 00107 spec, 00108 cdr)) 00109 { 00110 if (TAO_debug_level) 00111 ACE_ERROR ((LM_ERROR, 00112 ACE_TEXT ("(%P|%t) Error in writing request header \n"))); 00113 00114 return -1; 00115 } 00116 00117 return 0; 00118 } |
|
Header length.
Implements TAO_Pluggable_Messaging. Definition at line 1763 of file GIOP_Message_Base.cpp. Referenced by consolidate_fragmented_message().
01764 {
01765 return TAO_GIOP_MESSAGE_HEADER_LEN;
01766 }
|
|
Initialize the underlying state object based on the major and minor revision numbers Implements TAO_Pluggable_Messaging. Definition at line 53 of file GIOP_Message_Base.cpp. References out_stream_, and ACE_OutputCDR::set_version().
00055 { 00056 // Set the giop version of the out stream 00057 this->out_stream_.set_version (major, 00058 minor); 00059 } |
|
Initialize the TAO_Queued_Data from the relevant portions of a GIOP_Message_State. Definition at line 1783 of file GIOP_Message_Base.cpp. References TAO_GIOP_Message_State::byte_order_, TAO_Queued_Data::byte_order_, TAO_GIOP_Message_State::giop_version_, TAO_GIOP_Message_Version::major, TAO_Queued_Data::major_version_, message_type(), TAO_GIOP_Message_Version::minor, TAO_Queued_Data::minor_version_, TAO_GIOP_Message_State::more_fragments_, TAO_Queued_Data::more_fragments_, and TAO_Queued_Data::msg_type_. Referenced by consolidate_node(), extract_next_message(), and parse_next_message().
01786 { 01787 qd->byte_order_ = state.byte_order_; 01788 qd->major_version_ = state.giop_version_.major; 01789 qd->minor_version_ = state.giop_version_.minor; 01790 qd->more_fragments_ = state.more_fragments_; 01791 qd->msg_type_ = this->message_type (state); 01792 } |
|
Is the messaging object ready for processing BiDirectional request/response? Implements TAO_Pluggable_Messaging. Definition at line 1655 of file GIOP_Message_Base.cpp. References ACE_OutputCDR::get_version(), TAO_GIOP_Message_Generator_Parser::is_ready_for_bidirectional(), and set_state().
01656 { 01657 // Get a parser for us 01658 TAO_GIOP_Message_Generator_Parser *parser = 0; 01659 01660 CORBA::Octet major, minor = 0; 01661 01662 msg.get_version (major, minor); 01663 01664 // Get the state information that we need to use 01665 this->set_state (major, 01666 minor, 01667 parser); 01668 01669 // We dont really know.. So ask the generator and parser objects that 01670 // we know. 01671 // @@ TODO: Need to make this faster, instead of making virtual 01672 // call, try todo the check within this class 01673 return parser->is_ready_for_bidirectional (); 01674 } |
|
Creates a new node for the queue with a message block in the node of size sz. Definition at line 1678 of file GIOP_Message_Base.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_ORB_Core::create_input_cdr_data_block(), ACE_Message_Block::duplicate(), TAO_ORB_Core::input_cdr_msgblock_allocator(), LM_DEBUG, LM_ERROR, TAO_Queued_Data::make_queued_data(), ACE_CDR::mb_align(), TAO_Queued_Data::msg_block_, ACE_Data_Block::release(), TAO_Queued_Data::release(), and TAO_debug_level. Referenced by extract_next_message().
01679 { 01680 // Get a node for the queue.. 01681 TAO_Queued_Data *qd = 01682 TAO_Queued_Data::make_queued_data ( 01683 this->orb_core_->transport_message_buffer_allocator ()); 01684 01685 if (qd == 0) 01686 { 01687 if (TAO_debug_level > 0) 01688 { 01689 ACE_ERROR ((LM_ERROR, 01690 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ") 01691 ACE_TEXT ("our of memory, failed to allocate queued data object\n"))); 01692 } 01693 return 0; // NULL pointer 01694 } 01695 01696 // @@todo: We have a similar method in Transport.cpp. Need to see how 01697 // we can factor them out.. 01698 // Make a datablock for the size requested + something. The 01699 // "something" is required because we are going to align the data 01700 // block in the message block. During alignment we could loose some 01701 // bytes. As we may not know how many bytes will be lost, we will 01702 // allocate ACE_CDR::MAX_ALIGNMENT extra. 01703 ACE_Data_Block *db = 01704 this->orb_core_->create_input_cdr_data_block (sz + 01705 ACE_CDR::MAX_ALIGNMENT); 01706 01707 if (db == 0) 01708 { 01709 TAO_Queued_Data::release (qd); 01710 01711 if (TAO_debug_level > 0) 01712 { 01713 ACE_ERROR ((LM_ERROR, 01714 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ") 01715 ACE_TEXT ("out of memory, failed to allocate input data block of size %u\n"), 01716 sz)); 01717 } 01718 return 0; // NULL pointer 01719 } 01720 01721 ACE_Allocator *alloc = 01722 this->orb_core_->input_cdr_msgblock_allocator (); 01723 01724 if (alloc == 0) 01725 { 01726 if (TAO_debug_level >= 8) 01727 { 01728 ACE_DEBUG ((LM_DEBUG, 01729 ACE_TEXT ("(%P|%t) - TAO_GIOP_Message_Base::make_queued_data,") 01730 ACE_TEXT (" no ACE_Allocator defined\n"))); 01731 } 01732 } 01733 01734 01735 ACE_Message_Block mb (db, 01736 0, 01737 alloc); 01738 01739 ACE_Message_Block *new_mb = mb.duplicate (); 01740 01741 if (new_mb == 0) 01742 { 01743 TAO_Queued_Data::release (qd); 01744 db->release(); 01745 01746 if (TAO_debug_level > 0) 01747 { 01748 ACE_ERROR ((LM_ERROR, 01749 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ") 01750 ACE_TEXT ("out of memory, failed to allocate message block\n"))); 01751 } 01752 return 0; 01753 } 01754 01755 ACE_CDR::mb_align (new_mb); 01756 01757 qd->msg_block_ = new_mb; 01758 01759 return qd; 01760 } |
|
Make a GIOP_LOCATEREPLY and hand that over to the transport so that it can be sent over the connection.
Definition at line 1293 of file GIOP_Message_Base.cpp. References ACE_ERROR, ACE_TEXT, LM_ERROR, TAO_OutputCDR::more_fragments(), TAO_GIOP_Locate_Request_Header::request_id(), TAO_Transport::send_message(), TAO_debug_level, TAO_GIOP_LOCATEREPLY, TAO_GIOP_Message_Generator_Parser::write_locate_reply_mesg(), and write_protocol_header(). Referenced by process_locate_request().
01298 { 01299 // Note here we are making the Locate reply header which is *QUITE* 01300 // different from the reply header made by the make_reply () call.. 01301 // Make the GIOP message header 01302 this->write_protocol_header (TAO_GIOP_LOCATEREPLY, 01303 output); 01304 01305 // This writes the header & body 01306 parser->write_locate_reply_mesg (output, 01307 request.request_id (), 01308 status_info); 01309 01310 output.more_fragments (false); 01311 01312 // Send the message 01313 int result = transport->send_message (output, 01314 0, 01315 TAO_Transport::TAO_REPLY); 01316 01317 // Print out message if there is an error 01318 if (result == -1) 01319 { 01320 if (TAO_debug_level > 0) 01321 { 01322 ACE_ERROR ((LM_ERROR, 01323 ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"), 01324 ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply"))); 01325 } 01326 } 01327 01328 return result; 01329 } |
|
|
Accessor for the output CDR stream.
Implements TAO_Pluggable_Messaging. Definition at line 62 of file GIOP_Message_Base.cpp. References out_stream_.
00063 { 00064 return this->out_stream_; 00065 } |
|
Definition at line 369 of file GIOP_Message_Base.cpp. References message_state_, TAO_GIOP_Message_State::parse_message_header(), and TAO_GIOP_Message_State::reset().
00370 { 00371 this->message_state_.reset (); 00372 00373 return this->message_state_.parse_message_header (incoming); 00374 } |
|
Parse the details of the next message from the incoming and initializes attributes of qd.
Implements TAO_Pluggable_Messaging. Definition at line 377 of file GIOP_Message_Base.cpp. References init_queued_data(), ACE_Message_Block::length(), TAO_GIOP_Message_State::message_size(), TAO_Queued_Data::missing_data_, TAO_GIOP_Message_State::parse_message_header(), TAO_GIOP_Message_State::payload_size(), and TAO_MISSING_DATA_UNDEFINED.
00380 { 00381 if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) 00382 { 00383 qd.missing_data_ = TAO_MISSING_DATA_UNDEFINED; 00384 00385 return 0; /* incomplete header */ 00386 } 00387 else 00388 { 00389 TAO_GIOP_Message_State state; 00390 00391 if (state.parse_message_header (incoming) == -1) 00392 { 00393 return -1; 00394 } 00395 00396 const size_t message_size = state.message_size (); /* Header + Payload */ 00397 00398 if (message_size > incoming.length ()) 00399 { 00400 qd.missing_data_ = message_size - incoming.length (); 00401 } 00402 else 00403 { 00404 qd.missing_data_ = 0; 00405 } 00406 00407 /* init out-parameters */ 00408 this->init_queued_data (&qd, state); 00409 mesg_length = TAO_GIOP_MESSAGE_HEADER_LEN 00410 + state.payload_size (); 00411 00412 return 1; /* complete header */ 00413 } 00414 } |
|
Parse GIOP request-id of TAO_InputCDR cdr.
|
|
Parse GIOP request-id of TAO_Queued_Data qd
Definition at line 1795 of file GIOP_Message_Base.cpp. References ACE_BIT_ENABLED, ACE_Message_Block::base(), TAO_Queued_Data::byte_order_, ACE_Message_Block::data_block(), ACE_Data_Block::duplicate(), TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::msg_block_, TAO_Queued_Data::msg_type_, ACE_Message_Block::rd_ptr(), ACE_Message_Block::self_flags(), set_state(), TAO_PLUGGABLE_MESSAGE_CANCELREQUEST, TAO_PLUGGABLE_MESSAGE_FRAGMENT, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_REPLY, TAO_PLUGGABLE_MESSAGE_REQUEST, and ACE_Message_Block::wr_ptr(). Referenced by consolidate_fragmented_message(), and discard_fragmented_message().
01796 { 01797 // Get a parser for us 01798 TAO_GIOP_Message_Generator_Parser *generator_parser = 0; 01799 01800 // Get the state information that we need to use 01801 this->set_state (qd->major_version_, 01802 qd->minor_version_, 01803 generator_parser); 01804 01805 // Get the read and write positions before we steal data. 01806 size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); 01807 size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); 01808 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; 01809 01810 // Create a input CDR stream. We do the following 01811 // 1 - If the incoming message block has a data block with a flag 01812 // DONT_DELETE (for the data block) we create an input CDR 01813 // stream the same way. 01814 // 2 - If the incoming message block had a datablock from heap just 01815 // use it by duplicating it and make the flag 0. 01816 // NOTE: We use the same data block in which we read the message and 01817 // we pass it on to the higher layers of the ORB. So we dont to any 01818 // copies at all here. The same is also done in the higher layers. 01819 01820 ACE_Message_Block::Message_Flags flg = 0; 01821 ACE_Data_Block *db = 0; 01822 01823 // Get the flag in the message block 01824 flg = qd->msg_block_->self_flags (); 01825 01826 if (ACE_BIT_ENABLED (flg, 01827 ACE_Message_Block::DONT_DELETE)) 01828 { 01829 // Use the same datablock 01830 db = qd->msg_block_->data_block (); 01831 } 01832 else 01833 { 01834 // Use a duplicated datablock as the datablock has come off the 01835 // heap. 01836 db = qd->msg_block_->data_block ()->duplicate (); 01837 } 01838 01839 01840 TAO_InputCDR input_cdr (db, 01841 flg, 01842 rd_pos, 01843 wr_pos, 01844 qd->byte_order_, 01845 qd->major_version_, 01846 qd->minor_version_, 01847 this->orb_core_); 01848 01849 if (qd->major_version_ >= 1 && 01850 (qd->minor_version_ == 0 || qd->minor_version_ == 1)) 01851 { 01852 if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST || 01853 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY) 01854 { 01855 IOP::ServiceContextList service_context; 01856 01857 if ( ! (input_cdr >> service_context && 01858 input_cdr >> request_id) ) 01859 { 01860 return -1; 01861 } 01862 01863 return 0; 01864 } 01865 else if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST || 01866 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST || 01867 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) 01868 { 01869 if ( ! (input_cdr >> request_id) ) 01870 { 01871 return -1; 01872 } 01873 01874 return 0; 01875 } 01876 else 01877 { 01878 return -1; 01879 } 01880 } 01881 else 01882 { 01883 if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST || 01884 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY || 01885 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT || 01886 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST || 01887 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST || 01888 qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) 01889 { 01890 // Dealing with GIOP-1.2, the request-id is located directly behind the GIOP-Header. 01891 // This is true for all message types that might be sent in form of fragments or cancel-requests. 01892 if ( ! (input_cdr >> request_id) ) 01893 { 01894 return -1; 01895 } 01896 01897 return 0; 01898 } 01899 else 01900 { 01901 return -1; 01902 } 01903 } 01904 01905 return -1; 01906 } |
|
Processes the GIOP_LOCATE_REQUEST messages.
Definition at line 1156 of file GIOP_Message_Base.cpp. References ACE_CATCHALL, ACE_CATCHANY, ACE_DEBUG, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TEXT, ACE_TRY, ACE_TRY_CHECK, ACE_TRY_THROW, TAO_Request_Dispatcher::dispatch(), TAO_ServerRequest::exception_type(), TAO_ServerRequest::forward_location(), TAO_GIOP_Locate_Status_Msg::forward_location_var, TAO::unbounded_value_sequence< T >::get_buffer(), TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), TAO::unbounded_value_sequence< T >::length(), LM_DEBUG, make_send_locate_reply(), TAO_GIOP_Locate_Request_Header::object_key(), CORBA::Object_var, TAO_Transport::orb_core_, TAO_InputCDR::orb_core_, TAO_GIOP_Message_Generator_Parser::parse_locate_header(), TAO_ORB_Core::request_dispatcher(), TAO_GIOP_Locate_Request_Header::request_id(), TAO_GIOP_Locate_Status_Msg::status, TAO_debug_level, TAO_GIOP_NO_EXCEPTION, TAO_GIOP_OBJECT_FORWARD, TAO_GIOP_OBJECT_HERE, and TAO_GIOP_UNKNOWN_OBJECT. Referenced by process_request_message().
01160 { 01161 // This will extract the request header, set <response_required> as 01162 // appropriate. 01163 TAO_GIOP_Locate_Request_Header locate_request (input, 01164 this->orb_core_); 01165 01166 TAO_GIOP_Locate_Status_Msg status_info; 01167 01168 // Defaulting. 01169 status_info.status = TAO_GIOP_UNKNOWN_OBJECT; 01170 01171 CORBA::Boolean response_required = 1; 01172 01173 ACE_DECLARE_NEW_CORBA_ENV; 01174 ACE_TRY 01175 { 01176 int parse_error = 01177 parser->parse_locate_header (locate_request); 01178 01179 if (parse_error != 0) 01180 { 01181 ACE_TRY_THROW (CORBA::MARSHAL (0, 01182 CORBA::COMPLETED_NO)); 01183 } 01184 01185 TAO::ObjectKey tmp_key (locate_request.object_key ().length (), 01186 locate_request.object_key ().length (), 01187 locate_request.object_key ().get_buffer (), 01188 0); 01189 01190 // Set it to an error state 01191 parse_error = 1; 01192 CORBA::ULong req_id = locate_request.request_id (); 01193 01194 // We will send the reply. The ServerRequest class need not send 01195 // the reply 01196 CORBA::Boolean deferred_reply = true; 01197 TAO_ServerRequest server_request (this, 01198 req_id, 01199 response_required, 01200 deferred_reply, 01201 tmp_key, 01202 "_non_existent", 01203 output, 01204 transport, 01205 this->orb_core_, 01206 parse_error); 01207 01208 if (parse_error != 0) 01209 { 01210 ACE_TRY_THROW (CORBA::MARSHAL (0, 01211 CORBA::COMPLETED_NO)); 01212 } 01213 01214 CORBA::Object_var forward_to; 01215 01216 this->orb_core_->request_dispatcher ()->dispatch ( 01217 this->orb_core_, 01218 server_request, 01219 forward_to 01220 ACE_ENV_ARG_PARAMETER); 01221 ACE_TRY_CHECK; 01222 01223 if (!CORBA::is_nil (forward_to.in ())) 01224 { 01225 status_info.status = TAO_GIOP_OBJECT_FORWARD; 01226 status_info.forward_location_var = forward_to; 01227 if (TAO_debug_level > 0) 01228 ACE_DEBUG ((LM_DEBUG, 01229 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") 01230 ACE_TEXT ("called: forwarding\n"))); 01231 } 01232 else if (server_request.exception_type () == TAO_GIOP_NO_EXCEPTION) 01233 { 01234 // We got no exception, so the object is here. 01235 status_info.status = TAO_GIOP_OBJECT_HERE; 01236 if (TAO_debug_level > 0) 01237 ACE_DEBUG ((LM_DEBUG, 01238 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") 01239 ACE_TEXT ("found\n"))); 01240 } 01241 else 01242 { 01243 status_info.forward_location_var = server_request.forward_location (); 01244 01245 if (!CORBA::is_nil (status_info.forward_location_var.in ())) 01246 { 01247 status_info.status = TAO_GIOP_OBJECT_FORWARD; 01248 ACE_DEBUG ((LM_DEBUG, 01249 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") 01250 ACE_TEXT ("forwarding\n"))); 01251 } 01252 else 01253 { 01254 // Normal exception, so the object is not here 01255 status_info.status = TAO_GIOP_UNKNOWN_OBJECT; 01256 ACE_DEBUG ((LM_DEBUG, 01257 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") 01258 ACE_TEXT ("not here\n"))); 01259 } 01260 } 01261 } 01262 01263 ACE_CATCHANY 01264 { 01265 // Normal exception, so the object is not here 01266 status_info.status = TAO_GIOP_UNKNOWN_OBJECT; 01267 if (TAO_debug_level > 0) 01268 ACE_DEBUG ((LM_DEBUG, 01269 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") 01270 ACE_TEXT ("CORBA exception raised\n"))); 01271 } 01272 #if defined (TAO_HAS_EXCEPTIONS) 01273 ACE_CATCHALL 01274 { 01275 // Normal exception, so the object is not here 01276 status_info.status = TAO_GIOP_UNKNOWN_OBJECT; 01277 if (TAO_debug_level > 0) 01278 ACE_DEBUG ((LM_DEBUG, 01279 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ") 01280 ACE_TEXT ("C++ exception raised\n"))); 01281 } 01282 #endif /* TAO_HAS_EXCEPTIONS */ 01283 ACE_ENDTRY; 01284 01285 return this->make_send_locate_reply (transport, 01286 locate_request, 01287 status_info, 01288 output, 01289 parser); 01290 } |
|
Parse the reply message that we received and return the reply information through reply_info Implements TAO_Pluggable_Messaging. Definition at line 770 of file GIOP_Message_Base.cpp. References ACE_ERROR, ACE_TEXT, ACE_Message_Block::base(), TAO_Queued_Data::byte_order_, ACE_Message_Block::data_block(), TAO_Transport_Mux_Strategy::dispatch_reply(), dump_msg(), TAO_Transport::id(), TAO_Pluggable_Reply_Params::input_cdr_, ACE_Message_Block::length(), LM_ERROR, TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::msg_block_, TAO_Queued_Data::msg_type_, TAO_GIOP_Message_Generator_Parser::parse_locate_reply(), TAO_GIOP_Message_Generator_Parser::parse_reply(), ACE_Message_Block::rd_ptr(), set_state(), TAO_debug_level, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_REPLY, TAO_Transport::tms(), TAO_Pluggable_Reply_Params::transport_, and ACE_Message_Block::wr_ptr().
00773 { 00774 // Get a parser for us 00775 TAO_GIOP_Message_Generator_Parser *generator_parser = 0; 00776 00777 // Get the state information that we need to use 00778 this->set_state (qd->major_version_, 00779 qd->minor_version_, 00780 generator_parser); 00781 00782 // Get the read and write positions before we steal data. 00783 size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); 00784 size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); 00785 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; 00786 00787 if (TAO_debug_level > 0) 00788 this->dump_msg ("recv", 00789 reinterpret_cast <u_char *> (qd->msg_block_->rd_ptr ()), 00790 qd->msg_block_->length ()); 00791 00792 00793 // Create a empty buffer on stack 00794 // NOTE: We use the same data block in which we read the message and 00795 // we pass it on to the higher layers of the ORB. So we dont to any 00796 // copies at all here. 00797 TAO_InputCDR input_cdr (qd->msg_block_->data_block (), 00798 ACE_Message_Block::DONT_DELETE, 00799 rd_pos, 00800 wr_pos, 00801 qd->byte_order_, 00802 qd->major_version_, 00803 qd->minor_version_, 00804 this->orb_core_); 00805 00806 // We know we have some reply message. Check whether it is a 00807 // GIOP_REPLY or GIOP_LOCATE_REPLY to take action. 00808 00809 // Once we send the InputCDR stream we need to just forget about 00810 // the stream and never touch that again for anything. We basically 00811 // loose ownership of the data_block. 00812 int retval = 0; 00813 00814 switch (qd->msg_type_) 00815 { 00816 case TAO_PLUGGABLE_MESSAGE_REPLY: 00817 // Should be taken care by the state specific parsing 00818 retval = 00819 generator_parser->parse_reply (input_cdr, 00820 params); 00821 00822 break; 00823 case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY: 00824 retval = 00825 generator_parser->parse_locate_reply (input_cdr, 00826 params); 00827 break; 00828 default: 00829 retval = -1; 00830 } 00831 00832 if (retval == -1) 00833 return retval; 00834 00835 params.input_cdr_ = &input_cdr; 00836 00837 retval = 00838 params.transport_->tms ()->dispatch_reply (params); 00839 00840 if (retval == -1) 00841 { 00842 // Something really critical happened, we will forget about 00843 // every reply on this connection. 00844 if (TAO_debug_level > 0) 00845 ACE_ERROR ((LM_ERROR, 00846 ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, ") 00847 ACE_TEXT ("dispatch reply failed\n"), 00848 params.transport_->id ())); 00849 } 00850 00851 return retval; 00852 } |
|
Processes the GIOP_REQUEST messages.
Definition at line 933 of file GIOP_Message_Base.cpp. References ACE_ANY_EXCEPTION, ACE_CATCHALL, ACE_CATCHANY, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_PRINT_EXCEPTION, ACE_TEXT, ACE_TRY, ACE_TRY_CHECK, ACE_TRY_THROW, TAO_Transport::assign_translators(), TAO_ORB_Core::codeset_manager(), TAO_Request_Dispatcher::dispatch(), generate_reply_header(), TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), TAO_ORB_Core::is_permanent_forward_condition(), TAO::unbounded_value_sequence< ServiceContext >::length(), LM_ERROR, TAO_OutputCDR::message_attributes(), TAO_OutputCDR::more_fragments(), CORBA::Object_var, TAO_ServerRequest::orb_core(), TAO_Transport::orb_core_, TAO_GIOP_Message_Generator_Parser::parse_request_header(), TAO_Codeset_Manager::process_service_context(), TAO_ServerRequest::reply_service_info(), TAO_Pluggable_Reply_Params_Base::reply_status_, TAO_ORB_Core::request_dispatcher(), TAO_ServerRequest::request_id(), TAO_Pluggable_Reply_Params_Base::request_id_, TAO_ServerRequest::request_service_context(), TAO_ServerRequest::response_expected(), TAO_Transport::send_message(), send_reply_exception(), TAO_Pluggable_Reply_Params_Base::service_context_notowned(), TAO_Pluggable_Reply_Params_Base::svc_ctx_, TAO_debug_level, TAO_GIOP_LOCATION_FORWARD, TAO_GIOP_LOCATION_FORWARD_PERM, and TAO_UNHANDLED_SERVER_CXX_EXCEPTION. Referenced by process_request_message().
00937 { 00938 // This will extract the request header, set <response_required> 00939 // and <sync_with_server> as appropriate. 00940 TAO_ServerRequest request (this, 00941 cdr, 00942 output, 00943 transport, 00944 this->orb_core_); 00945 00946 CORBA::ULong request_id = 0; 00947 CORBA::Boolean response_required = false; 00948 00949 int parse_error = 0; 00950 00951 ACE_DECLARE_NEW_CORBA_ENV; 00952 ACE_TRY 00953 { 00954 parse_error = 00955 parser->parse_request_header (request); 00956 00957 TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); 00958 if (csm) 00959 { 00960 csm->process_service_context(request); 00961 transport->assign_translators(&cdr,&output); 00962 } 00963 00964 // Throw an exception if the 00965 if (parse_error != 0) 00966 ACE_TRY_THROW (CORBA::MARSHAL (0, 00967 CORBA::COMPLETED_NO)); 00968 request_id = request.request_id (); 00969 00970 response_required = request.response_expected (); 00971 00972 CORBA::Object_var forward_to; 00973 00974 /* 00975 * Hook to specialize request processing within TAO 00976 * This hook will be replaced by specialized request 00977 * processing implementation. 00978 */ 00979 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START 00980 00981 // Do this before the reply is sent. 00982 this->orb_core_->request_dispatcher ()->dispatch ( 00983 this->orb_core_, 00984 request, 00985 forward_to 00986 ACE_ENV_ARG_PARAMETER); 00987 ACE_TRY_CHECK; 00988 00989 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_END 00990 00991 if (!CORBA::is_nil (forward_to.in ())) 00992 { 00993 const CORBA::Boolean permanent_forward_condition = 00994 this->orb_core_->is_permanent_forward_condition 00995 (forward_to.in (), 00996 request.request_service_context ()); 00997 00998 // We should forward to another object... 00999 TAO_Pluggable_Reply_Params_Base reply_params; 01000 reply_params.request_id_ = request_id; 01001 reply_params.reply_status_ = 01002 permanent_forward_condition 01003 ? TAO_GIOP_LOCATION_FORWARD_PERM 01004 : TAO_GIOP_LOCATION_FORWARD; 01005 reply_params.svc_ctx_.length (0); 01006 01007 // Send back the reply service context. 01008 reply_params.service_context_notowned (&request.reply_service_info ()); 01009 01010 output.message_attributes (request_id, 01011 0, 01012 TAO_Transport::TAO_REPLY, 01013 0); 01014 01015 // Make the GIOP header and Reply header 01016 this->generate_reply_header (output, 01017 reply_params); 01018 01019 if (!(output << forward_to.in ())) 01020 { 01021 if (TAO_debug_level > 0) 01022 ACE_ERROR ((LM_ERROR, 01023 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ") 01024 ACE_TEXT ("forward reference.\n"))); 01025 01026 return -1; 01027 } 01028 01029 output.more_fragments (false); 01030 01031 int result = transport->send_message (output, 01032 0, 01033 TAO_Transport::TAO_REPLY); 01034 if (result == -1) 01035 { 01036 if (TAO_debug_level > 0) 01037 { 01038 // No exception but some kind of error, yet a 01039 // response is required. 01040 ACE_ERROR ((LM_ERROR, 01041 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ") 01042 ACE_TEXT ("cannot send reply\n"), 01043 ACE_TEXT ("TAO_GIOP_Message_Base::process_request"))); 01044 } 01045 } 01046 return result; 01047 } 01048 } 01049 // Only CORBA exceptions are caught here. 01050 ACE_CATCHANY 01051 { 01052 int result = 0; 01053 01054 if (response_required) 01055 { 01056 result = this->send_reply_exception (transport, 01057 output, 01058 request_id, 01059 &request.reply_service_info (), 01060 &ACE_ANY_EXCEPTION); 01061 if (result == -1) 01062 { 01063 if (TAO_debug_level > 0) 01064 { 01065 ACE_ERROR ((LM_ERROR, 01066 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ") 01067 ACE_TEXT ("cannot send exception\n"), 01068 ACE_TEXT ("process_connector_request ()"))); 01069 01070 ACE_PRINT_EXCEPTION ( 01071 ACE_ANY_EXCEPTION, 01072 "TAO_GIOP_Message_Base::process_request[1]"); 01073 } 01074 } 01075 01076 } 01077 else if (TAO_debug_level > 0) 01078 { 01079 // It is unfortunate that an exception (probably a system 01080 // exception) was thrown by the upcall code (even by the 01081 // user) when the client was not expecting a response. 01082 // However, in this case, we cannot close the connection 01083 // down, since it really isn't the client's fault. 01084 01085 ACE_ERROR ((LM_ERROR, 01086 ACE_TEXT ("(%P|%t) exception thrown ") 01087 ACE_TEXT ("but client is not waiting a response\n"))); 01088 01089 ACE_PRINT_EXCEPTION ( 01090 ACE_ANY_EXCEPTION, 01091 "TAO_GIOP_Message_Base::process_request[2]"); 01092 } 01093 01094 return result; 01095 } 01096 #if defined (TAO_HAS_EXCEPTIONS) 01097 ACE_CATCHALL 01098 { 01099 // @@ TODO some c++ exception or another, but what do we do with 01100 // it? 01101 // We are supposed to map it into a CORBA::UNKNOWN exception. 01102 // BTW, this cannot be detected if using the <env> mapping. If 01103 // we have native exceptions but no support for them in the ORB 01104 // we should still be able to catch it. If we don't have native 01105 // exceptions it couldn't have been raised in the first place! 01106 int result = 0; 01107 01108 if (response_required) 01109 { 01110 CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code 01111 (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0), 01112 CORBA::COMPLETED_MAYBE); 01113 01114 result = this->send_reply_exception (transport, 01115 output, 01116 request_id, 01117 &request.reply_service_info (), 01118 &exception); 01119 if (result == -1) 01120 { 01121 if (TAO_debug_level > 0) 01122 { 01123 ACE_ERROR ((LM_ERROR, 01124 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ") 01125 ACE_TEXT ("%p: ") 01126 ACE_TEXT ("cannot send exception\n"), 01127 ACE_TEXT ("process_request ()"))); 01128 ACE_PRINT_EXCEPTION ( 01129 exception, 01130 "TAO_GIOP_Message_Base::process_request[3]"); 01131 } 01132 } 01133 } 01134 else if (TAO_debug_level > 0) 01135 { 01136 // It is unfotunate that an exception (probably a system 01137 // exception) was thrown by the upcall code (even by the 01138 // user) when the client was not expecting a response. 01139 // However, in this case, we cannot close the connection 01140 // down, since it really isn't the client's fault. 01141 ACE_ERROR ((LM_ERROR, 01142 ACE_TEXT ("(%P|%t|%N|%l) exception thrown ") 01143 ACE_TEXT ("but client is not waiting a response\n"))); 01144 } 01145 01146 return result; 01147 } 01148 #endif /* TAO_HAS_EXCEPTIONS */ 01149 ACE_ENDTRY; 01150 01151 return 0; 01152 } |
|
Process the request message that we have received on the connection Implements TAO_Pluggable_Messaging. Definition at line 647 of file GIOP_Message_Base.cpp. References ACE_BIT_ENABLED, TAO_Transport::assign_translators(), ACE_Message_Block::base(), TAO_Queued_Data::byte_order_, TAO_ORB_Parameters::cdr_memcpy_tradeoff(), ACE_Message_Block::data_block(), dump_msg(), ACE_Data_Block::duplicate(), ACE_Auto_Basic_Ptr< X >::get(), TAO_ORB_Core::input_cdr_dblock_allocator(), TAO_ORB_Core::input_cdr_msgblock_allocator(), ACE_Message_Block::length(), TAO_ORB_Core::lf_strategy(), TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::msg_block_, TAO_Queued_Data::msg_type_, TAO_ORB_Core::orb_params(), process_locate_request(), process_request(), ACE_Message_Block::rd_ptr(), ACE_Message_Block::self_flags(), set_state(), TAO_LF_Strategy::set_upcall_thread(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_REQUEST, and ACE_Message_Block::wr_ptr().
00649 { 00650 // Set the upcall thread 00651 this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ()); 00652 00653 // Get a parser for us 00654 TAO_GIOP_Message_Generator_Parser *generator_parser = 0; 00655 00656 // Get the state information that we need to use 00657 this->set_state (qd->major_version_, 00658 qd->minor_version_, 00659 generator_parser); 00660 00661 // A buffer that we will use to initialise the CDR stream 00662 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) 00663 char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 }; 00664 #else 00665 char repbuf[ACE_CDR::DEFAULT_BUFSIZE]; 00666 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ 00667 00668 // Initialize an output CDR on the stack 00669 // NOTE: Don't jump to a conclusion as to why we are using the 00670 // input_cdr and hence the global pool here. These pools will move 00671 // to the lanes anyway at some point of time. Further, it would have 00672 // been awesome to have this in TSS. But for some reason the cloning 00673 // that happens when the ORB gets flow controlled while writing a 00674 // reply is messing things up. We crash horribly. Doing this adds a 00675 // lock, we need to set things like this -- put stuff in TSS here 00676 // and transfer to global memory when we get flow controlled. We 00677 // need to work on the message block to get it right! 00678 TAO_OutputCDR output (repbuf, 00679 sizeof repbuf, 00680 TAO_ENCAP_BYTE_ORDER, 00681 this->orb_core_->input_cdr_buffer_allocator (), 00682 this->orb_core_->input_cdr_dblock_allocator (), 00683 this->orb_core_->input_cdr_msgblock_allocator (), 00684 this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (), 00685 this->fragmentation_strategy_.get (), 00686 qd->major_version_, 00687 qd->minor_version_); 00688 00689 // Get the read and write positions before we steal data. 00690 size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); 00691 size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); 00692 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; 00693 00694 if (TAO_debug_level > 0) 00695 this->dump_msg ("recv", 00696 reinterpret_cast <u_char *> (qd->msg_block_->rd_ptr ()), 00697 qd->msg_block_->length ()); 00698 00699 00700 // Create a input CDR stream. We do the following 00701 // 1 - If the incoming message block has a data block with a flag 00702 // DONT_DELETE (for the data block) we create an input CDR 00703 // stream the same way. 00704 // 2 - If the incoming message block had a datablock from heap just 00705 // use it by duplicating it and make the flag 0. 00706 // NOTE: We use the same data block in which we read the message and 00707 // we pass it on to the higher layers of the ORB. So we dont to any 00708 // copies at all here. The same is also done in the higher layers. 00709 00710 ACE_Message_Block::Message_Flags flg = 0; 00711 ACE_Data_Block *db = 0; 00712 00713 // Get the flag in the message block 00714 flg = qd->msg_block_->self_flags (); 00715 00716 if (ACE_BIT_ENABLED (flg, 00717 ACE_Message_Block::DONT_DELETE)) 00718 { 00719 // Use the same datablock 00720 db = qd->msg_block_->data_block (); 00721 } 00722 else 00723 { 00724 // Use a duplicated datablock as the datablock has come off the 00725 // heap. 00726 db = qd->msg_block_->data_block ()->duplicate (); 00727 } 00728 00729 00730 TAO_InputCDR input_cdr (db, 00731 flg, 00732 rd_pos, 00733 wr_pos, 00734 qd->byte_order_, 00735 qd->major_version_, 00736 qd->minor_version_, 00737 this->orb_core_); 00738 00739 transport->assign_translators(&input_cdr,&output); 00740 00741 // We know we have some request message. Check whether it is a 00742 // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action. 00743 00744 // Once we send the InputCDR stream we need to just forget about 00745 // the stream and never touch that again for anything. We basically 00746 // loose ownership of the data_block. 00747 00748 switch (qd->msg_type_) 00749 { 00750 case TAO_PLUGGABLE_MESSAGE_REQUEST: 00751 // Should be taken care by the state specific invocations. They 00752 // could raise an exception or write things in the output CDR 00753 // stream 00754 return this->process_request (transport, 00755 input_cdr, 00756 output, 00757 generator_parser); 00758 00759 case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST: 00760 return this->process_locate_request (transport, 00761 input_cdr, 00762 output, 00763 generator_parser); 00764 default: 00765 return -1; 00766 } 00767 } |
|
Reset the messaging the object.
Implements TAO_Pluggable_Messaging. Definition at line 68 of file GIOP_Message_Base.cpp.
00069 {
00070 // no-op
00071 }
|
|
Close a connection, first sending GIOP::CloseConnection.
Definition at line 1439 of file GIOP_Message_Base.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_Transport::close_connection(), dump_msg(), TAO_Transport::id(), LM_DEBUG, LM_ERROR, TAO_GIOP_Message_Version::major, TAO_GIOP_Message_Version::minor, TAO_Transport::send_message_block_chain(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_CLOSECONNECTION, and ACE_Message_Block::wr_ptr().
01442 { 01443 01444 // static CORBA::Octet 01445 // I hate this in every method. Till the time I figure out a way 01446 // around I will have them here hanging around. 01447 const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] = 01448 { 01449 // The following works on non-ASCII platforms, such as MVS (which 01450 // uses EBCDIC). 01451 0x47, // 'G' 01452 0x49, // 'I' 01453 0x4f, // 'O' 01454 0x50, // 'P' 01455 version.major, 01456 version.minor, 01457 TAO_ENCAP_BYTE_ORDER, 01458 TAO_GIOP_CLOSECONNECTION, 01459 0, 0, 0, 0 01460 }; 01461 01462 // It's important that we use a reliable shutdown after we send this 01463 // message, so we know it's received. 01464 // 01465 // @@ should recv and discard queued data for portability; note 01466 // that this won't block (long) since we never set SO_LINGER 01467 01468 this->dump_msg ("send_close_connection", 01469 (const u_char *) close_message, 01470 TAO_GIOP_MESSAGE_HEADER_LEN); 01471 01472 #if 0 01473 // @@CJC I don't think we need this check b/c the transport's send() 01474 // will simply return -1. However, I guess we could create something 01475 // like TAO_Tranport::is_closed() that returns whether the connection 01476 // is already closed. The problem with that, however, is that it's 01477 // entirely possible that is_closed() could return TRUE, and then the 01478 // transport could get closed down btw. the time it gets called and the 01479 // time that the send actually occurs. 01480 ACE_HANDLE which = transport->handle (); 01481 if (which == ACE_INVALID_HANDLE) 01482 { 01483 if (TAO_debug_level > 0) 01484 ACE_DEBUG ((LM_DEBUG, 01485 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -") 01486 ACE_TEXT (" connection already closed\n"))); 01487 return; 01488 } 01489 #endif 01490 01491 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN, 01492 ACE_Message_Block::MB_DATA, 01493 close_message, 01494 0, 01495 0, 01496 ACE_Message_Block::DONT_DELETE, 01497 0); 01498 ACE_Message_Block message_block(&data_block); 01499 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); 01500 01501 size_t bt; 01502 int result = transport->send_message_block_chain (&message_block, bt); 01503 if (result == -1) 01504 { 01505 if (TAO_debug_level > 0) 01506 ACE_ERROR ((LM_ERROR, 01507 ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"), 01508 transport->id (), errno)); 01509 } 01510 01511 transport->close_connection (); 01512 ACE_DEBUG ((LM_DEBUG, 01513 ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"), 01514 transport-> id ())); 01515 01516 } |
|
Send error messages.
Definition at line 1337 of file GIOP_Message_Base.cpp. References ACE_DEBUG, ACE_TEXT, dump_msg(), TAO_Transport::id(), LM_DEBUG, TAO_Transport::send_message_block_chain(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_MESSAGERROR, and ACE_Message_Block::wr_ptr().
01338 { 01339 const char 01340 error_message [TAO_GIOP_MESSAGE_HEADER_LEN] = 01341 { 01342 // The following works on non-ASCII platforms, such as MVS (which 01343 // uses EBCDIC). 01344 0x47, // 'G' 01345 0x49, // 'I' 01346 0x4f, // 'O' 01347 0x50, // 'P' 01348 (CORBA::Octet) 1, // Use the lowest GIOP version 01349 (CORBA::Octet) 0, 01350 TAO_ENCAP_BYTE_ORDER, 01351 TAO_GIOP_MESSAGERROR, 01352 0, 0, 0, 0 01353 }; 01354 01355 // @@ Q: How does this works with GIOP lite? 01356 // A: It doesn't 01357 01358 this->dump_msg ("send_error", 01359 (const u_char *) error_message, 01360 TAO_GIOP_MESSAGE_HEADER_LEN); 01361 01362 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN, 01363 ACE_Message_Block::MB_DATA, 01364 error_message, 01365 0, 01366 0, 01367 ACE_Message_Block::DONT_DELETE, 01368 0); 01369 ACE_Message_Block message_block(&data_block, 01370 ACE_Message_Block::DONT_DELETE); 01371 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); 01372 01373 size_t bt; 01374 int result = transport->send_message_block_chain (&message_block, bt); 01375 if (result == -1) 01376 { 01377 if (TAO_debug_level > 0) 01378 ACE_DEBUG ((LM_DEBUG, 01379 ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"), 01380 transport->id ())); 01381 } 01382 01383 return result; 01384 } |
|
We must send a LocateReply through transport, this request resulted in some kind of exception. Definition at line 1520 of file GIOP_Message_Base.cpp. References CORBA::SystemException::_downcast(), TAO_Pluggable_Reply_Params_Base::argument_flag_, generate_exception_reply(), TAO::unbounded_value_sequence< ServiceContext >::length(), TAO_OutputCDR::more_fragments(), TAO_Pluggable_Reply_Params_Base::reply_status_, TAO_Pluggable_Reply_Params_Base::request_id_, TAO_Transport::send_message(), TAO_Pluggable_Reply_Params_Base::service_context_notowned(), TAO_Pluggable_Reply_Params_Base::svc_ctx_, TAO_GIOP_SYSTEM_EXCEPTION, and TAO_GIOP_USER_EXCEPTION. Referenced by process_request().
01527 { 01528 TAO_Pluggable_Reply_Params_Base reply_params; 01529 reply_params.request_id_ = request_id; 01530 reply_params.svc_ctx_.length (0); 01531 01532 // We are going to send some data 01533 reply_params.argument_flag_ = 1; 01534 01535 // Send back the service context we received. (RTCORBA relies on 01536 // this). 01537 reply_params.service_context_notowned (svc_info); 01538 01539 reply_params.reply_status_ = TAO_GIOP_USER_EXCEPTION; 01540 01541 if (CORBA::SystemException::_downcast (x) != 0) 01542 { 01543 reply_params.reply_status_ = TAO_GIOP_SYSTEM_EXCEPTION; 01544 } 01545 01546 if (this->generate_exception_reply (output, 01547 reply_params, 01548 *x) == -1) 01549 return -1; 01550 01551 output.more_fragments (false); 01552 01553 return transport->send_message (output, 01554 0, 01555 TAO_Transport::TAO_REPLY); 01556 } |
|
Definition at line 2128 of file GIOP_Message_Base.cpp. References ACE_SET_BITS, ACE_OutputCDR::buffer(), ACE_OutputCDR::do_byte_swap(), TAO_OutputCDR::more_fragments(), and TAO_ENCAP_BYTE_ORDER. Referenced by format_message().
02129 { 02130 CORBA::Octet * const buf = 02131 reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ())); 02132 02133 CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; 02134 CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET]; 02135 02136 // Flags for the GIOP protocol header "flags" field. 02137 CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; 02138 02139 // Least significant bit: Byte order 02140 ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ()); 02141 02142 // Second least significant bit: More fragments 02143 // 02144 // Only supported in GIOP 1.1 or better. 02145 if (!(major <= 1 && minor == 0)) 02146 ACE_SET_BITS (flags, msg.more_fragments () << 1); 02147 } |
|
Set the state.
Definition at line 1387 of file GIOP_Message_Base.cpp. References TAO_GIOP_Message_Generator_Parser_Impl::tao_giop_10, TAO_GIOP_Message_Generator_Parser_Impl::tao_giop_11, TAO_GIOP_Message_Generator_Parser_Impl::tao_giop_12, and tao_giop_impl_. Referenced by fragment_header_length(), generate_fragment_header(), generate_locate_request_header(), generate_reply_header(), generate_request_header(), is_ready_for_bidirectional(), parse_request_id(), process_reply_message(), and process_request_message().
01391 { 01392 switch (def_major) 01393 { 01394 case 1: 01395 switch (def_minor) 01396 { 01397 case 0: 01398 gen_parser = 01399 const_cast<TAO_GIOP_Message_Generator_Parser_10 *> ( 01400 &this->tao_giop_impl_.tao_giop_10); 01401 break; 01402 case 1: 01403 gen_parser = 01404 const_cast<TAO_GIOP_Message_Generator_Parser_11 *> ( 01405 &this->tao_giop_impl_.tao_giop_11); 01406 break; 01407 case 2: 01408 gen_parser = 01409 const_cast<TAO_GIOP_Message_Generator_Parser_12 *> ( 01410 &this->tao_giop_impl_.tao_giop_12); 01411 break; 01412 default: 01413 break; 01414 } 01415 break; 01416 default: 01417 break; 01418 } 01419 } |
|
Writes the GIOP header in to msg
Definition at line 893 of file GIOP_Message_Base.cpp. References ACE_OutputCDR::get_version(), ACE_OutputCDR::good_bit(), CORBA::Octet, ACE_OutputCDR::reset(), TAO_GIOP_Message_Type, CORBA::ULong, and ACE_OutputCDR::write_octet_array(). Referenced by generate_fragment_header(), generate_locate_request_header(), generate_reply_header(), generate_request_header(), and make_send_locate_reply().
00895 { 00896 // Reset the message type 00897 msg.reset (); 00898 00899 CORBA::Octet header[12] = 00900 { 00901 // The following works on non-ASCII platforms, such as MVS (which 00902 // uses EBCDIC). 00903 0x47, // 'G' 00904 0x49, // 'I' 00905 0x4f, // 'O' 00906 0x50 // 'P' 00907 }; 00908 00909 CORBA::Octet major, minor = 0; 00910 00911 (void) msg.get_version (major, minor); 00912 00913 header[4] = major; 00914 header[5] = minor; 00915 00916 // "flags" octet, i.e. header[6] will be set up later when message 00917 // is formatted by the transport. 00918 00919 header[7] = CORBA::Octet (type); // Message type 00920 00921 static ACE_CDR::ULong const header_size = 00922 sizeof (header) / sizeof (header[0]); 00923 00924 // Fragmentation should not occur at this point since there are only 00925 // 12 bytes in the stream, and fragmentation may only occur when 00926 // the stream length >= 16. 00927 msg.write_octet_array (header, header_size); 00928 00929 return msg.good_bit (); 00930 } |
|
All Fragments being received are stored on stack in reverse order, last top Definition at line 285 of file GIOP_Message_Base.h. Referenced by consolidate_fragmented_message(), and discard_fragmented_message(). |
|
Strategy that sends data currently marshaled into this TAO_OutputCDR stream if necessary. Definition at line 297 of file GIOP_Message_Base.h. |
|
The message handler object that does reading and parsing of the incoming messages Definition at line 278 of file GIOP_Message_Base.h. Referenced by parse_incoming_messages(). |
|
Cached ORB_Core pointer...
Definition at line 274 of file GIOP_Message_Base.h. |
|
Buffer where the request is placed.
Definition at line 300 of file GIOP_Message_Base.h. Referenced by init(), and out_stream(). |
|
All the implementations of GIOP message generator and parsers.
Definition at line 281 of file GIOP_Message_Base.h. Referenced by set_state(). |