00001
00002
00003 #include "tao/Strategies/SHMIOP_Transport.h"
00004
00005 #if defined (TAO_HAS_SHMIOP) && (TAO_HAS_SHMIOP != 0)
00006
00007 #include "tao/Strategies/SHMIOP_Connection_Handler.h"
00008 #include "tao/Strategies/SHMIOP_Profile.h"
00009 #include "tao/Timeprobe.h"
00010 #include "tao/CDR.h"
00011 #include "tao/Transport_Mux_Strategy.h"
00012 #include "tao/Wait_Strategy.h"
00013 #include "tao/Stub.h"
00014 #include "tao/ORB_Core.h"
00015 #include "tao/debug.h"
00016 #include "tao/Resume_Handle.h"
00017 #include "tao/GIOP_Message_Base.h"
00018 #include "tao/GIOP_Message_Lite.h"
00019
00020 ACE_RCSID (Strategies, SHMIOP_Transport, "SHMIOP_Transport.cpp,v 1.43 2006/04/20 12:37:17 jwillemsen Exp")
00021
00022
00023 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handler,
00026 TAO_ORB_Core *orb_core,
00027 CORBA::Boolean flag)
00028 : TAO_Transport (TAO_TAG_SHMEM_PROFILE,
00029 orb_core),
00030 connection_handler_ (handler),
00031 messaging_object_ (0)
00032 {
00033
00034
00035
00036
00037
00038 if (flag)
00039 {
00040
00041 ACE_NEW (this->messaging_object_,
00042 TAO_GIOP_Message_Lite (orb_core));
00043 }
00044 else
00045 {
00046
00047 ACE_NEW (this->messaging_object_,
00048 TAO_GIOP_Message_Base (orb_core, this));
00049 }
00050
00051 }
00052
00053 TAO_SHMIOP_Transport::~TAO_SHMIOP_Transport (void)
00054 {
00055 delete this->messaging_object_;
00056 }
00057
00058 ACE_Event_Handler *
00059 TAO_SHMIOP_Transport::event_handler_i (void)
00060 {
00061 return this->connection_handler_;
00062 }
00063
00064 TAO_Connection_Handler *
00065 TAO_SHMIOP_Transport::connection_handler_i (void)
00066 {
00067 return this->connection_handler_;
00068 }
00069
00070 TAO_Pluggable_Messaging *
00071 TAO_SHMIOP_Transport::messaging_object (void)
00072 {
00073 return this->messaging_object_;
00074 }
00075
00076
00077 ssize_t
00078 TAO_SHMIOP_Transport::send (iovec *iov, int iovcnt,
00079 size_t &bytes_transferred,
00080 const ACE_Time_Value *max_wait_time)
00081 {
00082 bytes_transferred = 0;
00083 for (int i = 0; i < iovcnt; ++i)
00084 {
00085 ssize_t retval =
00086 this->connection_handler_->peer ().send (iov[i].iov_base,
00087 iov[i].iov_len,
00088 max_wait_time);
00089 if (retval > 0)
00090 bytes_transferred += retval;
00091 if (retval <= 0)
00092 return retval;
00093 }
00094 return bytes_transferred;
00095 }
00096
00097 ssize_t
00098 TAO_SHMIOP_Transport::recv (char *buf,
00099 size_t len,
00100 const ACE_Time_Value *max_wait_time)
00101 {
00102 ssize_t n = 0;
00103
00104 int read_break = 0;
00105
00106 while (!read_break)
00107 {
00108 n = this->connection_handler_->peer ().recv (buf,
00109 len,
00110 max_wait_time);
00111
00112
00113 if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
00114 {
00115 n = 0;
00116 continue;
00117 }
00118
00119
00120 read_break = 1;
00121 }
00122
00123 if (n == -1)
00124 {
00125 if (TAO_debug_level > 3 && errno != ETIME)
00126 {
00127 ACE_DEBUG ((LM_DEBUG,
00128 ACE_TEXT ("TAO (%P|%t) - %p \n"),
00129 ACE_TEXT ("TAO - read message failure ")
00130 ACE_TEXT ("recv_i () \n")));
00131 }
00132 }
00133 else if (n == 0)
00134 {
00135 n = -1;
00136 }
00137 return n;
00138 }
00139
00140 int
00141 TAO_SHMIOP_Transport::handle_input (TAO_Resume_Handle &rh,
00142 ACE_Time_Value *max_wait_time,
00143 int)
00144 {
00145 if (TAO_debug_level > 3)
00146 {
00147 ACE_DEBUG ((LM_DEBUG,
00148 "TAO (%P|%t) - SHMIOP_Transport[%d]::handle_input\n",
00149 this->id ()));
00150 }
00151
00152
00153
00154 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
00155
00156
00157 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00158 (void) ACE_OS::memset (buf,
00159 '\0',
00160 sizeof buf);
00161 #endif
00162
00163
00164 ACE_Data_Block db (sizeof (buf),
00165 ACE_Message_Block::MB_DATA,
00166 buf,
00167 this->orb_core_->input_cdr_buffer_allocator (),
00168 this->orb_core_->locking_strategy (),
00169 ACE_Message_Block::DONT_DELETE,
00170 this->orb_core_->input_cdr_dblock_allocator ());
00171
00172
00173 ACE_Message_Block message_block (&db,
00174 ACE_Message_Block::DONT_DELETE,
00175 this->orb_core_->input_cdr_msgblock_allocator ());
00176
00177
00178
00179 ACE_CDR::mb_align (&message_block);
00180
00181 const size_t missing_header_data = this->messaging_object ()->header_length ();
00182
00183 if (missing_header_data == 0)
00184 {
00185 return -1;
00186 }
00187
00188
00189 ssize_t bytes = 0;
00190
00191
00192
00193 for (size_t m = missing_header_data;
00194 m != 0;
00195 m -= bytes)
00196 {
00197 bytes = 0;
00198
00199
00200
00201
00202
00203
00204
00205
00206 bytes = this->recv (message_block.wr_ptr (),
00207 m,
00208 max_wait_time);
00209
00210 if (bytes == 0 ||
00211 bytes == -1)
00212 {
00213 return -1;
00214 }
00215
00216 message_block.wr_ptr (bytes);
00217 }
00218
00219 TAO_Queued_Data qd (&message_block);
00220 size_t mesg_length;
00221
00222
00223
00224 if (this->messaging_object ()->parse_next_message (message_block,
00225 qd,
00226 mesg_length) == -1)
00227 return -1;
00228
00229 if (qd.missing_data_ == TAO_MISSING_DATA_UNDEFINED)
00230 {
00231
00232 return -1;
00233 }
00234
00235 if (message_block.length () > mesg_length)
00236 {
00237
00238 return -1;
00239 }
00240
00241 if (message_block.space () < qd.missing_data_)
00242 {
00243 const size_t message_size = message_block.length ()
00244 + qd.missing_data_;
00245
00246
00247 if (ACE_CDR::grow (&message_block, message_size) == -1)
00248 {
00249 if (TAO_debug_level > 0)
00250 {
00251 ACE_ERROR ((LM_ERROR,
00252 "TAO (%P|%t) - SHMIOP_Transport[%d]::handle_input, "
00253 "error growing message buffer\n",
00254 this->id () ));
00255 }
00256 return -1;
00257 }
00258
00259 }
00260
00261
00262
00263 for (size_t n = qd.missing_data_;
00264 n != 0;
00265 n -= bytes)
00266 {
00267 bytes = 0;
00268
00269
00270
00271
00272
00273
00274
00275
00276 bytes = this->recv (message_block.wr_ptr (),
00277 n,
00278 max_wait_time);
00279
00280 if (bytes == 0 ||
00281 bytes == -1)
00282 {
00283 return -1;
00284 }
00285
00286 message_block.wr_ptr (bytes);
00287
00288 }
00289
00290 qd.missing_data_ = 0;
00291
00292
00293
00294 if (this->process_parsed_messages (&qd, rh) == -1)
00295 {
00296 return -1;
00297 }
00298
00299 return 0;
00300 }
00301
00302
00303
00304 int
00305 TAO_SHMIOP_Transport::send_request (TAO_Stub *stub,
00306 TAO_ORB_Core *orb_core,
00307 TAO_OutputCDR &stream,
00308 int message_semantics,
00309 ACE_Time_Value *max_wait_time)
00310 {
00311 if (this->ws_->sending_request (orb_core,
00312 message_semantics) == -1)
00313 return -1;
00314
00315 if (this->send_message (stream,
00316 stub,
00317 message_semantics,
00318 max_wait_time) == -1)
00319
00320 return -1;
00321 this->first_request_sent();
00322
00323 return 0;
00324 }
00325
00326 int
00327 TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
00328 TAO_Stub *stub,
00329 int message_semantics,
00330 ACE_Time_Value *max_wait_time)
00331 {
00332
00333 if (this->messaging_object_->format_message (stream) != 0)
00334 return -1;
00335
00336
00337
00338
00339
00340
00341 ssize_t n = this->send_message_shared (stub,
00342 message_semantics,
00343 stream.begin (),
00344 max_wait_time);
00345
00346 if (n == -1)
00347 {
00348 if (TAO_debug_level)
00349 ACE_DEBUG ((LM_DEBUG,
00350 ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
00351 this->id (),
00352 ACE_TEXT ("send_message ()\n")));
00353
00354 return -1;
00355 }
00356
00357 return 1;
00358 }
00359
00360
00361 int
00362 TAO_SHMIOP_Transport::messaging_init (CORBA::Octet major,
00363 CORBA::Octet minor)
00364 {
00365 this->messaging_object_->init (major,
00366 minor);
00367 return 1;
00368 }
00369
00370 TAO_END_VERSIONED_NAMESPACE_DECL
00371
00372 #endif