GIOP_Message_Base.cpp

Go to the documentation of this file.
00001 // $Id: GIOP_Message_Base.cpp 81663 2008-05-09 12:43:47Z johnnyw $
00002 
00003 #include "tao/GIOP_Message_Base.h"
00004 #include "tao/operation_details.h"
00005 #include "tao/debug.h"
00006 #include "tao/ORB_Core.h"
00007 #include "tao/TAO_Server_Request.h"
00008 #include "tao/GIOP_Message_Locate_Header.h"
00009 #include "tao/Transport.h"
00010 #include "tao/Transport_Mux_Strategy.h"
00011 #include "tao/LF_Strategy.h"
00012 #include "tao/Request_Dispatcher.h"
00013 #include "tao/Codeset_Manager.h"
00014 #include "tao/SystemException.h"
00015 #include "ace/Min_Max.h"
00016 
00017 /*
00018  * Hook to add additional include files during specializations.
00019  */
00020 //@@ GIOP_MESSAGE_BASE_INCLUDE_ADD_HOOK
00021 
00022 ACE_RCSID (tao,
00023            GIOP_Message_Base,
00024            "$Id: GIOP_Message_Base.cpp 81663 2008-05-09 12:43:47Z johnnyw $")
00025 
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
00029                                               TAO_Transport *transport,
00030                                               size_t input_cdr_size)
00031   : orb_core_ (orb_core)
00032   , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
00033   , out_stream_ (0,
00034                  input_cdr_size,
00035                  TAO_ENCAP_BYTE_ORDER,
00036                  orb_core->output_cdr_buffer_allocator (),
00037                  orb_core->output_cdr_dblock_allocator (),
00038                  orb_core->output_cdr_msgblock_allocator (),
00039                  orb_core->orb_params ()->cdr_memcpy_tradeoff (),
00040                  fragmentation_strategy_.get (),
00041                  TAO_DEF_GIOP_MAJOR,
00042                  TAO_DEF_GIOP_MINOR)
00043 {
00044 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00045   const int nibbles = 2 * sizeof (size_t);
00046   char hex_string[nibbles + 1];
00047   ACE_OS::sprintf (hex_string,
00048                    "%8.8X",
00049                    transport->id ());
00050   hex_string[nibbles] = '\0';
00051   ACE_CString monitor_name ("OutputCDR_");
00052   monitor_name += hex_string;
00053   this->out_stream_.register_monitor (monitor_name.c_str ());
00054 #endif /* TAO_HAS_MONITOR_POINTS==1 */
00055 }
00056 
00057 
00058 TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void)
00059 {
00060 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00061   this->out_stream_.unregister_monitor ();
00062 #endif /* TAO_HAS_MONITOR_POINTS==1 */
00063 }
00064 
00065 void
00066 TAO_GIOP_Message_Base::init (CORBA::Octet major, CORBA::Octet minor)
00067 {
00068   // Set the giop version of the out stream
00069   this->out_stream_.set_version (major, minor);
00070 }
00071 
00072 TAO_OutputCDR &
00073 TAO_GIOP_Message_Base::out_stream (void)
00074 {
00075   return this->out_stream_;
00076 }
00077 
00078 int
00079 TAO_GIOP_Message_Base::generate_request_header (
00080     TAO_Operation_Details &op,
00081     TAO_Target_Specification &spec,
00082     TAO_OutputCDR &cdr)
00083 {
00084   // Get a parser for us
00085   TAO_GIOP_Message_Version giop_version;
00086 
00087   cdr.get_version (giop_version);
00088 
00089   // Write the GIOP header first
00090   if (!this->write_protocol_header (GIOP::Request, giop_version, cdr))
00091     {
00092       if (TAO_debug_level)
00093         {
00094           ACE_ERROR ((LM_ERROR,
00095                       ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00096         }
00097 
00098       return -1;
00099     }
00100 
00101   // Get the parser we need to use
00102   TAO_GIOP_Message_Generator_Parser *generator_parser =
00103     this->get_parser (giop_version);
00104 
00105   // Now call the implementation for the rest of the header
00106   if (!generator_parser->write_request_header (op, spec, cdr))
00107     {
00108       if (TAO_debug_level)
00109         ACE_ERROR ((LM_ERROR,
00110                     ACE_TEXT ("(%P|%t) Error in writing request header \n")));
00111 
00112       return -1;
00113     }
00114 
00115   return 0;
00116 }
00117 
00118 int
00119 TAO_GIOP_Message_Base::generate_locate_request_header (
00120     TAO_Operation_Details &op,
00121     TAO_Target_Specification &spec,
00122     TAO_OutputCDR &cdr)
00123 {
00124   TAO_GIOP_Message_Version giop_version;
00125 
00126   cdr.get_version (giop_version);
00127 
00128   // Get the parser we need to use
00129   TAO_GIOP_Message_Generator_Parser *generator_parser =
00130     this->get_parser (giop_version);
00131 
00132   // Write the GIOP header first
00133   if (!this->write_protocol_header (GIOP::LocateRequest, giop_version, cdr))
00134     {
00135       if (TAO_debug_level)
00136         ACE_ERROR ((LM_ERROR,
00137                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00138 
00139       return -1;
00140     }
00141 
00142   // Now call the implementation for the rest of the header
00143   if (!generator_parser->write_locate_request_header
00144       (op.request_id (), spec, cdr))
00145     {
00146       if (TAO_debug_level)
00147         ACE_ERROR ((LM_ERROR,
00148                     ACE_TEXT ("(%P|%t) Error in writing locate request header \n")));
00149 
00150 
00151       return -1;
00152 
00153     }
00154 
00155   return 0;
00156 }
00157 
00158 int
00159 TAO_GIOP_Message_Base::generate_reply_header (
00160     TAO_OutputCDR &cdr,
00161     TAO_Pluggable_Reply_Params_Base &params)
00162 {
00163   TAO_GIOP_Message_Version giop_version;
00164 
00165   cdr.get_version (giop_version);
00166 
00167   // Write the GIOP header first
00168   if (!this->write_protocol_header (GIOP::Reply, giop_version, cdr))
00169     {
00170       if (TAO_debug_level)
00171         ACE_ERROR ((LM_ERROR,
00172                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00173 
00174       return -1;
00175     }
00176 
00177   try
00178     {
00179       // Get the parser we need to use
00180       TAO_GIOP_Message_Generator_Parser *generator_parser =
00181         this->get_parser (giop_version);
00182 
00183       // Now call the implementation for the rest of the header
00184       if (!generator_parser->write_reply_header (cdr, params))
00185         {
00186           if (TAO_debug_level > 4)
00187             ACE_ERROR ((LM_ERROR,
00188                         ACE_TEXT ("(%P|%t) Error in writing reply ")
00189                         ACE_TEXT ("header\n")));
00190 
00191           return -1;
00192         }
00193     }
00194   catch (const ::CORBA::Exception& ex)
00195     {
00196       if (TAO_debug_level > 4)
00197         ex._tao_print_exception (
00198           ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
00199 
00200       return -1;
00201     }
00202 
00203   return 0;
00204 }
00205 
00206 int
00207 TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
00208                                                  CORBA::ULong request_id)
00209 {
00210   TAO_GIOP_Message_Version giop_version;
00211 
00212   cdr.get_version (giop_version);
00213 
00214   // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
00215   // supports them in 1.2 or better since GIOP 1.1 fragments do not
00216   // have a fragment message header.
00217   if (giop_version.major == 1 && giop_version.minor < 2)
00218     return -1;
00219 
00220   // Get the parser we need to use
00221   TAO_GIOP_Message_Generator_Parser *generator_parser =
00222     this->get_parser (giop_version);
00223 
00224   // Write the GIOP header first
00225   if (!this->write_protocol_header (GIOP::Fragment, giop_version, cdr)
00226       || !generator_parser->write_fragment_header (cdr, request_id))
00227     {
00228       if (TAO_debug_level)
00229         ACE_ERROR ((LM_ERROR,
00230                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00231 
00232       return -1;
00233     }
00234 
00235   return 0;
00236 }
00237 
00238 int
00239 TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
00240 {
00241   // Ptr to first buffer.
00242   char *buf = const_cast <char*> (stream.buffer ());
00243 
00244   this->set_giop_flags (stream);
00245 
00246   // Length of all buffers.
00247   size_t const total_len = stream.total_length ();
00248 
00249   // NOTE: Here would also be a fine place to calculate a digital
00250   // signature for the message and place it into a preallocated slot
00251   // in the "ServiceContext".  Similarly, this is a good spot to
00252   // encrypt messages (or just the message bodies) if that's needed in
00253   // this particular environment and that isn't handled by the
00254   // networking infrastructure (e.g., IPSEC).
00255 
00256   CORBA::ULong bodylen = static_cast <CORBA::ULong>
00257                            (total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
00258 
00259 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00260   *(reinterpret_cast <CORBA::ULong *> (buf +
00261                          TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00262 #else
00263   if (!stream.do_byte_swap ())
00264     *(reinterpret_cast <CORBA::ULong *>
00265                            (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00266   else
00267     ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen),
00268                      buf + TAO_GIOP_MESSAGE_SIZE_OFFSET);
00269 #endif /* ACE_ENABLE_SWAP_ON_WRITE */
00270 
00271   if (TAO_debug_level >= 5)
00272     {
00273       // Check whether the output cdr stream is build up of multiple
00274       // messageblocks. If so, consolidate them to one block that can be
00275       // dumped
00276       ACE_Message_Block* consolidated_block = 0;
00277       if (stream.begin()->cont () != 0)
00278         {
00279           ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0);
00280           ACE_CDR::consolidate (consolidated_block, stream.begin ());
00281           buf = (char *) (consolidated_block->rd_ptr ());
00282         }
00283       ///
00284       this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len);
00285 
00286       //
00287       delete consolidated_block;
00288       consolidated_block = 0;
00289       //
00290     }
00291 
00292   return 0;
00293 }
00294 
00295 int
00296 TAO_GIOP_Message_Base::parse_next_message (TAO_Queued_Data &qd,
00297                                            size_t &mesg_length)
00298 {
00299   if (qd.msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00300     {
00301       qd.missing_data (TAO_MISSING_DATA_UNDEFINED);
00302 
00303       return 0; /* incomplete header */
00304     }
00305   else
00306     {
00307       TAO_GIOP_Message_State state;
00308 
00309       if (state.parse_message_header (*(qd.msg_block ())) == -1)
00310         {
00311           return -1;
00312         }
00313 
00314       size_t const message_size = state.message_size (); /* Header + Payload */
00315 
00316       if (message_size > qd.msg_block ()->length ())
00317         {
00318           qd.missing_data (message_size - qd.msg_block ()->length ());
00319         }
00320       else
00321         {
00322           qd.missing_data (0);
00323         }
00324 
00325       /* init out-parameters */
00326       qd.state (state);
00327       mesg_length = message_size;
00328 
00329       return 1; /* complete header */
00330     }
00331 }
00332 
00333 int
00334 TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
00335                                              TAO_Queued_Data *&qd)
00336 {
00337   if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00338     {
00339       if (incoming.length () > 0)
00340         {
00341           // Optimize memory usage, we dont know actual message size
00342           // so far, but allocate enough space to hold small GIOP
00343           // messages. This way we avoid expensive "grow" operation
00344           // for small messages.
00345           size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
00346 
00347           // Make a node which has at least message block of the size
00348           // of MESSAGE_HEADER_LEN.
00349           size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
00350                                            default_buf_size);
00351 
00352           // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN
00353 
00354           qd = this->make_queued_data (buf_size);
00355 
00356           if (qd == 0)
00357             {
00358               if (TAO_debug_level > 0)
00359                 {
00360                   ACE_ERROR((LM_ERROR,
00361                              ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00362                              ACE_TEXT ("out of memory\n")));
00363                 }
00364               return -1;
00365             }
00366 
00367           qd->msg_block ()->copy (incoming.rd_ptr (), incoming.length ());
00368 
00369           incoming.rd_ptr (incoming.length ()); // consume all available data
00370 
00371           qd->missing_data (TAO_MISSING_DATA_UNDEFINED);
00372         }
00373       else
00374         {
00375           // handle not initialized variables
00376           qd = 0;  // reset
00377         }
00378 
00379       return 0;
00380     }
00381 
00382   TAO_GIOP_Message_State state;
00383   if (state.parse_message_header (incoming) == -1)
00384     {
00385       return -1;
00386     }
00387 
00388   size_t copying_len = state.message_size ();
00389 
00390   qd = this->make_queued_data (copying_len);
00391 
00392   if (qd == 0)
00393     {
00394       if (TAO_debug_level > 0)
00395         {
00396           ACE_ERROR ((LM_ERROR,
00397                       ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00398                       ACE_TEXT ("out of memory\n")));
00399         }
00400       return -1;
00401     }
00402 
00403   if (copying_len > incoming.length ())
00404     {
00405       qd->missing_data (copying_len - incoming.length ());
00406       copying_len = incoming.length ();
00407     }
00408   else
00409     {
00410       qd->missing_data (0);
00411     }
00412 
00413   qd->msg_block ()->copy (incoming.rd_ptr (), copying_len);
00414 
00415   incoming.rd_ptr (copying_len);
00416   qd->state (state);
00417 
00418   return 1;
00419 }
00420 
00421 int
00422 TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
00423                                          ACE_Message_Block &incoming)
00424 {
00425   // Look to see whether we had atleast parsed the GIOP header ...
00426   if (qd->missing_data () == TAO_MISSING_DATA_UNDEFINED)
00427     {
00428       // The data length that has been stuck in there during the last
00429       // read ....
00430       size_t const len = qd->msg_block ()->length ();
00431 
00432       // paranoid check
00433       if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
00434         {
00435           // inconsistency - this code should have parsed the header
00436           // so far
00437           return -1;
00438         }
00439 
00440       // We know that we would have space for
00441       // TAO_GIOP_MESSAGE_HEADER_LEN here.  So copy that much of data
00442       // from the <incoming> into the message block in <qd>
00443       size_t const available = incoming.length ();
00444       size_t const desired   = TAO_GIOP_MESSAGE_HEADER_LEN - len;
00445       size_t const n_copy    = ace_min (available, desired);
00446 
00447       // paranoid check, but would cause endless looping
00448       if (n_copy == 0)
00449         {
00450           return -1;
00451         }
00452 
00453       if (qd->msg_block ()->copy (incoming.rd_ptr (), n_copy) == -1)
00454         {
00455            return -1;
00456         }
00457 
00458       // Move the rd_ptr () in the incoming message block..
00459       incoming.rd_ptr (n_copy);
00460 
00461       // verify sufficient data to parse GIOP header
00462       if (qd->msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00463         {
00464           return 0; /* continue */
00465         }
00466 
00467       TAO_GIOP_Message_State state;
00468 
00469       // Parse the message header now...
00470       if (state.parse_message_header (*qd->msg_block ()) == -1)
00471         {
00472           if (TAO_debug_level > 0)
00473             {
00474               ACE_ERROR ((LM_ERROR,
00475                   ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
00476                   ACE_TEXT ("error parsing header\n") ));
00477             }
00478           return -1;
00479         }
00480       // Now grow the message block so that we can copy the rest of
00481       // the data, the message_block must be able to hold complete message
00482        if (ACE_CDR::grow (qd->msg_block (),
00483                           state.message_size ()) == -1)  /* GIOP_Header + Payload */
00484          {
00485            // on mem-error get rid of context silently, try to avoid
00486            // system calls that might allocate additional memory
00487            return -1;
00488          }
00489 
00490       // Copy the pay load..
00491       // Calculate the bytes that needs to be copied in the queue...
00492       size_t copy_len = state.payload_size ();
00493 
00494       // If the data that needs to be copied is more than that is
00495       // available to us ..
00496       if (copy_len > incoming.length ())
00497         {
00498           // Calculate the missing data..
00499           qd->missing_data (copy_len - incoming.length ());
00500 
00501           // Set the actual possible copy_len that is available...
00502           copy_len = incoming.length ();
00503         }
00504       else
00505         {
00506           qd->missing_data (0);
00507         }
00508 
00509       // ..now we are set to copy the right amount of data to the
00510       // node..
00511       if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00512         {
00513           return -1;
00514         }
00515 
00516       // Set the <rd_ptr> of the <incoming>..
00517       incoming.rd_ptr (copy_len);
00518 
00519       // Get the other details...
00520       qd->state (state);
00521     }
00522   else
00523     {
00524       // @todo: Need to abstract this out to a seperate method...
00525       size_t copy_len = qd->missing_data ();
00526 
00527       if (copy_len > incoming.length ())
00528         {
00529           // Calculate the missing data..
00530           qd->missing_data (copy_len - incoming.length ());
00531 
00532           // Set the actual possible copy_len that is available...
00533           copy_len = incoming.length ();
00534         }
00535 
00536       // paranoid check for endless-event-looping
00537       if (copy_len == 0)
00538         {
00539           return -1;
00540         }
00541 
00542       // Copy the right amount of data in to the node...
00543       if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00544         {
00545           return -1;
00546         }
00547 
00548       // Set the <rd_ptr> of the <incoming>..
00549       qd->msg_block ()->rd_ptr (copy_len);
00550 
00551     }
00552 
00553   return 0;
00554 }
00555 
00556 int
00557 TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
00558                                                 TAO_Queued_Data *qd)
00559 {
00560   // Set the upcall thread
00561   this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
00562 
00563   // Get the parser we need to use
00564   TAO_GIOP_Message_Generator_Parser *generator_parser =
00565     this->get_parser (qd->giop_version ());
00566 
00567   // A buffer that we will use to initialise the CDR stream.  Since we're
00568   // allocating the buffer on the stack, we may as well allocate the data
00569   // block on the stack too and avoid an allocation inside the message
00570   // block of the CDR.
00571 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00572   char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
00573 #else
00574   char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
00575 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00576   ACE_Data_Block out_db (sizeof (repbuf),
00577                          ACE_Message_Block::MB_DATA,
00578                          repbuf,
00579                          this->orb_core_->input_cdr_buffer_allocator (),
00580                          0,
00581                          ACE_Message_Block::DONT_DELETE,
00582                          this->orb_core_->input_cdr_dblock_allocator ());
00583 
00584   // Initialize an output CDR on the stack
00585   // NOTE: Don't jump to a conclusion as to why we are using the
00586   // input_cdr and hence the  global pool here. These pools will move
00587   // to the lanes anyway at some point of time. Further, it would have
00588   // been awesome to have this in TSS. But for some reason the cloning
00589   // that happens when the ORB gets flow controlled while writing a
00590   // reply is messing things up. We crash horribly. Doing this adds a
00591   // lock, we need to set things like this -- put stuff in TSS here
00592   // and transfer to global memory when we get flow controlled. We
00593   // need to work on the message block to get it right!
00594   TAO_OutputCDR output (&out_db,
00595                         TAO_ENCAP_BYTE_ORDER,
00596                         this->orb_core_->input_cdr_msgblock_allocator (),
00597                         this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
00598                         this->fragmentation_strategy_.get (),
00599                         qd->giop_version ().major_version (),
00600                         qd->giop_version ().minor_version ());
00601 
00602   // Get the read and write positions before we steal data.
00603   size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00604   size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00605   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00606 
00607   if (TAO_debug_level >= 5)
00608     {
00609       this->dump_msg ("recv",
00610                       reinterpret_cast <u_char *> (qd->msg_block ()->rd_ptr ()),
00611                       qd->msg_block ()->length ());
00612     }
00613 
00614   // Create a input CDR stream. We do the following
00615   //  1 - If the incoming message block has a data block with a flag
00616   //      DONT_DELETE  (for the data block) we create an input CDR
00617   //      stream the same way.
00618   //  2 - If the incoming message block had a datablock from heap just
00619   //      use it by duplicating it and make the flag 0.
00620   // NOTE: We use the same data block in which we read the message and
00621   // we pass it on to the higher layers of the ORB. So we dont to any
00622   // copies at all here. The same is also done in the higher layers.
00623 
00624   ACE_Message_Block::Message_Flags flg = 0;
00625   ACE_Data_Block *db = 0;
00626 
00627   // Get the flag in the message block
00628   flg = qd->msg_block ()->self_flags ();
00629 
00630   if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
00631     {
00632       // Use the same datablock
00633       db = qd->msg_block ()->data_block ();
00634     }
00635   else
00636     {
00637       // Use a duplicated datablock as the datablock has come off the
00638       // heap.
00639       db = qd->msg_block ()->data_block ()->duplicate ();
00640     }
00641 
00642   TAO_InputCDR input_cdr (db,
00643                           flg,
00644                           rd_pos,
00645                           wr_pos,
00646                           qd->byte_order (),
00647                           qd->giop_version ().major_version (),
00648                           qd->giop_version ().minor_version (),
00649                           this->orb_core_);
00650 
00651   transport->assign_translators(&input_cdr,&output);
00652 
00653   // We know we have some request message. Check whether it is a
00654   // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action.
00655 
00656   // Once we send the InputCDR stream we need to just forget about
00657   // the stream and never touch that again for anything. We basically
00658   // loose ownership of the data_block.
00659 
00660   switch (qd->msg_type ())
00661     {
00662     case GIOP::Request:
00663       // Should be taken care by the state specific invocations. They
00664       // could raise an exception or write things in the output CDR
00665       // stream
00666       return this->process_request (transport,
00667                                     input_cdr,
00668                                     output,
00669                                     generator_parser);
00670 
00671     case GIOP::LocateRequest:
00672       return this->process_locate_request (transport,
00673                                            input_cdr,
00674                                            output,
00675                                            generator_parser);
00676     default:
00677       return -1;
00678     }
00679 }
00680 
00681 int
00682 TAO_GIOP_Message_Base::process_reply_message (
00683     TAO_Pluggable_Reply_Params &params,
00684     TAO_Queued_Data *qd)
00685 {
00686   // Get the parser we need to use
00687   TAO_GIOP_Message_Generator_Parser *generator_parser =
00688     this->get_parser (qd->giop_version ());
00689 
00690   // Get the read and write positions before we steal data.
00691   size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00692   size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00693   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00694 
00695   if (TAO_debug_level >= 5)
00696     {
00697       this->dump_msg ("recv",
00698                       reinterpret_cast <u_char *> (qd->msg_block ()->rd_ptr ()),
00699                       qd->msg_block ()->length ());
00700     }
00701 
00702 
00703   // Create a empty buffer on stack
00704   // NOTE: We use the same data block in which we read the message and
00705   // we pass it on to the higher layers of the ORB. So we dont to any
00706   // copies at all here.
00707   TAO_InputCDR input_cdr (qd->msg_block ()->data_block (),
00708                           ACE_Message_Block::DONT_DELETE,
00709                           rd_pos,
00710                           wr_pos,
00711                           qd->byte_order (),
00712                           qd->giop_version ().major_version (),
00713                           qd->giop_version ().minor_version (),
00714                           this->orb_core_);
00715 
00716   // We know we have some reply message. Check whether it is a
00717   // GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
00718 
00719   // Once we send the InputCDR stream we need to just forget about
00720   // the stream and never touch that again for anything. We basically
00721   // loose ownership of the data_block.
00722   int retval = 0;
00723 
00724   switch (qd->msg_type ())
00725     {
00726     case GIOP::Reply:
00727       // Should be taken care by the state specific parsing
00728       retval = generator_parser->parse_reply (input_cdr, params);
00729       break;
00730     case GIOP::LocateReply:
00731       retval = generator_parser->parse_locate_reply (input_cdr, params);
00732       break;
00733     default:
00734       retval = -1;
00735     }
00736 
00737   if (retval == -1)
00738     return retval;
00739 
00740   params.input_cdr_ = &input_cdr;
00741   params.transport_->assign_translators (params.input_cdr_, 0);
00742 
00743   retval = params.transport_->tms ()->dispatch_reply (params);
00744 
00745   if (retval == -1)
00746     {
00747       // Something really critical happened, we will forget about
00748       // every reply on this connection.
00749       if (TAO_debug_level > 0)
00750         ACE_ERROR ((LM_ERROR,
00751                     ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_reply_message, ")
00752                     ACE_TEXT ("dispatch reply failed\n"),
00753                     params.transport_->id ()));
00754     }
00755 
00756   return retval;
00757 }
00758 
00759 int
00760 TAO_GIOP_Message_Base::generate_exception_reply (
00761     TAO_OutputCDR &cdr,
00762     TAO_Pluggable_Reply_Params_Base &params,
00763     const CORBA::Exception &x)
00764 {
00765   // A new try/catch block, but if something goes wrong now we have no
00766   // hope, just abort.
00767 
00768   try
00769     {
00770       // Make the GIOP & reply header.
00771       this->generate_reply_header (cdr, params);
00772       x._tao_encode (cdr);
00773     }
00774   catch (const ::CORBA::Exception&)
00775     {
00776       // Now we know that while handling the error an other error
00777       // happened -> no hope, close connection.
00778 
00779       // Close the handle.
00780       if (TAO_debug_level > 0)
00781         ACE_DEBUG ((LM_DEBUG,
00782                   ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
00783                   ACE_TEXT ("generate_exception_reply ()\n")));
00784       return -1;
00785     }
00786 
00787   return 0;
00788 }
00789 
00790 int
00791 TAO_GIOP_Message_Base::write_protocol_header (GIOP::MsgType type,
00792                                               const TAO_GIOP_Message_Version &version,
00793                                               TAO_OutputCDR &msg)
00794 {
00795   // Reset the message type
00796   msg.reset ();
00797 
00798   CORBA::Octet header[12] =
00799     {
00800       // The following works on non-ASCII platforms, such as MVS (which
00801       // uses EBCDIC).
00802       0x47, // 'G'
00803       0x49, // 'I'
00804       0x4f, // 'O'
00805       0x50  // 'P'
00806     };
00807 
00808   header[4] = version.major;
00809   header[5] = version.minor;
00810 
00811   // "flags" octet, i.e. header[6] will be set up later when message
00812   // is formatted by the transport.
00813 
00814   header[7] = static_cast <CORBA::Octet> (type);  // Message type
00815 
00816   static ACE_CDR::ULong const header_size =
00817     sizeof (header) / sizeof (header[0]);
00818 
00819   // Fragmentation should not occur at this point since there are only
00820   // 12 bytes in the stream, and fragmentation may only occur when
00821   // the stream length >= 16.
00822   msg.write_octet_array (header, header_size);
00823 
00824   return msg.good_bit ();
00825 }
00826 
00827 int
00828 TAO_GIOP_Message_Base::process_request (
00829   TAO_Transport * transport,
00830   TAO_InputCDR & cdr,
00831   TAO_OutputCDR & output,
00832   TAO_GIOP_Message_Generator_Parser * parser)
00833 {
00834   // This will extract the request header, set <response_required>
00835   // and <sync_with_server> as appropriate.
00836   TAO_ServerRequest request (this,
00837                              cdr,
00838                              output,
00839                              transport,
00840                              this->orb_core_);
00841 
00842   CORBA::ULong request_id = 0;
00843   CORBA::Boolean response_required = false;
00844   int parse_error = 0;
00845 
00846   try
00847     {
00848       parse_error = parser->parse_request_header (request);
00849 
00850       // Throw an exception if the
00851       if (parse_error != 0)
00852         throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
00853 
00854       TAO_Codeset_Manager *csm = request.orb_core ()->codeset_manager ();
00855       if (csm)
00856         {
00857           csm->process_service_context (request);
00858           transport->assign_translators (&cdr, &output);
00859         }
00860 
00861       request_id = request.request_id ();
00862 
00863       response_required = request.response_expected ();
00864 
00865       CORBA::Object_var forward_to;
00866 
00867       /*
00868        * Hook to specialize request processing within TAO
00869        * This hook will be replaced by specialized request
00870        * processing implementation.
00871        */
00872 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START
00873 
00874       // Do this before the reply is sent.
00875       this->orb_core_->request_dispatcher ()->dispatch (
00876           this->orb_core_,
00877           request,
00878           forward_to);
00879 
00880 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_END
00881 
00882       if (request.is_forwarded ())
00883         {
00884           CORBA::Boolean const permanent_forward_condition =
00885               this->orb_core_->is_permanent_forward_condition
00886               (forward_to.in (),
00887                request.request_service_context ());
00888 
00889           // We should forward to another object...
00890           TAO_Pluggable_Reply_Params_Base reply_params;
00891           reply_params.request_id_ = request_id;
00892           reply_params.reply_status (
00893               permanent_forward_condition
00894               ? GIOP::LOCATION_FORWARD_PERM
00895               : GIOP::LOCATION_FORWARD);
00896           reply_params.svc_ctx_.length (0);
00897 
00898           // Send back the reply service context.
00899           reply_params.service_context_notowned (
00900             &request.reply_service_info ());
00901 
00902           output.message_attributes (request_id,
00903                                      0,
00904                                      TAO_Transport::TAO_REPLY,
00905                                      0);
00906 
00907           // Make the GIOP header and Reply header
00908           this->generate_reply_header (output, reply_params);
00909 
00910           if (!(output << forward_to.in ()))
00911             {
00912               if (TAO_debug_level > 0)
00913                 ACE_ERROR ((LM_ERROR,
00914                             ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
00915                             ACE_TEXT ("forward reference.\n")));
00916 
00917               return -1;
00918             }
00919 
00920           output.more_fragments (false);
00921 
00922           int result = transport->send_message (output,
00923                                                 0,
00924                                                 TAO_Transport::TAO_REPLY);
00925           if (result == -1)
00926             {
00927               if (TAO_debug_level > 0)
00928                 {
00929                   // No exception but some kind of error, yet a
00930                   // response is required.
00931                   ACE_ERROR ((LM_ERROR,
00932                               ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
00933                               ACE_TEXT ("cannot send reply\n"),
00934                               ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
00935                 }
00936             }
00937           return result;
00938         }
00939     }
00940   // Only CORBA exceptions are caught here.
00941   catch ( ::CORBA::Exception& ex)
00942     {
00943       int result = 0;
00944 
00945       if (response_required)
00946         {
00947           result = this->send_reply_exception (transport,
00948                                                output,
00949                                                request_id,
00950                                                &request.reply_service_info (),
00951                                                &ex);
00952           if (result == -1)
00953             {
00954               if (TAO_debug_level > 0)
00955                 {
00956                   ACE_ERROR ((LM_ERROR,
00957                               ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
00958                               ACE_TEXT ("cannot send exception\n"),
00959                               ACE_TEXT ("process_connector_request ()")));
00960 
00961                   ex._tao_print_exception (
00962                     "TAO_GIOP_Message_Base::process_request[1]");
00963                 }
00964             }
00965 
00966         }
00967       else if (TAO_debug_level > 0)
00968         {
00969           // It is unfortunate that an exception (probably a system
00970           // exception) was thrown by the upcall code (even by the
00971           // user) when the client was not expecting a response.
00972           // However, in this case, we cannot close the connection
00973           // down, since it really isn't the client's fault.
00974 
00975           ACE_ERROR ((LM_ERROR,
00976                       ACE_TEXT ("(%P|%t) exception thrown ")
00977                       ACE_TEXT ("but client is not waiting a response\n")));
00978 
00979           ex._tao_print_exception (
00980             "TAO_GIOP_Message_Base::process_request[2]");
00981         }
00982 
00983       return result;
00984     }
00985   catch (...)
00986     {
00987       // @@ TODO some c++ exception or another, but what do we do with
00988       //    it?
00989       // We are supposed to map it into a CORBA::UNKNOWN exception.
00990       // BTW, this cannot be detected if using the <env> mapping.  If
00991       // we have native exceptions but no support for them in the ORB
00992       // we should still be able to catch it.  If we don't have native
00993       // exceptions it couldn't have been raised in the first place!
00994       int result = 0;
00995 
00996       if (response_required)
00997         {
00998           CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code
00999                                     (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0),
01000                                     CORBA::COMPLETED_MAYBE);
01001 
01002           if (this->send_reply_exception (transport,
01003                                           output,
01004                                           request_id,
01005                                           &request.reply_service_info (),
01006                                           &exception) == -1
01007               && TAO_debug_level > 0)
01008             {
01009               ACE_ERROR ((LM_ERROR,
01010                           ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
01011                           ACE_TEXT ("%p: ")
01012                           ACE_TEXT ("cannot send exception\n"),
01013                           ACE_TEXT ("process_request ()")));
01014               exception._tao_print_exception (
01015                 "TAO_GIOP_Message_Base::process_request[3]");
01016             }
01017         }
01018       else if (TAO_debug_level > 0)
01019         {
01020           // It is unfotunate that an exception (probably a system
01021           // exception) was thrown by the upcall code (even by the
01022           // user) when the client was not expecting a response.
01023           // However, in this case, we cannot close the connection
01024           // down, since it really isn't the client's fault.
01025           ACE_ERROR ((LM_ERROR,
01026                       ACE_TEXT ("TAO (%P|%t) exception thrown ")
01027                       ACE_TEXT ("but client is not waiting a response\n")));
01028         }
01029 
01030       return result;
01031     }
01032 
01033   return 0;
01034 }
01035 
01036 
01037 int
01038 TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
01039                                                TAO_InputCDR &input,
01040                                                TAO_OutputCDR &output,
01041                                                TAO_GIOP_Message_Generator_Parser *parser)
01042 {
01043   // This will extract the request header, set <response_required> as
01044   // appropriate.
01045   TAO_GIOP_Locate_Request_Header locate_request (input, this->orb_core_);
01046 
01047   TAO_GIOP_Locate_Status_Msg status_info;
01048 
01049   // Defaulting.
01050   status_info.status = GIOP::UNKNOWN_OBJECT;
01051 
01052   CORBA::Boolean response_required = true;
01053 
01054   try
01055     {
01056       int parse_error = parser->parse_locate_header (locate_request);
01057 
01058       if (parse_error != 0)
01059         {
01060           throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01061         }
01062 
01063       TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
01064                               locate_request.object_key ().length (),
01065                               locate_request.object_key ().get_buffer (),
01066                               0);
01067 
01068       // Set it to an error state
01069       parse_error = 1;
01070       CORBA::ULong req_id = locate_request.request_id ();
01071 
01072       // We will send the reply. The ServerRequest class need not send
01073       // the reply
01074       CORBA::Boolean deferred_reply = true;
01075       TAO_ServerRequest server_request (this,
01076                                         req_id,
01077                                         response_required,
01078                                         deferred_reply,
01079                                         tmp_key,
01080                                         "_non_existent",
01081                                         output,
01082                                         transport,
01083                                         this->orb_core_,
01084                                         parse_error);
01085 
01086       if (parse_error != 0)
01087         {
01088           throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01089         }
01090 
01091       CORBA::Object_var forward_to;
01092 
01093       this->orb_core_->request_dispatcher ()->dispatch (
01094           this->orb_core_,
01095           server_request,
01096           forward_to);
01097 
01098       if (server_request.is_forwarded ())
01099         {
01100           status_info.status = GIOP::OBJECT_FORWARD;
01101           status_info.forward_location_var = forward_to;
01102           if (TAO_debug_level > 0)
01103             ACE_DEBUG ((LM_DEBUG,
01104                         ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01105                         ACE_TEXT ("called: forwarding\n")));
01106         }
01107       else if (server_request.reply_status () == GIOP::NO_EXCEPTION)
01108         {
01109           // We got no exception, so the object is here.
01110           status_info.status = GIOP::OBJECT_HERE;
01111           if (TAO_debug_level > 0)
01112             ACE_DEBUG ((LM_DEBUG,
01113                         ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01114                         ACE_TEXT ("found\n")));
01115         }
01116       else
01117         {
01118           // Normal exception, so the object is not here
01119           status_info.status = GIOP::UNKNOWN_OBJECT;
01120           ACE_DEBUG ((LM_DEBUG,
01121                       ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01122                       ACE_TEXT ("not here\n")));
01123         }
01124     }
01125 
01126   catch (const ::CORBA::Exception&)
01127     {
01128       // Normal exception, so the object is not here
01129       status_info.status = GIOP::UNKNOWN_OBJECT;
01130       if (TAO_debug_level > 0)
01131         ACE_DEBUG ((LM_DEBUG,
01132                     ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01133                     ACE_TEXT ("CORBA exception raised\n")));
01134     }
01135   catch (...)
01136     {
01137       // Normal exception, so the object is not here
01138       status_info.status = GIOP::UNKNOWN_OBJECT;
01139       if (TAO_debug_level > 0)
01140         ACE_DEBUG ((LM_DEBUG,
01141                     ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
01142                     ACE_TEXT ("C++ exception raised\n")));
01143     }
01144 
01145   return this->make_send_locate_reply (transport,
01146                                        locate_request,
01147                                        status_info,
01148                                        output,
01149                                        parser);
01150 }
01151 
01152 int
01153 TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
01154                                                TAO_GIOP_Locate_Request_Header &request,
01155                                                TAO_GIOP_Locate_Status_Msg &status_info,
01156                                                TAO_OutputCDR &output,
01157                                                TAO_GIOP_Message_Generator_Parser *parser)
01158 {
01159   TAO_GIOP_Message_Version giop_version;
01160   output.get_version (giop_version);
01161 
01162   // Note here we are making the Locate reply header which is *QUITE*
01163   // different from the reply header made by the make_reply () call..
01164   // Make the GIOP message header
01165   this->write_protocol_header (GIOP::LocateReply, giop_version, output);
01166 
01167   // This writes the header & body
01168   parser->write_locate_reply_mesg (output,
01169                                    request.request_id (),
01170                                    status_info);
01171 
01172   output.more_fragments (false);
01173 
01174   // Send the message
01175   int const result = transport->send_message (output,
01176                                               0,
01177                                               TAO_Transport::TAO_REPLY);
01178 
01179   // Print out message if there is an error
01180   if (result == -1)
01181     {
01182       if (TAO_debug_level > 0)
01183         {
01184           ACE_ERROR ((LM_ERROR,
01185                       ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
01186                       ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
01187         }
01188     }
01189 
01190   return result;
01191 }
01192 
01193 // Send an "I can't understand you" message -- again, the message is
01194 // prefabricated for simplicity.  This implies abortive disconnect (at
01195 // the application level, if not at the level of TCP).
01196 //
01197 // NOTE that IIOP will still benefit from TCP's orderly disconnect.
01198 int
01199 TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
01200 {
01201   const char error_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01202   {
01203     // The following works on non-ASCII platforms, such as MVS (which
01204     // uses EBCDIC).
01205     0x47, // 'G'
01206     0x49, // 'I'
01207     0x4f, // 'O'
01208     0x50, // 'P'
01209     (CORBA::Octet) 1, // Use the lowest GIOP version
01210     (CORBA::Octet) 0,
01211     TAO_ENCAP_BYTE_ORDER,
01212     GIOP::MessageError,
01213     0, 0, 0, 0
01214   };
01215 
01216   if (TAO_debug_level >= 5)
01217     {
01218       this->dump_msg ("send_error",
01219                       reinterpret_cast <const u_char *> (error_message),
01220                       TAO_GIOP_MESSAGE_HEADER_LEN);
01221     }
01222 
01223   ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01224                              ACE_Message_Block::MB_DATA,
01225                              error_message,
01226                              0,
01227                              0,
01228                              ACE_Message_Block::DONT_DELETE,
01229                              0);
01230   ACE_Message_Block message_block(&data_block,
01231                                   ACE_Message_Block::DONT_DELETE);
01232   message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01233 
01234   size_t bt;
01235   int const result = transport->send_message_block_chain (&message_block, bt);
01236   if (result == -1)
01237     {
01238       if (TAO_debug_level > 0)
01239         ACE_DEBUG ((LM_DEBUG,
01240                     ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
01241                     transport->id ()));
01242     }
01243 
01244   return result;
01245 }
01246 
01247 TAO_GIOP_Message_Generator_Parser*
01248 TAO_GIOP_Message_Base::get_parser (
01249     const TAO_GIOP_Message_Version &version) const
01250 {
01251   switch (version.major)
01252     {
01253     case 1:
01254       switch (version.minor)
01255         {
01256         case 0:
01257           return
01258             const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
01259                                      &this->tao_giop_impl_.tao_giop_10);
01260           break;
01261         case 1:
01262           return
01263             const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
01264                                      &this->tao_giop_impl_.tao_giop_11);
01265           break;
01266         case 2:
01267           return
01268             const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
01269                                      &this->tao_giop_impl_.tao_giop_12);
01270           break;
01271         default:
01272           throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
01273           break;
01274         }
01275       break;
01276     default:
01277       throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
01278       break;
01279     }
01280 }
01281 
01282 // Server sends an "I'm shutting down now, any requests you've sent me
01283 // can be retried" message to the server.  The message is prefab, for
01284 // simplicity.
01285 //
01286 // NOTE: this is IIOP-specific though it doesn't look like it is.  It
01287 // relies on a TCP-ism: orderly disconnect, which doesn't exist in all
01288 // transport protocols.  Versions of GIOP atop some transport that's
01289 // lacking orderly disconnect must define some transport-specific
01290 // handshaking (e.g. the XNS/SPP handshake convention) in order to
01291 // know that the same transport semantics are provided when shutdown
01292 // is begun with messages "in flight". (IIOP doesn't report false
01293 // errors in the case of "clean shutdown", because it relies on
01294 // orderly disconnect as provided by TCP.  This quality of service is
01295 // required to write robust distributed systems.)
01296 
01297 void
01298 TAO_GIOP_Message_Base::
01299   send_close_connection (const TAO_GIOP_Message_Version &version,
01300                          TAO_Transport *transport)
01301 {
01302   // static CORBA::Octet
01303   // I hate  this in every method. Till the time I figure out a way
01304   // around  I will have them here hanging around.
01305   const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01306   {
01307     // The following works on non-ASCII platforms, such as MVS (which
01308     // uses EBCDIC).
01309     0x47, // 'G'
01310     0x49, // 'I'
01311     0x4f, // 'O'
01312     0x50, // 'P'
01313     version.major,
01314     version.minor,
01315     TAO_ENCAP_BYTE_ORDER,
01316     GIOP::CloseConnection,
01317     0, 0, 0, 0
01318   };
01319 
01320   // It's important that we use a reliable shutdown after we send this
01321   // message, so we know it's received.
01322   //
01323   // @@ should recv and discard queued data for portability; note
01324   // that this won't block (long) since we never set SO_LINGER
01325 
01326   if (TAO_debug_level >= 5)
01327     {
01328       this->dump_msg ("send_close_connection",
01329                       reinterpret_cast <const u_char *> (close_message),
01330                       TAO_GIOP_MESSAGE_HEADER_LEN);
01331     }
01332 
01333 #if 0
01334   // @@CJC I don't think we need this check b/c the transport's send()
01335   // will simply return -1.  However, I guess we could create something
01336   // like TAO_Tranport::is_closed() that returns whether the connection
01337   // is already closed.  The problem with that, however, is that it's
01338   // entirely possible that is_closed() could return TRUE, and then the
01339   // transport could get closed down btw. the time it gets called and the
01340   // time that the send actually occurs.
01341   ACE_HANDLE which = transport->handle ();
01342   if (which == ACE_INVALID_HANDLE)
01343     {
01344       if (TAO_debug_level > 0)
01345         ACE_DEBUG ((LM_DEBUG,
01346            ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -")
01347            ACE_TEXT (" connection already closed\n")));
01348       return;
01349     }
01350 #endif
01351 
01352   ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01353                              ACE_Message_Block::MB_DATA,
01354                              close_message,
01355                              0,
01356                              0,
01357                              ACE_Message_Block::DONT_DELETE,
01358                              0);
01359   ACE_Message_Block message_block(&data_block);
01360   message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01361 
01362   size_t bt;
01363   int const result = transport->send_message_block_chain (&message_block, bt);
01364   if (result == -1)
01365     {
01366       if (TAO_debug_level > 0)
01367         ACE_ERROR ((LM_ERROR,
01368            ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
01369            transport->id (), errno));
01370     }
01371 
01372   transport->close_connection ();
01373   ACE_DEBUG ((LM_DEBUG,
01374       ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
01375       transport-> id ()));
01376 }
01377 
01378 
01379 int
01380 TAO_GIOP_Message_Base::send_reply_exception (
01381     TAO_Transport *transport,
01382     TAO_OutputCDR &output,
01383     CORBA::ULong request_id,
01384     IOP::ServiceContextList *svc_info,
01385     CORBA::Exception *x
01386   )
01387 {
01388   TAO_Pluggable_Reply_Params_Base reply_params;
01389   reply_params.request_id_ = request_id;
01390   reply_params.svc_ctx_.length (0);
01391 
01392   // We are going to send some data
01393   reply_params.argument_flag_ = true;
01394 
01395   // Send back the service context we received.  (RTCORBA relies on
01396   // this).
01397   reply_params.service_context_notowned (svc_info);
01398 
01399   if (CORBA::SystemException::_downcast (x) != 0)
01400     {
01401       reply_params.reply_status (GIOP::SYSTEM_EXCEPTION);
01402     }
01403   else
01404     {
01405       reply_params.reply_status (GIOP::USER_EXCEPTION);
01406     }
01407 
01408   if (this->generate_exception_reply (output, reply_params, *x) == -1)
01409     return -1;
01410 
01411   output.more_fragments (false);
01412 
01413   return transport->send_message (output, 0, TAO_Transport::TAO_REPLY);
01414 }
01415 
01416 void
01417 TAO_GIOP_Message_Base::dump_msg (const char *label,
01418                                  const u_char *ptr,
01419                                  size_t len)
01420 {
01421     static const char digits[] = "0123456789ABCD";
01422     static const char *names[] =
01423     {
01424       "Request",
01425       "Reply",
01426       "CancelRequest",
01427       "LocateRequest",
01428       "LocateReply",
01429       "CloseConnection",
01430       "MessageError",
01431       "Fragment"
01432     };
01433 
01434     // Message name.
01435     const char *message_name = "UNKNOWN MESSAGE";
01436     u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET];
01437     if (slot < sizeof (names) / sizeof (names[0]))
01438       message_name = names[slot];
01439 
01440     // Byte order.
01441     int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
01442 
01443     // Get the version info
01444     CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
01445     CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
01446 
01447     // request/reply id.
01448     CORBA::ULong tmp = 0;
01449     CORBA::ULong *id = &tmp;
01450     char *tmp_id = 0;
01451 
01452     if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Request ||
01453         ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Reply ||
01454         ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Fragment)
01455       {
01456         if (major == 1 && minor < 2)
01457           {
01458             // @@ Only works if ServiceContextList is empty....
01459             tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN  + 4);
01460           }
01461         else
01462           {
01463             tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
01464           }
01465 #if !defined (ACE_DISABLE_SWAP_ON_READ)
01466       if (byte_order == TAO_ENCAP_BYTE_ORDER)
01467         {
01468           id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01469         }
01470       else
01471         {
01472           ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
01473         }
01474 #else
01475       id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01476 #endif /* ACE_DISABLE_SWAP_ON_READ */
01477 
01478       }
01479 
01480     // Print.
01481     ACE_DEBUG ((LM_DEBUG,
01482                 "TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
01483                 "%s GIOP v%c.%c msg, %d data bytes, %s endian, "
01484                 "Type %s[%u]\n",
01485                 ACE_TEXT_CHAR_TO_TCHAR (label),
01486                 digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
01487                 digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
01488                 len - TAO_GIOP_MESSAGE_HEADER_LEN ,
01489                 (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
01490                 ACE_TEXT_CHAR_TO_TCHAR(message_name),
01491                 *id));
01492 
01493     if (TAO_debug_level >= 10)
01494       ACE_HEX_DUMP ((LM_DEBUG,
01495                       (const char *) ptr,
01496                       len,
01497                       ACE_TEXT ("GIOP message")));
01498 }
01499 
01500 int
01501 TAO_GIOP_Message_Base::generate_locate_reply_header (
01502     TAO_OutputCDR & /*cdr*/,
01503     TAO_Pluggable_Reply_Params_Base & /*params*/)
01504 {
01505   return 0;
01506 }
01507 
01508 bool
01509 TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) const
01510 {
01511   TAO_GIOP_Message_Version giop_version;
01512 
01513   msg.get_version (giop_version);
01514 
01515   // Get the parser we need to use
01516   TAO_GIOP_Message_Generator_Parser *generator_parser =
01517     this->get_parser (giop_version);
01518 
01519   // We dont really know.. So ask the generator and parser objects that
01520   // we know.
01521   // @@ TODO: Need to make this faster, instead of making virtual
01522   // call, try todo the check within this class
01523   return generator_parser->is_ready_for_bidirectional ();
01524 }
01525 
01526 
01527 TAO_Queued_Data *
01528 TAO_GIOP_Message_Base::make_queued_data (size_t sz)
01529 {
01530   // Make a datablock for the size requested + something. The
01531   // "something" is required because we are going to align the data
01532   // block in the message block. During alignment we could loose some
01533   // bytes. As we may not know how many bytes will be lost, we will
01534   // allocate ACE_CDR::MAX_ALIGNMENT extra.
01535   ACE_Data_Block *db =
01536     this->orb_core_->create_input_cdr_data_block (sz +
01537                                                   ACE_CDR::MAX_ALIGNMENT);
01538 
01539   TAO_Queued_Data *qd =
01540     TAO_Queued_Data::make_queued_data (
01541                  this->orb_core_->transport_message_buffer_allocator (),
01542                  this->orb_core_->input_cdr_msgblock_allocator (),
01543                  db);
01544 
01545   if (qd == 0)
01546     {
01547       if (TAO_debug_level > 0)
01548         {
01549           ACE_ERROR ((LM_ERROR,
01550             ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01551             ACE_TEXT ("out of memory, failed to allocate queued data object\n")));
01552         }
01553       db->release ();
01554       return 0; // NULL pointer
01555     }
01556 
01557   return qd;
01558 }
01559 
01560 size_t
01561 TAO_GIOP_Message_Base::header_length (void) const
01562 {
01563   return TAO_GIOP_MESSAGE_HEADER_LEN;
01564 }
01565 
01566 size_t
01567 TAO_GIOP_Message_Base::fragment_header_length (
01568   const TAO_GIOP_Message_Version& giop_version) const
01569 {
01570   // Get the parser we need to use
01571   TAO_GIOP_Message_Generator_Parser *generator_parser =
01572     this->get_parser (giop_version);
01573 
01574   return generator_parser->fragment_header_length ();
01575 }
01576 
01577 int
01578 TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd,
01579                                          CORBA::ULong &request_id) const
01580 {
01581   // Get the read and write positions before we steal data.
01582   size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
01583   size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
01584   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
01585 
01586   // Create a input CDR stream. We do the following
01587   //  1 - If the incoming message block has a data block with a flag
01588   //      DONT_DELETE  (for the data block) we create an input CDR
01589   //      stream the same way.
01590   //  2 - If the incoming message block had a datablock from heap just
01591   //      use it by duplicating it and make the flag 0.
01592   // NOTE: We use the same data block in which we read the message and
01593   // we pass it on to the higher layers of the ORB. So we dont to any
01594   // copies at all here. The same is also done in the higher layers.
01595 
01596   ACE_Message_Block::Message_Flags flg = 0;
01597   ACE_Data_Block *db = 0;
01598 
01599   // Get the flag in the message block
01600   flg = qd->msg_block ()->self_flags ();
01601 
01602   if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
01603     {
01604       // Use the same datablock
01605       db = qd->msg_block ()->data_block ();
01606     }
01607   else
01608     {
01609       // Use a duplicated datablock as the datablock has come off the
01610       // heap.
01611       db = qd->msg_block ()->data_block ()->duplicate ();
01612     }
01613 
01614   TAO_InputCDR input_cdr (db,
01615                           flg,
01616                           rd_pos,
01617                           wr_pos,
01618                           qd->byte_order (),
01619                           qd->giop_version ().major_version (),
01620                           qd->giop_version ().minor_version (),
01621                           this->orb_core_);
01622 
01623   if (qd->giop_version ().major == 1 &&
01624       (qd->giop_version ().minor == 0 || qd->giop_version ().minor == 1))
01625     {
01626       switch (qd->msg_type ())
01627       {
01628         case GIOP::Request:
01629         case GIOP::Reply:
01630           {
01631             IOP::ServiceContextList service_context;
01632 
01633             if ((input_cdr >> service_context)
01634                 && (input_cdr >> request_id))
01635               {
01636                 return 0;
01637               }
01638           }
01639           break;
01640         case GIOP::CancelRequest:
01641         case GIOP::LocateRequest:
01642         case GIOP::LocateReply:
01643           {
01644             if ((input_cdr >> request_id))
01645               {
01646                 return 0;
01647               }
01648           }
01649           break;
01650         default:
01651           break;
01652       }
01653     }
01654   else
01655     {
01656       switch (qd->msg_type ())
01657       {
01658         case GIOP::Request:
01659         case GIOP::Reply:
01660         case GIOP::Fragment:
01661         case GIOP::CancelRequest:
01662         case GIOP::LocateRequest:
01663         case GIOP::LocateReply:
01664           {
01665             // Dealing with GIOP-1.2, the request-id is located directly
01666             // behind the GIOP-Header.  This is true for all message
01667             // types that might be sent in form of fragments or
01668             // cancel-requests.
01669             if ((input_cdr >> request_id))
01670               {
01671                 return 0;
01672               }
01673           }
01674           break;
01675         default:
01676           break;
01677       }
01678     }
01679 
01680   return -1;
01681 }
01682 
01683 /* @return -1 error, 0 ok, +1 outstanding fragments */
01684 int
01685 TAO_GIOP_Message_Base::consolidate_fragmented_message (
01686   TAO_Queued_Data * qd,
01687   TAO_Queued_Data *& msg)
01688 {
01689   TAO::Incoming_Message_Stack reverse_stack;
01690 
01691   TAO_Queued_Data *tail = 0;
01692   TAO_Queued_Data *head = 0;
01693 
01694   //
01695   // CONSOLIDATE FRAGMENTED MESSAGE
01696   //
01697 
01698   // check for error-condition
01699   if (qd == 0)
01700     {
01701       return -1;
01702     }
01703 
01704   if (qd->giop_version ().major == 1 && qd->giop_version ().minor == 0)
01705     {
01706       TAO_Queued_Data::release (qd);
01707       return -1; // error: GIOP-1.0 does not support fragments
01708     }
01709 
01710   // If this is not the last fragment, push it onto stack for later processing
01711   if (qd->more_fragments ())
01712     {
01713       this->fragment_stack_.push (qd);
01714 
01715       msg = 0;   // no consolidated message available yet
01716       return 1;  // status: more messages expected.
01717     }
01718 
01719   tail = qd;  // init
01720 
01721   // Add the current message block to the end of the chain
01722   // after adjusting the read pointer to skip the header(s)
01723   size_t const header_adjustment =
01724     this->header_length () +
01725     this->fragment_header_length (tail->giop_version ().major_version ());
01726 
01727   if (tail->msg_block ()->length () < header_adjustment)
01728     {
01729       // buffer length not sufficient
01730       TAO_Queued_Data::release (qd);
01731       return -1;
01732     }
01733 
01734   // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2
01735   if (tail->giop_version ().major_version () == 1 && tail->giop_version ().minor_version () == 1)
01736     {
01737       // GIOP-1.1
01738 
01739       while (this->fragment_stack_.pop (head) != -1)
01740         {
01741           if (head->more_fragments () &&
01742               head->giop_version ().major_version () == 1 &&
01743               head->giop_version ().minor_version () == 1 &&
01744               head->msg_block ()->length () >= header_adjustment)
01745             {
01746               // adjust the read-pointer, skip the fragment header
01747               tail->msg_block ()->rd_ptr(header_adjustment);
01748 
01749               head->msg_block ()->cont (tail->msg_block ());
01750 
01751               tail->msg_block (0);
01752 
01753               TAO_Queued_Data::release (tail);
01754 
01755               tail = head;
01756             }
01757           else
01758             {
01759               reverse_stack.push (head);
01760             }
01761         }
01762     }
01763   else
01764     {
01765       // > GIOP-1.2
01766 
01767       CORBA::ULong tmp_request_id = 0;
01768       if (this->parse_request_id (tail, tmp_request_id) == -1)
01769         {
01770           return -1;
01771         }
01772 
01773       const CORBA::ULong request_id = tmp_request_id;
01774 
01775       while (this->fragment_stack_.pop (head) != -1)
01776         {
01777           CORBA::ULong head_request_id = 0;
01778           int parse_status = 0;
01779 
01780           if (head->more_fragments () &&
01781               head->giop_version ().major_version () >= 1 &&
01782               head->giop_version ().minor_version () >= 2 &&
01783               head->msg_block ()->length () >= header_adjustment &&
01784               (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
01785               request_id == head_request_id)
01786             {
01787               // adjust the read-pointer, skip the fragment header
01788               tail->msg_block ()->rd_ptr(header_adjustment);
01789 
01790               head->msg_block ()->cont (tail->msg_block ());
01791 
01792               tail->msg_block (0);
01793 
01794               TAO_Queued_Data::release (tail);
01795 
01796               tail = head;
01797             }
01798           else
01799             {
01800               if (parse_status == -1)
01801                 {
01802                   TAO_Queued_Data::release (head);
01803                   return -1;
01804                 }
01805 
01806               reverse_stack.push (head);
01807             }
01808         }
01809     }
01810 
01811   // restore stack
01812   while (reverse_stack.pop (head) != -1)
01813     {
01814       this->fragment_stack_.push (head);
01815     }
01816 
01817   if (tail->consolidate () == -1)
01818     {
01819       // memory allocation failed
01820       TAO_Queued_Data::release (tail);
01821       return -1;
01822     }
01823 
01824   // set out value
01825   msg = tail;
01826 
01827   return 0;
01828 }
01829 
01830 
01831 int
01832 TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request)
01833 {
01834   // We must extract the specific request-id from message-buffer
01835   // and remove all fragments from stack that match this request-id.
01836 
01837   TAO::Incoming_Message_Stack reverse_stack;
01838 
01839   CORBA::ULong cancel_request_id;
01840 
01841   if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
01842     {
01843       return -1;
01844     }
01845 
01846   TAO_Queued_Data *head = 0;
01847 
01848   // Revert stack
01849   while (this->fragment_stack_.pop (head) != -1)
01850     {
01851       reverse_stack.push (head);
01852     }
01853 
01854   bool discard_all_GIOP11_messages = false;
01855 
01856   // Now we are able to process message in order they have arrived.
01857   // If the cancel_request_id matches to GIOP-1.1 message, all succeeding
01858   // fragments belong to this message and must be discarded.
01859   // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the
01860   // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments
01861   // having encoded the request id will be discarded.
01862   while (reverse_stack.pop (head) != -1)
01863     {
01864       CORBA::ULong head_request_id;
01865 
01866       if (head->giop_version ().major_version () == 1 &&
01867           head->giop_version ().minor_version () <= 1 &&
01868           head->msg_type () != GIOP::Fragment && // GIOP11 fragment does not provide request id
01869           this->parse_request_id (head, head_request_id) >= 0 &&
01870           cancel_request_id == head_request_id)
01871         {
01872           TAO_Queued_Data::release (head);
01873           discard_all_GIOP11_messages = true;
01874         }
01875       else if (head->giop_version ().major_version () == 1 &&
01876                head->giop_version ().minor_version () <= 1 &&
01877                discard_all_GIOP11_messages)
01878         {
01879           TAO_Queued_Data::release (head);
01880         }
01881       else if (head->giop_version ().major_version () >= 1 &&
01882                head->giop_version ().minor_version () >= 2 &&
01883                this->parse_request_id (head, head_request_id) >= 0 &&
01884                cancel_request_id == head_request_id)
01885         {
01886           TAO_Queued_Data::release (head);
01887         }
01888       else
01889         {
01890           this->fragment_stack_.push (head);
01891         }
01892     }
01893 
01894   return 0;
01895 }
01896 
01897 TAO_GIOP_Fragmentation_Strategy *
01898 TAO_GIOP_Message_Base::fragmentation_strategy (void)
01899 {
01900   return this->fragmentation_strategy_.get ();
01901 }
01902 
01903 void
01904 TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
01905 {
01906   CORBA::Octet * const buf =
01907     reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
01908 
01909   CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
01910   CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
01911 
01912   // Flags for the GIOP protocol header "flags" field.
01913   CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
01914 
01915   // Least significant bit:        Byte order
01916   ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
01917 
01918   // Second least significant bit: More fragments
01919   //
01920   // Only supported in GIOP 1.1 or better.
01921   if (!(major <= 1 && minor == 0))
01922     ACE_SET_BITS (flags, msg.more_fragments () << 1);
01923 }
01924 
01925 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:37:52 2010 for TAO by  doxygen 1.4.7