00001
00002
00003 #include "tao/Strategies/SHMIOP_Transport.h"
00004
00005 #if defined (TAO_HAS_SHMIOP) && (TAO_HAS_SHMIOP != 0)
00006
00007 #include "tao/Strategies/SHMIOP_Connection_Handler.h"
00008 #include "tao/Strategies/SHMIOP_Profile.h"
00009 #include "tao/Timeprobe.h"
00010 #include "tao/CDR.h"
00011 #include "tao/Transport_Mux_Strategy.h"
00012 #include "tao/Wait_Strategy.h"
00013 #include "tao/Stub.h"
00014 #include "tao/ORB_Core.h"
00015 #include "tao/debug.h"
00016 #include "tao/Resume_Handle.h"
00017 #include "tao/GIOP_Message_Base.h"
00018
00019 ACE_RCSID (Strategies, SHMIOP_Transport, "$Id: SHMIOP_Transport.cpp 80288 2007-12-17 20:05:04Z johnnyw $")
00020
00021
00022 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00023
00024 TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handler,
00025 TAO_ORB_Core *orb_core)
00026 : TAO_Transport (TAO_TAG_SHMEM_PROFILE,
00027 orb_core),
00028 connection_handler_ (handler)
00029 {
00030 }
00031
00032 TAO_SHMIOP_Transport::~TAO_SHMIOP_Transport (void)
00033 {
00034 }
00035
00036 ACE_Event_Handler *
00037 TAO_SHMIOP_Transport::event_handler_i (void)
00038 {
00039 return this->connection_handler_;
00040 }
00041
00042 TAO_Connection_Handler *
00043 TAO_SHMIOP_Transport::connection_handler_i (void)
00044 {
00045 return this->connection_handler_;
00046 }
00047
00048 ssize_t
00049 TAO_SHMIOP_Transport::send (iovec *iov, int iovcnt,
00050 size_t &bytes_transferred,
00051 const ACE_Time_Value *max_wait_time)
00052 {
00053 bytes_transferred = 0;
00054 for (int i = 0; i < iovcnt; ++i)
00055 {
00056 ssize_t retval =
00057 this->connection_handler_->peer ().send (iov[i].iov_base,
00058 iov[i].iov_len,
00059 max_wait_time);
00060 if (retval > 0)
00061 bytes_transferred += retval;
00062 if (retval <= 0)
00063 return retval;
00064 }
00065 return bytes_transferred;
00066 }
00067
00068 ssize_t
00069 TAO_SHMIOP_Transport::recv (char *buf,
00070 size_t len,
00071 const ACE_Time_Value *max_wait_time)
00072 {
00073 ssize_t n = 0;
00074
00075 int read_break = 0;
00076
00077 while (!read_break)
00078 {
00079 n = this->connection_handler_->peer ().recv (buf,
00080 len,
00081 max_wait_time);
00082
00083
00084 if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
00085 {
00086 n = 0;
00087 continue;
00088 }
00089
00090
00091 read_break = 1;
00092 }
00093
00094 if (n == -1)
00095 {
00096 if (TAO_debug_level > 3 && errno != ETIME)
00097 {
00098 ACE_DEBUG ((LM_DEBUG,
00099 ACE_TEXT ("TAO (%P|%t) - SHMIOP_Transport::recv, %p \n"),
00100 ACE_TEXT ("TAO - read message failure ")
00101 ACE_TEXT ("recv_i () \n")));
00102 }
00103 }
00104 else if (n == 0)
00105 {
00106 n = -1;
00107 }
00108 return n;
00109 }
00110
00111 int
00112 TAO_SHMIOP_Transport::handle_input (TAO_Resume_Handle &rh,
00113 ACE_Time_Value *max_wait_time)
00114 {
00115 if (TAO_debug_level > 3)
00116 {
00117 ACE_DEBUG ((LM_DEBUG,
00118 "TAO (%P|%t) - SHMIOP_Transport[%d]::handle_input\n",
00119 this->id ()));
00120 }
00121
00122
00123
00124 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
00125
00126
00127 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00128 (void) ACE_OS::memset (buf,
00129 '\0',
00130 sizeof buf);
00131 #endif
00132
00133
00134 ACE_Data_Block db (sizeof (buf),
00135 ACE_Message_Block::MB_DATA,
00136 buf,
00137 this->orb_core_->input_cdr_buffer_allocator (),
00138 this->orb_core_->locking_strategy (),
00139 ACE_Message_Block::DONT_DELETE,
00140 this->orb_core_->input_cdr_dblock_allocator ());
00141
00142
00143 ACE_Message_Block message_block (&db,
00144 ACE_Message_Block::DONT_DELETE,
00145 this->orb_core_->input_cdr_msgblock_allocator ());
00146
00147
00148
00149 ACE_CDR::mb_align (&message_block);
00150
00151 const size_t missing_header_data = this->messaging_object ()->header_length ();
00152
00153 if (missing_header_data == 0)
00154 {
00155 return -1;
00156 }
00157
00158
00159 ssize_t bytes = 0;
00160
00161
00162
00163 for (size_t m = missing_header_data;
00164 m != 0;
00165 m -= bytes)
00166 {
00167 bytes = 0;
00168
00169
00170
00171
00172
00173
00174
00175
00176 bytes = this->recv (message_block.wr_ptr (),
00177 m,
00178 max_wait_time);
00179
00180 if (bytes == 0 ||
00181 bytes == -1)
00182 {
00183 return -1;
00184 }
00185
00186 message_block.wr_ptr (bytes);
00187 }
00188
00189 TAO_Queued_Data qd (&message_block);
00190 size_t mesg_length = 0;
00191
00192
00193
00194 if (this->messaging_object ()->parse_next_message (qd,
00195 mesg_length) == -1)
00196 return -1;
00197
00198 if (qd.missing_data () == TAO_MISSING_DATA_UNDEFINED)
00199 {
00200
00201 return -1;
00202 }
00203
00204 if (message_block.length () > mesg_length)
00205 {
00206
00207 return -1;
00208 }
00209
00210 if (message_block.space () < qd.missing_data ())
00211 {
00212 size_t const message_size = message_block.length ()
00213 + qd.missing_data ();
00214
00215
00216 if (ACE_CDR::grow (&message_block, message_size) == -1)
00217 {
00218 if (TAO_debug_level > 0)
00219 {
00220 ACE_ERROR ((LM_ERROR,
00221 "TAO (%P|%t) - SHMIOP_Transport[%d]::handle_input, "
00222 "error growing message buffer\n",
00223 this->id () ));
00224 }
00225 return -1;
00226 }
00227
00228 }
00229
00230
00231
00232 for (size_t n = qd.missing_data ();
00233 n != 0;
00234 n -= bytes)
00235 {
00236 bytes = 0;
00237
00238
00239
00240
00241
00242
00243
00244
00245 bytes = this->recv (message_block.wr_ptr (),
00246 n,
00247 max_wait_time);
00248
00249 if (bytes == 0 ||
00250 bytes == -1)
00251 {
00252 return -1;
00253 }
00254
00255 message_block.wr_ptr (bytes);
00256 }
00257
00258 qd.missing_data (0);
00259
00260
00261
00262 if (this->process_parsed_messages (&qd, rh) == -1)
00263 {
00264 return -1;
00265 }
00266
00267 return 0;
00268 }
00269
00270
00271
00272 int
00273 TAO_SHMIOP_Transport::send_request (TAO_Stub *stub,
00274 TAO_ORB_Core *orb_core,
00275 TAO_OutputCDR &stream,
00276 TAO_Message_Semantics message_semantics,
00277 ACE_Time_Value *max_wait_time)
00278 {
00279 if (this->ws_->sending_request (orb_core,
00280 message_semantics) == -1)
00281 return -1;
00282
00283 if (this->send_message (stream,
00284 stub,
00285 message_semantics,
00286 max_wait_time) == -1)
00287
00288 return -1;
00289 this->first_request_sent();
00290
00291 return 0;
00292 }
00293
00294 int
00295 TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
00296 TAO_Stub *stub,
00297 TAO_Message_Semantics message_semantics,
00298 ACE_Time_Value *max_wait_time)
00299 {
00300
00301 if (this->messaging_object_->format_message (stream) != 0)
00302 return -1;
00303
00304
00305
00306
00307
00308
00309 ssize_t n = this->send_message_shared (stub,
00310 message_semantics,
00311 stream.begin (),
00312 max_wait_time);
00313
00314 if (n == -1)
00315 {
00316 if (TAO_debug_level)
00317 ACE_DEBUG ((LM_DEBUG,
00318 ACE_TEXT ("TAO (%P|%t) closing transport %d after fault %p\n"),
00319 this->id (),
00320 ACE_TEXT ("send_message ()\n")));
00321
00322 return -1;
00323 }
00324
00325 return 1;
00326 }
00327
00328 TAO_END_VERSIONED_NAMESPACE_DECL
00329
00330 #endif