GIOP_Message_Base.cpp

Go to the documentation of this file.
00001 // $Id: GIOP_Message_Base.cpp 79169 2007-08-02 08:41:23Z johnnyw $
00002 
00003 #include "tao/GIOP_Message_Base.h"
00004 #include "tao/operation_details.h"
00005 #include "tao/debug.h"
00006 #include "tao/ORB_Core.h"
00007 #include "tao/TAO_Server_Request.h"
00008 #include "tao/GIOP_Message_Locate_Header.h"
00009 #include "tao/Transport.h"
00010 #include "tao/Transport_Mux_Strategy.h"
00011 #include "tao/LF_Strategy.h"
00012 #include "tao/Request_Dispatcher.h"
00013 #include "tao/Codeset_Manager.h"
00014 #include "tao/SystemException.h"
00015 #include "ace/Min_Max.h"
00016 
00017 /*
00018  * Hook to add additional include files during specializations.
00019  */
00020 //@@ GIOP_MESSAGE_BASE_INCLUDE_ADD_HOOK
00021 
00022 ACE_RCSID (tao,
00023            GIOP_Message_Base,
00024            "$Id: GIOP_Message_Base.cpp 79169 2007-08-02 08:41:23Z johnnyw $")
00025 
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
00029                                               TAO_Transport *transport,
00030                                               size_t input_cdr_size)
00031   : orb_core_ (orb_core)
00032   , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
00033   , out_stream_ (0,
00034                  input_cdr_size,
00035                  TAO_ENCAP_BYTE_ORDER,
00036                  orb_core->output_cdr_buffer_allocator (),
00037                  orb_core->output_cdr_dblock_allocator (),
00038                  orb_core->output_cdr_msgblock_allocator (),
00039                  orb_core->orb_params ()->cdr_memcpy_tradeoff (),
00040                  fragmentation_strategy_.get (),
00041                  TAO_DEF_GIOP_MAJOR,
00042                  TAO_DEF_GIOP_MINOR)
00043 {
00044 }
00045 
00046 
00047 TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void)
00048 {
00049 }
00050 
00051 void
00052 TAO_GIOP_Message_Base::init (CORBA::Octet major, CORBA::Octet minor)
00053 {
00054   // Set the giop version of the out stream
00055   this->out_stream_.set_version (major, minor);
00056 }
00057 
00058 TAO_OutputCDR &
00059 TAO_GIOP_Message_Base::out_stream (void)
00060 {
00061   return this->out_stream_;
00062 }
00063 
00064 void
00065 TAO_GIOP_Message_Base::reset (void)
00066 {
00067   // no-op
00068 }
00069 
00070 int
00071 TAO_GIOP_Message_Base::generate_request_header (
00072     TAO_Operation_Details &op,
00073     TAO_Target_Specification &spec,
00074     TAO_OutputCDR &cdr)
00075 {
00076   // Get a parser for us
00077   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00078   TAO_GIOP_Message_Version giop_version;
00079 
00080   cdr.get_version (giop_version);
00081 
00082   // Get the state information that we need to use
00083   this->set_state (giop_version, generator_parser);
00084 
00085   // Write the GIOP header first
00086   if (!this->write_protocol_header (TAO_GIOP_REQUEST, cdr))
00087     {
00088       if (TAO_debug_level)
00089         {
00090           ACE_ERROR ((LM_ERROR,
00091                       ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00092         }
00093 
00094       return -1;
00095     }
00096 
00097   // Now call the implementation for the rest of the header
00098   if (!generator_parser->write_request_header (op, spec, cdr))
00099     {
00100       if (TAO_debug_level)
00101         ACE_ERROR ((LM_ERROR,
00102                     ACE_TEXT ("(%P|%t) Error in writing request header \n")));
00103 
00104       return -1;
00105     }
00106 
00107   return 0;
00108 }
00109 
00110 int
00111 TAO_GIOP_Message_Base::generate_locate_request_header (
00112     TAO_Operation_Details &op,
00113     TAO_Target_Specification &spec,
00114     TAO_OutputCDR &cdr)
00115 {
00116   // Get a parser for us
00117   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00118   TAO_GIOP_Message_Version giop_version;
00119 
00120   cdr.get_version (giop_version);
00121 
00122   // Get the state information that we need to use
00123   this->set_state (giop_version, generator_parser);
00124 
00125   // Write the GIOP header first
00126   if (!this->write_protocol_header (TAO_GIOP_LOCATEREQUEST, cdr))
00127     {
00128       if (TAO_debug_level)
00129         ACE_ERROR ((LM_ERROR,
00130                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00131 
00132       return -1;
00133     }
00134 
00135   // Now call the implementation for the rest of the header
00136   if (!generator_parser->write_locate_request_header
00137       (op.request_id (), spec, cdr))
00138     {
00139       if (TAO_debug_level)
00140         ACE_ERROR ((LM_ERROR,
00141                     ACE_TEXT ("(%P|%t) Error in writing locate request header \n")));
00142 
00143 
00144       return -1;
00145 
00146     }
00147 
00148   return 0;
00149 }
00150 
00151 int
00152 TAO_GIOP_Message_Base::generate_reply_header (
00153     TAO_OutputCDR &cdr,
00154     TAO_Pluggable_Reply_Params_Base &params)
00155 {
00156   // Get a parser for us
00157   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00158   TAO_GIOP_Message_Version giop_version;
00159 
00160   cdr.get_version (giop_version);
00161 
00162   // Get the state information that we need to use
00163   this->set_state (giop_version, generator_parser);
00164 
00165   // Write the GIOP header first
00166   if (!this->write_protocol_header (TAO_GIOP_REPLY, cdr))
00167     {
00168       if (TAO_debug_level)
00169         ACE_ERROR ((LM_ERROR,
00170                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00171 
00172       return -1;
00173     }
00174 
00175   try
00176     {
00177       // Now call the implementation for the rest of the header
00178       int const result =
00179         generator_parser->write_reply_header (cdr, params);
00180 
00181       if (!result)
00182         {
00183           if (TAO_debug_level > 4)
00184             ACE_ERROR ((LM_ERROR,
00185                         ACE_TEXT ("(%P|%t) Error in writing reply ")
00186                         ACE_TEXT ("header\n")));
00187 
00188           return -1;
00189         }
00190     }
00191   catch (const ::CORBA::Exception& ex)
00192     {
00193       if (TAO_debug_level > 4)
00194         ex._tao_print_exception (
00195           ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
00196 
00197       return -1;
00198     }
00199 
00200   return 0;
00201 }
00202 
00203 int
00204 TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
00205                                                  CORBA::ULong request_id)
00206 {
00207   // Get a parser for us
00208   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00209   TAO_GIOP_Message_Version giop_version;
00210 
00211   cdr.get_version (giop_version);
00212 
00213   // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
00214   // supports them in 1.2 or better since GIOP 1.1 fragments do not
00215   // have a fragment message header.
00216   if (giop_version.major == 1 && giop_version.minor < 2)
00217     return -1;
00218 
00219   // Get the state information that we need to use
00220   this->set_state (giop_version, generator_parser);
00221 
00222   // Write the GIOP header first
00223   if (!this->write_protocol_header (TAO_GIOP_FRAGMENT, cdr)
00224       || !generator_parser->write_fragment_header (cdr, request_id))
00225     {
00226       if (TAO_debug_level)
00227         ACE_ERROR ((LM_ERROR,
00228                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00229 
00230       return -1;
00231     }
00232 
00233   return 0;
00234 }
00235 
00236 int
00237 TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
00238 {
00239   // Ptr to first buffer.
00240   char * buf = (char *) stream.buffer ();
00241 
00242   this->set_giop_flags (stream);
00243 
00244   // Length of all buffers.
00245   size_t const total_len = stream.total_length ();
00246 
00247   // NOTE: Here would also be a fine place to calculate a digital
00248   // signature for the message and place it into a preallocated slot
00249   // in the "ServiceContext".  Similarly, this is a good spot to
00250   // encrypt messages (or just the message bodies) if that's needed in
00251   // this particular environment and that isn't handled by the
00252   // networking infrastructure (e.g., IPSEC).
00253 
00254   CORBA::ULong bodylen = static_cast <CORBA::ULong>
00255                            (total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
00256 
00257 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00258   *(reinterpret_cast <CORBA::ULong *> (buf +
00259                          TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00260 #else
00261   if (!stream.do_byte_swap ())
00262     *(reinterpret_cast <CORBA::ULong *>
00263                            (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00264   else
00265     ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen),
00266                      buf + TAO_GIOP_MESSAGE_SIZE_OFFSET);
00267 #endif /* ACE_ENABLE_SWAP_ON_WRITE */
00268 
00269   if (TAO_debug_level > 2)
00270     {
00271       // Check whether the output cdr stream is build up of multiple
00272       // messageblocks. If so, consolidate them to one block that can be
00273       // dumped
00274       ACE_Message_Block* consolidated_block = 0;
00275       if (stream.begin()->cont () != 0)
00276         {
00277           ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0);
00278           ACE_CDR::consolidate (consolidated_block, stream.begin ());
00279           buf = (char *) (consolidated_block->rd_ptr ());
00280         }
00281       ///
00282       this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len);
00283 
00284       //
00285       delete consolidated_block;
00286       consolidated_block = 0;
00287       //
00288     }
00289 
00290   return 0;
00291 }
00292 
00293 int
00294 TAO_GIOP_Message_Base::parse_next_message (TAO_Queued_Data &qd,
00295                                            size_t &mesg_length)
00296 {
00297   if (qd.msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00298     {
00299       qd.missing_data (TAO_MISSING_DATA_UNDEFINED);
00300 
00301       return 0; /* incomplete header */
00302     }
00303   else
00304     {
00305       TAO_GIOP_Message_State state;
00306 
00307       if (state.parse_message_header (*(qd.msg_block ())) == -1)
00308         {
00309           return -1;
00310         }
00311 
00312       size_t const message_size = state.message_size (); /* Header + Payload */
00313 
00314       if (message_size > qd.msg_block ()->length ())
00315         {
00316           qd.missing_data (message_size - qd.msg_block ()->length ());
00317         }
00318       else
00319         {
00320           qd.missing_data (0);
00321         }
00322 
00323       /* init out-parameters */
00324       qd.set_state (state);
00325       mesg_length = message_size;
00326 
00327       return 1; /* complete header */
00328     }
00329 }
00330 
00331 int
00332 TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
00333                                              TAO_Queued_Data *&qd)
00334 {
00335   if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00336     {
00337       if (incoming.length () > 0)
00338         {
00339           // Optimize memory usage, we dont know actual message size
00340           // so far, but allocate enough space to hold small GIOP
00341           // messages. This way we avoid expensive "grow" operation
00342           // for small messages.
00343           size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
00344 
00345           // Make a node which has at least message block of the size
00346           // of MESSAGE_HEADER_LEN.
00347           size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
00348                                            default_buf_size);
00349 
00350           // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN
00351 
00352           qd = this->make_queued_data (buf_size);
00353 
00354           if (qd == 0)
00355             {
00356               if (TAO_debug_level > 0)
00357                 {
00358                   ACE_ERROR((LM_ERROR,
00359                              ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00360                              ACE_TEXT ("out of memory\n")));
00361                 }
00362               return -1;
00363             }
00364 
00365           qd->msg_block ()->copy (incoming.rd_ptr (), incoming.length ());
00366 
00367           incoming.rd_ptr (incoming.length ()); // consume all available data
00368 
00369           qd->missing_data (TAO_MISSING_DATA_UNDEFINED);
00370         }
00371       else
00372         {
00373           // handle not initialized variables
00374           qd = 0;  // reset
00375         }
00376 
00377       return 0;
00378     }
00379 
00380   TAO_GIOP_Message_State state;
00381   if (state.parse_message_header (incoming) == -1)
00382     {
00383       return -1;
00384     }
00385 
00386   size_t copying_len = state.message_size ();
00387 
00388   qd = this->make_queued_data (copying_len);
00389 
00390   if (qd == 0)
00391     {
00392       if (TAO_debug_level > 0)
00393         {
00394           ACE_ERROR ((LM_ERROR,
00395                       ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00396                       ACE_TEXT ("out of memory\n")));
00397         }
00398       return -1;
00399     }
00400 
00401   if (copying_len > incoming.length ())
00402     {
00403       qd->missing_data (copying_len - incoming.length ());
00404       copying_len = incoming.length ();
00405     }
00406   else
00407     {
00408       qd->missing_data (0);
00409     }
00410 
00411   qd->msg_block ()->copy (incoming.rd_ptr (), copying_len);
00412 
00413   incoming.rd_ptr (copying_len);
00414   qd->set_state (state);
00415 
00416   return 1;
00417 }
00418 
00419 int
00420 TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
00421                                          ACE_Message_Block &incoming)
00422 {
00423   // Look to see whether we had atleast parsed the GIOP header ...
00424   if (qd->missing_data () == TAO_MISSING_DATA_UNDEFINED)
00425     {
00426       // The data length that has been stuck in there during the last
00427       // read ....
00428       size_t const len = qd->msg_block ()->length ();
00429 
00430       // paranoid check
00431       if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
00432         {
00433           // inconsistency - this code should have parsed the header
00434           // so far
00435           return -1;
00436         }
00437 
00438       // We know that we would have space for
00439       // TAO_GIOP_MESSAGE_HEADER_LEN here.  So copy that much of data
00440       // from the <incoming> into the message block in <qd>
00441       size_t const available = incoming.length ();
00442       size_t const desired   = TAO_GIOP_MESSAGE_HEADER_LEN - len;
00443       size_t const n_copy    = ace_min (available, desired);
00444 
00445       // paranoid check, but would cause endless looping
00446       if (n_copy == 0)
00447         {
00448           return -1;
00449         }
00450 
00451       if (qd->msg_block ()->copy (incoming.rd_ptr (), n_copy) == -1)
00452         {
00453            return -1;
00454         }
00455 
00456       // Move the rd_ptr () in the incoming message block..
00457       incoming.rd_ptr (n_copy);
00458 
00459       // verify sufficient data to parse GIOP header
00460       if (qd->msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00461         {
00462           return 0; /* continue */
00463         }
00464 
00465       TAO_GIOP_Message_State state;
00466 
00467       // Parse the message header now...
00468       if (state.parse_message_header (*qd->msg_block ()) == -1)
00469         {
00470           if (TAO_debug_level > 0)
00471             {
00472               ACE_ERROR ((LM_ERROR,
00473                   ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
00474                   ACE_TEXT ("error parsing header\n") ));
00475             }
00476           return -1;
00477         }
00478       // Now grow the message block so that we can copy the rest of
00479       // the data, the message_block must be able to hold complete message
00480        if (ACE_CDR::grow (qd->msg_block (),
00481                           state.message_size ()) == -1)  /* GIOP_Header + Payload */
00482          {
00483            // on mem-error get rid of context silently, try to avoid
00484            // system calls that might allocate additional memory
00485            return -1;
00486          }
00487 
00488       // Copy the pay load..
00489       // Calculate the bytes that needs to be copied in the queue...
00490       size_t copy_len = state.payload_size ();
00491 
00492       // If the data that needs to be copied is more than that is
00493       // available to us ..
00494       if (copy_len > incoming.length ())
00495         {
00496           // Calculate the missing data..
00497           qd->missing_data (copy_len - incoming.length ());
00498 
00499           // Set the actual possible copy_len that is available...
00500           copy_len = incoming.length ();
00501         }
00502       else
00503         {
00504           qd->missing_data (0);
00505         }
00506 
00507       // ..now we are set to copy the right amount of data to the
00508       // node..
00509       if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00510         {
00511           return -1;
00512         }
00513 
00514       // Set the <rd_ptr> of the <incoming>..
00515       incoming.rd_ptr (copy_len);
00516 
00517       // Get the other details...
00518       qd->set_state (state);
00519     }
00520   else
00521     {
00522       // @todo: Need to abstract this out to a seperate method...
00523       size_t copy_len = qd->missing_data ();
00524 
00525       if (copy_len > incoming.length ())
00526         {
00527           // Calculate the missing data..
00528           qd->missing_data (copy_len - incoming.length ());
00529 
00530           // Set the actual possible copy_len that is available...
00531           copy_len = incoming.length ();
00532         }
00533 
00534       // paranoid check for endless-event-looping
00535       if (copy_len == 0)
00536         {
00537           return -1;
00538         }
00539 
00540       // Copy the right amount of data in to the node...
00541       if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
00542         {
00543           return -1;
00544         }
00545 
00546       // Set the <rd_ptr> of the <incoming>..
00547       qd->msg_block ()->rd_ptr (copy_len);
00548 
00549     }
00550 
00551   return 0;
00552 }
00553 
00554 int
00555 TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
00556                                                 TAO_Queued_Data *qd)
00557 {
00558   // Set the upcall thread
00559   this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
00560 
00561   // Get a parser for us
00562   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00563 
00564   // Get the state information that we need to use
00565   this->set_state (qd->giop_version (), generator_parser);
00566 
00567   // A buffer that we will use to initialise the CDR stream.  Since we're
00568   // allocating the buffer on the stack, we may as well allocate the data
00569   // block on the stack too and avoid an allocation inside the message
00570   // block of the CDR.
00571 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00572   char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
00573 #else
00574   char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
00575 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00576   ACE_Data_Block out_db (sizeof (repbuf),
00577                          ACE_Message_Block::MB_DATA,
00578                          repbuf,
00579                          this->orb_core_->input_cdr_buffer_allocator (),
00580                          0,
00581                          ACE_Message_Block::DONT_DELETE,
00582                          this->orb_core_->input_cdr_dblock_allocator ());
00583 
00584   // Initialize an output CDR on the stack
00585   // NOTE: Don't jump to a conclusion as to why we are using the
00586   // input_cdr and hence the  global pool here. These pools will move
00587   // to the lanes anyway at some point of time. Further, it would have
00588   // been awesome to have this in TSS. But for some reason the cloning
00589   // that happens when the ORB gets flow controlled while writing a
00590   // reply is messing things up. We crash horribly. Doing this adds a
00591   // lock, we need to set things like this -- put stuff in TSS here
00592   // and transfer to global memory when we get flow controlled. We
00593   // need to work on the message block to get it right!
00594   TAO_OutputCDR output (&out_db,
00595                         TAO_ENCAP_BYTE_ORDER,
00596                         this->orb_core_->input_cdr_msgblock_allocator (),
00597                         this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
00598                         this->fragmentation_strategy_.get (),
00599                         qd->giop_version ().major_version (),
00600                         qd->giop_version ().minor_version ());
00601 
00602   // Get the read and write positions before we steal data.
00603   size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00604   size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00605   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00606 
00607   if (TAO_debug_level > 0)
00608     this->dump_msg ("recv",
00609                     reinterpret_cast <u_char *> (qd->msg_block ()->rd_ptr ()),
00610                     qd->msg_block ()->length ());
00611 
00612 
00613   // Create a input CDR stream. We do the following
00614   //  1 - If the incoming message block has a data block with a flag
00615   //      DONT_DELETE  (for the data block) we create an input CDR
00616   //      stream the same way.
00617   //  2 - If the incoming message block had a datablock from heap just
00618   //      use it by duplicating it and make the flag 0.
00619   // NOTE: We use the same data block in which we read the message and
00620   // we pass it on to the higher layers of the ORB. So we dont to any
00621   // copies at all here. The same is also done in the higher layers.
00622 
00623   ACE_Message_Block::Message_Flags flg = 0;
00624   ACE_Data_Block *db = 0;
00625 
00626   // Get the flag in the message block
00627   flg = qd->msg_block ()->self_flags ();
00628 
00629   if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
00630     {
00631       // Use the same datablock
00632       db = qd->msg_block ()->data_block ();
00633     }
00634   else
00635     {
00636       // Use a duplicated datablock as the datablock has come off the
00637       // heap.
00638       db = qd->msg_block ()->data_block ()->duplicate ();
00639     }
00640 
00641   TAO_InputCDR input_cdr (db,
00642                           flg,
00643                           rd_pos,
00644                           wr_pos,
00645                           qd->byte_order (),
00646                           qd->giop_version ().major_version (),
00647                           qd->giop_version ().minor_version (),
00648                           this->orb_core_);
00649 
00650   transport->assign_translators(&input_cdr,&output);
00651 
00652   // We know we have some request message. Check whether it is a
00653   // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action.
00654 
00655   // Once we send the InputCDR stream we need to just forget about
00656   // the stream and never touch that again for anything. We basically
00657   // loose ownership of the data_block.
00658 
00659   switch (qd->msg_type ())
00660     {
00661     case TAO_PLUGGABLE_MESSAGE_REQUEST:
00662       // Should be taken care by the state specific invocations. They
00663       // could raise an exception or write things in the output CDR
00664       // stream
00665       return this->process_request (transport,
00666                                     input_cdr,
00667                                     output,
00668                                     generator_parser);
00669 
00670     case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
00671       return this->process_locate_request (transport,
00672                                            input_cdr,
00673                                            output,
00674                                            generator_parser);
00675     default:
00676       return -1;
00677     }
00678 }
00679 
00680 int
00681 TAO_GIOP_Message_Base::process_reply_message (
00682     TAO_Pluggable_Reply_Params &params,
00683     TAO_Queued_Data *qd)
00684 {
00685   // Get a parser for us
00686   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00687 
00688   // Get the state information that we need to use
00689   this->set_state (qd->giop_version (), generator_parser);
00690 
00691   // Get the read and write positions before we steal data.
00692   size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
00693   size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
00694   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00695 
00696   if (TAO_debug_level > 0)
00697     this->dump_msg ("recv",
00698                     reinterpret_cast <u_char *> (qd->msg_block ()->rd_ptr ()),
00699                     qd->msg_block ()->length ());
00700 
00701 
00702   // Create a empty buffer on stack
00703   // NOTE: We use the same data block in which we read the message and
00704   // we pass it on to the higher layers of the ORB. So we dont to any
00705   // copies at all here.
00706   TAO_InputCDR input_cdr (qd->msg_block ()->data_block (),
00707                           ACE_Message_Block::DONT_DELETE,
00708                           rd_pos,
00709                           wr_pos,
00710                           qd->byte_order (),
00711                           qd->giop_version ().major_version (),
00712                           qd->giop_version ().minor_version (),
00713                           this->orb_core_);
00714 
00715   // We know we have some reply message. Check whether it is a
00716   // GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
00717 
00718   // Once we send the InputCDR stream we need to just forget about
00719   // the stream and never touch that again for anything. We basically
00720   // loose ownership of the data_block.
00721   int retval = 0;
00722 
00723   switch (qd->msg_type ())
00724     {
00725     case TAO_PLUGGABLE_MESSAGE_REPLY:
00726       // Should be taken care by the state specific parsing
00727       retval = generator_parser->parse_reply (input_cdr, params);
00728 
00729       break;
00730     case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
00731       retval = generator_parser->parse_locate_reply (input_cdr, params);
00732       break;
00733     default:
00734       retval = -1;
00735     }
00736 
00737   if (retval == -1)
00738     return retval;
00739 
00740   params.input_cdr_ = &input_cdr;
00741   params.transport_->assign_translators (params.input_cdr_, 0);
00742 
00743   retval = params.transport_->tms ()->dispatch_reply (params);
00744 
00745   if (retval == -1)
00746     {
00747       // Something really critical happened, we will forget about
00748       // every reply on this connection.
00749       if (TAO_debug_level > 0)
00750         ACE_ERROR ((LM_ERROR,
00751                     ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_reply_message, ")
00752                     ACE_TEXT ("dispatch reply failed\n"),
00753                     params.transport_->id ()));
00754     }
00755 
00756   return retval;
00757 }
00758 
00759 int
00760 TAO_GIOP_Message_Base::generate_exception_reply (
00761     TAO_OutputCDR &cdr,
00762     TAO_Pluggable_Reply_Params_Base &params,
00763     const CORBA::Exception &x)
00764 {
00765   // A new try/catch block, but if something goes wrong now we have no
00766   // hope, just abort.
00767 
00768   try
00769     {
00770       // Make the GIOP & reply header.
00771       this->generate_reply_header (cdr, params);
00772       x._tao_encode (cdr);
00773     }
00774   catch (const ::CORBA::Exception&)
00775     {
00776       // Now we know that while handling the error an other error
00777       // happened -> no hope, close connection.
00778 
00779       // Close the handle.
00780       if (TAO_debug_level > 0)
00781         ACE_DEBUG ((LM_DEBUG,
00782                   ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
00783                   ACE_TEXT ("generate_exception_reply ()\n")));
00784       return -1;
00785     }
00786 
00787   return 0;
00788 }
00789 
00790 int
00791 TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type,
00792                                               TAO_OutputCDR &msg)
00793 {
00794   // Reset the message type
00795   msg.reset ();
00796 
00797   CORBA::Octet header[12] =
00798     {
00799       // The following works on non-ASCII platforms, such as MVS (which
00800       // uses EBCDIC).
00801       0x47, // 'G'
00802       0x49, // 'I'
00803       0x4f, // 'O'
00804       0x50  // 'P'
00805     };
00806 
00807   TAO_GIOP_Message_Version giop_version;
00808 
00809   msg.get_version (giop_version);
00810 
00811   header[4] = giop_version.major;
00812   header[5] = giop_version.minor;
00813 
00814   // "flags" octet, i.e. header[6] will be set up later when message
00815   // is formatted by the transport.
00816 
00817   header[7] = CORBA::Octet (type);  // Message type
00818 
00819   static ACE_CDR::ULong const header_size =
00820     sizeof (header) / sizeof (header[0]);
00821 
00822   // Fragmentation should not occur at this point since there are only
00823   // 12 bytes in the stream, and fragmentation may only occur when
00824   // the stream length >= 16.
00825   msg.write_octet_array (header, header_size);
00826 
00827   return msg.good_bit ();
00828 }
00829 
00830 int
00831 TAO_GIOP_Message_Base::process_request (
00832   TAO_Transport * transport,
00833   TAO_InputCDR & cdr,
00834   TAO_OutputCDR & output,
00835   TAO_GIOP_Message_Generator_Parser * parser)
00836 {
00837   // This will extract the request header, set <response_required>
00838   // and <sync_with_server> as appropriate.
00839   TAO_ServerRequest request (this,
00840                              cdr,
00841                              output,
00842                              transport,
00843                              this->orb_core_);
00844 
00845   CORBA::ULong request_id = 0;
00846   CORBA::Boolean response_required = false;
00847   int parse_error = 0;
00848 
00849   try
00850     {
00851       parse_error = parser->parse_request_header (request);
00852 
00853       // Throw an exception if the
00854       if (parse_error != 0)
00855         throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
00856 
00857       TAO_Codeset_Manager *csm = request.orb_core ()->codeset_manager ();
00858       if (csm)
00859         {
00860           csm->process_service_context (request);
00861           transport->assign_translators (&cdr, &output);
00862         }
00863 
00864       request_id = request.request_id ();
00865 
00866       response_required = request.response_expected ();
00867 
00868       CORBA::Object_var forward_to;
00869 
00870       /*
00871        * Hook to specialize request processing within TAO
00872        * This hook will be replaced by specialized request
00873        * processing implementation.
00874        */
00875 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START
00876 
00877       // Do this before the reply is sent.
00878       this->orb_core_->request_dispatcher ()->dispatch (
00879           this->orb_core_,
00880           request,
00881           forward_to);
00882 
00883 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_END
00884 
00885       if (!CORBA::is_nil (forward_to.in ()))
00886         {
00887           CORBA::Boolean const permanent_forward_condition =
00888               this->orb_core_->is_permanent_forward_condition
00889               (forward_to.in (),
00890                request.request_service_context ());
00891 
00892           // We should forward to another object...
00893           TAO_Pluggable_Reply_Params_Base reply_params;
00894           reply_params.request_id_ = request_id;
00895           reply_params.reply_status_ =
00896               permanent_forward_condition
00897               ? TAO_GIOP_LOCATION_FORWARD_PERM
00898               : TAO_GIOP_LOCATION_FORWARD;
00899           reply_params.svc_ctx_.length (0);
00900 
00901           // Send back the reply service context.
00902           reply_params.service_context_notowned (
00903             &request.reply_service_info ());
00904 
00905           output.message_attributes (request_id,
00906                                      0,
00907                                      TAO_Transport::TAO_REPLY,
00908                                      0);
00909 
00910           // Make the GIOP header and Reply header
00911           this->generate_reply_header (output, reply_params);
00912 
00913           if (!(output << forward_to.in ()))
00914             {
00915               if (TAO_debug_level > 0)
00916                 ACE_ERROR ((LM_ERROR,
00917                             ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
00918                             ACE_TEXT ("forward reference.\n")));
00919 
00920               return -1;
00921             }
00922 
00923           output.more_fragments (false);
00924 
00925           int result = transport->send_message (output,
00926                                                 0,
00927                                                 TAO_Transport::TAO_REPLY);
00928           if (result == -1)
00929             {
00930               if (TAO_debug_level > 0)
00931                 {
00932                   // No exception but some kind of error, yet a
00933                   // response is required.
00934                   ACE_ERROR ((LM_ERROR,
00935                               ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
00936                               ACE_TEXT ("cannot send reply\n"),
00937                               ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
00938                 }
00939             }
00940           return result;
00941         }
00942     }
00943   // Only CORBA exceptions are caught here.
00944   catch ( ::CORBA::Exception& ex)
00945     {
00946       int result = 0;
00947 
00948       if (response_required)
00949         {
00950           result = this->send_reply_exception (transport,
00951                                                output,
00952                                                request_id,
00953                                                &request.reply_service_info (),
00954                                                &ex);
00955           if (result == -1)
00956             {
00957               if (TAO_debug_level > 0)
00958                 {
00959                   ACE_ERROR ((LM_ERROR,
00960                               ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
00961                               ACE_TEXT ("cannot send exception\n"),
00962                               ACE_TEXT ("process_connector_request ()")));
00963 
00964                   ex._tao_print_exception (
00965                     "TAO_GIOP_Message_Base::process_request[1]");
00966                 }
00967             }
00968 
00969         }
00970       else if (TAO_debug_level > 0)
00971         {
00972           // It is unfortunate that an exception (probably a system
00973           // exception) was thrown by the upcall code (even by the
00974           // user) when the client was not expecting a response.
00975           // However, in this case, we cannot close the connection
00976           // down, since it really isn't the client's fault.
00977 
00978           ACE_ERROR ((LM_ERROR,
00979                       ACE_TEXT ("(%P|%t) exception thrown ")
00980                       ACE_TEXT ("but client is not waiting a response\n")));
00981 
00982           ex._tao_print_exception (
00983             "TAO_GIOP_Message_Base::process_request[2]");
00984         }
00985 
00986       return result;
00987     }
00988   catch (...)
00989     {
00990       // @@ TODO some c++ exception or another, but what do we do with
00991       //    it?
00992       // We are supposed to map it into a CORBA::UNKNOWN exception.
00993       // BTW, this cannot be detected if using the <env> mapping.  If
00994       // we have native exceptions but no support for them in the ORB
00995       // we should still be able to catch it.  If we don't have native
00996       // exceptions it couldn't have been raised in the first place!
00997       int result = 0;
00998 
00999       if (response_required)
01000         {
01001           CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code
01002                                     (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0),
01003                                     CORBA::COMPLETED_MAYBE);
01004 
01005           if (this->send_reply_exception (transport,
01006                                           output,
01007                                           request_id,
01008                                           &request.reply_service_info (),
01009                                           &exception) == -1
01010               && TAO_debug_level > 0)
01011             {
01012               ACE_ERROR ((LM_ERROR,
01013                           ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
01014                           ACE_TEXT ("%p: ")
01015                           ACE_TEXT ("cannot send exception\n"),
01016                           ACE_TEXT ("process_request ()")));
01017               exception._tao_print_exception (
01018                 "TAO_GIOP_Message_Base::process_request[3]");
01019             }
01020         }
01021       else if (TAO_debug_level > 0)
01022         {
01023           // It is unfotunate that an exception (probably a system
01024           // exception) was thrown by the upcall code (even by the
01025           // user) when the client was not expecting a response.
01026           // However, in this case, we cannot close the connection
01027           // down, since it really isn't the client's fault.
01028           ACE_ERROR ((LM_ERROR,
01029                       ACE_TEXT ("TAO (%P|%t) exception thrown ")
01030                       ACE_TEXT ("but client is not waiting a response\n")));
01031         }
01032 
01033       return result;
01034     }
01035 
01036   return 0;
01037 }
01038 
01039 
01040 int
01041 TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
01042                                                TAO_InputCDR &input,
01043                                                TAO_OutputCDR &output,
01044                                                TAO_GIOP_Message_Generator_Parser *parser)
01045 {
01046   // This will extract the request header, set <response_required> as
01047   // appropriate.
01048   TAO_GIOP_Locate_Request_Header locate_request (input, this->orb_core_);
01049 
01050   TAO_GIOP_Locate_Status_Msg status_info;
01051 
01052   // Defaulting.
01053   status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01054 
01055   CORBA::Boolean response_required = true;
01056 
01057   try
01058     {
01059       int parse_error = parser->parse_locate_header (locate_request);
01060 
01061       if (parse_error != 0)
01062         {
01063           throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01064         }
01065 
01066       TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
01067                               locate_request.object_key ().length (),
01068                               locate_request.object_key ().get_buffer (),
01069                               0);
01070 
01071       // Set it to an error state
01072       parse_error = 1;
01073       CORBA::ULong req_id = locate_request.request_id ();
01074 
01075       // We will send the reply. The ServerRequest class need not send
01076       // the reply
01077       CORBA::Boolean deferred_reply = true;
01078       TAO_ServerRequest server_request (this,
01079                                         req_id,
01080                                         response_required,
01081                                         deferred_reply,
01082                                         tmp_key,
01083                                         "_non_existent",
01084                                         output,
01085                                         transport,
01086                                         this->orb_core_,
01087                                         parse_error);
01088 
01089       if (parse_error != 0)
01090         {
01091           throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
01092         }
01093 
01094       CORBA::Object_var forward_to;
01095 
01096       this->orb_core_->request_dispatcher ()->dispatch (
01097           this->orb_core_,
01098           server_request,
01099           forward_to);
01100 
01101       if (!CORBA::is_nil (forward_to.in ()))
01102         {
01103           status_info.status = TAO_GIOP_OBJECT_FORWARD;
01104           status_info.forward_location_var = forward_to;
01105           if (TAO_debug_level > 0)
01106             ACE_DEBUG ((LM_DEBUG,
01107                         ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01108                         ACE_TEXT ("called: forwarding\n")));
01109         }
01110       else if (server_request.exception_type () == TAO_GIOP_NO_EXCEPTION)
01111         {
01112           // We got no exception, so the object is here.
01113           status_info.status = TAO_GIOP_OBJECT_HERE;
01114           if (TAO_debug_level > 0)
01115             ACE_DEBUG ((LM_DEBUG,
01116                         ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01117                         ACE_TEXT ("found\n")));
01118         }
01119       else
01120         {
01121           status_info.forward_location_var = server_request.forward_location ();
01122 
01123           if (!CORBA::is_nil (status_info.forward_location_var.in ()))
01124             {
01125               status_info.status = TAO_GIOP_OBJECT_FORWARD;
01126               ACE_DEBUG ((LM_DEBUG,
01127                           ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01128                           ACE_TEXT ("forwarding\n")));
01129             }
01130           else
01131             {
01132               // Normal exception, so the object is not here
01133               status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01134               ACE_DEBUG ((LM_DEBUG,
01135                           ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01136                           ACE_TEXT ("not here\n")));
01137             }
01138         }
01139     }
01140 
01141   catch (const ::CORBA::Exception&)
01142     {
01143       // Normal exception, so the object is not here
01144       status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01145       if (TAO_debug_level > 0)
01146         ACE_DEBUG ((LM_DEBUG,
01147                     ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01148                     ACE_TEXT ("CORBA exception raised\n")));
01149     }
01150   catch (...)
01151     {
01152       // Normal exception, so the object is not here
01153       status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01154       if (TAO_debug_level > 0)
01155         ACE_DEBUG ((LM_DEBUG,
01156                     ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
01157                     ACE_TEXT ("C++ exception raised\n")));
01158     }
01159 
01160   return this->make_send_locate_reply (transport,
01161                                        locate_request,
01162                                        status_info,
01163                                        output,
01164                                        parser);
01165 }
01166 
01167 int
01168 TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
01169                                                TAO_GIOP_Locate_Request_Header &request,
01170                                                TAO_GIOP_Locate_Status_Msg &status_info,
01171                                                TAO_OutputCDR &output,
01172                                                TAO_GIOP_Message_Generator_Parser *parser)
01173 {
01174   // Note here we are making the Locate reply header which is *QUITE*
01175   // different from the reply header made by the make_reply () call..
01176   // Make the GIOP message header
01177   this->write_protocol_header (TAO_GIOP_LOCATEREPLY, output);
01178 
01179   // This writes the header & body
01180   parser->write_locate_reply_mesg (output,
01181                                    request.request_id (),
01182                                    status_info);
01183 
01184   output.more_fragments (false);
01185 
01186   // Send the message
01187   int const result = transport->send_message (output,
01188                                               0,
01189                                               TAO_Transport::TAO_REPLY);
01190 
01191   // Print out message if there is an error
01192   if (result == -1)
01193     {
01194       if (TAO_debug_level > 0)
01195         {
01196           ACE_ERROR ((LM_ERROR,
01197                       ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
01198                       ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
01199         }
01200     }
01201 
01202   return result;
01203 }
01204 
01205 // Send an "I can't understand you" message -- again, the message is
01206 // prefabricated for simplicity.  This implies abortive disconnect (at
01207 // the application level, if not at the level of TCP).
01208 //
01209 // NOTE that IIOP will still benefit from TCP's orderly disconnect.
01210 int
01211 TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
01212 {
01213   const char error_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01214   {
01215     // The following works on non-ASCII platforms, such as MVS (which
01216     // uses EBCDIC).
01217     0x47, // 'G'
01218     0x49, // 'I'
01219     0x4f, // 'O'
01220     0x50, // 'P'
01221     (CORBA::Octet) 1, // Use the lowest GIOP version
01222     (CORBA::Octet) 0,
01223     TAO_ENCAP_BYTE_ORDER,
01224     TAO_GIOP_MESSAGERROR,
01225     0, 0, 0, 0
01226   };
01227 
01228   this->dump_msg ("send_error",
01229                   (const u_char *) error_message,
01230                   TAO_GIOP_MESSAGE_HEADER_LEN);
01231 
01232   ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01233                              ACE_Message_Block::MB_DATA,
01234                              error_message,
01235                              0,
01236                              0,
01237                              ACE_Message_Block::DONT_DELETE,
01238                              0);
01239   ACE_Message_Block message_block(&data_block,
01240                                   ACE_Message_Block::DONT_DELETE);
01241   message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01242 
01243   size_t bt;
01244   int const result = transport->send_message_block_chain (&message_block, bt);
01245   if (result == -1)
01246     {
01247       if (TAO_debug_level > 0)
01248         ACE_DEBUG ((LM_DEBUG,
01249                     ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
01250                     transport->id ()));
01251     }
01252 
01253   return result;
01254 }
01255 
01256 void
01257 TAO_GIOP_Message_Base::set_state (
01258     const TAO_GIOP_Message_Version &version,
01259     TAO_GIOP_Message_Generator_Parser *&gen_parser) const
01260 {
01261   switch (version.major)
01262     {
01263     case 1:
01264       switch (version.minor)
01265         {
01266         case 0:
01267           gen_parser =
01268             const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
01269                                      &this->tao_giop_impl_.tao_giop_10);
01270           break;
01271         case 1:
01272           gen_parser =
01273             const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
01274                                      &this->tao_giop_impl_.tao_giop_11);
01275           break;
01276         case 2:
01277           gen_parser =
01278             const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
01279                                      &this->tao_giop_impl_.tao_giop_12);
01280           break;
01281         default:
01282           break;
01283         }
01284       break;
01285     default:
01286       break;
01287     }
01288 }
01289 
01290 
01291 // Server sends an "I'm shutting down now, any requests you've sent me
01292 // can be retried" message to the server.  The message is prefab, for
01293 // simplicity.
01294 //
01295 // NOTE: this is IIOP-specific though it doesn't look like it is.  It
01296 // relies on a TCP-ism: orderly disconnect, which doesn't exist in all
01297 // transport protocols.  Versions of GIOP atop some transport that's
01298 // lacking orderly disconnect must define some transport-specific
01299 // handshaking (e.g. the XNS/SPP handshake convention) in order to
01300 // know that the same transport semantics are provided when shutdown
01301 // is begun with messages "in flight". (IIOP doesn't report false
01302 // errors in the case of "clean shutdown", because it relies on
01303 // orderly disconnect as provided by TCP.  This quality of service is
01304 // required to write robust distributed systems.)
01305 
01306 void
01307 TAO_GIOP_Message_Base::
01308   send_close_connection (const TAO_GIOP_Message_Version &version,
01309                          TAO_Transport *transport,
01310                          void *)
01311 {
01312   // static CORBA::Octet
01313   // I hate  this in every method. Till the time I figure out a way
01314   // around  I will have them here hanging around.
01315   const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01316   {
01317     // The following works on non-ASCII platforms, such as MVS (which
01318     // uses EBCDIC).
01319     0x47, // 'G'
01320     0x49, // 'I'
01321     0x4f, // 'O'
01322     0x50, // 'P'
01323     version.major,
01324     version.minor,
01325     TAO_ENCAP_BYTE_ORDER,
01326     TAO_GIOP_CLOSECONNECTION,
01327     0, 0, 0, 0
01328   };
01329 
01330   // It's important that we use a reliable shutdown after we send this
01331   // message, so we know it's received.
01332   //
01333   // @@ should recv and discard queued data for portability; note
01334   // that this won't block (long) since we never set SO_LINGER
01335 
01336   this->dump_msg ("send_close_connection",
01337                   (const u_char *) close_message,
01338                   TAO_GIOP_MESSAGE_HEADER_LEN);
01339 
01340 #if 0
01341   // @@CJC I don't think we need this check b/c the transport's send()
01342   // will simply return -1.  However, I guess we could create something
01343   // like TAO_Tranport::is_closed() that returns whether the connection
01344   // is already closed.  The problem with that, however, is that it's
01345   // entirely possible that is_closed() could return TRUE, and then the
01346   // transport could get closed down btw. the time it gets called and the
01347   // time that the send actually occurs.
01348   ACE_HANDLE which = transport->handle ();
01349   if (which == ACE_INVALID_HANDLE)
01350     {
01351       if (TAO_debug_level > 0)
01352         ACE_DEBUG ((LM_DEBUG,
01353            ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -")
01354            ACE_TEXT (" connection already closed\n")));
01355       return;
01356     }
01357 #endif
01358 
01359   ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01360                              ACE_Message_Block::MB_DATA,
01361                              close_message,
01362                              0,
01363                              0,
01364                              ACE_Message_Block::DONT_DELETE,
01365                              0);
01366   ACE_Message_Block message_block(&data_block);
01367   message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01368 
01369   size_t bt;
01370   int const result = transport->send_message_block_chain (&message_block, bt);
01371   if (result == -1)
01372     {
01373       if (TAO_debug_level > 0)
01374         ACE_ERROR ((LM_ERROR,
01375            ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
01376            transport->id (), errno));
01377     }
01378 
01379   transport->close_connection ();
01380   ACE_DEBUG ((LM_DEBUG,
01381       ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
01382       transport-> id ()));
01383 }
01384 
01385 
01386 int
01387 TAO_GIOP_Message_Base::send_reply_exception (
01388     TAO_Transport *transport,
01389     TAO_OutputCDR &output,
01390     CORBA::ULong request_id,
01391     IOP::ServiceContextList *svc_info,
01392     CORBA::Exception *x
01393   )
01394 {
01395   TAO_Pluggable_Reply_Params_Base reply_params;
01396   reply_params.request_id_ = request_id;
01397   reply_params.svc_ctx_.length (0);
01398 
01399   // We are going to send some data
01400   reply_params.argument_flag_ = true;
01401 
01402   // Send back the service context we received.  (RTCORBA relies on
01403   // this).
01404   reply_params.service_context_notowned (svc_info);
01405 
01406   reply_params.reply_status_ = TAO_GIOP_USER_EXCEPTION;
01407 
01408   if (CORBA::SystemException::_downcast (x) != 0)
01409     {
01410       reply_params.reply_status_ = TAO_GIOP_SYSTEM_EXCEPTION;
01411     }
01412 
01413   if (this->generate_exception_reply (output, reply_params, *x) == -1)
01414     return -1;
01415 
01416   output.more_fragments (false);
01417 
01418   return transport->send_message (output, 0, TAO_Transport::TAO_REPLY);
01419 }
01420 
01421 void
01422 TAO_GIOP_Message_Base::dump_msg (const char *label,
01423                                  const u_char *ptr,
01424                                  size_t len)
01425 {
01426 
01427   if (TAO_debug_level >= 5)
01428     {
01429       static const char digits[] = "0123456789ABCD";
01430       static const char *names[] =
01431       {
01432         "Request",
01433         "Reply",
01434         "CancelRequest",
01435         "LocateRequest",
01436         "LocateReply",
01437         "CloseConnection",
01438         "MessageError",
01439         "Fragment"
01440       };
01441 
01442       // Message name.
01443       const char *message_name = "UNKNOWN MESSAGE";
01444       u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET];
01445       if (slot < sizeof (names) / sizeof (names[0]))
01446         message_name = names[slot];
01447 
01448       // Byte order.
01449       int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
01450 
01451       // Get the version info
01452       CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
01453       CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
01454 
01455       // request/reply id.
01456       CORBA::ULong tmp = 0;
01457       CORBA::ULong *id = &tmp;
01458       char *tmp_id = 0;
01459 
01460       if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST ||
01461           ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY ||
01462           ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_FRAGMENT)
01463         {
01464           if (major == 1 && minor < 2)
01465             {
01466               // @@ Only works if ServiceContextList is empty....
01467               tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN  + 4);
01468             }
01469           else
01470             {
01471               tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
01472             }
01473 #if !defined (ACE_DISABLE_SWAP_ON_READ)
01474         if (byte_order == TAO_ENCAP_BYTE_ORDER)
01475           {
01476             id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01477           }
01478         else
01479           {
01480             ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
01481           }
01482 #else
01483         id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01484 #endif /* ACE_DISABLE_SWAP_ON_READ */
01485 
01486         }
01487 
01488       // Print.
01489       ACE_DEBUG ((LM_DEBUG,
01490                   "TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
01491                   "%s GIOP v%c.%c msg, %d data bytes, %s endian, "
01492                   "Type %s[%u]\n",
01493                   ACE_TEXT_CHAR_TO_TCHAR (label),
01494                   digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
01495                   digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
01496                   len - TAO_GIOP_MESSAGE_HEADER_LEN ,
01497                   (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
01498                   ACE_TEXT_CHAR_TO_TCHAR(message_name),
01499                   *id));
01500 
01501       if (TAO_debug_level >= 10)
01502         ACE_HEX_DUMP ((LM_DEBUG,
01503                        (const char *) ptr,
01504                        len,
01505                        ACE_TEXT ("GIOP message")));
01506     }
01507 }
01508 
01509 int
01510 TAO_GIOP_Message_Base::generate_locate_reply_header (
01511     TAO_OutputCDR & /*cdr*/,
01512     TAO_Pluggable_Reply_Params_Base & /*params*/)
01513 {
01514   return 0;
01515 }
01516 
01517 int
01518 TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg)
01519 {
01520   // Get a parser for us
01521   TAO_GIOP_Message_Generator_Parser *parser = 0;
01522   TAO_GIOP_Message_Version giop_version;
01523 
01524   msg.get_version (giop_version);
01525 
01526   // Get the state information that we need to use
01527   this->set_state (giop_version, parser);
01528 
01529   // We dont really know.. So ask the generator and parser objects that
01530   // we know.
01531   // @@ TODO: Need to make this faster, instead of making virtual
01532   // call, try todo the check within this class
01533   return parser->is_ready_for_bidirectional ();
01534 }
01535 
01536 
01537 TAO_Queued_Data *
01538 TAO_GIOP_Message_Base::make_queued_data (size_t sz)
01539 {
01540   // Make a datablock for the size requested + something. The
01541   // "something" is required because we are going to align the data
01542   // block in the message block. During alignment we could loose some
01543   // bytes. As we may not know how many bytes will be lost, we will
01544   // allocate ACE_CDR::MAX_ALIGNMENT extra.
01545   ACE_Data_Block *db =
01546     this->orb_core_->create_input_cdr_data_block (sz +
01547                                                   ACE_CDR::MAX_ALIGNMENT);
01548 
01549   TAO_Queued_Data *qd =
01550     TAO_Queued_Data::make_queued_data (
01551                  this->orb_core_->transport_message_buffer_allocator (),
01552                  this->orb_core_->input_cdr_msgblock_allocator (),
01553                  db);
01554 
01555   if (qd == 0)
01556     {
01557       if (TAO_debug_level > 0)
01558         {
01559           ACE_ERROR ((LM_ERROR,
01560             ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01561             ACE_TEXT ("out of memory, failed to allocate queued data object\n")));
01562         }
01563       db->release ();
01564       return 0; // NULL pointer
01565     }
01566 
01567   return qd;
01568 }
01569 
01570 size_t
01571 TAO_GIOP_Message_Base::header_length (void) const
01572 {
01573   return TAO_GIOP_MESSAGE_HEADER_LEN;
01574 }
01575 
01576 size_t
01577 TAO_GIOP_Message_Base::fragment_header_length (
01578   const TAO_GIOP_Message_Version& giop_version) const
01579 {
01580   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
01581 
01582   // Get the state information that we need to use
01583   this->set_state (giop_version, generator_parser);
01584 
01585   return generator_parser->fragment_header_length ();
01586 }
01587 
01588 int
01589 TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd,
01590                                          CORBA::ULong &request_id) const
01591 {
01592   // Get a parser for us
01593   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
01594 
01595   // Get the state information that we need to use
01596   this->set_state (qd->giop_version (), generator_parser);
01597 
01598   // Get the read and write positions before we steal data.
01599   size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
01600   size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
01601   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
01602 
01603   // Create a input CDR stream. We do the following
01604   //  1 - If the incoming message block has a data block with a flag
01605   //      DONT_DELETE  (for the data block) we create an input CDR
01606   //      stream the same way.
01607   //  2 - If the incoming message block had a datablock from heap just
01608   //      use it by duplicating it and make the flag 0.
01609   // NOTE: We use the same data block in which we read the message and
01610   // we pass it on to the higher layers of the ORB. So we dont to any
01611   // copies at all here. The same is also done in the higher layers.
01612 
01613   ACE_Message_Block::Message_Flags flg = 0;
01614   ACE_Data_Block *db = 0;
01615 
01616   // Get the flag in the message block
01617   flg = qd->msg_block ()->self_flags ();
01618 
01619   if (ACE_BIT_ENABLED (flg,
01620                        ACE_Message_Block::DONT_DELETE))
01621     {
01622       // Use the same datablock
01623       db = qd->msg_block ()->data_block ();
01624     }
01625   else
01626     {
01627       // Use a duplicated datablock as the datablock has come off the
01628       // heap.
01629       db = qd->msg_block ()->data_block ()->duplicate ();
01630     }
01631 
01632   TAO_InputCDR input_cdr (db,
01633                           flg,
01634                           rd_pos,
01635                           wr_pos,
01636                           qd->byte_order (),
01637                           qd->giop_version ().major_version (),
01638                           qd->giop_version ().minor_version (),
01639                           this->orb_core_);
01640 
01641   if (qd->giop_version ().major == 1 &&
01642       (qd->giop_version ().minor == 0 || qd->giop_version ().minor == 1))
01643     {
01644       switch (qd->msg_type ())
01645       {
01646         case TAO_PLUGGABLE_MESSAGE_REQUEST:
01647         case TAO_PLUGGABLE_MESSAGE_REPLY:
01648           {
01649             IOP::ServiceContextList service_context;
01650 
01651             if ((input_cdr >> service_context)
01652                 && (input_cdr >> request_id))
01653               {
01654                 return 0;
01655               }
01656           }
01657           break;
01658         case TAO_PLUGGABLE_MESSAGE_CANCELREQUEST:
01659         case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
01660         case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
01661           {
01662             if ((input_cdr >> request_id))
01663               {
01664                 return 0;
01665               }
01666           }
01667           break;
01668         default:
01669           break;
01670       }
01671     }
01672   else
01673     {
01674       switch (qd->msg_type ())
01675       {
01676         case TAO_PLUGGABLE_MESSAGE_REQUEST:
01677         case TAO_PLUGGABLE_MESSAGE_REPLY:
01678         case TAO_PLUGGABLE_MESSAGE_FRAGMENT:
01679         case TAO_PLUGGABLE_MESSAGE_CANCELREQUEST:
01680         case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
01681         case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
01682           {
01683             // Dealing with GIOP-1.2, the request-id is located directly
01684             // behind the GIOP-Header.  This is true for all message
01685             // types that might be sent in form of fragments or
01686             // cancel-requests.
01687             if ((input_cdr >> request_id))
01688               {
01689                 return 0;
01690               }
01691           }
01692           break;
01693         default:
01694           break;
01695       }
01696     }
01697 
01698   return -1;
01699 }
01700 
01701 /* @return -1 error, 0 ok, +1 outstanding fragments */
01702 int
01703 TAO_GIOP_Message_Base::consolidate_fragmented_message (
01704   TAO_Queued_Data * qd,
01705   TAO_Queued_Data *& msg)
01706 {
01707   TAO::Incoming_Message_Stack reverse_stack;
01708 
01709   TAO_Queued_Data *tail = 0;
01710   TAO_Queued_Data *head = 0;
01711 
01712   //
01713   // CONSOLIDATE FRAGMENTED MESSAGE
01714   //
01715 
01716   // check for error-condition
01717   if (qd == 0)
01718     {
01719       return -1;
01720     }
01721 
01722   if (qd->giop_version ().major == 1 && qd->giop_version ().minor == 0)
01723     {
01724       TAO_Queued_Data::release (qd);
01725       return -1; // error: GIOP-1.0 does not support fragments
01726     }
01727 
01728   // If this is not the last fragment, push it onto stack for later processing
01729   if (qd->more_fragments ())
01730     {
01731       this->fragment_stack_.push (qd);
01732 
01733       msg = 0;   // no consolidated message available yet
01734       return 1;  // status: more messages expected.
01735     }
01736 
01737   tail = qd;  // init
01738 
01739   // Add the current message block to the end of the chain
01740   // after adjusting the read pointer to skip the header(s)
01741   size_t const header_adjustment =
01742     this->header_length () +
01743     this->fragment_header_length (tail->giop_version ().major_version ());
01744 
01745   if (tail->msg_block ()->length () < header_adjustment)
01746     {
01747       // buffer length not sufficient
01748       TAO_Queued_Data::release (qd);
01749       return -1;
01750     }
01751 
01752   // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2
01753   if (tail->giop_version ().major_version () == 1 && tail->giop_version ().minor_version () == 1)
01754     {
01755       // GIOP-1.1
01756 
01757       while (this->fragment_stack_.pop (head) != -1)
01758         {
01759           if (head->more_fragments () &&
01760               head->giop_version ().major_version () == 1 &&
01761               head->giop_version ().minor_version () == 1 &&
01762               head->msg_block ()->length () >= header_adjustment)
01763             {
01764               // adjust the read-pointer, skip the fragment header
01765               tail->msg_block ()->rd_ptr(header_adjustment);
01766 
01767               head->msg_block ()->cont (tail->msg_block ());
01768 
01769               tail->msg_block (0);
01770 
01771               TAO_Queued_Data::release (tail);
01772 
01773               tail = head;
01774             }
01775           else
01776             {
01777               reverse_stack.push (head);
01778             }
01779         }
01780     }
01781   else
01782     {
01783       // > GIOP-1.2
01784 
01785       CORBA::ULong tmp_request_id = 0;
01786       if (this->parse_request_id (tail, tmp_request_id) == -1)
01787         {
01788           return -1;
01789         }
01790 
01791       const CORBA::ULong request_id = tmp_request_id;
01792 
01793       while (this->fragment_stack_.pop (head) != -1)
01794         {
01795           CORBA::ULong head_request_id = 0;
01796           int parse_status = 0;
01797 
01798           if (head->more_fragments () &&
01799               head->giop_version ().major_version () >= 1 &&
01800               head->giop_version ().minor_version () >= 2 &&
01801               head->msg_block ()->length () >= header_adjustment &&
01802               (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
01803               request_id == head_request_id)
01804             {
01805               // adjust the read-pointer, skip the fragment header
01806               tail->msg_block ()->rd_ptr(header_adjustment);
01807 
01808               head->msg_block ()->cont (tail->msg_block ());
01809 
01810               tail->msg_block (0);
01811 
01812               TAO_Queued_Data::release (tail);
01813 
01814               tail = head;
01815             }
01816           else
01817             {
01818               if (parse_status == -1)
01819                 {
01820                   TAO_Queued_Data::release (head);
01821                   return -1;
01822                 }
01823 
01824               reverse_stack.push (head);
01825             }
01826         }
01827     }
01828 
01829   // restore stack
01830   while (reverse_stack.pop (head) != -1)
01831     {
01832       this->fragment_stack_.push (head);
01833     }
01834 
01835   if (tail->consolidate () == -1)
01836     {
01837       // memory allocation failed
01838       TAO_Queued_Data::release (tail);
01839       return -1;
01840     }
01841 
01842   // set out value
01843   msg = tail;
01844 
01845   return 0;
01846 }
01847 
01848 
01849 int
01850 TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request)
01851 {
01852   // We must extract the specific request-id from message-buffer
01853   // and remove all fragments from stack that match this request-id.
01854 
01855   TAO::Incoming_Message_Stack reverse_stack;
01856 
01857   CORBA::ULong cancel_request_id;
01858 
01859   if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
01860     {
01861       return -1;
01862     }
01863 
01864   TAO_Queued_Data *head = 0;
01865 
01866   // Revert stack
01867   while (this->fragment_stack_.pop (head) != -1)
01868     {
01869       reverse_stack.push (head);
01870     }
01871 
01872   bool discard_all_GIOP11_messages = false;
01873 
01874   // Now we are able to process message in order they have arrived.
01875   // If the cancel_request_id matches to GIOP-1.1 message, all succeeding
01876   // fragments belong to this message and must be discarded.
01877   // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the
01878   // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments
01879   // having encoded the request id will be discarded.
01880   while (reverse_stack.pop (head) != -1)
01881     {
01882       CORBA::ULong head_request_id;
01883 
01884       if (head->giop_version ().major_version () == 1 &&
01885           head->giop_version ().minor_version () <= 1 &&
01886           head->msg_type () != TAO_PLUGGABLE_MESSAGE_FRAGMENT && // GIOP11 fragment does not provide request id
01887           this->parse_request_id (head, head_request_id) >= 0 &&
01888           cancel_request_id == head_request_id)
01889         {
01890           TAO_Queued_Data::release (head);
01891           discard_all_GIOP11_messages = true;
01892         }
01893       else if (head->giop_version ().major_version () == 1 &&
01894                head->giop_version ().minor_version () <= 1 &&
01895                discard_all_GIOP11_messages)
01896         {
01897           TAO_Queued_Data::release (head);
01898         }
01899       else if (head->giop_version ().major_version () >= 1 &&
01900                head->giop_version ().minor_version () >= 2 &&
01901                this->parse_request_id (head, head_request_id) >= 0 &&
01902                cancel_request_id == head_request_id)
01903         {
01904           TAO_Queued_Data::release (head);
01905         }
01906       else
01907         {
01908           this->fragment_stack_.push (head);
01909         }
01910     }
01911 
01912   return 0;
01913 }
01914 
01915 TAO_GIOP_Fragmentation_Strategy *
01916 TAO_GIOP_Message_Base::fragmentation_strategy (void)
01917 {
01918   return this->fragmentation_strategy_.get ();
01919 }
01920 
01921 void
01922 TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
01923 {
01924   CORBA::Octet * const buf =
01925     reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
01926 
01927   CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
01928   CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
01929 
01930   // Flags for the GIOP protocol header "flags" field.
01931   CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
01932 
01933   // Least significant bit:        Byte order
01934   ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
01935 
01936   // Second least significant bit: More fragments
01937   //
01938   // Only supported in GIOP 1.1 or better.
01939   if (!(major <= 1 && minor == 0))
01940     ACE_SET_BITS (flags, msg.more_fragments () << 1);
01941 }
01942 
01943 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 13:07:32 2008 for TAO by doxygen 1.3.6