DIOP_Transport.cpp

Go to the documentation of this file.
00001 // DIOP_Transport.cpp,v 1.29 2006/04/20 12:37:17 jwillemsen Exp
00002 
00003 #include "tao/Strategies/DIOP_Transport.h"
00004 
00005 #if defined (TAO_HAS_DIOP) && (TAO_HAS_DIOP != 0)
00006 
00007 #include "tao/Strategies/DIOP_Connection_Handler.h"
00008 #include "tao/Strategies/DIOP_Acceptor.h"
00009 #include "tao/Strategies/DIOP_Profile.h"
00010 #include "tao/Acceptor_Registry.h"
00011 #include "tao/operation_details.h"
00012 #include "tao/Timeprobe.h"
00013 #include "tao/CDR.h"
00014 #include "tao/Transport_Mux_Strategy.h"
00015 #include "tao/Wait_Strategy.h"
00016 #include "tao/Stub.h"
00017 #include "tao/ORB_Core.h"
00018 #include "tao/debug.h"
00019 #include "tao/Resume_Handle.h"
00020 #include "tao/GIOP_Message_Base.h"
00021 #include "tao/GIOP_Message_Lite.h"
00022 
00023 ACE_RCSID (tao, DIOP_Transport, "DIOP_Transport.cpp,v 1.29 2006/04/20 12:37:17 jwillemsen Exp")
00024 
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026 
00027 TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
00028                                         TAO_ORB_Core *orb_core,
00029                                         CORBA::Boolean flag)
00030   : TAO_Transport (TAO_TAG_DIOP_PROFILE,
00031                    orb_core)
00032   , connection_handler_ (handler)
00033   , messaging_object_ (0)
00034 {
00035 /*
00036  * Hook to customize the messaging object when the concrete messaging
00037  * object is known a priori. In this case, the flag is ignored.
00038  */
00039 //@@ MESSAGING_SPL_COMMENT_HOOK_START
00040 
00041   // @@ Michael: Set the input CDR size to ACE_MAX_DGRAM_SIZE so that
00042   //             we read the whole UDP packet on a single read.
00043   if (flag)
00044     {
00045       // Use the lite version of the protocol
00046       ACE_NEW (this->messaging_object_,
00047                TAO_GIOP_Message_Lite (orb_core,
00048                                       ACE_MAX_DGRAM_SIZE));
00049                                       }
00050   else
00051     {
00052       // Use the normal GIOP object
00053       ACE_NEW (this->messaging_object_,
00054                TAO_GIOP_Message_Base (orb_core,
00055                                       this,
00056                                       ACE_MAX_DGRAM_SIZE));
00057     }
00058 
00059 //@@ MESSAGING_SPL_COMMENT_HOOK_END
00060 
00061 }
00062 
00063 TAO_DIOP_Transport::~TAO_DIOP_Transport (void)
00064 {
00065   delete this->messaging_object_;
00066 }
00067 
00068 ACE_Event_Handler *
00069 TAO_DIOP_Transport::event_handler_i (void)
00070 {
00071   return this->connection_handler_;
00072 }
00073 
00074 TAO_Connection_Handler *
00075 TAO_DIOP_Transport::connection_handler_i (void)
00076 {
00077   return this->connection_handler_;
00078 }
00079 
00080 TAO_Pluggable_Messaging *
00081 TAO_DIOP_Transport::messaging_object (void)
00082 {
00083   return this->messaging_object_;
00084 }
00085 
00086 ssize_t
00087 TAO_DIOP_Transport::send (iovec *iov, int iovcnt,
00088                           size_t &bytes_transferred,
00089                           const ACE_Time_Value *)
00090 {
00091   const ACE_INET_Addr &addr = this->connection_handler_->addr ();
00092 
00093   ssize_t bytes_to_send = 0;
00094   for (int i = 0; i < iovcnt; i++)
00095      bytes_to_send += iov[i].iov_len;
00096 
00097   this->connection_handler_->dgram ().send (iov,
00098                                             iovcnt,
00099                                             addr);
00100   // @@ Michael:
00101   // Always return a positive number of bytes sent, as we do
00102   // not handle sending errors in DIOP.
00103 
00104   bytes_transferred = bytes_to_send;
00105 
00106   return 1;
00107 }
00108 
00109 ssize_t
00110 TAO_DIOP_Transport::recv (char *buf,
00111                           size_t len,
00112                           const ACE_Time_Value * /* max_wait_time */)
00113 {
00114   ACE_INET_Addr from_addr;
00115 
00116   ssize_t n = this->connection_handler_->dgram ().recv (buf,
00117                                                         len,
00118                                                         from_addr);
00119 
00120   if (TAO_debug_level > 0)
00121     {
00122       ACE_DEBUG ((LM_DEBUG,
00123                   "TAO_DIOP_Transport::recv_i: received %d bytes from %s:%d %d\n",
00124                   n,
00125                   ACE_TEXT_CHAR_TO_TCHAR (from_addr.get_host_name ()),
00126                   from_addr.get_port_number (),
00127                   errno));
00128     }
00129 
00130   // Most of the errors handling is common for
00131   // Now the message has been read
00132   if (n == -1 && TAO_debug_level > 4)
00133     {
00134       ACE_DEBUG ((LM_DEBUG,
00135                   ACE_TEXT ("TAO (%P|%t) - %p \n"),
00136                   ACE_TEXT ("TAO - read message failure ")
00137                   ACE_TEXT ("recv () \n")));
00138     }
00139 
00140   // Error handling
00141   if (n == -1)
00142     {
00143       if (errno == EWOULDBLOCK)
00144         return 0;
00145 
00146       return -1;
00147     }
00148   // @@ What are the other error handling here??
00149   else if (n == 0)
00150     {
00151       return -1;
00152     }
00153 
00154   // Remember the from addr to eventually use it as remote
00155   // addr for the reply.
00156   this->connection_handler_->addr (from_addr);
00157 
00158   return n;
00159 }
00160 
00161 int
00162 TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
00163                                   ACE_Time_Value *max_wait_time,
00164                                   int /*block*/)
00165 {
00166   // If there are no messages then we can go ahead to read from the
00167   // handle for further reading..
00168 
00169   // The buffer on the stack which will be used to hold the input
00170   // messages
00171   char buf [ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00172 
00173 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00174   (void) ACE_OS::memset (buf,
00175                          '\0',
00176                          sizeof buf);
00177 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
00178 
00179   // Create a data block
00180   ACE_Data_Block db (sizeof (buf),
00181                      ACE_Message_Block::MB_DATA,
00182                      buf,
00183                      this->orb_core_->input_cdr_buffer_allocator (),
00184                      this->orb_core_->locking_strategy (),
00185                      ACE_Message_Block::DONT_DELETE,
00186                      this->orb_core_->input_cdr_dblock_allocator ());
00187 
00188   // Create a message block
00189   ACE_Message_Block message_block (&db,
00190                                    ACE_Message_Block::DONT_DELETE,
00191                                    this->orb_core_->input_cdr_msgblock_allocator ());
00192 
00193 
00194   // Align the message block
00195   ACE_CDR::mb_align (&message_block);
00196 
00197 
00198   // Read the message into the  message block that we have created on
00199   // the stack.
00200   ssize_t n = this->recv (message_block.rd_ptr (),
00201                           message_block.space (),
00202                           max_wait_time);
00203 
00204   // If there is an error return to the reactor..
00205   if (n <= 0)
00206     {
00207       if (n == -1)
00208         this->tms_->connection_closed ();
00209 
00210       return n;
00211     }
00212 
00213   // Set the write pointer in the stack buffer
00214   message_block.wr_ptr (n);
00215 
00216   // Make a node of the message block..
00217   TAO_Queued_Data qd (&message_block);
00218   size_t mesg_length;
00219 
00220   // Parse the incoming message for validity. The check needs to be
00221   // performed by the messaging objects.
00222   if (this->messaging_object ()->parse_next_message (message_block, 
00223                                                      qd,
00224                                                      mesg_length) == -1) 
00225     return -1;
00226 
00227   if (qd.missing_data_ == TAO_MISSING_DATA_UNDEFINED)
00228     {
00229       // parse/marshal error
00230       return -1;
00231     }
00232   
00233   if (message_block.length () > mesg_length)
00234     {
00235       // we read too much data
00236       return -1;
00237     }
00238 
00239   // NOTE: We are not performing any queueing nor any checking for
00240   // missing data. We are assuming that ALL the data would be got in a
00241   // single read.
00242 
00243   // Process the message
00244   return this->process_parsed_messages (&qd, rh);
00245 }
00246 
00247 
00248 int
00249 TAO_DIOP_Transport::register_handler (void)
00250 {
00251   // @@ Michael:
00252   //
00253   // We do never register register the handler with the reactor
00254   // as we never need to be informed about any incoming data,
00255   // assuming we only use one-ways.
00256   // If we would register and ICMP Messages would arrive, e.g
00257   // due to a not reachable server, we would get informed - as this
00258   // disturbs the general DIOP assumptions of not being
00259   // interested in any network failures, we ignore ICMP messages.
00260   return 0;
00261 }
00262 
00263 
00264 int
00265 TAO_DIOP_Transport::send_request (TAO_Stub *stub,
00266                                   TAO_ORB_Core *orb_core,
00267                                   TAO_OutputCDR &stream,
00268                                   int message_semantics,
00269                                   ACE_Time_Value *max_wait_time)
00270 {
00271   if (this->ws_->sending_request (orb_core,
00272                                   message_semantics) == -1)
00273     return -1;
00274 
00275   if (this->send_message (stream,
00276                           stub,
00277                           message_semantics,
00278                           max_wait_time) == -1)
00279 
00280     return -1;
00281 
00282   this->first_request_sent();
00283 
00284   return 0;
00285 }
00286 
00287 int
00288 TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
00289                                   TAO_Stub *stub,
00290                                   int message_semantics,
00291                                   ACE_Time_Value *max_wait_time)
00292 {
00293   // Format the message in the stream first
00294   if (this->messaging_object_->format_message (stream) != 0)
00295     return -1;
00296 
00297   // Strictly speaking, should not need to loop here because the
00298   // socket never gets set to a nonblocking mode ... some Linux
00299   // versions seem to need it though.  Leaving it costs little.
00300 
00301   // This guarantees to send all data (bytes) or return an error.
00302   ssize_t n = this->send_message_shared (stub,
00303                                          message_semantics,
00304                                          stream.begin (),
00305                                          max_wait_time);
00306 
00307   if (n == -1)
00308     {
00309       if (TAO_debug_level)
00310         ACE_DEBUG ((LM_DEBUG,
00311                     ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
00312                     this->id (),
00313                     ACE_TEXT ("send_message ()\n")));
00314 
00315       return -1;
00316     }
00317 
00318   return 1;
00319 }
00320 
00321 int
00322 TAO_DIOP_Transport::send_message_shared (TAO_Stub *stub,
00323                                          int message_semantics,
00324                                          const ACE_Message_Block *message_block,
00325                                          ACE_Time_Value *max_wait_time)
00326 {
00327   int result;
00328 
00329   {
00330     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00331 
00332     result =
00333       this->send_message_shared_i (stub, message_semantics,
00334                                    message_block, max_wait_time);
00335   }
00336 
00337   if (result == -1)
00338     {
00339       this->close_connection ();
00340     }
00341 
00342   return result;
00343 }
00344 
00345 int
00346 TAO_DIOP_Transport::messaging_init (CORBA::Octet major,
00347                                     CORBA::Octet minor)
00348 {
00349   this->messaging_object_->init (major, minor);
00350   return 1;
00351 }
00352 
00353 TAO_END_VERSIONED_NAMESPACE_DECL
00354 
00355 #endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */

Generated on Thu Nov 9 13:39:28 2006 for TAO_Strategies by doxygen 1.3.6