00001
00002
00003 #ifndef ACE_IOS_STREAM_HANDLER_CPP
00004 #define ACE_IOS_STREAM_HANDLER_CPP
00005
00006 #include "ace/INet/INet_Log.h"
00007 #include "ace/INet/StreamHandler.h"
00008 #include "ace/OS_NS_Thread.h"
00009 #include "ace/OS_NS_errno.h"
00010 #include "ace/Countdown_Time.h"
00011 #include "ace/Truncate.h"
00012
00013 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00014
00015 namespace ACE
00016 {
00017 namespace IOS
00018 {
00019 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00020 StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::StreamHandler (
00021 const ACE_Synch_Options &synch_options,
00022 ACE_Thread_Manager *thr_mgr,
00023 mq_type *mq,
00024 ACE_Reactor *reactor)
00025 : ACE_Svc_Handler<ACE_PEER_STREAM, ACE_SYNCH_USE> (thr_mgr, mq, reactor),
00026 connected_ (false),
00027 send_timeout_ (false),
00028 receive_timeout_ (false),
00029 notification_strategy_ (reactor,
00030 this,
00031 ACE_Event_Handler::WRITE_MASK)
00032 {
00033 INET_TRACE ("ACE_IOS_StreamHandler - ctor");
00034
00035 unsigned long opt = synch_options[ACE_Synch_Options::USE_REACTOR] ?
00036 ACE_Synch_Options::USE_REACTOR : 0;
00037 if (synch_options[ACE_Synch_Options::USE_TIMEOUT])
00038 opt |= ACE_Synch_Options::USE_TIMEOUT;
00039 this->sync_opt_.set (opt,
00040 synch_options.timeout (),
00041 synch_options.arg ());
00042 }
00043
00044 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00045 StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::~StreamHandler ()
00046 {
00047 INET_TRACE ("ACE_IOS_StreamHandler - dtor");
00048
00049 this->connected_ = false;
00050 }
00051
00052 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00053 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::open (void * )
00054 {
00055 this->connected_ = true;
00056 return 0;
00057 }
00058
00059 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00060 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::close (u_long flags)
00061 {
00062 this->connected_ = false;
00063 return base_type::close (flags);
00064 }
00065
00066 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00067 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::handle_input (ACE_HANDLE)
00068 {
00069
00070 ACE_Time_Value to = ACE_Time_Value::zero;
00071 return this->handle_input_i (MAX_INPUT_SIZE, &to);
00072 }
00073
00074 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00075 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::handle_input_i (size_t rdlen, ACE_Time_Value* timeout)
00076 {
00077 INET_TRACE ("ACE_IOS_StreamHandler::handle_input_i");
00078
00079 char buffer[MAX_INPUT_SIZE];
00080 ssize_t recv_cnt;
00081 size_t bytes_in = 0;
00082
00083
00084 bool no_wait = timeout && (*timeout == ACE_Time_Value::zero);
00085
00086 recv_cnt = this->peer ().recv_n (buffer,
00087 rdlen <= sizeof(buffer) ? rdlen : sizeof(buffer),
00088 timeout,
00089 &bytes_in);
00090
00091 if (bytes_in > 0)
00092 {
00093 INET_HEX_DUMP (11, (LM_DEBUG, buffer, bytes_in, DLINFO
00094 ACE_TEXT ("ACE_IOS_StreamHandler::handle_input_i <--")));
00095
00096 ACE_Message_Block *mb = 0;
00097 ACE_NEW_RETURN (mb, ACE_Message_Block (bytes_in), -1);
00098 mb->copy (buffer, bytes_in);
00099 ACE_Time_Value nowait (ACE_OS::gettimeofday ());
00100 if (this->putq (mb, &nowait) == -1)
00101 {
00102 INET_ERROR (1, (LM_ERROR, DLINFO
00103 ACE_TEXT ("ACE_IOS_StreamHandler - discarding input data, "),
00104 ACE_TEXT ("enqueue failed (%d)\n"),
00105 ACE_OS::last_error ()));
00106 mb->release ();
00107 this->connected_ = false;
00108 return -1;
00109 }
00110 }
00111
00112 if (recv_cnt == 0 || (recv_cnt < 0 && !no_wait))
00113 {
00114 if (recv_cnt < 0)
00115 {
00116 INET_ERROR (1, (LM_ERROR, DLINFO
00117 ACE_TEXT ("ACE_IOS_StreamHandler - receive failed (%d)\n"),
00118 ACE_OS::last_error ()));
00119 }
00120 this->connected_ = false;
00121 return this->using_reactor () ? -1 : 0;
00122 }
00123 return 0;
00124 }
00125
00126 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00127 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::handle_output (ACE_HANDLE)
00128 {
00129 if (this->use_timeout ())
00130 {
00131 ACE_Time_Value to = this->sync_opt_.timeout ();
00132 return this->handle_output_i (&to);
00133 }
00134 else
00135 return this->handle_output_i (0);
00136 }
00137
00138 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00139 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::handle_output_i (ACE_Time_Value* timeout)
00140 {
00141 INET_TRACE ("ACE_IOS_StreamHandler::handle_output_i");
00142
00143 ACE_Message_Block *mb = 0;
00144 ACE_Time_Value nowait (ACE_OS::gettimeofday ());
00145 size_t bytes_out = 0;
00146 if (-1 != this->getq (mb, &nowait))
00147 {
00148 ssize_t send_cnt =
00149 this->peer ().send_n (mb->rd_ptr (), mb->length (), timeout, &bytes_out);
00150 if (bytes_out > 0)
00151 {
00152 INET_HEX_DUMP (11, (LM_DEBUG, mb->rd_ptr (), bytes_out, DLINFO
00153 ACE_TEXT ("ACE_IOS_StreamHandler::handle_output_i -->")));
00154
00155 mb->rd_ptr (static_cast<size_t> (bytes_out));
00156 if (mb->length () > 0)
00157 this->ungetq (mb);
00158 else
00159 mb->release ();
00160 }
00161 if (send_cnt <= 0)
00162 {
00163 INET_ERROR (1, (LM_ERROR, DLINFO
00164 ACE_TEXT ("%p; ACE_IOS_StreamHandler - "),
00165 ACE_TEXT ("send failed\n")));
00166 this->connected_ = false;
00167 return this->using_reactor () ? -1 : 0;
00168 }
00169 }
00170 return (this->msg_queue ()->is_empty ()) ? -1 : 0;
00171 }
00172
00173 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00174 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::read_from_stream (
00175 void * buf,
00176 size_t length,
00177 u_short char_size)
00178 {
00179 INET_TRACE ("ACE_IOS_StreamHandler::read_from_stream");
00180
00181 size_t recv_char_count = 0;
00182 char* wptr = (char*)buf;
00183 size_t char_length = length * char_size;
00184 ACE_Time_Value max_wait_time = this->sync_opt_.timeout ();
00185 int result = 0;
00186 if (this->using_reactor ())
00187 {
00188 ACE_thread_t tid;
00189 this->reactor ()->owner (&tid);
00190 bool reactor_thread =
00191 ACE_OS::thr_equal (ACE_Thread::self (), tid) ? true : false;
00192
00193 if (this->connected_)
00194 {
00195 if (this->reactor ()->register_handler(this,
00196 ACE_Event_Handler::READ_MASK) != 0)
00197 {
00198 return -1;
00199 }
00200 }
00201
00202
00203
00204 while ((this->connected_ || this->char_in_queue (char_size)) && char_length > 0)
00205 {
00206 result = 0;
00207 if (reactor_thread && !this->char_in_queue (char_size))
00208 {
00209
00210 result = this->reactor ()->handle_events (this->use_timeout () ?
00211 &max_wait_time : 0);
00212 }
00213
00214 if (result != -1)
00215 {
00216 result = this->process_input (&wptr[recv_char_count],
00217 char_length,
00218 char_size,
00219 this->use_timeout () ?
00220 &max_wait_time : 0);
00221 }
00222
00223 if (result == -1)
00224 {
00225 this->reactor ()->remove_handler (this,
00226 ACE_Event_Handler::READ_MASK);
00227 return -1;
00228 }
00229
00230 recv_char_count += result;
00231
00232 if (recv_char_count > 0)
00233 {
00234 break;
00235 }
00236
00237 if (this->use_timeout () &&
00238 max_wait_time == ACE_Time_Value::zero)
00239 {
00240 this->reactor ()->remove_handler (this,
00241 ACE_Event_Handler::READ_MASK);
00242 this->receive_timeout_ = true;
00243 return -1;
00244 }
00245 }
00246
00247 this->reactor ()->remove_handler (this,
00248 ACE_Event_Handler::READ_MASK);
00249 }
00250 else
00251 {
00252
00253
00254
00255
00256
00257 size_t rdlen = MAX_INPUT_SIZE;
00258 ACE_Time_Value timeout = ACE_Time_Value::zero;
00259 ACE_Time_Value* to = &timeout;
00260 while ((this->connected_ || this->char_in_queue (char_size)) && char_length > 0)
00261 {
00262 if (!this->char_in_queue (char_size))
00263 {
00264
00265 result = this->handle_input_i (rdlen, to);
00266 }
00267
00268 if (result == -1)
00269 return result;
00270
00271 result = this->process_input (&wptr[recv_char_count],
00272 char_length,
00273 char_size,
00274 this->use_timeout () ?
00275 &max_wait_time : 0);
00276
00277 if (result == -1)
00278 return result;
00279
00280 recv_char_count += result;
00281
00282 if (recv_char_count > 0)
00283 {
00284
00285
00286 break;
00287 }
00288
00289 if (this->use_timeout () &&
00290 max_wait_time == ACE_Time_Value::zero)
00291 {
00292 this->receive_timeout_ = true;
00293 return -1;
00294 }
00295
00296 if (this->connected_ && char_length >0)
00297 {
00298
00299
00300 rdlen = char_size;
00301 to = this->use_timeout () ? &max_wait_time : 0;
00302 }
00303 }
00304 }
00305
00306 return ACE_Utils::truncate_cast<int> (recv_char_count / char_size);
00307 }
00308
00309
00310 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00311 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::process_input (
00312 char* buf,
00313 size_t& char_length,
00314 u_short char_size,
00315 ACE_Time_Value* timeout)
00316 {
00317 INET_TRACE ("ACE_IOS_StreamHandler::process_input");
00318
00319 ACE_Time_Value wait (ACE_OS::gettimeofday ());
00320
00321 ACE_Countdown_Time timeout_countdown (timeout);
00322
00323
00324 if (timeout)
00325 {
00326 wait += *timeout;
00327 timeout_countdown.start ();
00328 }
00329 ACE_Message_Block *mb_remain = 0;
00330 size_t recv_char_count = 0;
00331 while (!this->msg_queue ()->is_empty () && char_length > 0)
00332 {
00333 ACE_Message_Block *mb = 0;
00334 if (this->getq (mb, &wait) == -1)
00335 {
00336 if (ACE_OS::last_error () == EWOULDBLOCK)
00337 break;
00338 else
00339 return -1;
00340 }
00341
00342 size_t copy_len = 0;
00343
00344 if (mb_remain)
00345 {
00346 if ((mb_remain->length () + mb->length ()) < char_size)
00347 {
00348 ACE_Message_Block *mb_new = 0;
00349 ACE_NEW_NORETURN (mb,
00350 ACE_Message_Block (mb_remain->length () + mb->length ()));
00351 if (mb_new == 0)
00352 {
00353 mb->release ();
00354 mb_remain->release ();
00355 return -1;
00356 }
00357 mb_new->copy (mb_remain->rd_ptr (), mb_remain->length ());
00358 mb_remain->release ();
00359 mb_new->copy (mb->rd_ptr (), mb->length ());
00360 mb->release ();
00361 mb_remain = mb_new;
00362 continue;
00363 }
00364
00365 copy_len = (mb_remain->length () > char_length) ?
00366 char_length :
00367 mb_remain->length ();
00368 ACE_OS::memmove (&buf[recv_char_count],
00369 mb_remain->rd_ptr (),
00370 copy_len);
00371 char_length -= copy_len;
00372 recv_char_count += copy_len;
00373 mb_remain->rd_ptr (copy_len);
00374 if (mb_remain->length () > 0)
00375 {
00376 continue;
00377 }
00378
00379
00380 mb_remain->release ();
00381 mb_remain = 0;
00382 }
00383
00384
00385 size_t total_char_len = ((mb->length () + copy_len)/ char_size) * char_size;
00386
00387 size_t max_copy_len = (total_char_len > char_length) ?
00388 char_length :
00389 total_char_len;
00390
00391 copy_len = max_copy_len - copy_len;
00392
00393 ACE_OS::memmove (&buf[recv_char_count],
00394 mb->rd_ptr (),
00395 copy_len);
00396 recv_char_count += copy_len;
00397 char_length -= copy_len;
00398 mb->rd_ptr (copy_len);
00399 if (mb->length () > 0)
00400 {
00401 mb_remain = mb;
00402 }
00403 else
00404 mb->release ();
00405 }
00406
00407 if (mb_remain)
00408 {
00409 this->ungetq (mb_remain);
00410 }
00411
00412 if (timeout)
00413 {
00414
00415 timeout_countdown.stop ();
00416 }
00417
00418 return ACE_Utils::truncate_cast<int> (recv_char_count);
00419 }
00420
00421 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00422 bool StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::use_timeout () const
00423 {
00424 return this->sync_opt_[ACE_Synch_Options::USE_TIMEOUT];
00425 }
00426
00427 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00428 bool StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::char_in_queue (u_short char_size)
00429 {
00430 return this->msg_queue ()->message_bytes () >= char_size;
00431 }
00432
00433 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00434 int StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::write_to_stream (const void * buf, size_t length, u_short char_size)
00435 {
00436 INET_TRACE ("ACE_IOS_StreamHandler::write_to_stream");
00437
00438
00439 bool use_reactor = this->using_reactor ();
00440 if (use_reactor)
00441 {
00442 ACE_thread_t tid;
00443 this->reactor ()->owner (&tid);
00444 use_reactor =
00445 ACE_OS::thr_equal (ACE_Thread::self (), tid) ? true : false;
00446 }
00447
00448
00449 NotificationStrategyGuard ns_guard__(*this,
00450 use_reactor ?
00451 &this->notification_strategy_ : 0);
00452
00453 size_t datasz = length * char_size;
00454 ACE_Message_Block *mb = 0;
00455 ACE_NEW_RETURN (mb, ACE_Message_Block (datasz), -1);
00456 mb->copy ((const char*)buf, datasz);
00457 ACE_Time_Value nowait (ACE_OS::gettimeofday ());
00458 if (this->putq (mb, &nowait) == -1)
00459 {
00460 INET_ERROR (1, (LM_ERROR, DLINFO
00461 ACE_TEXT ("(%d) ACE_IOS_StreamHandler - discarding output data, "),
00462 ACE_TEXT ("enqueue failed\n"),
00463 ACE_OS::last_error ()));
00464 mb->release ();
00465 return 0;
00466 }
00467
00468 ACE_Time_Value max_wait_time = this->sync_opt_.timeout ();
00469 int result = 0;
00470
00471 if (use_reactor)
00472 {
00473 if (this->reactor ()->register_handler(this,
00474 ACE_Event_Handler::WRITE_MASK) != 0)
00475 {
00476 return -1;
00477 }
00478
00479
00480
00481 while (this->connected_)
00482 {
00483
00484 result = this->reactor ()->handle_events (this->use_timeout () ?
00485 &max_wait_time : 0);
00486
00487 if (result == -1)
00488 {
00489 INET_ERROR (1, (LM_ERROR, DLINFO
00490 ACE_TEXT ("(%d) ACE_IOS_StreamHandler::write_to_stream - ")
00491 ACE_TEXT ("handle_events failed\n"),
00492 ACE_OS::last_error ()));
00493 }
00494
00495
00496
00497 if (this->msg_queue ()->is_empty ())
00498 {
00499 break;
00500 }
00501
00502
00503 if (result == 0
00504 && this->use_timeout ()
00505 && max_wait_time == ACE_Time_Value::zero)
00506 {
00507 this->reactor ()->remove_handler (this, ACE_Event_Handler::WRITE_MASK);
00508 this->send_timeout_ = true;
00509 return ACE_Utils::truncate_cast<int>
00510 (length - (this->msg_queue ()->message_bytes () / char_size));
00511 }
00512
00513
00514 if (result == -1)
00515 {
00516 this->reactor ()->remove_handler (this, ACE_Event_Handler::WRITE_MASK);
00517 return -1;
00518 }
00519
00520
00521 }
00522 }
00523 else
00524 {
00525 while (this->connected_)
00526 {
00527 result = this->handle_output_i (this->use_timeout () ?
00528 &max_wait_time : 0);
00529
00530
00531
00532 if (this->msg_queue ()->is_empty ())
00533 {
00534 break;
00535 }
00536
00537
00538 if (result == 0
00539 && this->use_timeout ()
00540 && max_wait_time == ACE_Time_Value::zero)
00541 {
00542 this->send_timeout_ = true;
00543 return ACE_Utils::truncate_cast<int>
00544 (length - (this->msg_queue ()->message_bytes () / char_size));
00545 }
00546
00547
00548 }
00549 }
00550
00551 if (this->connected_)
00552 return ACE_Utils::truncate_cast<int> (length);
00553 else
00554 return ACE_Utils::truncate_cast<int>
00555 (length - (this->msg_queue ()->message_bytes () / char_size));
00556 }
00557
00558 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00559 bool StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::is_connected () const
00560 {
00561 return this->connected_;
00562 }
00563
00564 template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL>
00565 bool StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE>::using_reactor () const
00566 {
00567 return this->sync_opt_[ACE_Synch_Options::USE_REACTOR];
00568 }
00569
00570 }
00571 }
00572
00573 ACE_END_VERSIONED_NAMESPACE_DECL
00574
00575 #endif