GIOP_Message_Base.cpp

Go to the documentation of this file.
00001 // GIOP_Message_Base.cpp,v 1.122 2006/05/30 13:41:29 jwillemsen Exp
00002 
00003 #include "tao/GIOP_Message_Base.h"
00004 #include "tao/operation_details.h"
00005 #include "tao/debug.h"
00006 #include "tao/ORB_Core.h"
00007 #include "tao/TAO_Server_Request.h"
00008 #include "tao/GIOP_Message_Locate_Header.h"
00009 #include "tao/Transport.h"
00010 #include "tao/Transport_Mux_Strategy.h"
00011 #include "tao/LF_Strategy.h"
00012 #include "tao/Request_Dispatcher.h"
00013 #include "tao/Codeset_Manager.h"
00014 #include "tao/SystemException.h"
00015 
00016 /*
00017  * Hook to add additional include files during specializations.
00018  */
00019 //@@ GIOP_MESSAGE_BASE_INCLUDE_ADD_HOOK
00020 
00021 ACE_RCSID (tao,
00022            GIOP_Message_Base,
00023            "GIOP_Message_Base.cpp,v 1.122 2006/05/30 13:41:29 jwillemsen Exp")
00024 
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026 
00027 TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core * orb_core,
00028                                               TAO_Transport * transport,
00029                                               size_t input_cdr_size)
00030   : orb_core_ (orb_core)
00031   , message_state_ ()
00032   , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
00033   , out_stream_ (0,
00034                  input_cdr_size,
00035                  TAO_ENCAP_BYTE_ORDER,
00036                  orb_core->output_cdr_buffer_allocator (),
00037                  orb_core->output_cdr_dblock_allocator (),
00038                  orb_core->output_cdr_msgblock_allocator (),
00039                  orb_core->orb_params ()->cdr_memcpy_tradeoff (),
00040                  fragmentation_strategy_.get (),
00041                  TAO_DEF_GIOP_MAJOR,
00042                  TAO_DEF_GIOP_MINOR)
00043 {
00044 }
00045 
00046 
00047 TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void)
00048 {
00049 }
00050 
00051 
00052 void
00053 TAO_GIOP_Message_Base::init (CORBA::Octet major,
00054                              CORBA::Octet minor)
00055 {
00056   // Set the giop version of the out stream
00057   this->out_stream_.set_version (major,
00058                                  minor);
00059 }
00060 
00061 TAO_OutputCDR &
00062 TAO_GIOP_Message_Base::out_stream (void)
00063 {
00064   return this->out_stream_;
00065 }
00066 
00067 void
00068 TAO_GIOP_Message_Base::reset (void)
00069 {
00070   // no-op
00071 }
00072 
00073 int
00074 TAO_GIOP_Message_Base::generate_request_header (
00075     TAO_Operation_Details &op,
00076     TAO_Target_Specification &spec,
00077     TAO_OutputCDR &cdr
00078   )
00079 {
00080   // Get a parser for us
00081   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00082 
00083   CORBA::Octet major, minor;
00084 
00085   cdr.get_version (major, minor);
00086 
00087   // Get the state information that we need to use
00088   this->set_state (major,
00089                    minor,
00090                    generator_parser);
00091 
00092   // Write the GIOP header first
00093   if (!this->write_protocol_header (TAO_GIOP_REQUEST,
00094                                     cdr))
00095     {
00096       if (TAO_debug_level)
00097         {
00098           ACE_ERROR ((LM_ERROR,
00099                       ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00100         }
00101 
00102       return -1;
00103     }
00104 
00105   // Now call the implementation for the rest of the header
00106   if (!generator_parser->write_request_header (op,
00107                                                spec,
00108                                                cdr))
00109     {
00110       if (TAO_debug_level)
00111         ACE_ERROR ((LM_ERROR,
00112                     ACE_TEXT ("(%P|%t) Error in writing request header \n")));
00113 
00114       return -1;
00115     }
00116 
00117   return 0;
00118 }
00119 
00120 int
00121 TAO_GIOP_Message_Base::generate_locate_request_header (
00122     TAO_Operation_Details &op,
00123     TAO_Target_Specification &spec,
00124     TAO_OutputCDR &cdr
00125  )
00126 {
00127   // Get a parser for us
00128   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00129 
00130   CORBA::Octet major, minor;
00131 
00132   cdr.get_version (major, minor);
00133 
00134   // Get the state information that we need to use
00135   this->set_state (major,
00136                    minor,
00137                    generator_parser);
00138 
00139   // Write the GIOP header first
00140   if (!this->write_protocol_header (TAO_GIOP_LOCATEREQUEST,
00141                                     cdr))
00142     {
00143       if (TAO_debug_level)
00144         ACE_ERROR ((LM_ERROR,
00145                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00146 
00147       return -1;
00148     }
00149 
00150   // Now call the implementation for the rest of the header
00151   if (!generator_parser->write_locate_request_header
00152       (op.request_id (),
00153        spec,
00154        cdr))
00155     {
00156       if (TAO_debug_level)
00157         ACE_ERROR ((LM_ERROR,
00158                     ACE_TEXT ("(%P|%t) Error in writing locate request header \n")));
00159 
00160 
00161       return -1;
00162 
00163     }
00164 
00165   return 0;
00166 }
00167 
00168 int
00169 TAO_GIOP_Message_Base::generate_reply_header (
00170     TAO_OutputCDR &cdr,
00171     TAO_Pluggable_Reply_Params_Base &params
00172   )
00173 {
00174   // Get a parser for us
00175   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00176 
00177   CORBA::Octet major, minor;
00178 
00179   cdr.get_version (major, minor);
00180 
00181   // Get the state information that we need to use
00182   this->set_state (major,
00183                    minor,
00184                    generator_parser);
00185 
00186   // Write the GIOP header first
00187   if (!this->write_protocol_header (TAO_GIOP_REPLY,
00188                                     cdr))
00189     {
00190       if (TAO_debug_level)
00191         ACE_ERROR ((LM_ERROR,
00192                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00193 
00194       return -1;
00195     }
00196 
00197   ACE_DECLARE_NEW_CORBA_ENV;
00198   ACE_TRY
00199     {
00200       // Now call the implementation for the rest of the header
00201       int const result =
00202         generator_parser->write_reply_header (cdr,
00203                                               params
00204                                               ACE_ENV_ARG_PARAMETER);
00205       ACE_TRY_CHECK;
00206 
00207       if (!result)
00208         {
00209           if (TAO_debug_level > 4)
00210             ACE_ERROR ((LM_ERROR,
00211                         ACE_TEXT ("(%P|%t) Error in writing reply ")
00212                         ACE_TEXT ("header\n")));
00213 
00214           return -1;
00215         }
00216     }
00217   ACE_CATCHANY
00218     {
00219       if (TAO_debug_level > 4)
00220         ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00221                              ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
00222 
00223       return -1;
00224     }
00225   ACE_ENDTRY;
00226 
00227   return 0;
00228 }
00229 
00230 int
00231 TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
00232                                                  CORBA::ULong request_id)
00233 {
00234   // Get a parser for us
00235   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00236 
00237   CORBA::Octet major, minor;
00238 
00239   cdr.get_version (major, minor);
00240 
00241   // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
00242   // supports them in 1.2 or better since GIOP 1.1 fragments do not
00243   // have a fragment message header.
00244   if (major == 1 && minor < 2)
00245     return -1;
00246 
00247   // Get the state information that we need to use
00248   this->set_state (major,
00249                    minor,
00250                    generator_parser);
00251 
00252   // Write the GIOP header first
00253   if (!this->write_protocol_header (TAO_GIOP_FRAGMENT, cdr)
00254       || !generator_parser->write_fragment_header (cdr, request_id))
00255     {
00256       if (TAO_debug_level)
00257         ACE_ERROR ((LM_ERROR,
00258                     ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
00259 
00260       return -1;
00261     }
00262 
00263   return 0;
00264 }
00265 
00266 int
00267 TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
00268 {
00269   // Ptr to first buffer.
00270   char * buf = (char *) stream.buffer ();
00271 
00272   this->set_giop_flags (stream);
00273 
00274   // Length of all buffers.
00275   size_t const total_len = stream.total_length ();
00276 
00277   // NOTE: Here would also be a fine place to calculate a digital
00278   // signature for the message and place it into a preallocated slot
00279   // in the "ServiceContext".  Similarly, this is a good spot to
00280   // encrypt messages (or just the message bodies) if that's needed in
00281   // this particular environment and that isn't handled by the
00282   // networking infrastructure (e.g., IPSEC).
00283 
00284   CORBA::ULong const bodylen = static_cast <CORBA::ULong>
00285                            (total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
00286 
00287 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
00288   *(reinterpret_cast <CORBA::ULong *> (buf +
00289                          TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00290 #else
00291   if (!stream.do_byte_swap ())
00292     *(reinterpret_cast <CORBA::ULong *>
00293                            (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
00294   else
00295     ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen),
00296                      buf + TAO_GIOP_MESSAGE_SIZE_OFFSET);
00297 #endif /* ACE_ENABLE_SWAP_ON_WRITE */
00298 
00299   if (TAO_debug_level > 2)
00300     {
00301       // Check whether the output cdr stream is build up of multiple
00302       // messageblocks. If so, consolidate them to one block that can be
00303       // dumped
00304       ACE_Message_Block* consolidated_block = 0;
00305       if (stream.begin()->cont () != 0)
00306         {
00307           consolidated_block = new ACE_Message_Block;
00308           ACE_CDR::consolidate (consolidated_block, stream.begin ());
00309           buf = (char *) (consolidated_block->rd_ptr ());
00310         }
00311       ///
00312       this->dump_msg ("send",
00313                       reinterpret_cast <u_char *> (buf),
00314                       total_len);
00315 
00316       //
00317       delete consolidated_block;
00318       consolidated_block = 0;
00319       //
00320     }
00321 
00322   return 0;
00323 }
00324 
00325 TAO_Pluggable_Message_Type
00326 TAO_GIOP_Message_Base::message_type (
00327                          const TAO_GIOP_Message_State &msg_state) const
00328 {
00329   // Convert to the right type of Pluggable Messaging message type.
00330 
00331   switch (msg_state.message_type_)
00332     {
00333     case TAO_GIOP_REQUEST:
00334       return TAO_PLUGGABLE_MESSAGE_REQUEST;
00335     case TAO_GIOP_LOCATEREQUEST:
00336       return TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST;
00337 
00338     case TAO_GIOP_LOCATEREPLY:
00339       return TAO_PLUGGABLE_MESSAGE_LOCATEREPLY;
00340 
00341     case TAO_GIOP_REPLY:
00342       return TAO_PLUGGABLE_MESSAGE_REPLY;
00343 
00344     case TAO_GIOP_CLOSECONNECTION:
00345       return TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION;
00346 
00347     case TAO_GIOP_FRAGMENT:
00348       return TAO_PLUGGABLE_MESSAGE_FRAGMENT;
00349 
00350     case TAO_GIOP_MESSAGERROR:
00351       return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
00352 
00353     case TAO_GIOP_CANCELREQUEST:
00354       return TAO_PLUGGABLE_MESSAGE_CANCELREQUEST;
00355 
00356     default:
00357         if (TAO_debug_level > 0)
00358           {
00359         ACE_ERROR ((LM_ERROR,
00360                     ACE_TEXT ("TAO (%P|%t) %N:%l        message_type : ")
00361                     ACE_TEXT ("wrong message.\n")));
00362     }
00363     }
00364 
00365   return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
00366 }
00367 
00368 int
00369 TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
00370 {
00371   this->message_state_.reset ();
00372 
00373   return this->message_state_.parse_message_header (incoming);
00374 }
00375 
00376 int
00377 TAO_GIOP_Message_Base::parse_next_message (ACE_Message_Block &incoming,
00378                                            TAO_Queued_Data &qd,
00379                                            size_t &mesg_length)
00380 {
00381   if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00382     {
00383       qd.missing_data_ = TAO_MISSING_DATA_UNDEFINED;
00384 
00385       return 0; /* incomplete header */
00386     }
00387   else
00388     {
00389       TAO_GIOP_Message_State state;
00390 
00391       if (state.parse_message_header (incoming) == -1)
00392         {
00393           return -1;
00394         }
00395 
00396       const size_t message_size = state.message_size (); /* Header + Payload */
00397 
00398       if (message_size > incoming.length ())
00399         {
00400           qd.missing_data_ = message_size - incoming.length ();
00401         }
00402       else
00403         {
00404           qd.missing_data_ = 0;
00405         }
00406 
00407       /* init out-parameters */
00408       this->init_queued_data (&qd, state);
00409       mesg_length  = TAO_GIOP_MESSAGE_HEADER_LEN
00410                    + state.payload_size ();
00411 
00412       return 1; /* complete header */
00413     }
00414 }
00415 
00416 int
00417 TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
00418                                              TAO_Queued_Data *&qd)
00419 {
00420   if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00421     {
00422       if (incoming.length () > 0)
00423         {
00424           // Optimize memory usage, we dont know actual message size
00425           // so far, but allocate enough space to hold small GIOP
00426           // messages. This way we avoid expensive "grow" operation
00427           // for small messages.
00428           size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
00429 
00430           // Make a node which has at least message block of the size
00431           // of MESSAGE_HEADER_LEN.
00432           size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
00433                                            default_buf_size);
00434 
00435           // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN
00436 
00437           qd = this->make_queued_data (buf_size);
00438 
00439           if (qd == 0)
00440             {
00441               if (TAO_debug_level > 0)
00442                 {
00443                   ACE_ERROR((LM_ERROR,
00444                              ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00445                              ACE_TEXT ("out of memory\n")));
00446                 }
00447               return -1;
00448             }
00449 
00450           qd->msg_block_->copy (incoming.rd_ptr (),
00451                                 incoming.length ());
00452 
00453           incoming.rd_ptr (incoming.length ()); // consume all available data
00454 
00455           qd->missing_data_ = TAO_MISSING_DATA_UNDEFINED;
00456         }
00457       else
00458         {
00459           // handle not initialized variables
00460           qd = 0;  // reset
00461         }
00462 
00463       return 0;
00464     }
00465 
00466   TAO_GIOP_Message_State state;
00467   if (state.parse_message_header (incoming) == -1)
00468     {
00469       return -1;
00470     }
00471 
00472   size_t copying_len = state.message_size ();
00473 
00474   qd = this->make_queued_data (copying_len);
00475 
00476   if (qd == 0)
00477     {
00478       if (TAO_debug_level > 0)
00479         {
00480           ACE_ERROR ((LM_ERROR,
00481                       ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
00482                       ACE_TEXT ("out of memory\n")));
00483         }
00484       return -1;
00485     }
00486 
00487   if (copying_len > incoming.length ())
00488     {
00489       qd->missing_data_ = copying_len - incoming.length ();
00490       copying_len = incoming.length ();
00491     }
00492   else
00493     {
00494       qd->missing_data_ = 0;
00495     }
00496 
00497   qd->msg_block_->copy (incoming.rd_ptr (),
00498                         copying_len);
00499 
00500   incoming.rd_ptr (copying_len);
00501   this->init_queued_data (qd, state);
00502 
00503   return 1;
00504 }
00505 
00506 int
00507 TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
00508                                          ACE_Message_Block &incoming)
00509 {
00510   // Look to see whether we had atleast parsed the GIOP header ...
00511   if (qd->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
00512     {
00513       // The data length that has been stuck in there during the last
00514       // read ....
00515       size_t const len =
00516         qd->msg_block_->length ();
00517 
00518       // paranoid check
00519       if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
00520         {
00521           // inconsistency - this code should have parsed the header
00522           // so far
00523           return -1;
00524         }
00525 
00526       // We know that we would have space for
00527       // TAO_GIOP_MESSAGE_HEADER_LEN here.  So copy that much of data
00528       // from the <incoming> into the message block in <qd>
00529       size_t const available = incoming.length ();
00530       size_t const desired   = TAO_GIOP_MESSAGE_HEADER_LEN - len;
00531       size_t const n_copy    = ace_min (available, desired);
00532 
00533       // paranoid check, but would cause endless looping
00534       if (n_copy == 0)
00535         {
00536           return -1;
00537         }
00538 
00539       if (qd->msg_block_->copy (incoming.rd_ptr (),
00540                                 n_copy) == -1)
00541         {
00542            return -1;
00543         }
00544 
00545       // Move the rd_ptr () in the incoming message block..
00546       incoming.rd_ptr (n_copy);
00547 
00548       // verify sufficient data to parse GIOP header
00549       if (qd->msg_block_->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
00550         {
00551           return 0; /* continue */
00552         }
00553 
00554       TAO_GIOP_Message_State state;
00555 
00556       // Parse the message header now...
00557       if (state.parse_message_header (*qd->msg_block_) == -1)
00558         {
00559           if (TAO_debug_level > 0)
00560             {
00561               ACE_ERROR ((LM_ERROR,
00562                   ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
00563                   ACE_TEXT ("error parsing header\n") ));
00564             }
00565           return -1;
00566         }
00567       // Now grow the message block so that we can copy the rest of
00568       // the data, the message_block must be able to hold complete message
00569        if (ACE_CDR::grow (qd->msg_block_,
00570                           state.message_size ()) == -1)  /* GIOP_Header + Payload */
00571          {
00572            // on mem-error get rid of context silently, try to avoid
00573            // system calls that might allocate additional memory
00574            return -1;
00575          }
00576 
00577       // Copy the pay load..
00578       // Calculate the bytes that needs to be copied in the queue...
00579       size_t copy_len = state.payload_size ();
00580 
00581       // If the data that needs to be copied is more than that is
00582       // available to us ..
00583       if (copy_len > incoming.length ())
00584         {
00585           // Calculate the missing data..
00586           qd->missing_data_ = copy_len - incoming.length ();
00587 
00588           // Set the actual possible copy_len that is available...
00589           copy_len = incoming.length ();
00590         }
00591       else
00592         {
00593           qd->missing_data_ = 0;
00594         }
00595 
00596       // ..now we are set to copy the right amount of data to the
00597       // node..
00598       if (qd->msg_block_->copy (incoming.rd_ptr (),
00599                                 copy_len) == -1)
00600         {
00601           return -1;
00602         }
00603 
00604       // Set the <rd_ptr> of the <incoming>..
00605       incoming.rd_ptr (copy_len);
00606 
00607       // Get the other details...
00608       this->init_queued_data (qd, state);
00609     }
00610   else
00611     {
00612       // @@todo: Need to abstract this out to a seperate method...
00613       size_t copy_len = qd->missing_data_;
00614 
00615       if (copy_len > incoming.length ())
00616         {
00617           // Calculate the missing data..
00618           qd->missing_data_ = copy_len - incoming.length ();
00619 
00620           // Set the actual possible copy_len that is available...
00621           copy_len = incoming.length ();
00622         }
00623 
00624       // paranoid check for  endless-event-looping
00625       if (copy_len == 0)
00626         {
00627           return -1;
00628         }
00629 
00630       // Copy the right amount of data in to the node...
00631       // node..
00632       if (qd->msg_block_->copy (incoming.rd_ptr (),
00633                                 copy_len) == -1)
00634         {
00635           return -1;
00636         }
00637 
00638       // Set the <rd_ptr> of the <incoming>..
00639       qd->msg_block_->rd_ptr (copy_len);
00640 
00641     }
00642 
00643   return 0;
00644 }
00645 
00646 int
00647 TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
00648                                                 TAO_Queued_Data *qd)
00649 {
00650   // Set the upcall thread
00651   this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
00652 
00653   // Get a parser for us
00654   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00655 
00656   // Get the state information that we need to use
00657   this->set_state (qd->major_version_,
00658                    qd->minor_version_,
00659                    generator_parser);
00660 
00661   // A buffer that we will use to initialise the CDR stream
00662 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00663   char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
00664 #else
00665   char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
00666 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00667 
00668   // Initialize an output CDR on the stack
00669   // NOTE: Don't jump to a conclusion as to why we are using the
00670   // input_cdr and hence the  global pool here. These pools will move
00671   // to the lanes anyway at some point of time. Further, it would have
00672   // been awesome to have this in TSS. But for some reason the cloning
00673   // that happens when the ORB gets flow controlled while writing a
00674   // reply is messing things up. We crash horribly. Doing this adds a
00675   // lock, we need to set things like this -- put stuff in TSS here
00676   // and transfer to global memory when we get flow controlled. We
00677   // need to work on the message block to get it right!
00678   TAO_OutputCDR output (repbuf,
00679                         sizeof repbuf,
00680                         TAO_ENCAP_BYTE_ORDER,
00681                         this->orb_core_->input_cdr_buffer_allocator (),
00682                         this->orb_core_->input_cdr_dblock_allocator (),
00683                         this->orb_core_->input_cdr_msgblock_allocator (),
00684                         this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
00685                         this->fragmentation_strategy_.get (),
00686                         qd->major_version_,
00687                         qd->minor_version_);
00688 
00689   // Get the read and write positions before we steal data.
00690   size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
00691   size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
00692   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00693 
00694   if (TAO_debug_level > 0)
00695     this->dump_msg ("recv",
00696                     reinterpret_cast <u_char *> (qd->msg_block_->rd_ptr ()),
00697                     qd->msg_block_->length ());
00698 
00699 
00700   // Create a input CDR stream. We do the following
00701   //  1 - If the incoming message block has a data block with a flag
00702   //      DONT_DELETE  (for the data block) we create an input CDR
00703   //      stream the same way.
00704   //  2 - If the incoming message block had a datablock from heap just
00705   //      use it by duplicating it and make the flag 0.
00706   // NOTE: We use the same data block in which we read the message and
00707   // we pass it on to the higher layers of the ORB. So we dont to any
00708   // copies at all here. The same is also done in the higher layers.
00709 
00710   ACE_Message_Block::Message_Flags flg = 0;
00711   ACE_Data_Block *db = 0;
00712 
00713   // Get the flag in the message block
00714   flg = qd->msg_block_->self_flags ();
00715 
00716   if (ACE_BIT_ENABLED (flg,
00717                        ACE_Message_Block::DONT_DELETE))
00718     {
00719       // Use the same datablock
00720       db = qd->msg_block_->data_block ();
00721     }
00722   else
00723     {
00724       // Use a duplicated datablock as the datablock has come off the
00725       // heap.
00726       db = qd->msg_block_->data_block ()->duplicate ();
00727     }
00728 
00729 
00730   TAO_InputCDR input_cdr (db,
00731                           flg,
00732                           rd_pos,
00733                           wr_pos,
00734                           qd->byte_order_,
00735                           qd->major_version_,
00736                           qd->minor_version_,
00737                           this->orb_core_);
00738 
00739   transport->assign_translators(&input_cdr,&output);
00740 
00741   // We know we have some request message. Check whether it is a
00742   // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action.
00743 
00744   // Once we send the InputCDR stream we need to just forget about
00745   // the stream and never touch that again for anything. We basically
00746   // loose ownership of the data_block.
00747 
00748   switch (qd->msg_type_)
00749     {
00750     case TAO_PLUGGABLE_MESSAGE_REQUEST:
00751       // Should be taken care by the state specific invocations. They
00752       // could raise an exception or write things in the output CDR
00753       // stream
00754       return this->process_request (transport,
00755                                     input_cdr,
00756                                     output,
00757                                     generator_parser);
00758 
00759     case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
00760       return this->process_locate_request (transport,
00761                                            input_cdr,
00762                                            output,
00763                                            generator_parser);
00764     default:
00765       return -1;
00766     }
00767 }
00768 
00769 int
00770 TAO_GIOP_Message_Base::process_reply_message (
00771     TAO_Pluggable_Reply_Params &params,
00772     TAO_Queued_Data *qd)
00773 {
00774   // Get a parser for us
00775   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
00776 
00777   // Get the state information that we need to use
00778   this->set_state (qd->major_version_,
00779                    qd->minor_version_,
00780                    generator_parser);
00781 
00782   // Get the read and write positions before we steal data.
00783   size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
00784   size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
00785   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
00786 
00787   if (TAO_debug_level > 0)
00788     this->dump_msg ("recv",
00789                     reinterpret_cast <u_char *> (qd->msg_block_->rd_ptr ()),
00790                     qd->msg_block_->length ());
00791 
00792 
00793   // Create a empty buffer on stack
00794   // NOTE: We use the same data block in which we read the message and
00795   // we pass it on to the higher layers of the ORB. So we dont to any
00796   // copies at all here.
00797   TAO_InputCDR input_cdr (qd->msg_block_->data_block (),
00798                           ACE_Message_Block::DONT_DELETE,
00799                           rd_pos,
00800                           wr_pos,
00801                           qd->byte_order_,
00802                           qd->major_version_,
00803                           qd->minor_version_,
00804                           this->orb_core_);
00805 
00806   // We know we have some reply message. Check whether it is a
00807   // GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
00808 
00809   // Once we send the InputCDR stream we need to just forget about
00810   // the stream and never touch that again for anything. We basically
00811   // loose ownership of the data_block.
00812   int retval = 0;
00813 
00814   switch (qd->msg_type_)
00815     {
00816     case TAO_PLUGGABLE_MESSAGE_REPLY:
00817       // Should be taken care by the state specific parsing
00818       retval =
00819         generator_parser->parse_reply (input_cdr,
00820                                        params);
00821 
00822       break;
00823     case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
00824       retval =
00825         generator_parser->parse_locate_reply (input_cdr,
00826                                               params);
00827       break;
00828     default:
00829       retval = -1;
00830     }
00831 
00832   if (retval == -1)
00833     return retval;
00834 
00835   params.input_cdr_ = &input_cdr;
00836 
00837   retval =
00838     params.transport_->tms ()->dispatch_reply (params);
00839 
00840   if (retval == -1)
00841     {
00842       // Something really critical happened, we will forget about
00843       // every reply on this connection.
00844       if (TAO_debug_level > 0)
00845         ACE_ERROR ((LM_ERROR,
00846                     ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, ")
00847                     ACE_TEXT ("dispatch reply failed\n"),
00848                     params.transport_->id ()));
00849     }
00850 
00851   return retval;
00852 }
00853 
00854 int
00855 TAO_GIOP_Message_Base::generate_exception_reply (
00856     TAO_OutputCDR &cdr,
00857     TAO_Pluggable_Reply_Params_Base &params,
00858     CORBA::Exception &x
00859   )
00860 {
00861   // A new try/catch block, but if something goes wrong now we have no
00862   // hope, just abort.
00863   ACE_DECLARE_NEW_CORBA_ENV;
00864 
00865   ACE_TRY
00866     {
00867       // Make the GIOP & reply header.
00868       this->generate_reply_header (cdr,
00869                                    params);
00870       x._tao_encode (cdr
00871                      ACE_ENV_ARG_PARAMETER);
00872       ACE_TRY_CHECK;
00873     }
00874   ACE_CATCH (CORBA::Exception, ex)
00875     {
00876       // Now we know that while handling the error an other error
00877       // happened -> no hope, close connection.
00878 
00879       // Close the handle.
00880       if (TAO_debug_level > 0)
00881         ACE_DEBUG ((LM_DEBUG,
00882                   ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
00883                   ACE_TEXT ("generate_exception_reply ()\n")));
00884       return -1;
00885     }
00886   ACE_ENDTRY;
00887   ACE_CHECK_RETURN (-1);
00888 
00889   return 0;
00890 }
00891 
00892 int
00893 TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type,
00894                                               TAO_OutputCDR &msg)
00895 {
00896   // Reset the message type
00897   msg.reset ();
00898 
00899   CORBA::Octet header[12] =
00900     {
00901       // The following works on non-ASCII platforms, such as MVS (which
00902       // uses EBCDIC).
00903       0x47, // 'G'
00904       0x49, // 'I'
00905       0x4f, // 'O'
00906       0x50  // 'P'
00907     };
00908 
00909   CORBA::Octet major, minor = 0;
00910 
00911   (void) msg.get_version (major, minor);
00912 
00913   header[4] = major;
00914   header[5] = minor;
00915 
00916   // "flags" octet, i.e. header[6] will be set up later when message
00917   // is formatted by the transport.
00918 
00919   header[7] = CORBA::Octet (type);  // Message type
00920 
00921   static ACE_CDR::ULong const header_size =
00922     sizeof (header) / sizeof (header[0]);
00923 
00924   // Fragmentation should not occur at this point since there are only
00925   // 12 bytes in the stream, and fragmentation may only occur when
00926   // the stream length >= 16.
00927   msg.write_octet_array (header, header_size);
00928 
00929   return msg.good_bit ();
00930 }
00931 
00932 int
00933 TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
00934                                         TAO_InputCDR &cdr,
00935                                         TAO_OutputCDR &output,
00936                                         TAO_GIOP_Message_Generator_Parser *parser)
00937 {
00938   // This will extract the request header, set <response_required>
00939   // and <sync_with_server> as appropriate.
00940   TAO_ServerRequest request (this,
00941                              cdr,
00942                              output,
00943                              transport,
00944                              this->orb_core_);
00945 
00946   CORBA::ULong request_id = 0;
00947   CORBA::Boolean response_required = false;
00948 
00949   int parse_error = 0;
00950 
00951   ACE_DECLARE_NEW_CORBA_ENV;
00952   ACE_TRY
00953     {
00954       parse_error =
00955         parser->parse_request_header (request);
00956 
00957       TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager();
00958       if (csm)
00959         {
00960           csm->process_service_context(request);
00961           transport->assign_translators(&cdr,&output);
00962         }
00963 
00964       // Throw an exception if the
00965       if (parse_error != 0)
00966         ACE_TRY_THROW (CORBA::MARSHAL (0,
00967                                        CORBA::COMPLETED_NO));
00968       request_id = request.request_id ();
00969 
00970       response_required = request.response_expected ();
00971 
00972       CORBA::Object_var forward_to;
00973 
00974 /*
00975  * Hook to specialize request processing within TAO
00976  * This hook will be replaced by specialized request
00977  * processing implementation.
00978  */
00979 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START
00980 
00981       // Do this before the reply is sent.
00982       this->orb_core_->request_dispatcher ()->dispatch (
00983           this->orb_core_,
00984           request,
00985           forward_to
00986           ACE_ENV_ARG_PARAMETER);
00987       ACE_TRY_CHECK;
00988 
00989 //@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_END
00990 
00991       if (!CORBA::is_nil (forward_to.in ()))
00992         {
00993           const CORBA::Boolean permanent_forward_condition =
00994               this->orb_core_->is_permanent_forward_condition
00995               (forward_to.in (),
00996                request.request_service_context ());
00997 
00998           // We should forward to another object...
00999           TAO_Pluggable_Reply_Params_Base reply_params;
01000           reply_params.request_id_ = request_id;
01001           reply_params.reply_status_ =
01002               permanent_forward_condition
01003               ? TAO_GIOP_LOCATION_FORWARD_PERM
01004               : TAO_GIOP_LOCATION_FORWARD;
01005           reply_params.svc_ctx_.length (0);
01006 
01007           // Send back the reply service context.
01008           reply_params.service_context_notowned (&request.reply_service_info ());
01009 
01010           output.message_attributes (request_id,
01011                                      0,
01012                                      TAO_Transport::TAO_REPLY,
01013                                      0);
01014 
01015           // Make the GIOP header and Reply header
01016           this->generate_reply_header (output,
01017                                        reply_params);
01018 
01019           if (!(output << forward_to.in ()))
01020             {
01021               if (TAO_debug_level > 0)
01022                 ACE_ERROR ((LM_ERROR,
01023                             ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
01024                             ACE_TEXT ("forward reference.\n")));
01025 
01026               return -1;
01027             }
01028 
01029           output.more_fragments (false);
01030 
01031           int result = transport->send_message (output,
01032                                                 0,
01033                                                 TAO_Transport::TAO_REPLY);
01034           if (result == -1)
01035             {
01036               if (TAO_debug_level > 0)
01037                 {
01038                   // No exception but some kind of error, yet a
01039                   // response is required.
01040                   ACE_ERROR ((LM_ERROR,
01041                               ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
01042                               ACE_TEXT ("cannot send reply\n"),
01043                               ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
01044                 }
01045             }
01046           return result;
01047         }
01048     }
01049   // Only CORBA exceptions are caught here.
01050   ACE_CATCHANY
01051     {
01052       int result = 0;
01053 
01054       if (response_required)
01055         {
01056           result = this->send_reply_exception (transport,
01057                                                output,
01058                                                request_id,
01059                                                &request.reply_service_info (),
01060                                                &ACE_ANY_EXCEPTION);
01061           if (result == -1)
01062             {
01063               if (TAO_debug_level > 0)
01064                 {
01065                   ACE_ERROR ((LM_ERROR,
01066                               ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
01067                               ACE_TEXT ("cannot send exception\n"),
01068                               ACE_TEXT ("process_connector_request ()")));
01069 
01070                   ACE_PRINT_EXCEPTION (
01071                       ACE_ANY_EXCEPTION,
01072                       "TAO_GIOP_Message_Base::process_request[1]");
01073                 }
01074             }
01075 
01076         }
01077       else if (TAO_debug_level > 0)
01078         {
01079           // It is unfortunate that an exception (probably a system
01080           // exception) was thrown by the upcall code (even by the
01081           // user) when the client was not expecting a response.
01082           // However, in this case, we cannot close the connection
01083           // down, since it really isn't the client's fault.
01084 
01085           ACE_ERROR ((LM_ERROR,
01086                       ACE_TEXT ("(%P|%t) exception thrown ")
01087                       ACE_TEXT ("but client is not waiting a response\n")));
01088 
01089           ACE_PRINT_EXCEPTION (
01090               ACE_ANY_EXCEPTION,
01091               "TAO_GIOP_Message_Base::process_request[2]");
01092         }
01093 
01094       return result;
01095     }
01096 #if defined (TAO_HAS_EXCEPTIONS)
01097   ACE_CATCHALL
01098     {
01099       // @@ TODO some c++ exception or another, but what do we do with
01100       //    it?
01101       // We are supposed to map it into a CORBA::UNKNOWN exception.
01102       // BTW, this cannot be detected if using the <env> mapping.  If
01103       // we have native exceptions but no support for them in the ORB
01104       // we should still be able to catch it.  If we don't have native
01105       // exceptions it couldn't have been raised in the first place!
01106       int result = 0;
01107 
01108       if (response_required)
01109         {
01110           CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code
01111                                     (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0),
01112                                     CORBA::COMPLETED_MAYBE);
01113 
01114           result = this->send_reply_exception (transport,
01115                                                output,
01116                                                request_id,
01117                                                &request.reply_service_info (),
01118                                                &exception);
01119           if (result == -1)
01120             {
01121               if (TAO_debug_level > 0)
01122                 {
01123                   ACE_ERROR ((LM_ERROR,
01124                               ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
01125                               ACE_TEXT ("%p: ")
01126                               ACE_TEXT ("cannot send exception\n"),
01127                               ACE_TEXT ("process_request ()")));
01128                   ACE_PRINT_EXCEPTION (
01129                       exception,
01130                       "TAO_GIOP_Message_Base::process_request[3]");
01131                 }
01132             }
01133         }
01134       else if (TAO_debug_level > 0)
01135         {
01136           // It is unfotunate that an exception (probably a system
01137           // exception) was thrown by the upcall code (even by the
01138           // user) when the client was not expecting a response.
01139           // However, in this case, we cannot close the connection
01140           // down, since it really isn't the client's fault.
01141           ACE_ERROR ((LM_ERROR,
01142                       ACE_TEXT ("(%P|%t|%N|%l) exception thrown ")
01143                       ACE_TEXT ("but client is not waiting a response\n")));
01144         }
01145 
01146       return result;
01147     }
01148 #endif /* TAO_HAS_EXCEPTIONS */
01149   ACE_ENDTRY;
01150 
01151   return 0;
01152 }
01153 
01154 
01155 int
01156 TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
01157                                                TAO_InputCDR &input,
01158                                                TAO_OutputCDR &output,
01159                                                TAO_GIOP_Message_Generator_Parser *parser)
01160 {
01161   // This will extract the request header, set <response_required> as
01162   // appropriate.
01163   TAO_GIOP_Locate_Request_Header locate_request (input,
01164                                                  this->orb_core_);
01165 
01166   TAO_GIOP_Locate_Status_Msg status_info;
01167 
01168   // Defaulting.
01169   status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01170 
01171   CORBA::Boolean response_required = 1;
01172 
01173   ACE_DECLARE_NEW_CORBA_ENV;
01174   ACE_TRY
01175     {
01176       int parse_error =
01177         parser->parse_locate_header (locate_request);
01178 
01179       if (parse_error != 0)
01180         {
01181           ACE_TRY_THROW (CORBA::MARSHAL (0,
01182                                          CORBA::COMPLETED_NO));
01183         }
01184 
01185       TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
01186                               locate_request.object_key ().length (),
01187                               locate_request.object_key ().get_buffer (),
01188                               0);
01189 
01190       // Set it to an error state
01191       parse_error = 1;
01192       CORBA::ULong req_id = locate_request.request_id ();
01193 
01194       // We will send the reply. The ServerRequest class need not send
01195       // the reply
01196       CORBA::Boolean deferred_reply = true;
01197       TAO_ServerRequest server_request (this,
01198                                         req_id,
01199                                         response_required,
01200                                         deferred_reply,
01201                                         tmp_key,
01202                                         "_non_existent",
01203                                         output,
01204                                         transport,
01205                                         this->orb_core_,
01206                                         parse_error);
01207 
01208       if (parse_error != 0)
01209         {
01210           ACE_TRY_THROW (CORBA::MARSHAL (0,
01211                                          CORBA::COMPLETED_NO));
01212         }
01213 
01214       CORBA::Object_var forward_to;
01215 
01216       this->orb_core_->request_dispatcher ()->dispatch (
01217           this->orb_core_,
01218           server_request,
01219           forward_to
01220            ACE_ENV_ARG_PARAMETER);
01221       ACE_TRY_CHECK;
01222 
01223       if (!CORBA::is_nil (forward_to.in ()))
01224         {
01225           status_info.status = TAO_GIOP_OBJECT_FORWARD;
01226           status_info.forward_location_var = forward_to;
01227           if (TAO_debug_level > 0)
01228             ACE_DEBUG ((LM_DEBUG,
01229                         ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01230                         ACE_TEXT ("called: forwarding\n")));
01231         }
01232       else if (server_request.exception_type () == TAO_GIOP_NO_EXCEPTION)
01233         {
01234           // We got no exception, so the object is here.
01235           status_info.status = TAO_GIOP_OBJECT_HERE;
01236           if (TAO_debug_level > 0)
01237             ACE_DEBUG ((LM_DEBUG,
01238                         ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01239                         ACE_TEXT ("found\n")));
01240         }
01241       else
01242         {
01243           status_info.forward_location_var = server_request.forward_location ();
01244 
01245           if (!CORBA::is_nil (status_info.forward_location_var.in ()))
01246             {
01247               status_info.status = TAO_GIOP_OBJECT_FORWARD;
01248               ACE_DEBUG ((LM_DEBUG,
01249                           ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01250                           ACE_TEXT ("forwarding\n")));
01251             }
01252           else
01253             {
01254               // Normal exception, so the object is not here
01255               status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01256               ACE_DEBUG ((LM_DEBUG,
01257                           ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01258                           ACE_TEXT ("not here\n")));
01259             }
01260         }
01261     }
01262 
01263   ACE_CATCHANY
01264     {
01265       // Normal exception, so the object is not here
01266       status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01267       if (TAO_debug_level > 0)
01268         ACE_DEBUG ((LM_DEBUG,
01269                     ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
01270                     ACE_TEXT ("CORBA exception raised\n")));
01271     }
01272 #if defined (TAO_HAS_EXCEPTIONS)
01273   ACE_CATCHALL
01274     {
01275       // Normal exception, so the object is not here
01276       status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
01277       if (TAO_debug_level > 0)
01278         ACE_DEBUG ((LM_DEBUG,
01279                     ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
01280                     ACE_TEXT ("C++ exception raised\n")));
01281     }
01282 #endif /* TAO_HAS_EXCEPTIONS */
01283   ACE_ENDTRY;
01284 
01285   return this->make_send_locate_reply (transport,
01286                                        locate_request,
01287                                        status_info,
01288                                        output,
01289                                        parser);
01290 }
01291 
01292 int
01293 TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
01294                                                TAO_GIOP_Locate_Request_Header &request,
01295                                                TAO_GIOP_Locate_Status_Msg &status_info,
01296                                                TAO_OutputCDR &output,
01297                                                TAO_GIOP_Message_Generator_Parser *parser)
01298 {
01299   // Note here we are making the Locate reply header which is *QUITE*
01300   // different from the reply header made by the make_reply () call..
01301   // Make the GIOP message header
01302   this->write_protocol_header (TAO_GIOP_LOCATEREPLY,
01303                                output);
01304 
01305   // This writes the header & body
01306   parser->write_locate_reply_mesg (output,
01307                                    request.request_id (),
01308                                    status_info);
01309 
01310   output.more_fragments (false);
01311 
01312   // Send the message
01313   int result = transport->send_message (output,
01314                                         0,
01315                                         TAO_Transport::TAO_REPLY);
01316 
01317   // Print out message if there is an error
01318   if (result == -1)
01319     {
01320       if (TAO_debug_level > 0)
01321         {
01322           ACE_ERROR ((LM_ERROR,
01323                       ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
01324                       ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
01325         }
01326     }
01327 
01328   return result;
01329 }
01330 
01331 // Send an "I can't understand you" message -- again, the message is
01332 // prefabricated for simplicity.  This implies abortive disconnect (at
01333 // the application level, if not at the level of TCP).
01334 //
01335 // NOTE that IIOP will still benefit from TCP's orderly disconnect.
01336 int
01337 TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
01338 {
01339   const char
01340     error_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01341   {
01342     // The following works on non-ASCII platforms, such as MVS (which
01343     // uses EBCDIC).
01344     0x47, // 'G'
01345     0x49, // 'I'
01346     0x4f, // 'O'
01347     0x50, // 'P'
01348     (CORBA::Octet) 1, // Use the lowest GIOP version
01349     (CORBA::Octet) 0,
01350     TAO_ENCAP_BYTE_ORDER,
01351     TAO_GIOP_MESSAGERROR,
01352     0, 0, 0, 0
01353   };
01354 
01355   // @@ Q: How does this works with GIOP lite?
01356   //    A: It doesn't
01357 
01358   this->dump_msg ("send_error",
01359                   (const u_char *) error_message,
01360                   TAO_GIOP_MESSAGE_HEADER_LEN);
01361 
01362   ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01363                              ACE_Message_Block::MB_DATA,
01364                              error_message,
01365                              0,
01366                              0,
01367                              ACE_Message_Block::DONT_DELETE,
01368                              0);
01369   ACE_Message_Block message_block(&data_block,
01370                                   ACE_Message_Block::DONT_DELETE);
01371   message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01372 
01373   size_t bt;
01374   int result = transport->send_message_block_chain (&message_block, bt);
01375   if (result == -1)
01376     {
01377       if (TAO_debug_level > 0)
01378         ACE_DEBUG ((LM_DEBUG,
01379                     ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
01380                     transport->id ()));
01381     }
01382 
01383   return result;
01384 }
01385 
01386 void
01387 TAO_GIOP_Message_Base::set_state (
01388     CORBA::Octet def_major,
01389     CORBA::Octet def_minor,
01390     TAO_GIOP_Message_Generator_Parser *&gen_parser) const
01391 {
01392   switch (def_major)
01393     {
01394     case 1:
01395       switch (def_minor)
01396         {
01397         case 0:
01398           gen_parser =
01399             const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
01400                                      &this->tao_giop_impl_.tao_giop_10);
01401           break;
01402         case 1:
01403           gen_parser =
01404             const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
01405                                      &this->tao_giop_impl_.tao_giop_11);
01406           break;
01407         case 2:
01408           gen_parser =
01409             const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
01410                                      &this->tao_giop_impl_.tao_giop_12);
01411           break;
01412         default:
01413           break;
01414         }
01415       break;
01416     default:
01417       break;
01418     }
01419 }
01420 
01421 
01422 // Server sends an "I'm shutting down now, any requests you've sent me
01423 // can be retried" message to the server.  The message is prefab, for
01424 // simplicity.
01425 //
01426 // NOTE: this is IIOP-specific though it doesn't look like it is.  It
01427 // relies on a TCP-ism: orderly disconnect, which doesn't exist in all
01428 // transport protocols.  Versions of GIOP atop some transport that's
01429 // lacking orderly disconnect must define some transport-specific
01430 // handshaking (e.g. the XNS/SPP handshake convention) in order to
01431 // know that the same transport semantics are provided when shutdown
01432 // is begun with messages "in flight". (IIOP doesn't report false
01433 // errors in the case of "clean shutdown", because it relies on
01434 // orderly disconnect as provided by TCP.  This quality of service is
01435 // required to write robust distributed systems.)
01436 
01437 void
01438 TAO_GIOP_Message_Base::
01439   send_close_connection (const TAO_GIOP_Message_Version &version,
01440                          TAO_Transport *transport,
01441                          void *)
01442 {
01443 
01444   // static CORBA::Octet
01445   // I hate  this in every method. Till the time I figure out a way
01446   // around  I will have them here hanging around.
01447   const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
01448   {
01449     // The following works on non-ASCII platforms, such as MVS (which
01450     // uses EBCDIC).
01451     0x47, // 'G'
01452     0x49, // 'I'
01453     0x4f, // 'O'
01454     0x50, // 'P'
01455     version.major,
01456     version.minor,
01457     TAO_ENCAP_BYTE_ORDER,
01458     TAO_GIOP_CLOSECONNECTION,
01459     0, 0, 0, 0
01460   };
01461 
01462   // It's important that we use a reliable shutdown after we send this
01463   // message, so we know it's received.
01464   //
01465   // @@ should recv and discard queued data for portability; note
01466   // that this won't block (long) since we never set SO_LINGER
01467 
01468   this->dump_msg ("send_close_connection",
01469                   (const u_char *) close_message,
01470                   TAO_GIOP_MESSAGE_HEADER_LEN);
01471 
01472 #if 0
01473   // @@CJC I don't think we need this check b/c the transport's send()
01474   // will simply return -1.  However, I guess we could create something
01475   // like TAO_Tranport::is_closed() that returns whether the connection
01476   // is already closed.  The problem with that, however, is that it's
01477   // entirely possible that is_closed() could return TRUE, and then the
01478   // transport could get closed down btw. the time it gets called and the
01479   // time that the send actually occurs.
01480   ACE_HANDLE which = transport->handle ();
01481   if (which == ACE_INVALID_HANDLE)
01482     {
01483       if (TAO_debug_level > 0)
01484         ACE_DEBUG ((LM_DEBUG,
01485            ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -")
01486            ACE_TEXT (" connection already closed\n")));
01487       return;
01488     }
01489 #endif
01490 
01491   ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
01492                              ACE_Message_Block::MB_DATA,
01493                              close_message,
01494                              0,
01495                              0,
01496                              ACE_Message_Block::DONT_DELETE,
01497                              0);
01498   ACE_Message_Block message_block(&data_block);
01499   message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
01500 
01501   size_t bt;
01502   int result = transport->send_message_block_chain (&message_block, bt);
01503   if (result == -1)
01504     {
01505       if (TAO_debug_level > 0)
01506         ACE_ERROR ((LM_ERROR,
01507            ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
01508            transport->id (), errno));
01509     }
01510 
01511   transport->close_connection ();
01512   ACE_DEBUG ((LM_DEBUG,
01513       ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
01514       transport-> id ()));
01515 
01516 }
01517 
01518 
01519 int
01520 TAO_GIOP_Message_Base::send_reply_exception (
01521     TAO_Transport *transport,
01522     TAO_OutputCDR &output,
01523     CORBA::ULong request_id,
01524     IOP::ServiceContextList *svc_info,
01525     CORBA::Exception *x
01526   )
01527 {
01528   TAO_Pluggable_Reply_Params_Base reply_params;
01529   reply_params.request_id_ = request_id;
01530   reply_params.svc_ctx_.length (0);
01531 
01532   // We are going to send some data
01533   reply_params.argument_flag_ = 1;
01534 
01535   // Send back the service context we received.  (RTCORBA relies on
01536   // this).
01537   reply_params.service_context_notowned (svc_info);
01538 
01539   reply_params.reply_status_ = TAO_GIOP_USER_EXCEPTION;
01540 
01541   if (CORBA::SystemException::_downcast (x) != 0)
01542     {
01543       reply_params.reply_status_ = TAO_GIOP_SYSTEM_EXCEPTION;
01544     }
01545 
01546   if (this->generate_exception_reply (output,
01547                                       reply_params,
01548                                       *x) == -1)
01549     return -1;
01550 
01551   output.more_fragments (false);
01552 
01553   return transport->send_message (output,
01554                                   0,
01555                                   TAO_Transport::TAO_REPLY);
01556 }
01557 
01558 void
01559 TAO_GIOP_Message_Base::dump_msg (const char *label,
01560                                  const u_char *ptr,
01561                                  size_t len)
01562 {
01563 
01564   if (TAO_debug_level >= 5)
01565     {
01566       static const char digits[] = "0123456789ABCD";
01567       static const char *names[] =
01568       {
01569         "Request",
01570         "Reply",
01571         "CancelRequest",
01572         "LocateRequest",
01573         "LocateReply",
01574         "CloseConnection",
01575         "MessageError",
01576         "Fragment"
01577       };
01578 
01579       // Message name.
01580       const char *message_name = "UNKNOWN MESSAGE";
01581       u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET];
01582       if (slot < sizeof (names) / sizeof (names[0]))
01583         message_name = names[slot];
01584 
01585       // Byte order.
01586       int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
01587 
01588       // Get the version info
01589       CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
01590       CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
01591 
01592       // request/reply id.
01593       CORBA::ULong tmp = 0;
01594       CORBA::ULong *id = &tmp;
01595       char *tmp_id = 0;
01596 
01597       if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST ||
01598           ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY ||
01599           ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_FRAGMENT)
01600         {
01601           if (major == 1 && minor < 2)
01602             {
01603               // @@ Only works if ServiceContextList is empty....
01604               tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN  + 4);
01605             }
01606           else
01607             {
01608               tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
01609             }
01610 #if !defined (ACE_DISABLE_SWAP_ON_READ)
01611         if (byte_order == TAO_ENCAP_BYTE_ORDER)
01612           {
01613             id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01614           }
01615         else
01616           {
01617             ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
01618           }
01619 #else
01620         id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
01621 #endif /* ACE_DISABLE_SWAP_ON_READ */
01622 
01623         }
01624 
01625       // Print.
01626       ACE_DEBUG ((LM_DEBUG,
01627                   "TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
01628                   "%s GIOP v%c.%c msg, %d data bytes, %s endian, "
01629                   "Type %s[%u]\n",
01630                   ACE_TEXT_CHAR_TO_TCHAR (label),
01631                   digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
01632                   digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
01633                   len - TAO_GIOP_MESSAGE_HEADER_LEN ,
01634                   (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
01635                   ACE_TEXT_CHAR_TO_TCHAR(message_name),
01636                   *id));
01637 
01638       if (TAO_debug_level >= 10)
01639         ACE_HEX_DUMP ((LM_DEBUG,
01640                        (const char *) ptr,
01641                        len,
01642                        ACE_TEXT ("GIOP message")));
01643     }
01644 }
01645 
01646 int
01647 TAO_GIOP_Message_Base::generate_locate_reply_header (
01648     TAO_OutputCDR & /*cdr*/,
01649     TAO_Pluggable_Reply_Params_Base & /*params*/)
01650 {
01651   return 0;
01652 }
01653 
01654 int
01655 TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg)
01656 {
01657   // Get a parser for us
01658   TAO_GIOP_Message_Generator_Parser *parser = 0;
01659 
01660   CORBA::Octet major, minor = 0;
01661 
01662   msg.get_version (major, minor);
01663 
01664   // Get the state information that we need to use
01665   this->set_state (major,
01666                    minor,
01667                    parser);
01668 
01669   // We dont really know.. So ask the generator and parser objects that
01670   // we know.
01671   // @@ TODO: Need to make this faster, instead of making virtual
01672   // call, try todo the check within this class
01673   return parser->is_ready_for_bidirectional ();
01674 }
01675 
01676 
01677 TAO_Queued_Data *
01678 TAO_GIOP_Message_Base::make_queued_data (size_t sz)
01679 {
01680   // Get a node for the queue..
01681   TAO_Queued_Data *qd =
01682     TAO_Queued_Data::make_queued_data (
01683       this->orb_core_->transport_message_buffer_allocator ());
01684 
01685   if (qd == 0)
01686     {
01687       if (TAO_debug_level > 0)
01688         {
01689           ACE_ERROR ((LM_ERROR,
01690             ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01691             ACE_TEXT ("our of memory, failed to allocate queued data object\n")));
01692         }
01693       return 0; // NULL pointer
01694     }
01695 
01696   // @@todo: We have a similar method in Transport.cpp. Need to see how
01697   // we can factor them out..
01698   // Make a datablock for the size requested + something. The
01699   // "something" is required because we are going to align the data
01700   // block in the message block. During alignment we could loose some
01701   // bytes. As we may not know how many bytes will be lost, we will
01702   // allocate ACE_CDR::MAX_ALIGNMENT extra.
01703   ACE_Data_Block *db =
01704     this->orb_core_->create_input_cdr_data_block (sz +
01705                                                   ACE_CDR::MAX_ALIGNMENT);
01706 
01707   if (db == 0)
01708     {
01709       TAO_Queued_Data::release (qd);
01710 
01711       if (TAO_debug_level > 0)
01712         {
01713           ACE_ERROR ((LM_ERROR,
01714             ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01715             ACE_TEXT ("out of memory, failed to allocate input data block of size %u\n"),
01716             sz));
01717         }
01718       return 0; // NULL pointer
01719     }
01720 
01721   ACE_Allocator *alloc =
01722     this->orb_core_->input_cdr_msgblock_allocator ();
01723 
01724   if (alloc == 0)
01725     {
01726       if (TAO_debug_level >= 8)
01727         {
01728           ACE_DEBUG ((LM_DEBUG,
01729              ACE_TEXT ("(%P|%t) - TAO_GIOP_Message_Base::make_queued_data,")
01730              ACE_TEXT (" no ACE_Allocator defined\n")));
01731         }
01732     }
01733 
01734 
01735   ACE_Message_Block mb (db,
01736                         0,
01737                         alloc);
01738 
01739   ACE_Message_Block *new_mb = mb.duplicate ();
01740 
01741   if (new_mb == 0)
01742     {
01743       TAO_Queued_Data::release (qd);
01744       db->release();
01745 
01746       if (TAO_debug_level > 0)
01747         {
01748           ACE_ERROR ((LM_ERROR,
01749             ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
01750             ACE_TEXT ("out of memory, failed to allocate message block\n")));
01751         }
01752       return 0;
01753     }
01754 
01755   ACE_CDR::mb_align (new_mb);
01756 
01757   qd->msg_block_ = new_mb;
01758 
01759   return qd;
01760 }
01761 
01762 size_t
01763 TAO_GIOP_Message_Base::header_length (void) const
01764 {
01765   return TAO_GIOP_MESSAGE_HEADER_LEN;
01766 }
01767 
01768 size_t
01769 TAO_GIOP_Message_Base::fragment_header_length (CORBA::Octet major,
01770                                                CORBA::Octet minor) const
01771 {
01772   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
01773 
01774   // Get the state information that we need to use
01775   this->set_state (major,
01776                    minor,
01777                    generator_parser);
01778 
01779   return generator_parser->fragment_header_length ();
01780 }
01781 
01782 void
01783 TAO_GIOP_Message_Base::init_queued_data (
01784   TAO_Queued_Data* qd,
01785   const TAO_GIOP_Message_State& state) const
01786 {
01787   qd->byte_order_     = state.byte_order_;
01788   qd->major_version_  = state.giop_version_.major;
01789   qd->minor_version_  = state.giop_version_.minor;
01790   qd->more_fragments_ = state.more_fragments_;
01791   qd->msg_type_       = this->message_type (state);
01792 }
01793 
01794 int
01795 TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd, CORBA::ULong &request_id) const
01796 {
01797   // Get a parser for us
01798   TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
01799 
01800   // Get the state information that we need to use
01801   this->set_state (qd->major_version_,
01802                    qd->minor_version_,
01803                    generator_parser);
01804 
01805   // Get the read and write positions before we steal data.
01806   size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
01807   size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
01808   rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
01809 
01810   // Create a input CDR stream. We do the following
01811   //  1 - If the incoming message block has a data block with a flag
01812   //      DONT_DELETE  (for the data block) we create an input CDR
01813   //      stream the same way.
01814   //  2 - If the incoming message block had a datablock from heap just
01815   //      use it by duplicating it and make the flag 0.
01816   // NOTE: We use the same data block in which we read the message and
01817   // we pass it on to the higher layers of the ORB. So we dont to any
01818   // copies at all here. The same is also done in the higher layers.
01819 
01820   ACE_Message_Block::Message_Flags flg = 0;
01821   ACE_Data_Block *db = 0;
01822 
01823   // Get the flag in the message block
01824   flg = qd->msg_block_->self_flags ();
01825 
01826   if (ACE_BIT_ENABLED (flg,
01827                        ACE_Message_Block::DONT_DELETE))
01828     {
01829       // Use the same datablock
01830       db = qd->msg_block_->data_block ();
01831     }
01832   else
01833     {
01834       // Use a duplicated datablock as the datablock has come off the
01835       // heap.
01836       db = qd->msg_block_->data_block ()->duplicate ();
01837     }
01838 
01839 
01840   TAO_InputCDR input_cdr (db,
01841                           flg,
01842                           rd_pos,
01843                           wr_pos,
01844                           qd->byte_order_,
01845                           qd->major_version_,
01846                           qd->minor_version_,
01847                           this->orb_core_);
01848 
01849   if (qd->major_version_ >= 1 &&
01850       (qd->minor_version_ == 0 || qd->minor_version_ == 1))
01851     {
01852       if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST ||
01853           qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY)
01854         {
01855           IOP::ServiceContextList service_context;
01856 
01857           if ( ! (input_cdr >> service_context &&
01858                   input_cdr >> request_id) )
01859             {
01860               return -1;
01861             }
01862 
01863           return 0;
01864         }
01865       else if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST ||
01866                qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST ||
01867                qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
01868         {
01869           if ( ! (input_cdr >> request_id) )
01870             {
01871               return -1;
01872             }
01873 
01874           return 0;
01875         }
01876       else
01877         {
01878           return -1;
01879         }
01880     }
01881   else
01882     {
01883       if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST  ||
01884           qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY    ||
01885           qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT ||
01886           qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST ||
01887           qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST ||
01888           qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
01889         {
01890           // Dealing with GIOP-1.2, the request-id is located directly behind the GIOP-Header.
01891           // This is true for all message types that might be sent in form of fragments or cancel-requests.
01892           if ( ! (input_cdr >> request_id) )
01893             {
01894               return -1;
01895             }
01896 
01897           return 0;
01898         }
01899       else
01900         {
01901           return -1;
01902         }
01903     }
01904 
01905   return -1;
01906 }
01907 
01908 /* @return -1 error, 0 ok, +1 outstanding fragments */
01909 int
01910 TAO_GIOP_Message_Base::consolidate_fragmented_message (TAO_Queued_Data *qd, TAO_Queued_Data *&msg)
01911 {
01912   TAO::Incoming_Message_Stack reverse_stack;
01913 
01914   TAO_Queued_Data *tail = 0;
01915   TAO_Queued_Data *head = 0;
01916 
01917   //
01918   // CONSOLIDATE FRAGMENTED MESSAGE
01919   //
01920 
01921   // check for error-condition
01922   if (qd == 0)
01923     {
01924       return -1;
01925     }
01926 
01927   if (qd->major_version_ == 1 && qd->minor_version_ == 0)
01928     {
01929       TAO_Queued_Data::release (qd);
01930       return -1; // error: GIOP-1.0 does not support fragments
01931     }
01932 
01933   // If this is not the last fragment, push it onto stack for later processing
01934   if (qd->more_fragments_)
01935     {
01936       this->fragment_stack_.push (qd);
01937 
01938       msg = 0;   // no consolidated message available yet
01939       return 1;  // status: more messages expected.
01940     }
01941 
01942   tail = qd;  // init
01943 
01944   // Add the current message block to the end of the chain
01945   // after adjusting the read pointer to skip the header(s)
01946   const size_t header_adjustment =
01947     this->header_length () +
01948     this->fragment_header_length (tail->major_version_,
01949                                   tail->minor_version_);
01950 
01951   if (tail->msg_block_->length () < header_adjustment)
01952     {
01953       // buffer length not sufficient
01954       TAO_Queued_Data::release (qd);
01955       return -1;
01956     }
01957 
01958   // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2
01959   if (tail->major_version_ == 1 && tail->minor_version_ == 1)
01960     {
01961       // GIOP-1.1
01962 
01963       while (this->fragment_stack_.pop (head) != -1)
01964         {
01965           if (head->more_fragments_ &&
01966               head->major_version_ == 1 &&
01967               head->minor_version_ == 1 &&
01968               head->msg_block_->length () >= header_adjustment)
01969             {
01970               // adjust the read-pointer, skip the fragment header
01971               tail->msg_block_->rd_ptr(header_adjustment);
01972 
01973               head->msg_block_->cont (tail->msg_block_);
01974 
01975               tail->msg_block_ = 0;
01976 
01977               TAO_Queued_Data::release (tail);
01978 
01979               tail = head;
01980             }
01981           else
01982             {
01983               reverse_stack.push (head);
01984             }
01985         }
01986     }
01987   else
01988     {
01989       // > GIOP-1.2
01990 
01991       CORBA::ULong tmp_request_id = 0;
01992       if (this->parse_request_id (tail, tmp_request_id) == -1)
01993         {
01994           return -1;
01995         }
01996 
01997       const CORBA::ULong request_id = tmp_request_id;
01998 
01999       while (this->fragment_stack_.pop (head) != -1)
02000         {
02001           CORBA::ULong head_request_id = 0;
02002           int parse_status = 0;
02003 
02004           if (head->more_fragments_ &&
02005               head->major_version_ >= 1 &&
02006               head->minor_version_ >= 2 &&
02007               head->msg_block_->length () >= header_adjustment &&
02008               (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
02009               request_id == head_request_id)
02010             {
02011               // adjust the read-pointer, skip the fragment header
02012               tail->msg_block_->rd_ptr(header_adjustment);
02013 
02014               head->msg_block_->cont (tail->msg_block_);
02015 
02016               tail->msg_block_ = 0;
02017 
02018               TAO_Queued_Data::release (tail);
02019 
02020               tail = head;
02021             }
02022           else
02023             {
02024               if (parse_status == -1)
02025                 {
02026                   TAO_Queued_Data::release (head);
02027                   return -1;
02028                 }
02029 
02030               reverse_stack.push (head);
02031             }
02032         }
02033     }
02034 
02035   // restore stack
02036   while (reverse_stack.pop (head) != -1)
02037     {
02038       this->fragment_stack_.push (head);
02039     }
02040 
02041   if (tail->consolidate () == -1)
02042     {
02043       // memory allocation failed
02044       TAO_Queued_Data::release (tail);
02045       return -1;
02046     }
02047 
02048   // set out value
02049   msg = tail;
02050 
02051   return 0;
02052 }
02053 
02054 
02055 int
02056 TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request)
02057 {
02058   // We must extract the specific request-id from message-buffer
02059   // and remove all fragments from stack that match this request-id.
02060 
02061   TAO::Incoming_Message_Stack reverse_stack;
02062 
02063   CORBA::ULong cancel_request_id;
02064 
02065   if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
02066     {
02067       return -1;
02068     }
02069 
02070   TAO_Queued_Data *head = 0;
02071 
02072   // Revert stack
02073   while (this->fragment_stack_.pop (head) != -1)
02074     {
02075       reverse_stack.push (head);
02076     }
02077 
02078   bool discard_all_GIOP11_messages = false;
02079 
02080   // Now we are able to process message in order they have arrived.
02081   // If the cancel_request_id matches to GIOP-1.1 message, all succeeding
02082   // fragments belong to this message and must be discarded.
02083   // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the
02084   // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments
02085   // having encoded the request id will be discarded.
02086   while (reverse_stack.pop (head) != -1)
02087     {
02088       CORBA::ULong head_request_id;
02089 
02090       if (head->major_version_ == 1 &&
02091           head->minor_version_ <= 1 &&
02092           head->msg_type_ != TAO_PLUGGABLE_MESSAGE_FRAGMENT && // GIOP11 fragment does not provide request id
02093           this->parse_request_id (head, head_request_id) >= 0 &&
02094           cancel_request_id == head_request_id)
02095         {
02096           TAO_Queued_Data::release (head);
02097           discard_all_GIOP11_messages = true;
02098         }
02099       else if (head->major_version_ == 1 &&
02100                head->minor_version_ <= 1 &&
02101                discard_all_GIOP11_messages)
02102         {
02103           TAO_Queued_Data::release (head);
02104         }
02105       else if (head->major_version_ >= 1 &&
02106                head->minor_version_ >= 2 &&
02107                this->parse_request_id (head, head_request_id) >= 0 &&
02108                cancel_request_id == head_request_id)
02109         {
02110           TAO_Queued_Data::release (head);
02111         }
02112       else
02113         {
02114           this->fragment_stack_.push (head);
02115         }
02116     }
02117 
02118   return 0;
02119 }
02120 
02121 TAO_GIOP_Fragmentation_Strategy *
02122 TAO_GIOP_Message_Base::fragmentation_strategy (void)
02123 {
02124   return this->fragmentation_strategy_.get ();
02125 }
02126 
02127 void
02128 TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
02129 {
02130   CORBA::Octet * const buf =
02131     reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
02132 
02133   CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
02134   CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
02135 
02136   // Flags for the GIOP protocol header "flags" field.
02137   CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
02138 
02139   // Least significant bit:        Byte order
02140   ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
02141 
02142   // Second least significant bit: More fragments
02143   //
02144   // Only supported in GIOP 1.1 or better.
02145   if (!(major <= 1 && minor == 0))
02146     ACE_SET_BITS (flags, msg.more_fragments () << 1);
02147 }
02148 
02149 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 11:54:12 2006 for TAO by doxygen 1.3.6