DIOP_Transport.cpp

Go to the documentation of this file.
00001 // $Id: DIOP_Transport.cpp 80603 2008-02-11 22:14:39Z johnc $
00002 
00003 #include "tao/Strategies/DIOP_Transport.h"
00004 
00005 #if defined (TAO_HAS_DIOP) && (TAO_HAS_DIOP != 0)
00006 
00007 #include "tao/Strategies/DIOP_Connection_Handler.h"
00008 #include "tao/Strategies/DIOP_Acceptor.h"
00009 #include "tao/Strategies/DIOP_Profile.h"
00010 #include "tao/Acceptor_Registry.h"
00011 #include "tao/operation_details.h"
00012 #include "tao/Timeprobe.h"
00013 #include "tao/CDR.h"
00014 #include "tao/Transport_Mux_Strategy.h"
00015 #include "tao/Wait_Strategy.h"
00016 #include "tao/Stub.h"
00017 #include "tao/ORB_Core.h"
00018 #include "tao/debug.h"
00019 #include "tao/Resume_Handle.h"
00020 #include "tao/GIOP_Message_Base.h"
00021 
00022 ACE_RCSID (tao, DIOP_Transport, "$Id: DIOP_Transport.cpp 80603 2008-02-11 22:14:39Z johnc $")
00023 
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
00027                                         TAO_ORB_Core *orb_core)
00028   : TAO_Transport (TAO_TAG_DIOP_PROFILE,
00029                    orb_core,
00030                    ACE_MAX_DGRAM_SIZE)
00031   , connection_handler_ (handler)
00032 {
00033 }
00034 
00035 TAO_DIOP_Transport::~TAO_DIOP_Transport (void)
00036 {
00037 }
00038 
00039 ACE_Event_Handler *
00040 TAO_DIOP_Transport::event_handler_i (void)
00041 {
00042   return this->connection_handler_;
00043 }
00044 
00045 TAO_Connection_Handler *
00046 TAO_DIOP_Transport::connection_handler_i (void)
00047 {
00048   return this->connection_handler_;
00049 }
00050 
00051 ssize_t
00052 TAO_DIOP_Transport::send (iovec *iov, int iovcnt,
00053                           size_t &bytes_transferred,
00054                           const ACE_Time_Value *)
00055 {
00056   const ACE_INET_Addr &addr = this->connection_handler_->addr ();
00057 
00058   ssize_t bytes_to_send = 0;
00059   for (int i = 0; i < iovcnt; i++)
00060      bytes_to_send += iov[i].iov_len;
00061 
00062   this->connection_handler_->peer ().send (iov, iovcnt, addr);
00063 
00064   // @@ Michael:
00065   // Always return a positive number of bytes sent, as we do
00066   // not handle sending errors in DIOP.
00067 
00068   bytes_transferred = bytes_to_send;
00069 
00070   return 1;
00071 }
00072 
00073 ssize_t
00074 TAO_DIOP_Transport::recv (char *buf,
00075                           size_t len,
00076                           const ACE_Time_Value * /* max_wait_time */)
00077 {
00078   ACE_INET_Addr from_addr;
00079 
00080   ssize_t const n = this->connection_handler_->peer ().recv (buf, len, from_addr);
00081 
00082   if (TAO_debug_level > 0)
00083     {
00084       ACE_DEBUG ((LM_DEBUG,
00085                   "TAO (%P|%t) - DIOP_Transport::recv, received %d bytes from %s:%d %d\n",
00086                   n,
00087                   ACE_TEXT_CHAR_TO_TCHAR (from_addr.get_host_name ()),
00088                   from_addr.get_port_number (),
00089                   errno));
00090     }
00091 
00092   // Most of the errors handling is common for
00093   // Now the message has been read
00094   if (n == -1 && TAO_debug_level > 4)
00095     {
00096       ACE_DEBUG ((LM_DEBUG,
00097                   ACE_TEXT ("TAO (%P|%t) - DIOP_Transport::recv, %p \n"),
00098                   ACE_TEXT ("TAO - read message failure ")
00099                   ACE_TEXT ("recv () \n")));
00100     }
00101 
00102   // Error handling
00103   if (n == -1)
00104     {
00105       if (errno == EWOULDBLOCK)
00106         return 0;
00107 
00108       return -1;
00109     }
00110   // @@ What are the other error handling here??
00111   else if (n == 0)
00112     {
00113       return -1;
00114     }
00115 
00116   // Remember the from addr to eventually use it as remote
00117   // addr for the reply.
00118   this->connection_handler_->addr (from_addr);
00119 
00120   return n;
00121 }
00122 
00123 int
00124 TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
00125                                   ACE_Time_Value *max_wait_time)
00126 {
00127   // If there are no messages then we can go ahead to read from the
00128   // handle for further reading..
00129 
00130   // The buffer on the stack which will be used to hold the input
00131   // messages
00132   char buf [ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00133 
00134 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00135   (void) ACE_OS::memset (buf,
00136                          '\0',
00137                          sizeof buf);
00138 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00139 
00140   // Create a data block
00141   ACE_Data_Block db (sizeof (buf),
00142                      ACE_Message_Block::MB_DATA,
00143                      buf,
00144                      this->orb_core_->input_cdr_buffer_allocator (),
00145                      this->orb_core_->locking_strategy (),
00146                      ACE_Message_Block::DONT_DELETE,
00147                      this->orb_core_->input_cdr_dblock_allocator ());
00148 
00149   // Create a message block
00150   ACE_Message_Block message_block (&db,
00151                                    ACE_Message_Block::DONT_DELETE,
00152                                    this->orb_core_->input_cdr_msgblock_allocator ());
00153 
00154 
00155   // Align the message block
00156   ACE_CDR::mb_align (&message_block);
00157 
00158 
00159   // Read the message into the  message block that we have created on
00160   // the stack.
00161   ssize_t n = this->recv (message_block.rd_ptr (),
00162                           message_block.space (),
00163                           max_wait_time);
00164 
00165   // If there is an error return to the reactor..
00166   if (n <= 0)
00167     {
00168       if (n == -1)
00169         {
00170           this->tms_->connection_closed ();
00171         }
00172 
00173       return n;
00174     }
00175 
00176   // Set the write pointer in the stack buffer
00177   message_block.wr_ptr (n);
00178 
00179   // Make a node of the message block..
00180   TAO_Queued_Data qd (&message_block);
00181   size_t mesg_length = 0;
00182 
00183   // Parse the incoming message for validity. The check needs to be
00184   // performed by the messaging objects.
00185   if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1)
00186     return -1;
00187 
00188   if (qd.missing_data () == TAO_MISSING_DATA_UNDEFINED)
00189     {
00190       // parse/marshal error
00191       return -1;
00192     }
00193 
00194   if (message_block.length () > mesg_length)
00195     {
00196       // we read too much data
00197       return -1;
00198     }
00199 
00200   // NOTE: We are not performing any queueing nor any checking for
00201   // missing data. We are assuming that ALL the data would be got in a
00202   // single read.
00203 
00204   // Process the message
00205   return this->process_parsed_messages (&qd, rh);
00206 }
00207 
00208 
00209 int
00210 TAO_DIOP_Transport::register_handler (void)
00211 {
00212   // @@ Michael:
00213   //
00214   // We do never register register the handler with the reactor
00215   // as we never need to be informed about any incoming data,
00216   // assuming we only use one-ways.
00217   // If we would register and ICMP Messages would arrive, e.g
00218   // due to a not reachable server, we would get informed - as this
00219   // disturbs the general DIOP assumptions of not being
00220   // interested in any network failures, we ignore ICMP messages.
00221   return 0;
00222 }
00223 
00224 
00225 int
00226 TAO_DIOP_Transport::send_request (TAO_Stub *stub,
00227                                   TAO_ORB_Core *orb_core,
00228                                   TAO_OutputCDR &stream,
00229                                   TAO_Message_Semantics message_semantics,
00230                                   ACE_Time_Value *max_wait_time)
00231 {
00232   if (this->ws_->sending_request (orb_core, message_semantics) == -1)
00233     return -1;
00234 
00235   if (this->send_message (stream,
00236                           stub,
00237                           message_semantics,
00238                           max_wait_time) == -1)
00239 
00240     return -1;
00241 
00242   this->first_request_sent ();
00243 
00244   return 0;
00245 }
00246 
00247 int
00248 TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
00249                                   TAO_Stub *stub,
00250                                   TAO_Message_Semantics message_semantics,
00251                                   ACE_Time_Value *max_wait_time)
00252 {
00253   // Format the message in the stream first
00254   if (this->messaging_object_->format_message (stream) != 0)
00255     return -1;
00256 
00257   // Strictly speaking, should not need to loop here because the
00258   // socket never gets set to a nonblocking mode ... some Linux
00259   // versions seem to need it though.  Leaving it costs little.
00260 
00261   // This guarantees to send all data (bytes) or return an error.
00262   ssize_t const n = this->send_message_shared (stub,
00263                                                message_semantics,
00264                                                stream.begin (),
00265                                                max_wait_time);
00266 
00267   if (n == -1)
00268     {
00269       if (TAO_debug_level)
00270         ACE_DEBUG ((LM_DEBUG,
00271                     ACE_TEXT ("TAO (%P|%t) - DIOP_Transport::send_message, ")
00272                     ACE_TEXT ("closing transport %d after fault %p\n"),
00273                     this->id (),
00274                     ACE_TEXT ("send_message ()\n")));
00275 
00276       return -1;
00277     }
00278 
00279   return 1;
00280 }
00281 
00282 TAO_END_VERSIONED_NAMESPACE_DECL
00283 
00284 #endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */

Generated on Tue Feb 2 17:47:18 2010 for TAO_Strategies by  doxygen 1.4.7