DIOP_Transport.cpp

Go to the documentation of this file.
00001 // $Id: DIOP_Transport.cpp 79151 2007-08-01 09:04:36Z johnnyw $
00002 
00003 #include "tao/Strategies/DIOP_Transport.h"
00004 
00005 #if defined (TAO_HAS_DIOP) && (TAO_HAS_DIOP != 0)
00006 
00007 #include "tao/Strategies/DIOP_Connection_Handler.h"
00008 #include "tao/Strategies/DIOP_Acceptor.h"
00009 #include "tao/Strategies/DIOP_Profile.h"
00010 #include "tao/Acceptor_Registry.h"
00011 #include "tao/operation_details.h"
00012 #include "tao/Timeprobe.h"
00013 #include "tao/CDR.h"
00014 #include "tao/Transport_Mux_Strategy.h"
00015 #include "tao/Wait_Strategy.h"
00016 #include "tao/Stub.h"
00017 #include "tao/ORB_Core.h"
00018 #include "tao/debug.h"
00019 #include "tao/Resume_Handle.h"
00020 #include "tao/GIOP_Message_Base.h"
00021 
00022 ACE_RCSID (tao, DIOP_Transport, "$Id: DIOP_Transport.cpp 79151 2007-08-01 09:04:36Z johnnyw $")
00023 
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
00027                                         TAO_ORB_Core *orb_core)
00028   : TAO_Transport (TAO_TAG_DIOP_PROFILE,
00029                    orb_core)
00030   , connection_handler_ (handler)
00031   , messaging_object_ (0)
00032 {
00033 /*
00034  * Hook to customize the messaging object when the concrete messaging
00035  * object is known a priori. In this case, the flag is ignored.
00036  */
00037 //@@ MESSAGING_SPL_COMMENT_HOOK_START
00038 
00039   // Use the normal GIOP object
00040   ACE_NEW (this->messaging_object_,
00041             TAO_GIOP_Message_Base (orb_core,
00042                                   this,
00043                                   ACE_MAX_DGRAM_SIZE));
00044 
00045 //@@ MESSAGING_SPL_COMMENT_HOOK_END
00046 
00047 }
00048 
00049 TAO_DIOP_Transport::~TAO_DIOP_Transport (void)
00050 {
00051   delete this->messaging_object_;
00052 }
00053 
00054 ACE_Event_Handler *
00055 TAO_DIOP_Transport::event_handler_i (void)
00056 {
00057   return this->connection_handler_;
00058 }
00059 
00060 TAO_Connection_Handler *
00061 TAO_DIOP_Transport::connection_handler_i (void)
00062 {
00063   return this->connection_handler_;
00064 }
00065 
00066 TAO_Pluggable_Messaging *
00067 TAO_DIOP_Transport::messaging_object (void)
00068 {
00069   return this->messaging_object_;
00070 }
00071 
00072 ssize_t
00073 TAO_DIOP_Transport::send (iovec *iov, int iovcnt,
00074                           size_t &bytes_transferred,
00075                           const ACE_Time_Value *)
00076 {
00077   const ACE_INET_Addr &addr = this->connection_handler_->addr ();
00078 
00079   ssize_t bytes_to_send = 0;
00080   for (int i = 0; i < iovcnt; i++)
00081      bytes_to_send += iov[i].iov_len;
00082 
00083   this->connection_handler_->peer ().send (iov, iovcnt, addr);
00084 
00085   // @@ Michael:
00086   // Always return a positive number of bytes sent, as we do
00087   // not handle sending errors in DIOP.
00088 
00089   bytes_transferred = bytes_to_send;
00090 
00091   return 1;
00092 }
00093 
00094 ssize_t
00095 TAO_DIOP_Transport::recv (char *buf,
00096                           size_t len,
00097                           const ACE_Time_Value * /* max_wait_time */)
00098 {
00099   ACE_INET_Addr from_addr;
00100 
00101   ssize_t const n = this->connection_handler_->peer ().recv (buf, len, from_addr);
00102 
00103   if (TAO_debug_level > 0)
00104     {
00105       ACE_DEBUG ((LM_DEBUG,
00106                   "TAO (%P|%t) - DIOP_Transport::recv, received %d bytes from %s:%d %d\n",
00107                   n,
00108                   ACE_TEXT_CHAR_TO_TCHAR (from_addr.get_host_name ()),
00109                   from_addr.get_port_number (),
00110                   errno));
00111     }
00112 
00113   // Most of the errors handling is common for
00114   // Now the message has been read
00115   if (n == -1 && TAO_debug_level > 4)
00116     {
00117       ACE_DEBUG ((LM_DEBUG,
00118                   ACE_TEXT ("TAO (%P|%t) - DIOP_Transport::recv, %p \n"),
00119                   ACE_TEXT ("TAO - read message failure ")
00120                   ACE_TEXT ("recv () \n")));
00121     }
00122 
00123   // Error handling
00124   if (n == -1)
00125     {
00126       if (errno == EWOULDBLOCK)
00127         return 0;
00128 
00129       return -1;
00130     }
00131   // @@ What are the other error handling here??
00132   else if (n == 0)
00133     {
00134       return -1;
00135     }
00136 
00137   // Remember the from addr to eventually use it as remote
00138   // addr for the reply.
00139   this->connection_handler_->addr (from_addr);
00140 
00141   return n;
00142 }
00143 
00144 int
00145 TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
00146                                   ACE_Time_Value *max_wait_time)
00147 {
00148   // If there are no messages then we can go ahead to read from the
00149   // handle for further reading..
00150 
00151   // The buffer on the stack which will be used to hold the input
00152   // messages
00153   char buf [ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00154 
00155 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00156   (void) ACE_OS::memset (buf,
00157                          '\0',
00158                          sizeof buf);
00159 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00160 
00161   // Create a data block
00162   ACE_Data_Block db (sizeof (buf),
00163                      ACE_Message_Block::MB_DATA,
00164                      buf,
00165                      this->orb_core_->input_cdr_buffer_allocator (),
00166                      this->orb_core_->locking_strategy (),
00167                      ACE_Message_Block::DONT_DELETE,
00168                      this->orb_core_->input_cdr_dblock_allocator ());
00169 
00170   // Create a message block
00171   ACE_Message_Block message_block (&db,
00172                                    ACE_Message_Block::DONT_DELETE,
00173                                    this->orb_core_->input_cdr_msgblock_allocator ());
00174 
00175 
00176   // Align the message block
00177   ACE_CDR::mb_align (&message_block);
00178 
00179 
00180   // Read the message into the  message block that we have created on
00181   // the stack.
00182   ssize_t n = this->recv (message_block.rd_ptr (),
00183                           message_block.space (),
00184                           max_wait_time);
00185 
00186   // If there is an error return to the reactor..
00187   if (n <= 0)
00188     {
00189       if (n == -1)
00190         {
00191           this->tms_->connection_closed ();
00192         }
00193 
00194       return n;
00195     }
00196 
00197   // Set the write pointer in the stack buffer
00198   message_block.wr_ptr (n);
00199 
00200   // Make a node of the message block..
00201   TAO_Queued_Data qd (&message_block);
00202   size_t mesg_length = 0;
00203 
00204   // Parse the incoming message for validity. The check needs to be
00205   // performed by the messaging objects.
00206   if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1)
00207     return -1;
00208 
00209   if (qd.missing_data () == TAO_MISSING_DATA_UNDEFINED)
00210     {
00211       // parse/marshal error
00212       return -1;
00213     }
00214 
00215   if (message_block.length () > mesg_length)
00216     {
00217       // we read too much data
00218       return -1;
00219     }
00220 
00221   // NOTE: We are not performing any queueing nor any checking for
00222   // missing data. We are assuming that ALL the data would be got in a
00223   // single read.
00224 
00225   // Process the message
00226   return this->process_parsed_messages (&qd, rh);
00227 }
00228 
00229 
00230 int
00231 TAO_DIOP_Transport::register_handler (void)
00232 {
00233   // @@ Michael:
00234   //
00235   // We do never register register the handler with the reactor
00236   // as we never need to be informed about any incoming data,
00237   // assuming we only use one-ways.
00238   // If we would register and ICMP Messages would arrive, e.g
00239   // due to a not reachable server, we would get informed - as this
00240   // disturbs the general DIOP assumptions of not being
00241   // interested in any network failures, we ignore ICMP messages.
00242   return 0;
00243 }
00244 
00245 
00246 int
00247 TAO_DIOP_Transport::send_request (TAO_Stub *stub,
00248                                   TAO_ORB_Core *orb_core,
00249                                   TAO_OutputCDR &stream,
00250                                   int message_semantics,
00251                                   ACE_Time_Value *max_wait_time)
00252 {
00253   if (this->ws_->sending_request (orb_core, message_semantics) == -1)
00254     return -1;
00255 
00256   if (this->send_message (stream,
00257                           stub,
00258                           message_semantics,
00259                           max_wait_time) == -1)
00260 
00261     return -1;
00262 
00263   this->first_request_sent ();
00264 
00265   return 0;
00266 }
00267 
00268 int
00269 TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
00270                                   TAO_Stub *stub,
00271                                   int message_semantics,
00272                                   ACE_Time_Value *max_wait_time)
00273 {
00274   // Format the message in the stream first
00275   if (this->messaging_object_->format_message (stream) != 0)
00276     return -1;
00277 
00278   // Strictly speaking, should not need to loop here because the
00279   // socket never gets set to a nonblocking mode ... some Linux
00280   // versions seem to need it though.  Leaving it costs little.
00281 
00282   // This guarantees to send all data (bytes) or return an error.
00283   ssize_t const n = this->send_message_shared (stub,
00284                                                message_semantics,
00285                                                stream.begin (),
00286                                                max_wait_time);
00287 
00288   if (n == -1)
00289     {
00290       if (TAO_debug_level)
00291         ACE_DEBUG ((LM_DEBUG,
00292                     ACE_TEXT ("TAO (%P|%t) - DIOP_Transport::send_message, ")
00293                     ACE_TEXT ("closing transport %d after fault %p\n"),
00294                     this->id (),
00295                     ACE_TEXT ("send_message ()\n")));
00296 
00297       return -1;
00298     }
00299 
00300   return 1;
00301 }
00302 
00303 int
00304 TAO_DIOP_Transport::send_message_shared (TAO_Stub *stub,
00305                                          int message_semantics,
00306                                          const ACE_Message_Block *message_block,
00307                                          ACE_Time_Value *max_wait_time)
00308 {
00309   int result;
00310 
00311   {
00312     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00313 
00314     result =
00315       this->send_message_shared_i (stub, message_semantics,
00316                                    message_block, max_wait_time);
00317   }
00318 
00319   if (result == -1)
00320     {
00321       this->close_connection ();
00322     }
00323 
00324   return result;
00325 }
00326 
00327 int
00328 TAO_DIOP_Transport::messaging_init (CORBA::Octet major,
00329                                     CORBA::Octet minor)
00330 {
00331   this->messaging_object_->init (major, minor);
00332   return 1;
00333 }
00334 
00335 TAO_END_VERSIONED_NAMESPACE_DECL
00336 
00337 #endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */

Generated on Sun Jan 27 15:59:47 2008 for TAO_Strategies by doxygen 1.3.6