TAO_GIOP_Message_Base Class Reference

Definitions of the GIOP specific stuff. More...

#include <GIOP_Message_Base.h>

Collaboration diagram for TAO_GIOP_Message_Base:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, TAO_Transport *transport, size_t input_cdr_size=ACE_CDR::DEFAULT_BUFSIZE)
 Constructor.
 ~TAO_GIOP_Message_Base (void)
 Dtor.
void init (CORBA::Octet major, CORBA::Octet minor)
int generate_request_header (TAO_Operation_Details &op, TAO_Target_Specification &spec, TAO_OutputCDR &cdr)
int generate_locate_request_header (TAO_Operation_Details &op, TAO_Target_Specification &spec, TAO_OutputCDR &cdr)
 Write the RequestHeader in to the cdr stream.
int generate_reply_header (TAO_OutputCDR &cdr, TAO_Pluggable_Reply_Params_Base &params)
 Write the reply header.
int generate_fragment_header (TAO_OutputCDR &cdr, CORBA::ULong request_id)
int format_message (TAO_OutputCDR &cdr)
int parse_next_message (TAO_Queued_Data &qd, size_t &mesg_length)
int extract_next_message (ACE_Message_Block &incoming, TAO_Queued_Data *&qd)
int consolidate_node (TAO_Queued_Data *qd, ACE_Message_Block &incoming)
 Check whether the node qd needs consolidation from incoming.
int process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd)
int process_reply_message (TAO_Pluggable_Reply_Params &reply_info, TAO_Queued_Data *qd)
int generate_exception_reply (TAO_OutputCDR &cdr, TAO_Pluggable_Reply_Params_Base &params, const CORBA::Exception &x)
 Generate a reply message with the exception ex.
size_t header_length (void) const
 Header length.
size_t fragment_header_length (const TAO_GIOP_Message_Version &giop_version) const
 The header length of a fragment.
TAO_OutputCDRout_stream (void)
int consolidate_fragmented_message (TAO_Queued_Data *qd, TAO_Queued_Data *&msg)
int discard_fragmented_message (const TAO_Queued_Data *cancel_request)
TAO_GIOP_Fragmentation_Strategyfragmentation_strategy (void)
 Outgoing GIOP message fragmentation strategy.
bool is_ready_for_bidirectional (TAO_OutputCDR &msg) const

Protected Attributes

Outgoing GIOP Fragment Related Attributes
These attributes are only used when fragmenting outgoing GIOP requests and replies.

auto_ptr< TAO_GIOP_Fragmentation_Strategyfragmentation_strategy_
TAO_OutputCDR out_stream_
 Buffer where the request is placed.

Private Member Functions

int process_request (TAO_Transport *transport, TAO_InputCDR &input, TAO_OutputCDR &output, TAO_GIOP_Message_Generator_Parser *)
 Processes the GIOP_REQUEST messages.
int process_locate_request (TAO_Transport *transport, TAO_InputCDR &input, TAO_OutputCDR &output, TAO_GIOP_Message_Generator_Parser *)
 Processes the GIOP_LOCATE_REQUEST messages.
TAO_GIOP_Message_Generator_Parserget_parser (const TAO_GIOP_Message_Version &version) const
 Get the parser.
void dump_msg (const char *label, const u_char *ptr, size_t len)
 Print out a debug messages..
int write_protocol_header (GIOP::MsgType t, const TAO_GIOP_Message_Version &version, TAO_OutputCDR &msg)
int make_send_locate_reply (TAO_Transport *transport, TAO_GIOP_Locate_Request_Header &request, TAO_GIOP_Locate_Status_Msg &status, TAO_OutputCDR &output, TAO_GIOP_Message_Generator_Parser *)
int send_error (TAO_Transport *transport)
 Send error messages.
void send_close_connection (const TAO_GIOP_Message_Version &version, TAO_Transport *transport)
 Close a connection, first sending GIOP::CloseConnection.
int send_reply_exception (TAO_Transport *transport, TAO_OutputCDR &cdr, CORBA::ULong request_id, IOP::ServiceContextList *svc_info, CORBA::Exception *x)
int generate_locate_reply_header (TAO_OutputCDR &cdr, TAO_Pluggable_Reply_Params_Base &params)
 Write the locate reply header.
TAO_Queued_Datamake_queued_data (size_t sz)
int parse_request_id (const TAO_Queued_Data *qd, CORBA::ULong &request_id) const
int parse_request_id (const TAO_InputCDR &cdr, CORBA::ULong &request_id) const
void set_giop_flags (TAO_OutputCDR &msg) const

Private Attributes

TAO_ORB_Coreorb_core_
 Cached ORB_Core pointer...
TAO_GIOP_Message_Generator_Parser_Impl tao_giop_impl_
 All the implementations of GIOP message generator and parsers.
TAO::Incoming_Message_Stack fragment_stack_

Detailed Description

Definitions of the GIOP specific stuff.

This class will hold the specific details common to all the GIOP versions. Some of them which are here may be shifted if things start changing between versions.

Definition at line 51 of file GIOP_Message_Base.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_GIOP_Message_Base::TAO_GIOP_Message_Base ( TAO_ORB_Core orb_core,
TAO_Transport transport,
size_t  input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE 
)

Constructor.

Definition at line 28 of file GIOP_Message_Base.cpp.

References ACE_String_Base< CHAR >::c_str(), and ACE_OS::sprintf().

00031   : orb_core_ (orb_core)
00032   , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
00033   , out_stream_ (0,
00034                  input_cdr_size,
00035                  TAO_ENCAP_BYTE_ORDER,
00036                  orb_core->output_cdr_buffer_allocator (),
00037                  orb_core->output_cdr_dblock_allocator (),
00038                  orb_core->output_cdr_msgblock_allocator (),
00039                  orb_core->orb_params ()->cdr_memcpy_tradeoff (),
00040                  fragmentation_strategy_.get (),
00041                  TAO_DEF_GIOP_MAJOR,
00042                  TAO_DEF_GIOP_MINOR)
00043 {
00044 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00045   const int nibbles = 2 * sizeof (size_t);
00046   char hex_string[nibbles + 1];
00047   ACE_OS::sprintf (hex_string,
00048                    "%8.8X",
00049                    transport->id ());
00050   hex_string[nibbles] = '\0';
00051   ACE_CString monitor_name ("OutputCDR_");
00052   monitor_name += hex_string;
00053   this->out_stream_.register_monitor (monitor_name.c_str ());
00054 #endif /* TAO_HAS_MONITOR_POINTS==1 */
00055 }

TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base ( void   ) 

Dtor.

Definition at line 58 of file GIOP_Message_Base.cpp.

References out_stream_.

00059 {
00060 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00061   this->out_stream_.unregister_monitor ();
00062 #endif /* TAO_HAS_MONITOR_POINTS==1 */
00063 }


Member Function Documentation

int TAO_GIOP_Message_Base::consolidate_fragmented_message ( TAO_Queued_Data qd,
TAO_Queued_Data *&  msg 
)

Consolidate fragmented message with associated fragments, being stored withi this class. If reliable transport is used (like TCP) fragments are partialy ordered on stack, last fragment on top. Otherwise If un-reliable transport is used (like UDP) fragments may be dis-ordered, and must be ordered before consolidation.

Returns:
0 on success and msg points to consolidated message, 1 if there are still fragmens outstanding, in case of error -1 is being returned. In any case qd must be released by method implementation.

Definition at line 1685 of file GIOP_Message_Base.cpp.

References TAO_Queued_Data::consolidate(), ACE_Message_Block::cont(), fragment_header_length(), TAO_Queued_Data::giop_version(), header_length(), ACE_Message_Block::length(), TAO_GIOP_Message_Version::major, TAO_GIOP_Message_Version::major_version(), TAO_GIOP_Message_Version::minor, TAO_GIOP_Message_Version::minor_version(), TAO_Queued_Data::more_fragments(), TAO_Queued_Data::msg_block(), parse_request_id(), ACE_Message_Block::rd_ptr(), and TAO_Queued_Data::release().

