00001
00002
00003 #include "tao/Strategies/DIOP_Transport.h"
00004
00005 #if defined (TAO_HAS_DIOP) && (TAO_HAS_DIOP != 0)
00006
00007 #include "tao/Strategies/DIOP_Connection_Handler.h"
00008 #include "tao/Strategies/DIOP_Acceptor.h"
00009 #include "tao/Strategies/DIOP_Profile.h"
00010 #include "tao/Acceptor_Registry.h"
00011 #include "tao/operation_details.h"
00012 #include "tao/Timeprobe.h"
00013 #include "tao/CDR.h"
00014 #include "tao/Transport_Mux_Strategy.h"
00015 #include "tao/Wait_Strategy.h"
00016 #include "tao/Stub.h"
00017 #include "tao/ORB_Core.h"
00018 #include "tao/debug.h"
00019 #include "tao/Resume_Handle.h"
00020 #include "tao/GIOP_Message_Base.h"
00021
00022 ACE_RCSID (tao, DIOP_Transport, "$Id: DIOP_Transport.cpp 79151 2007-08-01 09:04:36Z johnnyw $")
00023
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025
00026 TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
00027 TAO_ORB_Core *orb_core)
00028 : TAO_Transport (TAO_TAG_DIOP_PROFILE,
00029 orb_core)
00030 , connection_handler_ (handler)
00031 , messaging_object_ (0)
00032 {
00033
00034
00035
00036
00037
00038
00039
00040 ACE_NEW (this->messaging_object_,
00041 TAO_GIOP_Message_Base (orb_core,
00042 this,
00043 ACE_MAX_DGRAM_SIZE));
00044
00045
00046
00047 }
00048
00049 TAO_DIOP_Transport::~TAO_DIOP_Transport (void)
00050 {
00051 delete this->messaging_object_;
00052 }
00053
00054 ACE_Event_Handler *
00055 TAO_DIOP_Transport::event_handler_i (void)
00056 {
00057 return this->connection_handler_;
00058 }
00059
00060 TAO_Connection_Handler *
00061 TAO_DIOP_Transport::connection_handler_i (void)
00062 {
00063 return this->connection_handler_;
00064 }
00065
00066 TAO_Pluggable_Messaging *
00067 TAO_DIOP_Transport::messaging_object (void)
00068 {
00069 return this->messaging_object_;
00070 }
00071
00072 ssize_t
00073 TAO_DIOP_Transport::send (iovec *iov, int iovcnt,
00074 size_t &bytes_transferred,
00075 const ACE_Time_Value *)
00076 {
00077 const ACE_INET_Addr &addr = this->connection_handler_->addr ();
00078
00079 ssize_t bytes_to_send = 0;
00080 for (int i = 0; i < iovcnt; i++)
00081 bytes_to_send += iov[i].iov_len;
00082
00083 this->connection_handler_->peer ().send (iov, iovcnt, addr);
00084
00085
00086
00087
00088
00089 bytes_transferred = bytes_to_send;
00090
00091 return 1;
00092 }
00093
00094 ssize_t
00095 TAO_DIOP_Transport::recv (char *buf,
00096 size_t len,
00097 const ACE_Time_Value * )
00098 {
00099 ACE_INET_Addr from_addr;
00100
00101 ssize_t const n = this->connection_handler_->peer ().recv (buf, len, from_addr);
00102
00103 if (TAO_debug_level > 0)
00104 {
00105 ACE_DEBUG ((LM_DEBUG,
00106 "TAO (%P|%t) - DIOP_Transport::recv, received %d bytes from %s:%d %d\n",
00107 n,
00108 ACE_TEXT_CHAR_TO_TCHAR (from_addr.get_host_name ()),
00109 from_addr.get_port_number (),
00110 errno));
00111 }
00112
00113
00114
00115 if (n == -1 && TAO_debug_level > 4)
00116 {
00117 ACE_DEBUG ((LM_DEBUG,
00118 ACE_TEXT ("TAO (%P|%t) - DIOP_Transport::recv, %p \n"),
00119 ACE_TEXT ("TAO - read message failure ")
00120 ACE_TEXT ("recv () \n")));
00121 }
00122
00123
00124 if (n == -1)
00125 {
00126 if (errno == EWOULDBLOCK)
00127 return 0;
00128
00129 return -1;
00130 }
00131
00132 else if (n == 0)
00133 {
00134 return -1;
00135 }
00136
00137
00138
00139 this->connection_handler_->addr (from_addr);
00140
00141 return n;
00142 }
00143
00144 int
00145 TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
00146 ACE_Time_Value *max_wait_time)
00147 {
00148
00149
00150
00151
00152
00153 char buf [ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00154
00155 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00156 (void) ACE_OS::memset (buf,
00157 '\0',
00158 sizeof buf);
00159 #endif
00160
00161
00162 ACE_Data_Block db (sizeof (buf),
00163 ACE_Message_Block::MB_DATA,
00164 buf,
00165 this->orb_core_->input_cdr_buffer_allocator (),
00166 this->orb_core_->locking_strategy (),
00167 ACE_Message_Block::DONT_DELETE,
00168 this->orb_core_->input_cdr_dblock_allocator ());
00169
00170
00171 ACE_Message_Block message_block (&db,
00172 ACE_Message_Block::DONT_DELETE,
00173 this->orb_core_->input_cdr_msgblock_allocator ());
00174
00175
00176
00177 ACE_CDR::mb_align (&message_block);
00178
00179
00180
00181
00182 ssize_t n = this->recv (message_block.rd_ptr (),
00183 message_block.space (),
00184 max_wait_time);
00185
00186
00187 if (n <= 0)
00188 {
00189 if (n == -1)
00190 {
00191 this->tms_->connection_closed ();
00192 }
00193
00194 return n;
00195 }
00196
00197
00198 message_block.wr_ptr (n);
00199
00200
00201 TAO_Queued_Data qd (&message_block);
00202 size_t mesg_length = 0;
00203
00204
00205
00206 if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1)
00207 return -1;
00208
00209 if (qd.missing_data () == TAO_MISSING_DATA_UNDEFINED)
00210 {
00211
00212 return -1;
00213 }
00214
00215 if (message_block.length () > mesg_length)
00216 {
00217
00218 return -1;
00219 }
00220
00221
00222
00223
00224
00225
00226 return this->process_parsed_messages (&qd, rh);
00227 }
00228
00229
00230 int
00231 TAO_DIOP_Transport::register_handler (void)
00232 {
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242 return 0;
00243 }
00244
00245
00246 int
00247 TAO_DIOP_Transport::send_request (TAO_Stub *stub,
00248 TAO_ORB_Core *orb_core,
00249 TAO_OutputCDR &stream,
00250 int message_semantics,
00251 ACE_Time_Value *max_wait_time)
00252 {
00253 if (this->ws_->sending_request (orb_core, message_semantics) == -1)
00254 return -1;
00255
00256 if (this->send_message (stream,
00257 stub,
00258 message_semantics,
00259 max_wait_time) == -1)
00260
00261 return -1;
00262
00263 this->first_request_sent ();
00264
00265 return 0;
00266 }
00267
00268 int
00269 TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
00270 TAO_Stub *stub,
00271 int message_semantics,
00272 ACE_Time_Value *max_wait_time)
00273 {
00274
00275 if (this->messaging_object_->format_message (stream) != 0)
00276 return -1;
00277
00278
00279
00280
00281
00282
00283 ssize_t const n = this->send_message_shared (stub,
00284 message_semantics,
00285 stream.begin (),
00286 max_wait_time);
00287
00288 if (n == -1)
00289 {
00290 if (TAO_debug_level)
00291 ACE_DEBUG ((LM_DEBUG,
00292 ACE_TEXT ("TAO (%P|%t) - DIOP_Transport::send_message, ")
00293 ACE_TEXT ("closing transport %d after fault %p\n"),
00294 this->id (),
00295 ACE_TEXT ("send_message ()\n")));
00296
00297 return -1;
00298 }
00299
00300 return 1;
00301 }
00302
00303 int
00304 TAO_DIOP_Transport::send_message_shared (TAO_Stub *stub,
00305 int message_semantics,
00306 const ACE_Message_Block *message_block,
00307 ACE_Time_Value *max_wait_time)
00308 {
00309 int result;
00310
00311 {
00312 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00313
00314 result =
00315 this->send_message_shared_i (stub, message_semantics,
00316 message_block, max_wait_time);
00317 }
00318
00319 if (result == -1)
00320 {
00321 this->close_connection ();
00322 }
00323
00324 return result;
00325 }
00326
00327 int
00328 TAO_DIOP_Transport::messaging_init (CORBA::Octet major,
00329 CORBA::Octet minor)
00330 {
00331 this->messaging_object_->init (major, minor);
00332 return 1;
00333 }
00334
00335 TAO_END_VERSIONED_NAMESPACE_DECL
00336
00337 #endif