00001
00002
00003 #include "tao/Strategies/DIOP_Transport.h"
00004
00005 #if defined (TAO_HAS_DIOP) && (TAO_HAS_DIOP != 0)
00006
00007 #include "tao/Strategies/DIOP_Connection_Handler.h"
00008 #include "tao/Strategies/DIOP_Acceptor.h"
00009 #include "tao/Strategies/DIOP_Profile.h"
00010 #include "tao/Acceptor_Registry.h"
00011 #include "tao/operation_details.h"
00012 #include "tao/Timeprobe.h"
00013 #include "tao/CDR.h"
00014 #include "tao/Transport_Mux_Strategy.h"
00015 #include "tao/Wait_Strategy.h"
00016 #include "tao/Stub.h"
00017 #include "tao/ORB_Core.h"
00018 #include "tao/debug.h"
00019 #include "tao/Resume_Handle.h"
00020 #include "tao/GIOP_Message_Base.h"
00021 #include "tao/GIOP_Message_Lite.h"
00022
00023 ACE_RCSID (tao, DIOP_Transport, "DIOP_Transport.cpp,v 1.29 2006/04/20 12:37:17 jwillemsen Exp")
00024
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
00028 TAO_ORB_Core *orb_core,
00029 CORBA::Boolean flag)
00030 : TAO_Transport (TAO_TAG_DIOP_PROFILE,
00031 orb_core)
00032 , connection_handler_ (handler)
00033 , messaging_object_ (0)
00034 {
00035
00036
00037
00038
00039
00040
00041
00042
00043 if (flag)
00044 {
00045
00046 ACE_NEW (this->messaging_object_,
00047 TAO_GIOP_Message_Lite (orb_core,
00048 ACE_MAX_DGRAM_SIZE));
00049 }
00050 else
00051 {
00052
00053 ACE_NEW (this->messaging_object_,
00054 TAO_GIOP_Message_Base (orb_core,
00055 this,
00056 ACE_MAX_DGRAM_SIZE));
00057 }
00058
00059
00060
00061 }
00062
00063 TAO_DIOP_Transport::~TAO_DIOP_Transport (void)
00064 {
00065 delete this->messaging_object_;
00066 }
00067
00068 ACE_Event_Handler *
00069 TAO_DIOP_Transport::event_handler_i (void)
00070 {
00071 return this->connection_handler_;
00072 }
00073
00074 TAO_Connection_Handler *
00075 TAO_DIOP_Transport::connection_handler_i (void)
00076 {
00077 return this->connection_handler_;
00078 }
00079
00080 TAO_Pluggable_Messaging *
00081 TAO_DIOP_Transport::messaging_object (void)
00082 {
00083 return this->messaging_object_;
00084 }
00085
00086 ssize_t
00087 TAO_DIOP_Transport::send (iovec *iov, int iovcnt,
00088 size_t &bytes_transferred,
00089 const ACE_Time_Value *)
00090 {
00091 const ACE_INET_Addr &addr = this->connection_handler_->addr ();
00092
00093 ssize_t bytes_to_send = 0;
00094 for (int i = 0; i < iovcnt; i++)
00095 bytes_to_send += iov[i].iov_len;
00096
00097 this->connection_handler_->dgram ().send (iov,
00098 iovcnt,
00099 addr);
00100
00101
00102
00103
00104 bytes_transferred = bytes_to_send;
00105
00106 return 1;
00107 }
00108
00109 ssize_t
00110 TAO_DIOP_Transport::recv (char *buf,
00111 size_t len,
00112 const ACE_Time_Value * )
00113 {
00114 ACE_INET_Addr from_addr;
00115
00116 ssize_t n = this->connection_handler_->dgram ().recv (buf,
00117 len,
00118 from_addr);
00119
00120 if (TAO_debug_level > 0)
00121 {
00122 ACE_DEBUG ((LM_DEBUG,
00123 "TAO_DIOP_Transport::recv_i: received %d bytes from %s:%d %d\n",
00124 n,
00125 ACE_TEXT_CHAR_TO_TCHAR (from_addr.get_host_name ()),
00126 from_addr.get_port_number (),
00127 errno));
00128 }
00129
00130
00131
00132 if (n == -1 && TAO_debug_level > 4)
00133 {
00134 ACE_DEBUG ((LM_DEBUG,
00135 ACE_TEXT ("TAO (%P|%t) - %p \n"),
00136 ACE_TEXT ("TAO - read message failure ")
00137 ACE_TEXT ("recv () \n")));
00138 }
00139
00140
00141 if (n == -1)
00142 {
00143 if (errno == EWOULDBLOCK)
00144 return 0;
00145
00146 return -1;
00147 }
00148
00149 else if (n == 0)
00150 {
00151 return -1;
00152 }
00153
00154
00155
00156 this->connection_handler_->addr (from_addr);
00157
00158 return n;
00159 }
00160
00161 int
00162 TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
00163 ACE_Time_Value *max_wait_time,
00164 int )
00165 {
00166
00167
00168
00169
00170
00171 char buf [ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
00172
00173 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00174 (void) ACE_OS::memset (buf,
00175 '\0',
00176 sizeof buf);
00177 #endif
00178
00179
00180 ACE_Data_Block db (sizeof (buf),
00181 ACE_Message_Block::MB_DATA,
00182 buf,
00183 this->orb_core_->input_cdr_buffer_allocator (),
00184 this->orb_core_->locking_strategy (),
00185 ACE_Message_Block::DONT_DELETE,
00186 this->orb_core_->input_cdr_dblock_allocator ());
00187
00188
00189 ACE_Message_Block message_block (&db,
00190 ACE_Message_Block::DONT_DELETE,
00191 this->orb_core_->input_cdr_msgblock_allocator ());
00192
00193
00194
00195 ACE_CDR::mb_align (&message_block);
00196
00197
00198
00199
00200 ssize_t n = this->recv (message_block.rd_ptr (),
00201 message_block.space (),
00202 max_wait_time);
00203
00204
00205 if (n <= 0)
00206 {
00207 if (n == -1)
00208 this->tms_->connection_closed ();
00209
00210 return n;
00211 }
00212
00213
00214 message_block.wr_ptr (n);
00215
00216
00217 TAO_Queued_Data qd (&message_block);
00218 size_t mesg_length;
00219
00220
00221
00222 if (this->messaging_object ()->parse_next_message (message_block,
00223 qd,
00224 mesg_length) == -1)
00225 return -1;
00226
00227 if (qd.missing_data_ == TAO_MISSING_DATA_UNDEFINED)
00228 {
00229
00230 return -1;
00231 }
00232
00233 if (message_block.length () > mesg_length)
00234 {
00235
00236 return -1;
00237 }
00238
00239
00240
00241
00242
00243
00244 return this->process_parsed_messages (&qd, rh);
00245 }
00246
00247
00248 int
00249 TAO_DIOP_Transport::register_handler (void)
00250 {
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260 return 0;
00261 }
00262
00263
00264 int
00265 TAO_DIOP_Transport::send_request (TAO_Stub *stub,
00266 TAO_ORB_Core *orb_core,
00267 TAO_OutputCDR &stream,
00268 int message_semantics,
00269 ACE_Time_Value *max_wait_time)
00270 {
00271 if (this->ws_->sending_request (orb_core,
00272 message_semantics) == -1)
00273 return -1;
00274
00275 if (this->send_message (stream,
00276 stub,
00277 message_semantics,
00278 max_wait_time) == -1)
00279
00280 return -1;
00281
00282 this->first_request_sent();
00283
00284 return 0;
00285 }
00286
00287 int
00288 TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
00289 TAO_Stub *stub,
00290 int message_semantics,
00291 ACE_Time_Value *max_wait_time)
00292 {
00293
00294 if (this->messaging_object_->format_message (stream) != 0)
00295 return -1;
00296
00297
00298
00299
00300
00301
00302 ssize_t n = this->send_message_shared (stub,
00303 message_semantics,
00304 stream.begin (),
00305 max_wait_time);
00306
00307 if (n == -1)
00308 {
00309 if (TAO_debug_level)
00310 ACE_DEBUG ((LM_DEBUG,
00311 ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
00312 this->id (),
00313 ACE_TEXT ("send_message ()\n")));
00314
00315 return -1;
00316 }
00317
00318 return 1;
00319 }
00320
00321 int
00322 TAO_DIOP_Transport::send_message_shared (TAO_Stub *stub,
00323 int message_semantics,
00324 const ACE_Message_Block *message_block,
00325 ACE_Time_Value *max_wait_time)
00326 {
00327 int result;
00328
00329 {
00330 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00331
00332 result =
00333 this->send_message_shared_i (stub, message_semantics,
00334 message_block, max_wait_time);
00335 }
00336
00337 if (result == -1)
00338 {
00339 this->close_connection ();
00340 }
00341
00342 return result;
00343 }
00344
00345 int
00346 TAO_DIOP_Transport::messaging_init (CORBA::Octet major,
00347 CORBA::Octet minor)
00348 {
00349 this->messaging_object_->init (major, minor);
00350 return 1;
00351 }
00352
00353 TAO_END_VERSIONED_NAMESPACE_DECL
00354
00355 #endif