01688 {
01689   TAO::Incoming_Message_Stack reverse_stack;
01690 
01691   TAO_Queued_Data *tail = 0;
01692   TAO_Queued_Data *head = 0;
01693 
01694   //
01695   // CONSOLIDATE FRAGMENTED MESSAGE
01696   //
01697 
01698   // check for error-condition
01699   if (qd == 0)
01700     {
01701       return -1;
01702     }
01703 
01704   if (qd->giop_version ().major == 1 && qd->giop_version ().minor == 0)
01705     {
01706       TAO_Queued_Data::release (qd);
01707       return -1; // error: GIOP-1.0 does not support fragments
01708     }
01709 
01710   // If this is not the last fragment, push it onto stack for later processing
01711   if (qd->more_fragments ())
01712     {
01713       this->fragment_stack_.push (qd);
01714 
01715       msg = 0;   // no consolidated message available yet
01716       return 1;  // status: more messages expected.
01717     }
01718 
01719   tail = qd;  // init
01720 
01721   // Add the current message block to the end of the chain
01722   // after adjusting the read pointer to skip the header(s)
01723   size_t const header_adjustment =
01724     this->header_length () +
01725     this->fragment_header_length (tail->giop_version ().major_version ());
01726 
01727   if (tail->msg_block ()->length () < header_adjustment)
01728     {
01729       // buffer length not sufficient
01730       TAO_Queued_Data::release (qd);
01731       return -1;
01732     }
01733 
01734   // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2
01735   if (tail->giop_version ().major_version () == 1 && tail->giop_version ().minor_version () == 1)
01736     {
01737       // GIOP-1.1
01738 
01739       while (this->fragment_stack_.pop (head) != -1)
01740         {
01741           if (head->more_fragments () &&
01742               head->giop_version ().major_version () == 1 &&
01743               head->giop_version ().minor_version () == 1 &&
01744               head->msg_block ()->length () >= header_adjustment)
01745             {
01746               // adjust the read-pointer, skip the fragment header
01747               tail->msg_block ()->rd_ptr(header_adjustment);
01748 
01749               head->msg_block ()->cont (tail->msg_block ());
01750 
01751               tail->msg_block (0);
01752 
01753               TAO_Queued_Data::release (tail);
01754 
01755               tail = head;
01756             }
01757           else
01758             {
01759               reverse_stack.push (head);
01760             }
01761         }
01762     }
01763   else
01764     {
01765       // > GIOP-1.2
01766 
01767       CORBA::ULong tmp_request_id = 0;
01768       if (this->parse_request_id (tail, tmp_request_id) == -1)
01769         {
01770           return -1;
01771         }
01772 
01773       const CORBA::ULong request_id = tmp_request_id;
01774 
01775       while (this->fragment_stack_.pop (head) != -1)
01776         {
01777           CORBA::ULong head_request_id = 0;
01778           int parse_status = 0;
01779 
01780           if (head->more_fragments () &&
01781               head->giop_version ().major_version () >= 1 &&
01782               head->giop_version ().minor_version () >= 2 &&
01783               head->msg_block ()->length () >= header_adjustment &&
01784               (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
01785               request_id == head_request_id)
01786             {
01787               // adjust the read-pointer, skip the fragment header
01788               tail->msg_block ()->rd_ptr(header_adjustment);
01789 
01790               head->msg_block ()->cont (tail->msg_block ());
01791 
01792               tail->msg_block (0);
01793 
01794               TAO_Queued_Data::release (tail);
01795 
01796               tail = head;
01797             }
01798           else
01799             {
01800               if (parse_status == -1)
01801                 {
01802                   TAO_Queued_Data::release (head);
01803                   return -1;
01804                 }
01805 
01806               reverse_stack.push (head);
01807             }
01808         }
01809     }
01810 
01811   // restore stack
01812   while (reverse_stack.pop (head) != -1)
01813     {
01814       this->fragment_stack_.push (head);
01815     }
01816 
01817   if (tail->consolidate () == -1)
01818     {
01819       // memory allocation failed
01820       TAO_Queued_Data::release (tail);
01821       return -1;
01822     }
01823 
01824   // set out value
01825   msg = tail;
01826 
01827   return 0;
01828 }

int TAO_GIOP_Message_Base::consolidate_node ( TAO_Queued_Data qd,
ACE_Message_Block incoming 
)

Check whether the node qd needs consolidation from incoming.

Definition at line 422 of file GIOP_Message_Base.cpp.

References ACE_ERROR, ace_min(), ACE_TEXT, ACE_Message_Block::copy(), ACE_CDR::grow(), ACE_Message_Block::length(), LM_ERROR, TAO_GIOP_Message_State::message_size(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO_GIOP_Message_State::parse_message_header(), TAO_GIOP_Message_State::payload_size(), ACE_Message_Block::rd_ptr(), TAO_Queued_Data::state(), TAO_debug_level, TAO_GIOP_MESSAGE_HEADER_LEN, and TAO_MISSING_DATA_UNDEFINED.

00424 {
00425   // Look to see whether we had atleast parsed the GIOP header ...
00426   if (qd->missing_data () == TAO_MISSING_DATA_UNDEFINED)
00427     {
00428       // The data length that has been stuck in there during the last
00429       // read ....
00430       size_t const len = qd->msg_block ()->length ();
00431 
00432       // paranoid check
00433       if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
00434         {
00435           // inconsistency - this code should have parsed the header
00436           // so far
00437           return -1;
00438         }
00439 
00440       // We know that we would have space for
00441       // TAO_GIOP_MESSAGE_HEADER_LEN here.  So copy that much of data
00442       // from the <incoming> into the message block in <qd>
00443       size_t const available = incoming.length ();
00444       size_t const desired   = TAO_GIOP_MESSAGE_HEADER_LEN - len;
00445       size_t const n_copy    = ace_min (available, desired);
00446 
00447       // paranoid check, but would cause endless looping
00448       if (n_copy == 0)
00449         {
00450           return -1;
00451         }
00452 
00453       if (qd->msg_block ()->copy (incoming.rd_ptr (), n_copy) == -1)
00454         {
00455            return -1;
00456         }
00457 
00458       // Move the rd_ptr () in the incoming message block..
00459       incoming.rd_ptr (n_copy);
00460 
00461       // verify sufficient data to parse GIOP header
00462       if (qd->msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00463         {
00464           return 0; /* continue */
00465         }
00466 
00467       TAO_GIOP_Message_State state;
00468 
00469       // Parse the message header now...
00470       if (state.parse_message_header (*qd->msg_block ()) == -1)
00471         {
00472           if (TAO_debug_level > 0)
00473             {
00474               ACE_ERROR ((LM_ERROR,
00475                   ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
00476                   ACE_TEXT ("error parsing header\n") ));
00477             }
00478           return -1;
00479         }
00480       // Now grow the message block so that we can copy the rest of
00481       // the data, the message_block must be able to hold complete message
00482        if (ACE_CDR::grow (qd->msg_block (),
00483                           state.message_size ()) == -1)  /* GIOP_Header + Payload */
00484          {
00485            // on mem-error get rid of context silently, try to avoid
00486            // system calls that might allocate additional memory
00487            return -1;
00488          }
00489 
00490       // Copy the pay load..
00491       // Calculate the bytes that needs to be copied in the queue...
00492       size_t copy_len = state.payload_size ();
00493 
00494       // If the data that needs to be copied is more than that is
00495       // available to us ..
00496       if (copy_len > incoming.length ())
00497         {
00498           // Calculate the missing data..
00499           qd->missing_data (copy_len - incoming.length ());
00500 
00501           // Set the actual possible copy_len that is available...
00502           copy_len = incoming.length ();
00503         }
00504       else
00505         {
00506           qd->missing_data (0);
00507         }
00508 
00509       // ..now we are set to copy the right amount of data to the
00510       // node..
00511       if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00512         {
00513           return -1;
00514         }
00515 
00516       // Set the <rd_ptr> of the <incoming>..
00517       incoming.rd_ptr (copy_len);
00518 
00519       // Get the other details...
00520       qd->state (state);
00521     }
00522   else
00523     {
00524       // @todo: Need to abstract this out to a seperate method...
00525       size_t copy_len = qd->missing_data ();
00526 
00527       if (copy_len > incoming.length ())
00528         {
00529           // Calculate the missing data..
00530           qd->missing_data (copy_len - incoming.length ());
00531 
00532           // Set the actual possible copy_len that is available...
00533           copy_len = incoming.length ();
00534         }
00535 
00536       // paranoid check for endless-event-looping
00537       if (copy_len == 0)
00538         {
00539           return -1;
00540         }
00541 
00542       // Copy the right amount of data in to the node...
00543       if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00544         {
00545           return -1;
00546         }
00547 
00548       // Set the <rd_ptr> of the <incoming>..
00549       qd->msg_block ()->rd_ptr (copy_len);
00550 
00551     }
00552 
00553   return 0;
00554 }

int TAO_GIOP_Message_Base::discard_fragmented_message ( const TAO_Queued_Data cancel_request  ) 

Discard all fragments associated to request-id encoded in cancel_request. This operation will never be called concurrently by multiplpe threads nor concurrently to consolidate_fragmented_message

Returns:
-1 on failure, 0 on success, 1 no fragment on stack relating to CancelRequest.

Definition at line 1832 of file GIOP_Message_Base.cpp.

References fragment_stack_, TAO_Queued_Data::giop_version(), TAO_GIOP_Message_Version::major_version(), TAO_GIOP_Message_Version::minor_version(), TAO_Queued_Data::msg_type(), parse_request_id(), and TAO_Queued_Data::release().

01833 {
01834   // We must extract the specific request-id from message-buffer
01835   // and remove all fragments from stack that match this request-id.
01836 
01837   TAO::Incoming_Message_Stack reverse_stack;
01838 
01839   CORBA::ULong cancel_request_id;
01840 
01841   if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
01842     {
01843       return -1;
01844     }
01845 
01846   TAO_Queued_Data *head = 0;
01847 
01848   // Revert stack
01849   while (this->fragment_stack_.pop (head) != -1)
01850     {
01851       reverse_stack.push (head);
01852     }
01853 
01854   bool discard_all_GIOP11_messages = false;
01855 
01856   // Now we are able to process message in order they have arrived.
01857   // If the cancel_request_id matches to GIOP-1.1 message, all succeeding
01858   // fragments belong to this message and must be discarded.
01859   // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the
01860   // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments
01861   // having encoded the request id will be discarded.
01862   while (reverse_stack.pop (head) != -1)
01863     {
01864       CORBA::ULong head_request_id;
01865 
01866       if (head->giop_version ().major_version () == 1 &&
01867           head->giop_version ().minor_version () <= 1 &&
01868           head->msg_type () != GIOP::Fragment && // GIOP11 fragment does not provide request id
01869           this->parse_request_id (head, head_request_id) >= 0 &&
01870           cancel_request_id == head_request_id)
01871         {
01872           TAO_Queued_Data::release (head);
01873           discard_all_GIOP11_messages = true;
01874         }
01875       else if (head->giop_version ().major_version () == 1 &&
01876                head->giop_version ().minor_version () <= 1 &&
01877                discard_all_GIOP11_messages)
01878         {
01879           TAO_Queued_Data::release (head);
01880         }
01881       else if (head->giop_version ().major_version () >= 1 &&
01882                head->giop_version ().minor_version () >= 2 &&
01883                this->parse_request_id (head, head_request_id) >= 0 &&
01884                cancel_request_id == head_request_id)
01885         {
01886           TAO_Queued_Data::release (head);
01887         }
01888       else
01889         {
01890           this->fragment_stack_.push (head);
01891         }
01892     }
01893 
01894   return 0;
01895 }

void TAO_GIOP_Message_Base::dump_msg ( const char *  label,
const u_char *  ptr,
size_t  len 
) [private]

Print out a debug messages..

Definition at line 1417 of file GIOP_Message_Base.cpp.

References ACE_DEBUG, ACE_HEX_DUMP, ACE_TEXT, ACE_TEXT_CHAR_TO_TCHAR, LM_DEBUG, ACE_CDR::swap_4(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_MESSAGE_FLAGS_OFFSET, TAO_GIOP_MESSAGE_HEADER_LEN, TAO_GIOP_MESSAGE_TYPE_OFFSET, TAO_GIOP_VERSION_MAJOR_OFFSET, and TAO_GIOP_VERSION_MINOR_OFFSET.

Referenced by format_message(), process_reply_message(), process_request_message(), send_close_connection(), and send_error().

01420 {
01421     static const char digits[] = "0123456789ABCD";
01422     static const char *names[] =
01423     {
01424       "Request",
01425       "Reply",
01426       "CancelRequest",
01427       "LocateRequest",
01428       "LocateReply",
01429       "CloseConnection",
01430       "MessageError",
01431       "Fragment"
01432     };
01433 
01434     // Message name.
01435     const char *message_name = "UNKNOWN MESSAGE";
01436     u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET];
01437     if (slot < sizeof (names) / sizeof (names[0]))
01438       message_name = names[slot];
01439 
01440     // Byte order.
01441     int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
01442 
01443     // Get the version info
01444     CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
01445     CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
01446 
01447     // request/reply id.
01448     CORBA::ULong tmp = 0;
01449     CORBA::ULong *id = &tmp;
01450     char *tmp_id = 0;
01451 
01452     if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Request ||
01453         ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Reply ||
01454         ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Fragment)
01455       {
01456         if (major == 1 && minor < 2)
01457           {
01458             // @@ Only works if ServiceContextList is empty....
01459             tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN  + 4);
01460           }
01461         else
01462           {
01463             tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
01464           }
01465 #if !defined (ACE_DISABLE_SWAP_ON_READ)
01466       if (byte_order == TAO_ENCAP_BYTE_ORDER)
01467         {
01468           id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01469         }
01470       else
01471         {
01472           ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
01473         }
01474 #else
01475       id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01476 #endif /* ACE_DISABLE_SWAP_ON_READ */
01477 
01478       }
01479 
01480     // Print.
01481     ACE_DEBUG ((LM_DEBUG,
01482                 "TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
01483                 "%s GIOP v%c.%c msg, %d data bytes, %s endian, "
01484                 "Type %s[%u]\n",
01485                 ACE_TEXT_CHAR_TO_TCHAR (label),
01486                 digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
01487                 digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
01488                 len - TAO_GIOP_MESSAGE_HEADER_LEN ,
01489                 (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
01490                 ACE_TEXT_CHAR_TO_TCHAR(message_name),
01491                 *id));
01492 
01493     if (TAO_debug_level >= 10)
01494       ACE_HEX_DUMP ((LM_DEBUG,
01495                       (const char *) ptr,
01496                       len,
01497                       ACE_TEXT ("GIOP message")));
01498 }

int TAO_GIOP_Message_Base::extract_next_message ( ACE_Message_Block incoming,
TAO_Queued_Data *&  qd 
)

Extract the details of the next message from the incoming through qd. Returns 0 if the message header could not be parsed completely, returns a 1 if the message header could be parsed completely and returns -1 on error.

Definition at line 334 of file GIOP_Message_Base.cpp.

References ACE_ERROR, ace_max(), ACE_TEXT, ACE_Message_Block::copy(), ACE_CDR::DEFAULT_BUFSIZE, ACE_Message_Block::length(), LM_ERROR, make_queued_data(), TAO_GIOP_Message_State::message_size(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO_GIOP_Message_State::parse_message_header(), ACE_Message_Block::rd_ptr(), TAO_Queued_Data::state(), TAO_debug_level, TAO_GIOP_MESSAGE_HEADER_LEN, and TAO_MISSING_DATA_UNDEFINED.

Referenced by TAO_Transport::handle_input_parse_extra_messages().

00336 {
00337   if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00338     {
00339       if (incoming.length () > 0)
00340         {
00341           // Optimize memory usage, we dont know actual message size
00342           // so far, but allocate enough space to hold small GIOP
00343           // messages. This way we avoid expensive "grow" operation
00344           // for small messages.
00345           size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
00346 
00347           // Make a node which has at least message block of the size
00348           // of MESSAGE_HEADER_LEN.
00349           size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
00350                                            default_buf_size);
00351 
00352           // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN
00353 
00354           qd = this->make_queued_data (buf_size);
00355 
00356           if (qd == 0)
00357             {
00358               if (TAO_debug_level > 0)
00359                 {
00360                   ACE_ERROR((LM_ERROR,
00361                              ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00362                              ACE_TEXT ("out of memory\n")));
00363                 }
00364               return -1;
00365             }
00366 
00367           qd->msg_block ()->copy (incoming.rd_ptr (), incoming.length ());
00368 
00369           incoming.rd_ptr (incoming.length ()); // consume all available data
00370 
00371           qd->missing_data (TAO_MISSING_DATA_UNDEFINED);
00372         }
00373       else
00374         {
00375           // handle not initialized variables
00376           qd = 0;  // reset
00377         }
00378 
00379       return 0;
00380     }
00381 
00382   TAO_GIOP_Message_State state;
00383   if (state.parse_message_header (incoming) == -1)
00384     {
00385       return -1;
00386     }
00387 
00388   size_t copying_len = state.message_size ();
00389 
00390   qd = this->make_queued_data (copying_len);
00391 
00392   if (qd == 0)
00393     {
00394       if (TAO_debug_level > 0)
00395         {
00396           ACE_ERROR ((LM_ERROR,
00397                       ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00398                       ACE_TEXT ("out of memory\n")));
00399         }
00400       return -1;
00401     }
00402 
00403   if (copying_len > incoming.length ())
00404     {
00405       qd->missing_data (copying_len - incoming.length ());
00406       copying_len = incoming.length ();
00407     }
00408   else
00409     {
00410       qd->missing_data (0);
00411     }
00412 
00413   qd->msg_block ()->copy (incoming.rd_ptr (), copying_len);
00414 
00415   incoming.rd_ptr (copying_len);
00416   qd->state (state);
00417 
00418   return 1;
00419 }

int TAO_GIOP_Message_Base::format_message ( TAO_OutputCDR cdr  ) 

Format the message. As we have not written the message length in the header, we make use of this oppurtunity to insert and format the message.

Definition at line 239 of file GIOP_Message_Base.cpp.

References ACE_NEW_RETURN, ACE_OutputCDR::begin(), ACE_OutputCDR::buffer(), ACE_CDR::consolidate(), ACE_Message_Block::cont(), ACE_OutputCDR::do_byte_swap(), dump_msg(), set_giop_flags(), ACE_CDR::swap_4(), TAO_debug_level, TAO_GIOP_MESSAGE_HEADER_LEN, TAO_GIOP_MESSAGE_SIZE_OFFSET, and ACE_OutputCDR::total_length().

00240 {
00241   // Ptr to first buffer.
00242   char *buf = const_cast <char*> (stream.buffer ());
00243 
00244   this->set_giop_flags (stream);
00245 
00246   // Length of all buffers.
00247   size_t const total_len = stream.total_length ();
00248 
00249   // NOTE: Here would also be a fine place to calculate a digital
00250   // signature for the message and place it into a preallocated slot
00251   // in the "ServiceContext".  Similarly, this is a good spot to
00252   // encrypt messages (or just the message bodies) if that's needed in
00253   // this particular environment and that isn't handled by the
00254   // networking infrastructure (e.g., IPSEC).
00255 
00256   CORBA::ULong bodylen = static_cast <CORBA::ULong>
00257                            (total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
00258 
00259 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00260   *(reinterpret_cast <CORBA::ULong *> (buf +
00261                          TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00262 #else
00263   if (!stream.do_byte_swap ())
00264     *(reinterpret_cast <CORBA::ULong *>
00265                            (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00266   else
00267     ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen),
00268                      buf + TAO_GIOP_MESSAGE_SIZE_OFFSET);
00269 #endif /* ACE_ENABLE_SWAP_ON_WRITE */
00270 
00271   if (TAO_debug_level >= 5)
00272     {
00273       // Check whether the output cdr stream is build up of multiple
00274       // messageblocks. If so, consolidate them to one block that can be
00275       // dumped
00276       ACE_Message_Block* consolidated_block = 0;
00277       if (stream.begin()->cont () != 0)
00278         {
00279           ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0);
00280           ACE_CDR::consolidate (consolidated_block, stream.begin ());
00281           buf = (char *) (consolidated_block->rd_ptr ());
00282         }
00283       ///
00284       this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len);
00285 
00286       //
00287       delete consolidated_block;
00288       consolidated_block = 0;
00289       //
00290     }
00291 
00292   return 0;
00293 }

size_t TAO_GIOP_Message_Base::fragment_header_length ( const TAO_GIOP_Message_Version giop_version  )  const

The header length of a fragment.

Definition at line 1567 of file GIOP_Message_Base.cpp.

References TAO_GIOP_Message_Generator_Parser::fragment_header_length(), and get_parser().

Referenced by consolidate_fragmented_message().

01569 {
01570   // Get the parser we need to use
01571   TAO_GIOP_Message_Generator_Parser *generator_parser =
01572     this->get_parser (giop_version);
01573 
01574   return generator_parser->fragment_header_length ();
01575 }

TAO_GIOP_Fragmentation_Strategy * TAO_GIOP_Message_Base::fragmentation_strategy ( void   ) 

Outgoing GIOP message fragmentation strategy.

Definition at line 1898 of file GIOP_Message_Base.cpp.

References fragmentation_strategy_, and ACE_Auto_Basic_Ptr< X >::get().

Referenced by TAO_ServerRequest::send_cached_reply(), and TAO_ServerRequest::tao_send_reply_exception().

01899 {
01900   return this->fragmentation_strategy_.get ();
01901 }

int TAO_GIOP_Message_Base::generate_exception_reply ( TAO_OutputCDR cdr,
TAO_Pluggable_Reply_Params_Base params,
const CORBA::Exception x 
)

Generate a reply message with the exception ex.

Definition at line 760 of file GIOP_Message_Base.cpp.

References CORBA::Exception::_tao_encode(), ACE_DEBUG, ACE_TEXT, generate_reply_header(), LM_DEBUG, and TAO_debug_level.

00764 {
00765   // A new try/catch block, but if something goes wrong now we have no
00766   // hope, just abort.
00767 
00768   try
00769     {
00770       // Make the GIOP & reply header.
00771       this->generate_reply_header (cdr, params);
00772       x._tao_encode (cdr);
00773     }
00774   catch (const ::CORBA::Exception&)
00775     {
00776       // Now we know that while handling the error an other error
00777       // happened -> no hope, close connection.
00778 
00779       // Close the handle.
00780       if (TAO_debug_level > 0)
00781         ACE_DEBUG ((LM_DEBUG,
00782                   ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
00783                   ACE_TEXT ("generate_exception_reply ()\n")));
00784       return -1;
00785     }
00786 
00787   return 0;
00788 }

int TAO_GIOP_Message_Base::generate_fragment_header ( TAO_OutputCDR cdr,
CORBA::ULong  request_id 
)

Definition at line 207 of file GIOP_Message_Base.cpp.

References ACE_ERROR, ACE_TEXT, get_parser(), TAO_OutputCDR::get_version(), LM_ERROR, TAO_GIOP_Message_Version::major, TAO_GIOP_Message_Version::minor, TAO_debug_level, and TAO_GIOP_Message_Generator_Parser::write_fragment_header().

00209 {
00210   TAO_GIOP_Message_Version giop_version;
00211 
00212   cdr.get_version (giop_version);
00213 
00214   // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
00215   // supports them in 1.2 or better since GIOP 1.1 fragments do not
00216   // have a fragment message header.
00217   if (giop_version.major == 1 && giop_version.minor < 2)
00218     return -1;
00219 
00220   // Get the parser we need to use
00221   TAO_GIOP_Message_Generator_Parser *generator_parser =
00222     this->get_parser (giop_version);
00223 
00224   // Write the GIOP header first
00225   if (!this->write_protocol_header (GIOP::Fragment, giop_version, cdr)
00226       || !generator_parser->write_fragment_header (cdr, request_id))
00227     {
00228       if (TAO_debug_level)
00229         ACE_ERROR ((LM_ERROR,
00230                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00231 
00232       return -1;
00233     }
00234 
00235   return 0;
00236 }

int TAO_GIOP_Message_Base::generate_locate_reply_header ( TAO_OutputCDR cdr,
TAO_Pluggable_Reply_Params_Base params 
) [private]

Write the locate reply header.

Definition at line 1501 of file GIOP_Message_Base.cpp.

01504 {
01505   return 0;
01506 }

int TAO_GIOP_Message_Base::generate_locate_request_header ( TAO_Operation_Details op,
TAO_Target_Specification spec,
TAO_OutputCDR cdr 
)

Write the RequestHeader in to the cdr stream.

Definition at line 119 of file GIOP_Message_Base.cpp.

References ACE_ERROR, ACE_TEXT, get_parser(), TAO_OutputCDR::get_version(), LM_ERROR, TAO_Operation_Details::request_id(), TAO_debug_level, and TAO_GIOP_Message_Generator_Parser::write_locate_request_header().

00123 {
00124   TAO_GIOP_Message_Version giop_version;
00125 
00126   cdr.get_version (giop_version);
00127 
00128   // Get the parser we need to use
00129   TAO_GIOP_Message_Generator_Parser *generator_parser =
00130     this->get_parser (giop_version);
00131 
00132   // Write the GIOP header first
00133   if (!this->write_protocol_header (GIOP::LocateRequest, giop_version, cdr))
00134     {
00135       if (TAO_debug_level)
00136         ACE_ERROR ((LM_ERROR,
00137                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00138 
00139       return -1;
00140     }
00141 
00142   // Now call the implementation for the rest of the header
00143   if (!generator_parser->write_locate_request_header
00144       (op.request_id (), spec, cdr))
00145     {
00146       if (TAO_debug_level)
00147         ACE_ERROR ((LM_ERROR,
00148                     ACE_TEXT ("(%P|%t) Error in writing locate request header \n")));
00149 
00150 
00151       return -1;
00152 
00153     }
00154 
00155   return 0;
00156 }

int TAO_GIOP_Message_Base::generate_reply_header ( TAO_OutputCDR cdr,
TAO_Pluggable_Reply_Params_Base params 
)

Write the reply header.

Definition at line 159 of file GIOP_Message_Base.cpp.

References ACE_ERROR, ACE_TEXT, get_parser(), TAO_OutputCDR::get_version(), LM_ERROR, TAO_debug_level, and TAO_GIOP_Message_Generator_Parser::write_reply_header().

Referenced by generate_exception_reply(), TAO_ServerRequest::init_reply(), process_request(), and TAO_ServerRequest::send_no_exception_reply().

00162 {
00163   TAO_GIOP_Message_Version giop_version;
00164 
00165   cdr.get_version (giop_version);
00166 
00167   // Write the GIOP header first
00168   if (!this->write_protocol_header (GIOP::Reply, giop_version, cdr))
00169     {
00170       if (TAO_debug_level)
00171         ACE_ERROR ((LM_ERROR,
00172                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00173 
00174       return -1;
00175     }
00176 
00177   try
00178     {
00179       // Get the parser we need to use
00180       TAO_GIOP_Message_Generator_Parser *generator_parser =
00181         this->get_parser (giop_version);
00182 
00183       // Now call the implementation for the rest of the header
00184       if (!generator_parser->write_reply_header (cdr, params))
00185         {
00186           if (TAO_debug_level > 4)
00187             ACE_ERROR ((LM_ERROR,
00188                         ACE_TEXT ("(%P|%t) Error in writing reply ")
00189                         ACE_TEXT ("header\n")));
00190 
00191           return -1;
00192         }
00193     }
00194   catch (const ::CORBA::Exception& ex)
00195     {
00196       if (TAO_debug_level > 4)
00197         ex._tao_print_exception (
00198           ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
00199 
00200       return -1;
00201     }
00202 
00203   return 0;
00204 }

int TAO_GIOP_Message_Base::generate_request_header ( TAO_Operation_Details op,
TAO_Target_Specification spec,
TAO_OutputCDR cdr 
)

Write the RequestHeader in to the cdr stream. The underlying implementation of the mesaging should do the right thing.

Definition at line 79 of file GIOP_Message_Base.cpp.

References ACE_ERROR, ACE_TEXT, get_parser(), TAO_OutputCDR::get_version(), LM_ERROR, TAO_debug_level, and TAO_GIOP_Message_Generator_Parser::write_request_header().

00083 {
00084   // Get a parser for us
00085   TAO_GIOP_Message_Version giop_version;
00086 
00087   cdr.get_version (giop_version);
00088 
00089   // Write the GIOP header first
00090   if (!this->write_protocol_header (GIOP::Request, giop_version, cdr))
00091     {
00092       if (TAO_debug_level)
00093         {
00094           ACE_ERROR ((LM_ERROR,
00095                       ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00096         }
00097 
00098       return -1;
00099     }
00100 
00101   // Get the parser we need to use
00102   TAO_GIOP_Message_Generator_Parser *generator_parser =
00103     this->get_parser (giop_version);
00104 
00105   // Now call the implementation for the rest of the header
00106   if (!generator_parser->write_request_header (op, spec, cdr))
00107     {
00108       if (TAO_debug_level)
00109         ACE_ERROR ((LM_ERROR,
00110                     ACE_TEXT ("(%P|%t) Error in writing request header \n")));
00111 
00112       return -1;
00113     }
00114 
00115   return 0;
00116 }

TAO_GIOP_Message_Generator_Parser * TAO_GIOP_Message_Base::get_parser ( const TAO_GIOP_Message_Version version  )  const [private]

Get the parser.

Definition at line 1248 of file GIOP_Message_Base.cpp.

References CORBA::COMPLETED_NO, TAO_GIOP_Message_Version::major, TAO_GIOP_Message_Version::minor, TAO_GIOP_Message_Generator_Parser_Impl::tao_giop_10, TAO_GIOP_Message_Generator_Parser_Impl::tao_giop_11, TAO_GIOP_Message_Generator_Parser_Impl::tao_giop_12, and tao_giop_impl_.

Referenced by fragment_header_length(), generate_fragment_header(), generate_locate_request_header(), generate_reply_header(), generate_request_header(), is_ready_for_bidirectional(), process_reply_message(), and process_request_message().

01250 {
01251   switch (version.major)
01252     {
01253     case 1:
01254       switch (version.minor)
01255         {
01256         case 0:
01257           return
01258             const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
01259                                      &this->tao_giop_impl_.tao_giop_10);
01260           break;
01261         case 1:
01262           return
01263             const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
01264                                      &this->tao_giop_impl_.tao_giop_11);
01265           break;
01266         case 2:
01267           return
01268             const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
01269                                      &this->tao_giop_impl_.tao_giop_12);
01270           break;
01271         default:
01272           throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
01273           break;
01274         }
01275       break;
01276     default:
01277       throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
01278       break;
01279     }
01280 }

size_t TAO_GIOP_Message_Base::header_length ( void   )  const

Header length.

Definition at line 1561 of file GIOP_Message_Base.cpp.

References TAO_GIOP_MESSAGE_HEADER_LEN.

Referenced by TAO_Transport::allocate_partial_message_block(), consolidate_fragmented_message(), and TAO_Transport::handle_input_parse_data().

01562 {
01563   return TAO_GIOP_MESSAGE_HEADER_LEN;
01564 }

void TAO_GIOP_Message_Base::init ( CORBA::Octet  major,
CORBA::Octet  minor 
)

Initialize the underlying state object based on the major and minor revision numbers

Definition at line 66 of file GIOP_Message_Base.cpp.

References out_stream_, and ACE_OutputCDR::set_version().

Referenced by TAO_Transport::messaging_init().

00067 {
00068   // Set the giop version of the out stream
00069   this->out_stream_.set_version (major, minor);
00070 }

bool TAO_GIOP_Message_Base::is_ready_for_bidirectional ( TAO_OutputCDR msg  )  const

Is the messaging object ready for processing BiDirectional request/response?

Definition at line 1509 of file GIOP_Message_Base.cpp.

References get_parser(), TAO_OutputCDR::get_version(), and TAO_GIOP_Message_Generator_Parser::is_ready_for_bidirectional().

Referenced by TAO_IIOP_Transport::generate_request_header().

01510 {
01511   TAO_GIOP_Message_Version giop_version;
01512 
01513   msg.get_version (giop_version);
01514 
01515   // Get the parser we need to use
01516   TAO_GIOP_Message_Generator_Parser *generator_parser =
01517     this->get_parser (giop_version);
01518 
01519   // We dont really know.. So ask the generator and parser objects that
01520   // we know.
01521   // @@ TODO: Need to make this faster, instead of making virtual
01522   // call, try todo the check within this class
01523   return generator_parser->is_ready_for_bidirectional ();
01524 }

TAO_Queued_Data * TAO_GIOP_Message_Base::make_queued_data ( size_t  sz  )  [private]

Creates a new node for the queue with a message block in the node of size sz.

Definition at line 1528 of file GIOP_Message_Base.cpp.

References ACE_ERROR, ACE_TEXT, TAO_ORB_Core::create_input_cdr_data_block(), TAO_ORB_Core::input_cdr_msgblock_allocator(), LM_ERROR, TAO_Queued_Data::make_queued_data(), ACE_CDR::MAX_ALIGNMENT, orb_core_, ACE_Data_Block::release(), and TAO_debug_level.

Referenced by extract_next_message().

01529 {
01530   // Make a datablock for the size requested + something. The
01531   // "something" is required because we are going to align the data
01532   // block in the message block. During alignment we could loose some
01533   // bytes. As we may not know how many bytes will be lost, we will
01534   // allocate ACE_CDR::MAX_ALIGNMENT extra.
01535   ACE_Data_Block *db =
01536     this->orb_core_->create_input_cdr_data_block (sz +
01537                                                   ACE_CDR::MAX_ALIGNMENT);
01538 
01539   TAO_Queued_Data *qd =
01540     TAO_Queued_Data::make_queued_data (
01541                  this->orb_core_->transport_message_buffer_allocator (),
01542                  this->orb_core_->input_cdr_msgblock_allocator (),
01543                  db);
01544 
01545   if (qd == 0)
01546     {
01547       if (TAO_debug_level > 0)
01548         {
01549           ACE_ERROR ((LM_ERROR,
01550             ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01551             ACE_TEXT ("out of memory, failed to allocate queued data object\n")));
01552         }
01553       db->release ();
01554       return 0; // NULL pointer
01555     }
01556 
01557   return qd;
01558 }

int TAO_GIOP_Message_Base::make_send_locate_reply ( TAO_Transport transport,
TAO_GIOP_Locate_Request_Header request,
TAO_GIOP_Locate_Status_Msg status,
TAO_OutputCDR output,
TAO_GIOP_Message_Generator_Parser  
) [private]

Make a GIOP_LOCATEREPLY and hand that over to the transport so that it can be sent over the connection.

Note:
As on date 1.1 & 1.2 seem to have similar headers. Till an unmanageable difference comes let them be implemented here.

Definition at line 1153 of file GIOP_Message_Base.cpp.

References ACE_ERROR, ACE_TEXT, TAO_OutputCDR::get_version(), LM_ERROR, TAO_OutputCDR::more_fragments(), TAO_GIOP_Locate_Request_Header::request_id(), TAO_Transport::send_message(), TAO_debug_level, TAO_Transport::TAO_REPLY, TAO_GIOP_Message_Generator_Parser::write_locate_reply_mesg(), and write_protocol_header().

Referenced by process_locate_request().

01158 {
01159   TAO_GIOP_Message_Version giop_version;
01160   output.get_version (giop_version);
01161 
01162   // Note here we are making the Locate reply header which is *QUITE*
01163   // different from the reply header made by the make_reply () call..
01164   // Make the GIOP message header
01165   this->write_protocol_header (GIOP::LocateReply, giop_version, output);
01166 
01167   // This writes the header & body
01168   parser->write_locate_reply_mesg (output,
01169                                    request.request_id (),
01170                                    status_info);
01171 
01172   output.more_fragments (false);
01173 
01174   // Send the message
01175   int const result = transport->send_message (output,
01176                                               0,
01177                                               TAO_Transport::TAO_REPLY);
01178 
01179   // Print out message if there is an error
01180   if (result == -1)
01181     {
01182       if (TAO_debug_level > 0)
01183         {
01184           ACE_ERROR ((LM_ERROR,
01185                       ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
01186                       ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
01187         }
01188     }
01189 
01190   return result;
01191 }

TAO_OutputCDR & TAO_GIOP_Message_Base::out_stream ( void   ) 

Definition at line 73 of file GIOP_Message_Base.cpp.

References out_stream_.

Referenced by TAO_Transport::out_stream().

00074 {
00075   return this->out_stream_;
00076 }

int TAO_GIOP_Message_Base::parse_next_message ( TAO_Queued_Data qd,
size_t &  mesg_length 
)

Parse the details of the next message from the incoming and initializes attributes of qd.

Return values:
0 If the message header could not be parsed completely,
1 If the message header could be parsed completely
-1 On error.

Definition at line 296 of file GIOP_Message_Base.cpp.

References ACE_Message_Block::length(), TAO_GIOP_Message_State::message_size(), TAO_Queued_Data::missing_data(), TAO_Queued_Data::msg_block(), TAO_GIOP_Message_State::parse_message_header(), TAO_Queued_Data::state(), TAO_GIOP_MESSAGE_HEADER_LEN, and TAO_MISSING_DATA_UNDEFINED.

00298 {
00299   if (qd.msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00300     {
00301       qd.missing_data (TAO_MISSING_DATA_UNDEFINED);
00302 
00303       return 0; /* incomplete header */
00304     }
00305   else
00306     {
00307       TAO_GIOP_Message_State state;
00308 
00309       if (state.parse_message_header (*(qd.msg_block ())) == -1)
00310         {
00311           return -1;
00312         }
00313 
00314       size_t const message_size = state.message_size (); /* Header + Payload */
00315 
00316       if (message_size > qd.msg_block ()->length ())
00317         {
00318           qd.missing_data (message_size - qd.msg_block ()->length ());
00319         }
00320       else
00321         {
00322           qd.missing_data (0);
00323         }
00324 
00325       /* init out-parameters */
00326       qd.state (state);
00327       mesg_length = message_size;
00328 
00329       return 1; /* complete header */
00330     }
00331 }

int TAO_GIOP_Message_Base::parse_request_id ( const TAO_InputCDR cdr,
CORBA::ULong request_id 
) const [private]

Parse GIOP request-id of TAO_InputCDR cdr.

Returns:
0 on success, otherwise -1

int TAO_GIOP_Message_Base::parse_request_id ( const TAO_Queued_Data qd,
CORBA::ULong request_id 
) const [private]

Parse GIOP request-id of TAO_Queued_Data qd

Returns:
0 on success, otherwise -1

Definition at line 1578 of file GIOP_Message_Base.cpp.

References ACE_BIT_ENABLED, ACE_Message_Block::base(), TAO_Queued_Data::byte_order(), ACE_Message_Block::data_block(), ACE_Message_Block::DONT_DELETE, ACE_Data_Block::duplicate(), TAO_Queued_Data::giop_version(), TAO_GIOP_Message_Version::major, TAO_GIOP_Message_Version::major_version(), TAO_GIOP_Message_Version::minor, TAO_GIOP_Message_Version::minor_version(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), orb_core_, ACE_Message_Block::rd_ptr(), ACE_Message_Block::self_flags(), TAO_GIOP_MESSAGE_HEADER_LEN, and ACE_Message_Block::wr_ptr().

Referenced by consolidate_fragmented_message(), and discard_fragmented_message().

01580 {
01581   // Get the read and write positions before we steal data.
01582   size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
01583   size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
01584   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
01585 
01586   // Create a input CDR stream. We do the following
01587   //  1 - If the incoming message block has a data block with a flag
01588   //      DONT_DELETE  (for the data block) we create an input CDR
01589   //      stream the same way.
01590   //  2 - If the incoming message block had a datablock from heap just
01591   //      use it by duplicating it and make the flag 0.
01592   // NOTE: We use the same data block in which we read the message and
01593   // we pass it on to the higher layers of the ORB. So we dont to any
01594   // copies at all here. The same is also done in the higher layers.
01595 
01596   ACE_Message_Block::Message_Flags flg = 0;
01597   ACE_Data_Block *db = 0;
01598 
01599   // Get the flag in the message block
01600   flg = qd->msg_block ()->self_flags ();
01601 
01602   if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
01603     {
01604       // Use the same datablock
01605       db = qd->msg_block ()->data_block ();
01606     }
01607   else
01608     {
01609       // Use a duplicated datablock as the datablock has come off the
01610       // heap.
01611       db = qd->msg_block ()->data_block ()->duplicate ();
01612     }
01613 
01614   TAO_InputCDR input_cdr (db,
01615                           flg,
01616                           rd_pos,
01617                           wr_pos,
01618                           qd->byte_order (),
01619                           qd->giop_version ().major_version (),
01620                           qd->giop_version ().minor_version (),
01621                           this->orb_core_);
01622 
01623   if (qd->giop_version ().major == 1 &&
01624       (qd->giop_version ().minor == 0 || qd->giop_version ().minor == 1))
01625     {
01626       switch (qd->msg_type ())
01627       {
01628         case GIOP::Request:
01629         case GIOP::Reply:
01630           {
01631             IOP::ServiceContextList service_context;
01632 
01633             if ((input_cdr >> service_context)
01634                 && (input_cdr >> request_id))
01635               {
01636                 return 0;
01637               }
01638           }
01639           break;
01640         case GIOP::CancelRequest:
01641         case GIOP::LocateRequest:
01642         case GIOP::LocateReply:
01643           {
01644             if ((input_cdr >> request_id))
01645               {
01646                 return 0;
01647               }
01648           }
01649           break;
01650         default:
01651           break;
01652       }
01653     }
01654   else
01655     {
01656       switch (qd->msg_type ())
01657       {
01658         case GIOP::Request:
01659         case GIOP::Reply:
01660         case GIOP::Fragment:
01661         case GIOP::CancelRequest:
01662         case GIOP::LocateRequest:
01663         case GIOP::LocateReply:
01664           {
01665             // Dealing with GIOP-1.2, the request-id is located directly
01666             // behind the GIOP-Header.  This is true for all message
01667             // types that might be sent in form of fragments or
01668             // cancel-requests.
01669             if ((input_cdr >> request_id))
01670               {
01671                 return 0;
01672               }
01673           }
01674           break;
01675         default:
01676           break;
01677       }
01678     }
01679 
01680   return -1;
01681 }

int TAO_GIOP_Message_Base::process_locate_request ( TAO_Transport transport,
TAO_InputCDR input,
TAO_OutputCDR output,
TAO_GIOP_Message_Generator_Parser  
) [private]

Processes the GIOP_LOCATE_REQUEST messages.

Definition at line 1038 of file GIOP_Message_Base.cpp.

References ACE_DEBUG, ACE_TEXT, CORBA::COMPLETED_NO, TAO_Request_Dispatcher::dispatch(), TAO_GIOP_Locate_Status_Msg::forward_location_var, TAO_ServerRequest::is_forwarded(), LM_DEBUG, make_send_locate_reply(), CORBA::NO_EXCEPTION, TAO_GIOP_Locate_Request_Header::object_key(), orb_core_, TAO_Transport::orb_core_, TAO_InputCDR::orb_core_, TAO_GIOP_Message_Generator_Parser::parse_locate_header(), TAO_ServerRequest::reply_status(), TAO_ORB_Core::request_dispatcher(), TAO_GIOP_Locate_Request_Header::request_id(), TAO_GIOP_Locate_Status_Msg::status, and TAO_debug_level.

Referenced by process_request_message().

01042 {
01043   // This will extract the request header, set <response_required> as
01044   // appropriate.
01045   TAO_GIOP_Locate_Request_Header locate_request (input, this->orb_core_);
01046 
01047   TAO_GIOP_Locate_Status_Msg status_info;
01048 
01049   // Defaulting.
01050   status_info.status = GIOP::UNKNOWN_OBJECT;
01051 
01052   CORBA::Boolean response_required = true;
01053 
01054   try
01055     {
01056       int parse_error = parser->parse_locate_header (locate_request);
01057 
01058       if (parse_error != 0)
01059         {
01060           throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01061         }
01062 
01063       TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
01064                               locate_request.object_key ().length (),
01065                               locate_request.object_key ().get_buffer (),
01066                               0);
01067 
01068       // Set it to an error state
01069       parse_error = 1;
01070       CORBA::ULong req_id = locate_request.request_id ();
01071 
01072       // We will send the reply. The ServerRequest class need not send
01073       // the reply
01074       CORBA::Boolean deferred_reply = true;
01075       TAO_ServerRequest server_request (this,
01076                                         req_id,
01077                                         response_required,
01078                                         deferred_reply,
01079                                         tmp_key,
01080                                         "_non_existent",
01081                                         output,
01082                                         transport,
01083                                         this->orb_core_,
01084                                         parse_error);
01085 
01086       if (parse_error != 0)
01087         {
01088           throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01089         }
01090 
01091       CORBA::Object_var forward_to;
01092 
01093       this->orb_core_->request_dispatcher ()->dispatch (
01094           this->orb_core_,
01095           server_request,
01096           forward_to);
01097 
01098       if (server_request.is_forwarded ())
01099         {
01100           status_info.status = GIOP::OBJECT_FORWARD;
01101           status_info.forward_location_var = forward_to;
01102           if (TAO_debug_level > 0)
01103             ACE_DEBUG ((LM_DEBUG,
01104                         ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01105                         ACE_TEXT ("called: forwarding\n")));
01106         }
01107       else if (server_request.reply_status () == GIOP::NO_EXCEPTION)
01108         {
01109           // We got no exception, so the object is here.
01110           status_info.status = GIOP::OBJECT_HERE;
01111           if (TAO_debug_level > 0)
01112             ACE_DEBUG ((LM_DEBUG,
01113                         ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01114                         ACE_TEXT ("found\n")));
01115         }
01116       else
01117         {
01118           // Normal exception, so the object is not here
01119           status_info.status = GIOP::UNKNOWN_OBJECT;
01120           ACE_DEBUG ((LM_DEBUG,
01121                       ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01122                       ACE_TEXT ("not here\n")));
01123         }
01124     }
01125 
01126   catch (const ::CORBA::Exception&)
01127     {
01128       // Normal exception, so the object is not here
01129       status_info.status = GIOP::UNKNOWN_OBJECT;
01130       if (TAO_debug_level > 0)
01131         ACE_DEBUG ((LM_DEBUG,
01132                     ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01133                     ACE_TEXT ("CORBA exception raised\n")));
01134     }
01135   catch (...)
01136     {
01137       // Normal exception, so the object is not here
01138       status_info.status = GIOP::UNKNOWN_OBJECT;
01139       if (TAO_debug_level > 0)
01140         ACE_DEBUG ((LM_DEBUG,
01141                     ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
01142                     ACE_TEXT ("C++ exception raised\n")));
01143     }
01144 
01145   return this->make_send_locate_reply (transport,
01146                                        locate_request,
01147                                        status_info,
01148                                        output,
01149                                        parser);
01150 }

int TAO_GIOP_Message_Base::process_reply_message ( TAO_Pluggable_Reply_Params reply_info,
TAO_Queued_Data qd 
)

Parse the reply message that we received and return the reply information through reply_info

Definition at line 682 of file GIOP_Message_Base.cpp.

References ACE_ERROR, ACE_TEXT, TAO_Transport::assign_translators(), ACE_Message_Block::base(), TAO_Queued_Data::byte_order(), ACE_Message_Block::data_block(), TAO_Transport_Mux_Strategy::dispatch_reply(), ACE_Message_Block::DONT_DELETE, dump_msg(), get_parser(), TAO_Queued_Data::giop_version(), TAO_Transport::id(), TAO_Pluggable_Reply_Params::input_cdr_, ACE_Message_Block::length(), LM_ERROR, TAO_GIOP_Message_Version::major_version(), TAO_GIOP_Message_Version::minor_version(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), orb_core_, TAO_GIOP_Message_Generator_Parser::parse_locate_reply(), TAO_GIOP_Message_Generator_Parser::parse_reply(), ACE_Message_Block::rd_ptr(), TAO_debug_level, TAO_GIOP_MESSAGE_HEADER_LEN, TAO_Transport::tms(), TAO_Pluggable_Reply_Params::transport_, and ACE_Message_Block::wr_ptr().

00685 {
00686   // Get the parser we need to use
00687   TAO_GIOP_Message_Generator_Parser *generator_parser =
00688     this->get_parser (qd->giop_version ());
00689 
00690   // Get the read and write positions before we steal data.
00691   size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00692   size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00693   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00694 
00695   if (TAO_debug_level >= 5)
00696     {
00697       this->dump_msg ("recv",
00698                       reinterpret_cast <u_char *> (qd->msg_block ()->rd_ptr ()),
00699                       qd->msg_block ()->length ());
00700     }
00701 
00702 
00703   // Create a empty buffer on stack
00704   // NOTE: We use the same data block in which we read the message and
00705   // we pass it on to the higher layers of the ORB. So we dont to any
00706   // copies at all here.
00707   TAO_InputCDR input_cdr (qd->msg_block ()->data_block (),
00708                           ACE_Message_Block::DONT_DELETE,
00709                           rd_pos,
00710                           wr_pos,
00711                           qd->byte_order (),
00712                           qd->giop_version ().major_version (),
00713                           qd->giop_version ().minor_version (),
00714                           this->orb_core_);
00715 
00716   // We know we have some reply message. Check whether it is a
00717   // GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
00718 
00719   // Once we send the InputCDR stream we need to just forget about
00720   // the stream and never touch that again for anything. We basically
00721   // loose ownership of the data_block.
00722   int retval = 0;
00723 
00724   switch (qd->msg_type ())
00725     {
00726     case GIOP::Reply:
00727       // Should be taken care by the state specific parsing
00728       retval = generator_parser->parse_reply (input_cdr, params);
00729       break;
00730     case GIOP::LocateReply:
00731       retval = generator_parser->parse_locate_reply (input_cdr, params);
00732       break;
00733     default:
00734       retval = -1;
00735     }
00736 
00737   if (retval == -1)
00738     return retval;
00739 
00740   params.input_cdr_ = &input_cdr;
00741   params.transport_->assign_translators (params.input_cdr_, 0);
00742 
00743   retval = params.transport_->tms ()->dispatch_reply (params);
00744 
00745   if (retval == -1)
00746     {
00747       // Something really critical happened, we will forget about
00748       // every reply on this connection.
00749       if (TAO_debug_level > 0)
00750         ACE_ERROR ((LM_ERROR,
00751                     ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_reply_message, ")
00752                     ACE_TEXT ("dispatch reply failed\n"),
00753                     params.transport_->id ()));
00754     }
00755 
00756   return retval;
00757 }

int TAO_GIOP_Message_Base::process_request ( TAO_Transport transport,
TAO_InputCDR input,
TAO_OutputCDR output,
TAO_GIOP_Message_Generator_Parser  
) [private]

Processes the GIOP_REQUEST messages.

Definition at line 828 of file GIOP_Message_Base.cpp.

References CORBA::SystemException::_tao_minor_code(), CORBA::Exception::_tao_print_exception(), ACE_ERROR, ACE_TEXT, TAO_Transport::assign_translators(), TAO_ORB_Core::codeset_manager(), CORBA::COMPLETED_MAYBE, CORBA::COMPLETED_NO, TAO_Request_Dispatcher::dispatch(), generate_reply_header(), TAO_Pseudo_Var_T< T >::in(), TAO_ServerRequest::is_forwarded(), LM_ERROR, TAO_OutputCDR::message_attributes(), TAO_OutputCDR::more_fragments(), TAO_ServerRequest::orb_core(), orb_core_, TAO_Transport::orb_core_, TAO_GIOP_Message_Generator_Parser::parse_request_header(), TAO_Codeset_Manager::process_service_context(), TAO_ServerRequest::reply_service_info(), TAO_Pluggable_Reply_Params_Base::reply_status(), TAO_ORB_Core::request_dispatcher(), TAO_ServerRequest::request_id(), TAO_Pluggable_Reply_Params_Base::request_id_, TAO_ServerRequest::request_service_context(), TAO_ServerRequest::response_expected(), TAO_Transport::send_message(), send_reply_exception(), TAO_Pluggable_Reply_Params_Base::service_context_notowned(), TAO_Pluggable_Reply_Params_Base::svc_ctx_, TAO_debug_level, TAO_Transport::TAO_REPLY, and TAO_UNHANDLED_SERVER_CXX_EXCEPTION.

00833 {
00834   // This will extract the request header, set <response_required>
00835   // and <sync_with_server> as appropriate.
00836   TAO_ServerRequest request (this,
00837                              cdr,
00838                              output,
00839                              transport,
00840                              this->orb_core_);
00841 
00842   CORBA::ULong request_id = 0;
00843   CORBA::Boolean response_required = false;
00844   int parse_error = 0;
00845 
00846   try
00847     {
00848       parse_error = parser->parse_request_header (request);
00849 
00850       // Throw an exception if the
00851       if (parse_error != 0)
00852         throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
00853 
00854       TAO_Codeset_Manager *csm = request.orb_core ()->codeset_manager ();
00855       if (csm)
00856         {
00857           csm->process_service_context (request);
00858           transport->assign_translators (&cdr, &output);
00859         }
00860 
00861       request_id = request.request_id ();
00862 
00863       response_required = request.response_expected ();
00864 
00865       CORBA::Object_var forward_to;
00866 
00867       /*
00868        * Hook to specialize request processing within TAO
00869        * This hook will be replaced by specialized request
00870        * processing implementation.
00871        */
00872 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START
00873 
00874       // Do this before the reply is sent.
00875       this->orb_core_->request_dispatcher ()->dispatch (
00876           this->orb_core_,
00877           request,
00878           forward_to);
00879 
00880 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_END
00881 
00882       if (request.is_forwarded ())
00883         {
00884           CORBA::Boolean const permanent_forward_condition =
00885               this->orb_core_->is_permanent_forward_condition
00886               (forward_to.in (),
00887                request.request_service_context ());
00888 
00889           // We should forward to another object...
00890           TAO_Pluggable_Reply_Params_Base reply_params;
00891           reply_params.request_id_ = request_id;
00892           reply_params.reply_status (
00893               permanent_forward_condition
00894               ? GIOP::LOCATION_FORWARD_PERM
00895               : GIOP::LOCATION_FORWARD);
00896           reply_params.svc_ctx_.length (0);
00897 
00898           // Send back the reply service context.
00899           reply_params.service_context_notowned (
00900             &request.reply_service_info ());
00901 
00902           output.message_attributes (request_id,
00903                                      0,
00904                                      TAO_Transport::TAO_REPLY,
00905                                      0);
00906 
00907           // Make the GIOP header and Reply header
00908           this->generate_reply_header (output, reply_params);
00909 
00910           if (!(output << forward_to.in ()))
00911             {
00912               if (TAO_debug_level > 0)
00913                 ACE_ERROR ((LM_ERROR,
00914                             ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
00915                             ACE_TEXT ("forward reference.\n")));
00916 
00917               return -1;
00918             }
00919 
00920           output.more_fragments (false);
00921 
00922           int result = transport->send_message (output,
00923                                                 0,
00924                                                 TAO_Transport::TAO_REPLY);
00925           if (result == -1)
00926             {
00927               if (TAO_debug_level > 0)
00928                 {
00929                   // No exception but some kind of error, yet a
00930                   // response is required.
00931                   ACE_ERROR ((LM_ERROR,
00932                               ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
00933                               ACE_TEXT ("cannot send reply\n"),
00934                               ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
00935                 }
00936             }
00937           return result;
00938         }
00939     }
00940   // Only CORBA exceptions are caught here.
00941   catch ( ::CORBA::Exception& ex)
00942     {
00943       int result = 0;
00944 
00945       if (response_required)
00946         {
00947           result = this->send_reply_exception (transport,
00948                                                output,
00949                                                request_id,
00950                                                &request.reply_service_info (),
00951                                                &ex);
00952           if (result == -1)
00953             {
00954               if (TAO_debug_level > 0)
00955                 {
00956                   ACE_ERROR ((LM_ERROR,
00957                               ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
00958                               ACE_TEXT ("cannot send exception\n"),
00959                               ACE_TEXT ("process_connector_request ()")));
00960 
00961                   ex._tao_print_exception (
00962                     "TAO_GIOP_Message_Base::process_request[1]");
00963                 }
00964             }
00965 
00966         }
00967       else if (TAO_debug_level > 0)
00968         {
00969           // It is unfortunate that an exception (probably a system
00970           // exception) was thrown by the upcall code (even by the
00971           // user) when the client was not expecting a response.
00972           // However, in this case, we cannot close the connection
00973           // down, since it really isn't the client's fault.
00974 
00975           ACE_ERROR ((LM_ERROR,
00976                       ACE_TEXT ("(%P|%t) exception thrown ")
00977                       ACE_TEXT ("but client is not waiting a response\n")));
00978 
00979           ex._tao_print_exception (
00980             "TAO_GIOP_Message_Base::process_request[2]");
00981         }
00982 
00983       return result;
00984     }
00985   catch (...)
00986     {
00987       // @@ TODO some c++ exception or another, but what do we do with
00988       //    it?
00989       // We are supposed to map it into a CORBA::UNKNOWN exception.
00990       // BTW, this cannot be detected if using the <env> mapping.  If
00991       // we have native exceptions but no support for them in the ORB
00992       // we should still be able to catch it.  If we don't have native
00993       // exceptions it couldn't have been raised in the first place!
00994       int result = 0;
00995 
00996       if (response_required)
00997         {
00998           CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code
00999                                     (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0),
01000                                     CORBA::COMPLETED_MAYBE);
01001 
01002           if (this->send_reply_exception (transport,
01003                                           output,
01004                                           request_id,
01005                                           &request.reply_service_info (),
01006                                           &exception) == -1
01007               && TAO_debug_level > 0)
01008             {
01009               ACE_ERROR ((LM_ERROR,
01010                           ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
01011                           ACE_TEXT ("%p: ")
01012                           ACE_TEXT ("cannot send exception\n"),
01013                           ACE_TEXT ("process_request ()")));
01014               exception._tao_print_exception (
01015                 "TAO_GIOP_Message_Base::process_request[3]");
01016             }
01017         }
01018       else if (TAO_debug_level > 0)
01019         {
01020           // It is unfotunate that an exception (probably a system
01021           // exception) was thrown by the upcall code (even by the
01022           // user) when the client was not expecting a response.
01023           // However, in this case, we cannot close the connection
01024           // down, since it really isn't the client's fault.
01025           ACE_ERROR ((LM_ERROR,
01026                       ACE_TEXT ("TAO (%P|%t) exception thrown ")
01027                       ACE_TEXT ("but client is not waiting a response\n")));
01028         }
01029 
01030       return result;
01031     }
01032 
01033   return 0;
01034 }

int TAO_GIOP_Message_Base::process_request_message ( TAO_Transport transport,
TAO_Queued_Data qd 
)

Process the request message that we have received on the connection

Definition at line 557 of file GIOP_Message_Base.cpp.

References ACE_BIT_ENABLED, TAO_Transport::assign_translators(), ACE_Message_Block::base(), TAO_Queued_Data::byte_order(), TAO_ORB_Parameters::cdr_memcpy_tradeoff(), ACE_Message_Block::data_block(), ACE_CDR::DEFAULT_BUFSIZE, ACE_Message_Block::DONT_DELETE, dump_msg(), ACE_Data_Block::duplicate(), fragmentation_strategy_, ACE_Auto_Basic_Ptr< X >::get(), get_parser(), TAO_Queued_Data::giop_version(), TAO_ORB_Core::input_cdr_dblock_allocator(), ACE_Message_Block::length(), TAO_ORB_Core::lf_strategy(), TAO_GIOP_Message_Version::major_version(), ACE_Message_Block::MB_DATA, TAO_GIOP_Message_Version::minor_version(), TAO_Queued_Data::msg_block(), TAO_Queued_Data::msg_type(), orb_core_, TAO_ORB_Core::orb_params(), process_locate_request(), ACE_Message_Block::rd_ptr(), ACE_Message_Block::self_flags(), TAO_LF_Strategy::set_upcall_thread(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_MESSAGE_HEADER_LEN, and ACE_Message_Block::wr_ptr().

00559 {
00560   // Set the upcall thread
00561   this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
00562 
00563   // Get the parser we need to use
00564   TAO_GIOP_Message_Generator_Parser *generator_parser =
00565     this->get_parser (qd->giop_version ());
00566 
00567   // A buffer that we will use to initialise the CDR stream.  Since we're
00568   // allocating the buffer on the stack, we may as well allocate the data
00569   // block on the stack too and avoid an allocation inside the message
00570   // block of the CDR.
00571 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00572   char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
00573 #else
00574   char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
00575 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00576   ACE_Data_Block out_db (sizeof (repbuf),
00577                          ACE_Message_Block::MB_DATA,
00578                          repbuf,
00579                          this->orb_core_->input_cdr_buffer_allocator (),
00580                          0,
00581                          ACE_Message_Block::DONT_DELETE,
00582                          this->orb_core_->input_cdr_dblock_allocator ());
00583 
00584   // Initialize an output CDR on the stack
00585   // NOTE: Don't jump to a conclusion as to why we are using the
00586   // input_cdr and hence the  global pool here. These pools will move
00587   // to the lanes anyway at some point of time. Further, it would have
00588   // been awesome to have this in TSS. But for some reason the cloning
00589   // that happens when the ORB gets flow controlled while writing a
00590   // reply is messing things up. We crash horribly. Doing this adds a
00591   // lock, we need to set things like this -- put stuff in TSS here
00592   // and transfer to global memory when we get flow controlled. We
00593   // need to work on the message block to get it right!
00594   TAO_OutputCDR output (&out_db,
00595                         TAO_ENCAP_BYTE_ORDER,
00596                         this->orb_core_->input_cdr_msgblock_allocator (),
00597                         this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
00598                         this->fragmentation_strategy_.get (),
00599                         qd->giop_version ().major_version (),
00600                         qd->giop_version ().minor_version ());
00601 
00602   // Get the read and write positions before we steal data.
00603   size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00604   size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00605   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00606 
00607   if (TAO_debug_level >= 5)
00608     {
00609       this->dump_msg ("recv",
00610                       reinterpret_cast <u_char *> (qd->msg_block ()->rd_ptr ()),
00611                       qd->msg_block ()->length ());
00612     }
00613 
00614   // Create a input CDR stream. We do the following
00615   //  1 - If the incoming message block has a data block with a flag
00616   //      DONT_DELETE  (for the data block) we create an input CDR
00617   //      stream the same way.
00618   //  2 - If the incoming message block had a datablock from heap just
00619   //      use it by duplicating it and make the flag 0.
00620   // NOTE: We use the same data block in which we read the message and
00621   // we pass it on to the higher layers of the ORB. So we dont to any
00622   // copies at all here. The same is also done in the higher layers.
00623 
00624   ACE_Message_Block::Message_Flags flg = 0;
00625   ACE_Data_Block *db = 0;
00626 
00627   // Get the flag in the message block
00628   flg = qd->msg_block ()->self_flags ();
00629 
00630   if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
00631     {
00632       // Use the same datablock
00633       db = qd->msg_block ()->data_block ();
00634     }
00635   else
00636     {
00637       // Use a duplicated datablock as the datablock has come off the
00638       // heap.
00639       db = qd->msg_block ()->data_block ()->duplicate ();
00640     }
00641 
00642   TAO_InputCDR input_cdr (db,
00643                           flg,
00644                           rd_pos,
00645                           wr_pos,
00646                           qd->byte_order (),
00647                           qd->giop_version ().major_version (),
00648                           qd->giop_version ().minor_version (),
00649                           this->orb_core_);
00650 
00651   transport->assign_translators(&input_cdr,&output);
00652 
00653   // We know we have some request message. Check whether it is a
00654   // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action.
00655 
00656   // Once we send the InputCDR stream we need to just forget about
00657   // the stream and never touch that again for anything. We basically
00658   // loose ownership of the data_block.
00659 
00660   switch (qd->msg_type ())
00661     {
00662     case GIOP::Request:
00663       // Should be taken care by the state specific invocations. They
00664       // could raise an exception or write things in the output CDR
00665       // stream
00666       return this->process_request (transport,
00667                                     input_cdr,
00668                                     output,
00669                                     generator_parser);
00670 
00671     case GIOP::LocateRequest:
00672       return this->process_locate_request (transport,
00673                                            input_cdr,
00674                                            output,
00675                                            generator_parser);
00676     default:
00677       return -1;
00678     }
00679 }

void TAO_GIOP_Message_Base::send_close_connection ( const TAO_GIOP_Message_Version version,
TAO_Transport transport 
) [private]

Close a connection, first sending GIOP::CloseConnection.

Definition at line 1299 of file GIOP_Message_Base.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT, TAO_Transport::close_connection(), ACE_Message_Block::DONT_DELETE, dump_msg(), TAO_Transport::id(), LM_DEBUG, LM_ERROR, TAO_GIOP_Message_Version::major, ACE_Message_Block::MB_DATA, TAO_GIOP_Message_Version::minor, TAO_Transport::send_message_block_chain(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_MESSAGE_HEADER_LEN, and ACE_Message_Block::wr_ptr().

01301 {
01302   // static CORBA::Octet
01303   // I hate  this in every method. Till the time I figure out a way
01304   // around  I will have them here hanging around.
01305   const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01306   {
01307     // The following works on non-ASCII platforms, such as MVS (which
01308     // uses EBCDIC).
01309     0x47, // 'G'
01310     0x49, // 'I'
01311     0x4f, // 'O'
01312     0x50, // 'P'
01313     version.major,
01314     version.minor,
01315     TAO_ENCAP_BYTE_ORDER,
01316     GIOP::CloseConnection,
01317     0, 0, 0, 0
01318   };
01319 
01320   // It's important that we use a reliable shutdown after we send this
01321   // message, so we know it's received.
01322   //
01323   // @@ should recv and discard queued data for portability; note
01324   // that this won't block (long) since we never set SO_LINGER
01325 
01326   if (TAO_debug_level >= 5)
01327     {
01328       this->dump_msg ("send_close_connection",
01329                       reinterpret_cast <const u_char *> (close_message),
01330                       TAO_GIOP_MESSAGE_HEADER_LEN);
01331     }
01332 
01333 #if 0
01334   // @@CJC I don't think we need this check b/c the transport's send()
01335   // will simply return -1.  However, I guess we could create something
01336   // like TAO_Tranport::is_closed() that returns whether the connection
01337   // is already closed.  The problem with that, however, is that it's
01338   // entirely possible that is_closed() could return TRUE, and then the
01339   // transport could get closed down btw. the time it gets called and the
01340   // time that the send actually occurs.
01341   ACE_HANDLE which = transport->handle ();
01342   if (which == ACE_INVALID_HANDLE)
01343     {
01344       if (TAO_debug_level > 0)
01345         ACE_DEBUG ((LM_DEBUG,
01346            ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -")
01347            ACE_TEXT (" connection already closed\n")));
01348       return;
01349     }
01350 #endif
01351 
01352   ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01353                              ACE_Message_Block::MB_DATA,
01354                              close_message,
01355                              0,
01356                              0,
01357                              ACE_Message_Block::DONT_DELETE,
01358                              0);
01359   ACE_Message_Block message_block(&data_block);
01360   message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01361 
01362   size_t bt;
01363   int const result = transport->send_message_block_chain (&message_block, bt);
01364   if (result == -1)
01365     {
01366       if (TAO_debug_level > 0)
01367         ACE_ERROR ((LM_ERROR,
01368            ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
01369            transport->id (), errno));
01370     }
01371 
01372   transport->close_connection ();
01373   ACE_DEBUG ((LM_DEBUG,
01374       ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
01375       transport-> id ()));
01376 }

int TAO_GIOP_Message_Base::send_error ( TAO_Transport transport  )  [private]

Send error messages.

Definition at line 1199 of file GIOP_Message_Base.cpp.

References ACE_DEBUG, ACE_TEXT, ACE_Message_Block::DONT_DELETE, dump_msg(), TAO_Transport::id(), LM_DEBUG, ACE_Message_Block::MB_DATA, TAO_Transport::send_message_block_chain(), TAO_debug_level, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_MESSAGE_HEADER_LEN, and ACE_Message_Block::wr_ptr().

01200 {
01201   const char error_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01202   {
01203     // The following works on non-ASCII platforms, such as MVS (which
01204     // uses EBCDIC).
01205     0x47, // 'G'
01206     0x49, // 'I'
01207     0x4f, // 'O'
01208     0x50, // 'P'
01209     (CORBA::Octet) 1, // Use the lowest GIOP version
01210     (CORBA::Octet) 0,
01211     TAO_ENCAP_BYTE_ORDER,
01212     GIOP::MessageError,
01213     0, 0, 0, 0
01214   };
01215 
01216   if (TAO_debug_level >= 5)
01217     {
01218       this->dump_msg ("send_error",
01219                       reinterpret_cast <const u_char *> (error_message),
01220                       TAO_GIOP_MESSAGE_HEADER_LEN);
01221     }
01222 
01223   ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01224                              ACE_Message_Block::MB_DATA,
01225                              error_message,
01226                              0,
01227                              0,
01228                              ACE_Message_Block::DONT_DELETE,
01229                              0);
01230   ACE_Message_Block message_block(&data_block,
01231                                   ACE_Message_Block::DONT_DELETE);
01232   message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01233 
01234   size_t bt;
01235   int const result = transport->send_message_block_chain (&message_block, bt);
01236   if (result == -1)
01237     {
01238       if (TAO_debug_level > 0)
01239         ACE_DEBUG ((LM_DEBUG,
01240                     ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
01241                     transport->id ()));
01242     }
01243 
01244   return result;
01245 }

int TAO_GIOP_Message_Base::send_reply_exception ( TAO_Transport transport,
TAO_OutputCDR cdr,
CORBA::ULong  request_id,
IOP::ServiceContextList *  svc_info,
CORBA::Exception x 
) [private]

We must send a LocateReply through transport, this request resulted in some kind of exception.

Definition at line 1380 of file GIOP_Message_Base.cpp.

References CORBA::SystemException::_downcast(), TAO_Pluggable_Reply_Params_Base::argument_flag_, TAO_OutputCDR::more_fragments(), TAO_Pluggable_Reply_Params_Base::reply_status(), TAO_Pluggable_Reply_Params_Base::request_id_, TAO_Transport::send_message(), TAO_Pluggable_Reply_Params_Base::service_context_notowned(), TAO_Pluggable_Reply_Params_Base::svc_ctx_, CORBA::SYSTEM_EXCEPTION, TAO_Transport::TAO_REPLY, and CORBA::USER_EXCEPTION.

Referenced by process_request().

01387 {
01388   TAO_Pluggable_Reply_Params_Base reply_params;
01389   reply_params.request_id_ = request_id;
01390   reply_params.svc_ctx_.length (0);
01391 
01392   // We are going to send some data
01393   reply_params.argument_flag_ = true;
01394 
01395   // Send back the service context we received.  (RTCORBA relies on
01396   // this).
01397   reply_params.service_context_notowned (svc_info);
01398 
01399   if (CORBA::SystemException::_downcast (x) != 0)
01400     {
01401       reply_params.reply_status (GIOP::SYSTEM_EXCEPTION);
01402     }
01403   else
01404     {
01405       reply_params.reply_status (GIOP::USER_EXCEPTION);
01406     }
01407 
01408   if (this->generate_exception_reply (output, reply_params, *x) == -1)
01409     return -1;
01410 
01411   output.more_fragments (false);
01412 
01413   return transport->send_message (output, 0, TAO_Transport::TAO_REPLY);
01414 }

void TAO_GIOP_Message_Base::set_giop_flags ( TAO_OutputCDR msg  )  const [private]

Note:
It is assumed that the GIOP message header is the first thing marshaled into the output CDR stream msg.

Definition at line 1904 of file GIOP_Message_Base.cpp.

References ACE_SET_BITS, ACE_OutputCDR::buffer(), ACE_OutputCDR::do_byte_swap(), TAO_OutputCDR::more_fragments(), TAO_ENCAP_BYTE_ORDER, TAO_GIOP_MESSAGE_FLAGS_OFFSET, TAO_GIOP_VERSION_MAJOR_OFFSET, and TAO_GIOP_VERSION_MINOR_OFFSET.

Referenced by format_message().

01905 {
01906   CORBA::Octet * const buf =
01907     reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
01908 
01909   CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
01910   CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
01911 
01912   // Flags for the GIOP protocol header "flags" field.
01913   CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
01914 
01915   // Least significant bit:        Byte order
01916   ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
01917 
01918   // Second least significant bit: More fragments
01919   //
01920   // Only supported in GIOP 1.1 or better.
01921   if (!(major <= 1 && minor == 0))
01922     ACE_SET_BITS (flags, msg.more_fragments () << 1);
01923 }

int TAO_GIOP_Message_Base::write_protocol_header ( GIOP::MsgType  t,
const TAO_GIOP_Message_Version version,
TAO_OutputCDR msg 
) [private]

Writes the GIOP header in to msg

Note:
If the GIOP header happens to change in the future, we can push this method in to the generator_parser classes.

Definition at line 791 of file GIOP_Message_Base.cpp.

References ACE_OutputCDR::good_bit(), TAO_GIOP_Message_Version::major, TAO_GIOP_Message_Version::minor, ACE_OutputCDR::reset(), and ACE_OutputCDR::write_octet_array().

Referenced by make_send_locate_reply().

00794 {
00795   // Reset the message type
00796   msg.reset ();
00797 
00798   CORBA::Octet header[12] =
00799     {
00800       // The following works on non-ASCII platforms, such as MVS (which
00801       // uses EBCDIC).
00802       0x47, // 'G'
00803       0x49, // 'I'
00804       0x4f, // 'O'
00805       0x50  // 'P'
00806     };
00807 
00808   header[4] = version.major;
00809   header[5] = version.minor;
00810 
00811   // "flags" octet, i.e. header[6] will be set up later when message
00812   // is formatted by the transport.
00813 
00814   header[7] = static_cast <CORBA::Octet> (type);  // Message type
00815 
00816   static ACE_CDR::ULong const header_size =
00817     sizeof (header) / sizeof (header[0]);
00818 
00819   // Fragmentation should not occur at this point since there are only
00820   // 12 bytes in the stream, and fragmentation may only occur when
00821   // the stream length >= 16.
00822   msg.write_octet_array (header, header_size);
00823 
00824   return msg.good_bit ();
00825 }


Member Data Documentation

TAO::Incoming_Message_Stack TAO_GIOP_Message_Base::fragment_stack_ [private]

All Fragments being received are stored on stack in reverse order, last top

Definition at line 245 of file GIOP_Message_Base.h.

Referenced by discard_fragmented_message().

auto_ptr<TAO_GIOP_Fragmentation_Strategy> TAO_GIOP_Message_Base::fragmentation_strategy_ [protected]

Strategy that sends data currently marshaled into this TAO_OutputCDR stream if necessary.

Definition at line 257 of file GIOP_Message_Base.h.

Referenced by fragmentation_strategy(), and process_request_message().

TAO_ORB_Core* TAO_GIOP_Message_Base::orb_core_ [private]

Cached ORB_Core pointer...

Definition at line 238 of file GIOP_Message_Base.h.

Referenced by make_queued_data(), parse_request_id(), process_locate_request(), process_reply_message(), process_request(), and process_request_message().

TAO_OutputCDR TAO_GIOP_Message_Base::out_stream_ [protected]

Buffer where the request is placed.

Definition at line 260 of file GIOP_Message_Base.h.

Referenced by init(), out_stream(), and ~TAO_GIOP_Message_Base().

TAO_GIOP_Message_Generator_Parser_Impl TAO_GIOP_Message_Base::tao_giop_impl_ [private]

All the implementations of GIOP message generator and parsers.

Definition at line 241 of file GIOP_Message_Base.h.

Referenced by get_parser().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:39:30 2010 for TAO by  doxygen 1.4.7