00001
00002
00003 #include "tao/Transport.h"
00004
00005 #include "tao/LF_Follower.h"
00006 #include "tao/Leader_Follower.h"
00007 #include "tao/Client_Strategy_Factory.h"
00008 #include "tao/Wait_Strategy.h"
00009 #include "tao/Transport_Mux_Strategy.h"
00010 #include "tao/Stub.h"
00011 #include "tao/Transport_Queueing_Strategies.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/GIOP_Message_Base.h"
00014 #include "tao/Synch_Queued_Message.h"
00015 #include "tao/Asynch_Queued_Message.h"
00016 #include "tao/Flushing_Strategy.h"
00017 #include "tao/Thread_Lane_Resources.h"
00018 #include "tao/Resume_Handle.h"
00019 #include "tao/Codeset_Manager.h"
00020 #include "tao/Codeset_Translator_Base.h"
00021 #include "tao/debug.h"
00022 #include "tao/CDR.h"
00023 #include "tao/ORB_Core.h"
00024 #include "tao/MMAP_Allocator.h"
00025 #include "tao/SystemException.h"
00026 #include "tao/operation_details.h"
00027 #include "tao/Transport_Descriptor_Interface.h"
00028
00029 #include "ace/OS_NS_sys_time.h"
00030 #include "ace/OS_NS_stdio.h"
00031 #include "ace/Reactor.h"
00032 #include "ace/os_include/sys/os_uio.h"
00033 #include "ace/High_Res_Timer.h"
00034 #include "ace/Countdown_Time.h"
00035 #include "ace/CORBA_macros.h"
00036
00037
00038
00039
00040
00041
00042
00043 #if !defined (__ACE_INLINE__)
00044 # include "tao/Transport.inl"
00045 #endif
00046
00047
00048 ACE_RCSID (tao,
00049 Transport,
00050 "$Id: Transport.cpp 88803 2010-02-02 11:13:27Z vzykov $")
00051
00052
00053
00054
00055 static void
00056 dump_iov (iovec *iov, int iovcnt, size_t id,
00057 size_t current_transfer,
00058 const ACE_TCHAR *location)
00059 {
00060 ACE_Guard <ACE_Log_Msg> log_guard (*ACE_Log_Msg::instance ());
00061
00062 ACE_DEBUG ((LM_DEBUG,
00063 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00064 ACE_TEXT ("sending %d buffers\n"),
00065 id, location, iovcnt));
00066
00067 for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
00068 {
00069 size_t iov_len = iov[i].iov_len;
00070
00071
00072 if (current_transfer < iov_len)
00073 {
00074 iov_len = current_transfer;
00075 }
00076
00077 ACE_DEBUG ((LM_DEBUG,
00078 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00079 ACE_TEXT ("buffer %d/%d has %d bytes\n"),
00080 id, location,
00081 i, iovcnt,
00082 iov_len));
00083
00084 size_t len;
00085
00086 for (size_t offset = 0; offset < iov_len; offset += len)
00087 {
00088 ACE_TCHAR header[1024];
00089 ACE_OS::sprintf (header,
00090 ACE_TEXT("TAO - ")
00091 ACE_TEXT("Transport[")
00092 ACE_SIZE_T_FORMAT_SPECIFIER
00093 ACE_TEXT("]::%s")
00094 ACE_TEXT(" (")
00095 ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT("/")
00096 ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT(")"),
00097 id, location, offset, iov_len);
00098
00099 len = iov_len - offset;
00100
00101 if (len > 512)
00102 {
00103 len = 512;
00104 }
00105
00106 ACE_HEX_DUMP ((LM_DEBUG,
00107 static_cast<char*> (iov[i].iov_base) + offset,
00108 len,
00109 header));
00110 }
00111 current_transfer -= iov_len;
00112 }
00113
00114 ACE_DEBUG ((LM_DEBUG,
00115 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00116 ACE_TEXT ("end of data\n"),
00117 id, location));
00118 }
00119
00120 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00121
00122 #if TAO_HAS_TRANSPORT_CURRENT == 1
00123 TAO::Transport::Stats::~Stats ()
00124 {
00125
00126 }
00127 #endif
00128
00129 TAO_Transport::TAO_Transport (CORBA::ULong tag,
00130 TAO_ORB_Core *orb_core,
00131 size_t input_cdr_size)
00132 : tag_ (tag)
00133 , orb_core_ (orb_core)
00134 , cache_map_entry_ (0)
00135 , tms_ (0)
00136 , ws_ (0)
00137 , bidirectional_flag_ (-1)
00138 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00139 , head_ (0)
00140 , tail_ (0)
00141 , incoming_message_queue_ (orb_core)
00142 , current_deadline_ (ACE_Time_Value::zero)
00143 , flush_timer_id_ (-1)
00144 , transport_timer_ (this)
00145 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00146 , id_ ((size_t) this)
00147 , purging_order_ (0)
00148 , recv_buffer_size_ (0)
00149 , sent_byte_count_ (0)
00150 , is_connected_ (false)
00151 , messaging_object_ (0)
00152 , char_translator_ (0)
00153 , wchar_translator_ (0)
00154 , tcs_set_ (0)
00155 , first_request_ (true)
00156 , partial_message_ (0)
00157 #if TAO_HAS_SENDFILE == 1
00158
00159
00160
00161
00162
00163 , mmap_allocator_ (
00164 dynamic_cast<TAO_MMAP_Allocator *> (
00165 orb_core->output_cdr_buffer_allocator ()))
00166 #endif
00167 #if TAO_HAS_TRANSPORT_CURRENT == 1
00168 , stats_ (0)
00169 #endif
00170 , flush_in_post_open_ (false)
00171 {
00172 ACE_NEW (this->messaging_object_,
00173 TAO_GIOP_Message_Base (orb_core,
00174 this,
00175 input_cdr_size));
00176
00177 TAO_Client_Strategy_Factory *cf =
00178 this->orb_core_->client_factory ();
00179
00180
00181 this->ws_ = cf->create_wait_strategy (this);
00182
00183
00184 this->tms_ = cf->create_transport_mux_strategy (this);
00185
00186 #if TAO_HAS_TRANSPORT_CURRENT == 1
00187
00188 ACE_NEW_THROW_EX (this->stats_,
00189 TAO::Transport::Stats,
00190 CORBA::NO_MEMORY ());
00191 #endif
00192
00193
00194
00195
00196
00197
00198
00199
00200 }
00201
00202 TAO_Transport::~TAO_Transport (void)
00203 {
00204 delete this->messaging_object_;
00205
00206 delete this->ws_;
00207
00208 delete this->tms_;
00209
00210 delete this->handler_lock_;
00211
00212 if (!this->is_connected_)
00213 {
00214
00215
00216 this->cleanup_queue_i();
00217 }
00218
00219
00220
00221 ACE_Message_Block::release (this->partial_message_);
00222
00223
00224
00225
00226
00227
00228 ACE_ASSERT (this->queue_is_empty_i ());
00229 ACE_ASSERT (this->cache_map_entry_ == 0);
00230
00231 #if TAO_HAS_TRANSPORT_CURRENT == 1
00232 delete this->stats_;
00233 #endif
00234
00235
00236
00237
00238
00239
00240
00241
00242 }
00243
00244 void
00245 TAO_Transport::provide_handler (TAO::Connection_Handler_Set &handlers)
00246 {
00247 (void) this->add_reference ();
00248
00249 handlers.insert (this->connection_handler_i ());
00250 }
00251
00252 bool
00253 TAO_Transport::provide_blockable_handler (TAO::Connection_Handler_Set &h)
00254 {
00255 if (this->ws_->non_blocking () ||
00256 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00257 return false;
00258
00259 (void) this->add_reference ();
00260
00261 h.insert (this->connection_handler_i ());
00262
00263 return true;
00264 }
00265
00266 bool
00267 TAO_Transport::idle_after_send (void)
00268 {
00269 return this->tms ()->idle_after_send ();
00270 }
00271
00272 bool
00273 TAO_Transport::idle_after_reply (void)
00274 {
00275 return this->tms ()->idle_after_reply ();
00276 }
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288 int
00289 TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
00290 {
00291 ACE_NOTSUP_RETURN (-1);
00292 }
00293
00294 int
00295 TAO_Transport::send_message_shared (TAO_Stub *stub,
00296 TAO_Message_Semantics message_semantics,
00297 const ACE_Message_Block *message_block,
00298 ACE_Time_Value *max_wait_time)
00299 {
00300 int result = 0;
00301
00302 {
00303 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00304
00305 result =
00306 this->send_message_shared_i (stub, message_semantics,
00307 message_block, max_wait_time);
00308 }
00309
00310 if (result == -1)
00311 {
00312
00313
00314
00315
00316 this->close_connection ();
00317 }
00318
00319 return result;
00320 }
00321
00322
00323
00324 bool
00325 TAO_Transport::post_connect_hook (void)
00326 {
00327 return true;
00328 }
00329
00330 bool
00331 TAO_Transport::register_if_necessary (void)
00332 {
00333 if (this->is_connected_ &&
00334 this->wait_strategy ()->register_handler () == -1)
00335 {
00336
00337 if (TAO_debug_level > 0)
00338 {
00339 ACE_ERROR ((LM_ERROR,
00340 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_if_necessary, ")
00341 ACE_TEXT ("could not register the transport ")
00342 ACE_TEXT ("in the reactor.\n"),
00343 this->id ()));
00344 }
00345
00346
00347
00348 (void) this->purge_entry ();
00349
00350
00351 (void) this->close_connection ();
00352
00353 return false;
00354 }
00355 return true;
00356 }
00357
00358 void
00359 TAO_Transport::close_connection (void)
00360 {
00361 this->connection_handler_i ()->close_connection ();
00362 }
00363
00364 int
00365 TAO_Transport::register_handler (void)
00366 {
00367 if (TAO_debug_level > 4)
00368 {
00369 ACE_DEBUG ((LM_DEBUG,
00370 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00371 this->id ()));
00372 }
00373
00374 ACE_Reactor * const r = this->orb_core_->reactor ();
00375
00376
00377
00378 ACE_GUARD_RETURN (ACE_Lock,
00379 ace_mon,
00380 *this->handler_lock_,
00381 false);
00382
00383 if (r == this->event_handler_i ()->reactor ())
00384 {
00385 return 0;
00386 }
00387
00388
00389
00390
00391 this->ws_->is_registered (true);
00392
00393
00394 return r->register_handler (this->event_handler_i (),
00395 ACE_Event_Handler::READ_MASK);
00396 }
00397
00398 #if TAO_HAS_SENDFILE == 1
00399 ssize_t
00400 TAO_Transport::sendfile (TAO_MMAP_Allocator * ,
00401 iovec * iov,
00402 int iovcnt,
00403 size_t &bytes_transferred,
00404 TAO::Transport::Drain_Constraints const & dc)
00405 {
00406
00407
00408
00409
00410
00411
00412
00413 return this->send (iov, iovcnt, bytes_transferred,
00414 this->io_timeout (dc));
00415 }
00416 #endif
00417
00418 int
00419 TAO_Transport::generate_locate_request (
00420 TAO_Target_Specification &spec,
00421 TAO_Operation_Details &opdetails,
00422 TAO_OutputCDR &output)
00423 {
00424 if (this->messaging_object ()->generate_locate_request_header (opdetails,
00425 spec,
00426 output) == -1)
00427 {
00428 if (TAO_debug_level > 0)
00429 {
00430 ACE_ERROR ((LM_ERROR,
00431 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00432 ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00433 this->id ()));
00434 }
00435
00436 return -1;
00437 }
00438
00439 return 0;
00440 }
00441
00442 int
00443 TAO_Transport::generate_request_header (
00444 TAO_Operation_Details &opdetails,
00445 TAO_Target_Specification &spec,
00446 TAO_OutputCDR &output)
00447 {
00448 if (this->messaging_object ()->generate_request_header (opdetails,
00449 spec,
00450 output) == -1)
00451 {
00452 if (TAO_debug_level > 0)
00453 {
00454 ACE_ERROR ((LM_ERROR,
00455 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_request_header, ")
00456 ACE_TEXT ("error while marshalling the Request header\n"),
00457 this->id()));
00458 }
00459
00460 return -1;
00461 }
00462
00463 return 0;
00464 }
00465
00466
00467
00468 int
00469 TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
00470 {
00471
00472 this->purge_entry ();
00473
00474
00475 return this->transport_cache_manager ().cache_transport (desc, this);
00476 }
00477
00478 int
00479 TAO_Transport::purge_entry (void)
00480 {
00481
00482
00483
00484
00485 TAO::Transport_Cache_Manager::HASH_MAP_ENTRY* entry = 0;
00486 {
00487 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00488 entry = this->cache_map_entry_;
00489 this->cache_map_entry_ = 0;
00490 }
00491 return this->transport_cache_manager ().purge_entry (entry);
00492 }
00493
00494 bool
00495 TAO_Transport::can_be_purged (void)
00496 {
00497 return !this->tms_->has_request ();
00498 }
00499
00500 int
00501 TAO_Transport::make_idle (void)
00502 {
00503 if (TAO_debug_level > 3)
00504 {
00505 ACE_DEBUG ((LM_DEBUG,
00506 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00507 this->id ()));
00508 }
00509
00510 return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00511 }
00512
00513 int
00514 TAO_Transport::update_transport (void)
00515 {
00516 return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00517 }
00518
00519
00520
00521
00522
00523
00524 TAO_Transport::Drain_Result
00525 TAO_Transport::handle_output (TAO::Transport::Drain_Constraints const & dc)
00526 {
00527 if (TAO_debug_level > 3)
00528 {
00529 ACE_DEBUG ((LM_DEBUG,
00530 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output")
00531 ACE_TEXT (" - block_on_io=%d, timeout=%d.%06d\n"),
00532 this->id (),
00533 dc.block_on_io(),
00534 dc.timeout() ? dc.timeout()->sec() : -1,
00535 dc.timeout() ? dc.timeout()->usec() : -1 ));
00536 }
00537
00538
00539
00540
00541 Drain_Result const retval = this->drain_queue (dc);
00542
00543 if (TAO_debug_level > 3)
00544 {
00545 ACE_DEBUG ((LM_DEBUG,
00546 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00547 ACE_TEXT ("drain_queue returns %d/%d\n"),
00548 this->id (),
00549 static_cast<int> (retval.dre_), ACE_ERRNO_GET));
00550 }
00551
00552
00553 return retval;
00554 }
00555
00556 int
00557 TAO_Transport::format_queue_message (TAO_OutputCDR &stream,
00558 ACE_Time_Value *max_wait_time,
00559 TAO_Stub* stub)
00560 {
00561 if (this->messaging_object ()->format_message (stream, stub) != 0)
00562 return -1;
00563
00564 return this->queue_message_i (stream.begin (), max_wait_time);
00565 }
00566
00567 int
00568 TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
00569 size_t &bytes_transferred,
00570 ACE_Time_Value *max_wait_time)
00571 {
00572 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00573
00574 TAO::Transport::Drain_Constraints dc(
00575 max_wait_time, true);
00576
00577 return this->send_message_block_chain_i (mb,
00578 bytes_transferred,
00579 dc);
00580 }
00581
00582 int
00583 TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
00584 size_t &bytes_transferred,
00585 TAO::Transport::Drain_Constraints const & dc)
00586 {
00587 size_t const total_length = mb->total_length ();
00588
00589
00590
00591 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00592
00593 synch_message.push_back (this->head_, this->tail_);
00594
00595 Drain_Result const n = this->drain_queue_i (dc);
00596
00597 if (n == DR_ERROR)
00598 {
00599 synch_message.remove_from_list (this->head_, this->tail_);
00600 return -1;
00601 }
00602 else if (n == DR_QUEUE_EMPTY)
00603 {
00604 bytes_transferred = total_length;
00605 return 1;
00606 }
00607
00608
00609 synch_message.remove_from_list (this->head_, this->tail_);
00610
00611 bytes_transferred = total_length - synch_message.message_length ();
00612
00613 return 0;
00614 }
00615
00616 int
00617 TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
00618 ACE_Time_Value *max_wait_time)
00619 {
00620
00621
00622 size_t const total_length = mb->total_length ();
00623 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00624
00625 synch_message.push_back (this->head_, this->tail_);
00626
00627 int const result = this->send_synch_message_helper_i (synch_message,
00628 max_wait_time);
00629 if (result == -1 && errno == ETIME)
00630 {
00631 if (total_length == synch_message.message_length ())
00632 {
00633 if (TAO_debug_level > 2)
00634 {
00635 ACE_DEBUG ((LM_DEBUG,
00636 ACE_TEXT ("TAO (%P|%t) - ")
00637 ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ")
00638 ACE_TEXT ("timeout encountered before any bytes sent\n"),
00639 this->id ()));
00640 }
00641 throw ::CORBA::TIMEOUT (
00642 CORBA::SystemException::_tao_minor_code (
00643 TAO_TIMEOUT_SEND_MINOR_CODE,
00644 ETIME),
00645 CORBA::COMPLETED_NO);
00646 }
00647 else
00648 {
00649 return -1;
00650 }
00651 }
00652 else if(result == -1 || result == 1)
00653 {
00654 return result;
00655 }
00656
00657 TAO_Flushing_Strategy *flushing_strategy =
00658 this->orb_core ()->flushing_strategy ();
00659 if (flushing_strategy->schedule_output (this) == -1)
00660 {
00661 synch_message.remove_from_list (this->head_, this->tail_);
00662 if (TAO_debug_level > 0)
00663 {
00664 ACE_ERROR ((LM_ERROR,
00665 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
00666 ACE_TEXT ("send_synchronous_message_i, ")
00667 ACE_TEXT ("error while scheduling flush - %m\n"),
00668 this->id ()));
00669 }
00670 return -1;
00671 }
00672
00673
00674
00675
00676
00677
00678 int flush_result;
00679 {
00680 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00681 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00682 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00683
00684 flush_result = flushing_strategy->flush_message (this,
00685 &synch_message,
00686 max_wait_time);
00687 }
00688
00689 if (flush_result == -1)
00690 {
00691 synch_message.remove_from_list (this->head_, this->tail_);
00692
00693
00694
00695
00696
00697 if (TAO_debug_level > 0)
00698 {
00699 ACE_ERROR ((LM_ERROR,
00700 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00701 ACE_TEXT ("error while sending message - %m\n"),
00702 this->id ()));
00703 }
00704
00705 return -1;
00706 }
00707
00708 return 1;
00709 }
00710
00711
00712 int
00713 TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
00714 ACE_Time_Value *max_wait_time)
00715 {
00716
00717 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00718
00719 synch_message.push_back (this->head_, this->tail_);
00720
00721 int const n =
00722 this->send_synch_message_helper_i (synch_message, max_wait_time);
00723
00724
00725 if (n == -1 || n == 1)
00726 {
00727 return n;
00728 }
00729
00730 if (TAO_debug_level > 3)
00731 {
00732 ACE_DEBUG ((LM_DEBUG,
00733 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00734 ACE_TEXT ("preparing to add to queue before leaving\n"),
00735 this->id ()));
00736 }
00737
00738
00739
00740 synch_message.remove_from_list (this->head_, this->tail_);
00741
00742
00743 TAO_Queued_Message *msg =
00744 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00745
00746
00747 msg->push_back (this->head_, this->tail_);
00748
00749 TAO_Flushing_Strategy *flushing_strategy =
00750 this->orb_core ()->flushing_strategy ();
00751
00752 int const result = flushing_strategy->schedule_output (this);
00753
00754 if (result == -1)
00755 {
00756 if (TAO_debug_level > 5)
00757 {
00758 ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00759 "message_i, dequeuing msg due to schedule_output "
00760 "failure\n", this->id ()));
00761 }
00762 msg->remove_from_list (this->head_, this->tail_);
00763 msg->destroy ();
00764 }
00765 else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00766 {
00767 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00768 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00769 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00770 (void) flushing_strategy->flush_transport (this, 0);
00771 }
00772
00773 return 1;
00774 }
00775
00776 int
00777 TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
00778 ACE_Time_Value * max_wait_time)
00779 {
00780 TAO::Transport::Drain_Constraints dc(
00781 max_wait_time, this->using_blocking_io_for_synch_messages());
00782
00783 Drain_Result const n = this->drain_queue_i (dc);
00784
00785 if (n == DR_ERROR)
00786 {
00787 synch_message.remove_from_list (this->head_, this->tail_);
00788 return -1;
00789 }
00790 else if (n == DR_QUEUE_EMPTY)
00791 {
00792 return 1;
00793 }
00794
00795 if (synch_message.all_data_sent ())
00796 {
00797 return 1;
00798 }
00799
00800 return 0;
00801 }
00802
00803 int
00804 TAO_Transport::schedule_output_i (void)
00805 {
00806 ACE_Event_Handler * const eh = this->event_handler_i ();
00807 ACE_Reactor * const reactor = eh->reactor ();
00808
00809 if (reactor == 0)
00810 {
00811 if (TAO_debug_level > 1)
00812 {
00813 ACE_ERROR ((LM_ERROR,
00814 ACE_TEXT ("TAO (%P|%t) - ")
00815 ACE_TEXT ("Transport[%d]::schedule_output_i, ")
00816 ACE_TEXT ("no reactor,")
00817 ACE_TEXT ("returning -1\n"),
00818 this->id ()));
00819 }
00820 return -1;
00821 }
00822
00823
00824
00825
00826 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00827 if (found)
00828 {
00829 found->remove_reference ();
00830
00831 if (found != eh)
00832 {
00833 if (TAO_debug_level > 3)
00834 {
00835 ACE_ERROR ((LM_ERROR,
00836 ACE_TEXT ("TAO (%P|%t) - ")
00837 ACE_TEXT ("Transport[%d]::schedule_output_i ")
00838 ACE_TEXT ("event handler not found in reactor,")
00839 ACE_TEXT ("returning -1\n"),
00840 this->id ()));
00841 }
00842
00843 return -1;
00844 }
00845 }
00846
00847 if (TAO_debug_level > 3)
00848 {
00849 ACE_DEBUG ((LM_DEBUG,
00850 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00851 this->id ()));
00852 }
00853
00854 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00855 }
00856
00857 int
00858 TAO_Transport::cancel_output_i (void)
00859 {
00860 ACE_Event_Handler * const eh = this->event_handler_i ();
00861 ACE_Reactor *const reactor = eh->reactor ();
00862
00863 if (TAO_debug_level > 3)
00864 {
00865 ACE_DEBUG ((LM_DEBUG,
00866 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00867 this->id ()));
00868 }
00869
00870 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00871 }
00872
00873 int
00874 TAO_Transport::handle_timeout (const ACE_Time_Value & ,
00875 const void *act)
00876 {
00877 if (TAO_debug_level > 6)
00878 {
00879 ACE_DEBUG ((LM_DEBUG,
00880 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_timeout, ")
00881 ACE_TEXT ("timer expired\n"),
00882 this->id ()));
00883 }
00884
00885
00886 if (act != &this->current_deadline_)
00887 {
00888 return -1;
00889 }
00890
00891 if (this->flush_timer_pending ())
00892 {
00893
00894
00895 this->reset_flush_timer ();
00896
00897 TAO_Flushing_Strategy *flushing_strategy =
00898 this->orb_core ()->flushing_strategy ();
00899 int const result = flushing_strategy->schedule_output (this);
00900 if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00901 {
00902 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00903 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00904 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00905 if (flushing_strategy->flush_transport (this, 0) == -1) {
00906 return -1;
00907 }
00908 }
00909 }
00910
00911 return 0;
00912 }
00913
00914 TAO_Transport::Drain_Result
00915 TAO_Transport::drain_queue (TAO::Transport::Drain_Constraints const & dc)
00916 {
00917 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, DR_ERROR);
00918 Drain_Result const retval = this->drain_queue_i (dc);
00919
00920 if (retval == DR_QUEUE_EMPTY)
00921 {
00922
00923
00924 TAO_Flushing_Strategy *flushing_strategy =
00925 this->orb_core ()->flushing_strategy ();
00926
00927 flushing_strategy->cancel_output (this);
00928
00929 return DR_OK;
00930 }
00931
00932 return retval;
00933 }
00934
00935 TAO_Transport::Drain_Result
00936 TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[],
00937 TAO::Transport::Drain_Constraints const & dc)
00938 {
00939
00940
00941
00942 ACE_Countdown_Time countdown(dc.timeout());
00943
00944 size_t byte_count = 0;
00945
00946
00947 ssize_t retval = -1;
00948
00949 #if TAO_HAS_SENDFILE == 1
00950 if (this->mmap_allocator_)
00951 retval = this->sendfile (this->mmap_allocator_,
00952 iov,
00953 iovcnt,
00954 byte_count,
00955 dc);
00956 else
00957 #endif
00958 retval = this->send (iov, iovcnt, byte_count,
00959 this->io_timeout (dc));
00960
00961 if (TAO_debug_level == 5)
00962 {
00963 dump_iov (iov, iovcnt, this->id (),
00964 byte_count, ACE_TEXT("drain_queue_helper"));
00965 }
00966
00967 if (retval == 0)
00968 {
00969 if (TAO_debug_level > 4)
00970 {
00971 ACE_DEBUG ((LM_DEBUG,
00972 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00973 ACE_TEXT ("send() returns 0\n"),
00974 this->id ()));
00975 }
00976 return DR_ERROR;
00977 }
00978 else if (retval == -1)
00979 {
00980 if (TAO_debug_level > 4)
00981 {
00982 ACE_DEBUG ((LM_DEBUG,
00983 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00984 ACE_TEXT ("error during send() (errno: %d) - %m\n"),
00985 this->id (), ACE_ERRNO_GET));
00986 }
00987
00988 if (errno == EWOULDBLOCK || errno == EAGAIN)
00989 {
00990 return DR_WOULDBLOCK;
00991 }
00992
00993 return DR_ERROR;
00994 }
00995
00996
00997
00998
00999 this->cleanup_queue (byte_count);
01000 iovcnt = 0;
01001
01002
01003
01004
01005
01006 this->sent_byte_count_ += byte_count;
01007
01008 if (TAO_debug_level > 4)
01009 {
01010 ACE_DEBUG ((LM_DEBUG,
01011 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
01012 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
01013 this->id(), byte_count, this->queue_is_empty_i ()));
01014 }
01015
01016 return DR_QUEUE_EMPTY;
01017
01018 }
01019
01020 TAO_Transport::Drain_Result
01021 TAO_Transport::drain_queue_i (TAO::Transport::Drain_Constraints const & dc)
01022 {
01023
01024
01025
01026 int iovcnt = 0;
01027 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01028 iovec iov[ACE_IOV_MAX] = { { 0 , 0 } };
01029 #else
01030 iovec iov[ACE_IOV_MAX];
01031 #endif
01032
01033
01034 TAO_Queued_Message *i = this->head_;
01035
01036
01037
01038 this->sent_byte_count_ = 0;
01039
01040
01041
01042
01043 ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
01044
01045 while (i != 0)
01046 {
01047 if (i->is_expired (now))
01048 {
01049 if (TAO_debug_level > 3)
01050 {
01051 ACE_DEBUG ((LM_DEBUG,
01052 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01053 ACE_TEXT ("Discarding expired queued message.\n"),
01054 this->id ()));
01055 }
01056 TAO_Queued_Message *next = i->next ();
01057 i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
01058 this->orb_core_->leader_follower ());
01059 i->remove_from_list (this->head_, this->tail_);
01060 i->destroy ();
01061 i = next;
01062 continue;
01063 }
01064
01065 i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
01066
01067
01068
01069
01070 if (iovcnt == ACE_IOV_MAX)
01071 {
01072 Drain_Result const retval =
01073 this->drain_queue_helper (iovcnt, iov, dc);
01074
01075 if (TAO_debug_level > 4)
01076 {
01077 ACE_DEBUG ((LM_DEBUG,
01078 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01079 ACE_TEXT ("helper retval = %d\n"),
01080 this->id (), static_cast<int> (retval.dre_)));
01081 }
01082
01083 if (retval != DR_QUEUE_EMPTY)
01084 {
01085 return retval;
01086 }
01087
01088 now = ACE_High_Res_Timer::gettimeofday_hr ();
01089
01090 i = this->head_;
01091 continue;
01092 }
01093
01094
01095 i = i->next ();
01096 }
01097
01098 if (iovcnt != 0)
01099 {
01100 Drain_Result const retval = this->drain_queue_helper (iovcnt, iov, dc);
01101
01102 if (TAO_debug_level > 4)
01103 {
01104 ACE_DEBUG ((LM_DEBUG,
01105 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01106 ACE_TEXT ("helper retval = %d\n"),
01107 this->id (), static_cast<int> (retval.dre_)));
01108 }
01109
01110 if (retval != DR_QUEUE_EMPTY)
01111 {
01112 return retval;
01113 }
01114 }
01115
01116 if (this->queue_is_empty_i ())
01117 {
01118 if (this->flush_timer_pending ())
01119 {
01120 ACE_Event_Handler *eh = this->event_handler_i ();
01121 ACE_Reactor * const reactor = eh->reactor ();
01122 reactor->cancel_timer (this->flush_timer_id_);
01123 this->reset_flush_timer ();
01124 }
01125
01126 return DR_QUEUE_EMPTY;
01127 }
01128
01129 return DR_OK;
01130 }
01131
01132 void
01133 TAO_Transport::cleanup_queue_i ()
01134 {
01135 if (TAO_debug_level > 4)
01136 {
01137 ACE_DEBUG ((LM_DEBUG,
01138 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01139 ACE_TEXT ("cleaning up complete queue\n"),
01140 this->id ()));
01141 }
01142
01143 size_t byte_count = 0;
01144 int msg_count = 0;
01145
01146
01147 while (!this->queue_is_empty_i ())
01148 {
01149 TAO_Queued_Message *i = this->head_;
01150
01151 if (TAO_debug_level > 4)
01152 {
01153 byte_count += i->message_length();
01154 ++msg_count;
01155 }
01156
01157
01158 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01159 this->orb_core_->leader_follower ());
01160
01161 i->remove_from_list (this->head_, this->tail_);
01162
01163 i->destroy ();
01164 }
01165
01166 if (TAO_debug_level > 4)
01167 {
01168 ACE_DEBUG ((LM_DEBUG,
01169 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01170 ACE_TEXT ("discarded %d messages, %u bytes.\n"),
01171 this->id (), msg_count, byte_count));
01172 }
01173 }
01174
01175 void
01176 TAO_Transport::cleanup_queue (size_t byte_count)
01177 {
01178 while (!this->queue_is_empty_i () && byte_count > 0)
01179 {
01180 TAO_Queued_Message *i = this->head_;
01181
01182 if (TAO_debug_level > 4)
01183 {
01184 ACE_DEBUG ((LM_DEBUG,
01185 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01186 ACE_TEXT ("byte_count = %d\n"),
01187 this->id (), byte_count));
01188 }
01189
01190
01191 i->bytes_transferred (byte_count);
01192
01193 if (TAO_debug_level > 4)
01194 {
01195 ACE_DEBUG ((LM_DEBUG,
01196 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01197 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01198 this->id (), byte_count, i->all_data_sent (),
01199 i->message_length ()));
01200 }
01201
01202
01203
01204 if (i->all_data_sent ())
01205 {
01206 i->remove_from_list (this->head_, this->tail_);
01207 i->destroy ();
01208 }
01209 else if (byte_count == 0)
01210 {
01211
01212
01213
01214
01215
01216
01217 i->copy_if_necessary (this->out_stream ().begin ());
01218 }
01219 }
01220 }
01221
01222 int
01223 TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
01224 {
01225
01226 size_t msg_count = 0;
01227 size_t total_bytes = 0;
01228
01229 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01230 {
01231 ++msg_count;
01232 total_bytes += i->message_length ();
01233 }
01234
01235 bool set_timer = false;
01236 ACE_Time_Value new_deadline;
01237
01238 TAO::Transport_Queueing_Strategy *queue_strategy =
01239 stub->transport_queueing_strategy ();
01240
01241 bool constraints_reached = true;
01242
01243 if (queue_strategy)
01244 {
01245 constraints_reached =
01246 queue_strategy->buffering_constraints_reached (stub,
01247 msg_count,
01248 total_bytes,
01249 must_flush,
01250 this->current_deadline_,
01251 set_timer,
01252 new_deadline);
01253 }
01254 else
01255 {
01256 must_flush = false;
01257 }
01258
01259
01260 if (set_timer)
01261 {
01262 ACE_Event_Handler *eh = this->event_handler_i ();
01263 ACE_Reactor * const reactor = eh->reactor ();
01264 this->current_deadline_ = new_deadline;
01265 ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
01266
01267 if (this->flush_timer_pending ())
01268 {
01269 reactor->cancel_timer (this->flush_timer_id_);
01270 }
01271
01272 this->flush_timer_id_ =
01273 reactor->schedule_timer (&this->transport_timer_,
01274 &this->current_deadline_,
01275 delay);
01276 }
01277
01278 return constraints_reached;
01279 }
01280
01281 void
01282 TAO_Transport::report_invalid_event_handler (const char *caller)
01283 {
01284 if (TAO_debug_level > 0)
01285 {
01286 ACE_DEBUG ((LM_DEBUG,
01287 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01288 ACE_TEXT ("(%C) no longer associated with handler [tag=%d]\n"),
01289 this->id (), caller, this->tag_));
01290 }
01291 }
01292
01293 void
01294 TAO_Transport::send_connection_closed_notifications (void)
01295 {
01296 {
01297 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01298
01299 this->send_connection_closed_notifications_i ();
01300 }
01301
01302 this->tms ()->connection_closed ();
01303 }
01304
01305 void
01306 TAO_Transport::send_connection_closed_notifications_i (void)
01307 {
01308 this->cleanup_queue_i ();
01309 }
01310
01311 int
01312 TAO_Transport::send_message_shared_i (TAO_Stub *stub,
01313 TAO_Message_Semantics message_semantics,
01314 const ACE_Message_Block *message_block,
01315 ACE_Time_Value *max_wait_time)
01316 {
01317 int ret = 0;
01318
01319 #if TAO_HAS_TRANSPORT_CURRENT == 1
01320 size_t const message_length = message_block->length ();
01321 #endif
01322
01323 switch (message_semantics)
01324 {
01325 case TAO_TWOWAY_REQUEST:
01326 ret = this->send_synchronous_message_i (message_block, max_wait_time);
01327 break;
01328
01329 case TAO_REPLY:
01330 ret = this->send_reply_message_i (message_block, max_wait_time);
01331 break;
01332
01333 case TAO_ONEWAY_REQUEST:
01334 ret = this->send_asynchronous_message_i (stub,
01335 message_block,
01336 max_wait_time);
01337 break;
01338 }
01339
01340 #if TAO_HAS_TRANSPORT_CURRENT == 1
01341
01342 if (ret != -1 && this->stats_ != 0)
01343 this->stats_->messages_sent (message_length);
01344 #endif
01345
01346 return ret;
01347 }
01348
01349 int
01350 TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
01351 const ACE_Message_Block *message_block,
01352 ACE_Time_Value *max_wait_time)
01353 {
01354
01355
01356 bool try_sending_first = true;
01357
01358 bool const queue_empty = this->queue_is_empty_i ();
01359
01360 TAO::Transport_Queueing_Strategy *queue_strategy =
01361 stub->transport_queueing_strategy ();
01362
01363 if (!queue_empty)
01364 {
01365 try_sending_first = false;
01366 }
01367 else if (queue_strategy)
01368 {
01369 if (queue_strategy->must_queue (queue_empty))
01370 {
01371 try_sending_first = false;
01372 }
01373 }
01374
01375 bool partially_sent = false;
01376 bool timeout_encountered = false;
01377
01378 TAO::Transport::Drain_Constraints dc(
01379 max_wait_time, this->using_blocking_io_for_asynch_messages());
01380
01381 if (try_sending_first)
01382 {
01383 ssize_t n = 0;
01384 size_t byte_count = 0;
01385
01386
01387 size_t const total_length = message_block->total_length ();
01388
01389 if (TAO_debug_level > 6)
01390 {
01391 ACE_DEBUG ((LM_DEBUG,
01392 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01393 ACE_TEXT ("trying to send the message (ml = %d)\n"),
01394 this->id (), total_length));
01395 }
01396
01397
01398
01399
01400
01401 n = this->send_message_block_chain_i (message_block,
01402 byte_count,
01403 dc);
01404
01405 if (n == -1)
01406 {
01407
01408
01409
01410
01411
01412 if (errno != EWOULDBLOCK && errno != ETIME)
01413 {
01414 if (TAO_debug_level > 0)
01415 {
01416 ACE_ERROR ((LM_ERROR,
01417 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01418 ACE_TEXT ("fatal error in ")
01419 ACE_TEXT ("send_message_block_chain_i - %m\n"),
01420 this->id ()));
01421 }
01422 return -1;
01423 }
01424 }
01425
01426
01427 if (total_length == byte_count)
01428 {
01429
01430
01431
01432
01433
01434 return 0;
01435 }
01436
01437 if (byte_count > 0)
01438 {
01439 partially_sent = true;
01440 }
01441
01442
01443 if (errno == ETIME)
01444 {
01445 timeout_encountered = true;
01446 if (byte_count == 0)
01447 {
01448
01449
01450 if (TAO_debug_level > 2)
01451 {
01452 ACE_DEBUG ((LM_DEBUG,
01453 ACE_TEXT ("TAO (%P|%t) - ")
01454 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01455 ACE_TEXT ("timeout encountered before any bytes sent\n"),
01456 this->id ()));
01457 }
01458 throw ::CORBA::TIMEOUT (
01459 CORBA::SystemException::_tao_minor_code (
01460 TAO_TIMEOUT_SEND_MINOR_CODE,
01461 ETIME),
01462 CORBA::COMPLETED_NO);
01463 }
01464 }
01465
01466 if (TAO_debug_level > 6)
01467 {
01468 ACE_DEBUG ((LM_DEBUG,
01469 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01470 ACE_TEXT ("partial send %d / %d bytes\n"),
01471 this->id (), byte_count, total_length));
01472 }
01473
01474
01475
01476 while (message_block != 0 && message_block->length () == 0)
01477 {
01478 message_block = message_block->cont ();
01479 }
01480
01481
01482
01483 }
01484
01485
01486
01487
01488 ACE_Time_Value *wait_time = (partially_sent ? 0: max_wait_time);
01489 if (this->queue_message_i (message_block, wait_time, !partially_sent)
01490 == -1)
01491 {
01492 if (TAO_debug_level > 0)
01493 {
01494 ACE_DEBUG ((LM_DEBUG,
01495 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01496 ACE_TEXT ("send_asynchronous_message_i, ")
01497 ACE_TEXT ("cannot queue message for - %m\n"),
01498 this->id ()));
01499 }
01500 return -1;
01501 }
01502
01503 if (TAO_debug_level > 6)
01504 {
01505 ACE_DEBUG ((LM_DEBUG,
01506 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01507 ACE_TEXT ("message is queued\n"),
01508 this->id ()));
01509 }
01510
01511 if (timeout_encountered && partially_sent)
01512 {
01513
01514
01515
01516 if (TAO_debug_level > 0)
01517 {
01518 ACE_DEBUG ((LM_DEBUG,
01519 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01520 ACE_TEXT ("send_asynchronous_message_i, ")
01521 ACE_TEXT ("timeout after partial send, closing.\n"),
01522 this->id ()));
01523 }
01524 return -1;
01525 }
01526 else if (!timeout_encountered)
01527 {
01528
01529
01530
01531 bool must_flush = false;
01532 const bool constraints_reached =
01533 this->check_buffering_constraints_i (stub,
01534 must_flush);
01535
01536
01537
01538
01539
01540 TAO_Flushing_Strategy *flushing_strategy =
01541 this->orb_core ()->flushing_strategy ();
01542
01543 if (constraints_reached || try_sending_first)
01544 {
01545 int const result = flushing_strategy->schedule_output (this);
01546 if (result == TAO_Flushing_Strategy::MUST_FLUSH)
01547 {
01548 must_flush = true;
01549 }
01550 }
01551
01552 if (must_flush)
01553 {
01554 if (TAO_debug_level > 0)
01555 {
01556 ACE_DEBUG ((LM_DEBUG,
01557 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01558 ACE_TEXT ("send_asynchronous_message_i, ")
01559 ACE_TEXT ("flushing transport.\n"),
01560 this->id ()));
01561 }
01562
01563 size_t sent_byte = sent_byte_count_;
01564 int ret = 0;
01565 {
01566 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01567 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01568 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01569 ret = flushing_strategy->flush_transport (this, max_wait_time);
01570 }
01571
01572 if (ret == -1)
01573 {
01574 if (errno == ETIME)
01575 {
01576 if (sent_byte == sent_byte_count_)
01577 {
01578
01579
01580 if (TAO_debug_level > 2)
01581 {
01582 ACE_DEBUG ((LM_DEBUG,
01583 ACE_TEXT ("TAO (%P|%t) - ")
01584 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01585 ACE_TEXT ("2 timeout encountered before any bytes sent\n"),
01586 this->id ()));
01587 }
01588 throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code
01589 (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME),
01590 CORBA::COMPLETED_NO);
01591 }
01592 }
01593 return -1;
01594 }
01595 }
01596 }
01597 return 0;
01598 }
01599
01600 int
01601 TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
01602 ACE_Time_Value *max_wait_time, bool back)
01603 {
01604 TAO_Queued_Message *queued_message = 0;
01605 ACE_NEW_RETURN (queued_message,
01606 TAO_Asynch_Queued_Message (message_block,
01607 this->orb_core_,
01608 max_wait_time,
01609 0,
01610 true),
01611 -1);
01612 if (back) {
01613 queued_message->push_back (this->head_, this->tail_);
01614 }
01615 else {
01616 queued_message->push_front (this->head_, this->tail_);
01617 }
01618
01619 return 0;
01620 }
01621
01622
01623
01624
01625
01626
01627
01628 int
01629 TAO_Transport::handle_input (TAO_Resume_Handle &rh,
01630 ACE_Time_Value * max_wait_time)
01631 {
01632 if (TAO_debug_level > 3)
01633 {
01634 ACE_DEBUG ((LM_DEBUG,
01635 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01636 this->id ()));
01637 }
01638
01639
01640 int const retval = this->process_queue_head (rh);
01641
01642 if (retval <= 0)
01643 {
01644 if (retval == -1)
01645 {
01646 if (TAO_debug_level > 2)
01647 {
01648 ACE_ERROR ((LM_ERROR,
01649 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01650 ACE_TEXT ("error while parsing the head of the queue\n"),
01651 this->id()));
01652
01653 }
01654 return -1;
01655 }
01656 else
01657 {
01658
01659
01660
01661
01662 return 0;
01663 }
01664 }
01665
01666 TAO_Queued_Data *q_data = 0;
01667
01668 if (this->incoming_message_stack_.top (q_data) != -1
01669 && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
01670 {
01671
01672 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01673 {
01674 if (TAO_debug_level > 0)
01675 {
01676 ACE_ERROR ((LM_ERROR,
01677 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01678 ACE_TEXT ("error consolidating incoming message\n"),
01679 this->id ()));
01680 }
01681 return -1;
01682 }
01683 }
01684 else
01685 {
01686 if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01687 {
01688 if (TAO_debug_level > 0)
01689 {
01690 ACE_ERROR ((LM_ERROR,
01691 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01692 ACE_TEXT ("error parsing incoming message\n"),
01693 this->id ()));
01694 }
01695 return -1;
01696 }
01697 }
01698
01699 return 0;
01700 }
01701
01702 int
01703 TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data,
01704 TAO_Resume_Handle &rh)
01705 {
01706
01707 if (q_data->missing_data () != 0)
01708 {
01709 if (TAO_debug_level > 0)
01710 {
01711 ACE_ERROR ((LM_ERROR,
01712 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01713 ACE_TEXT ("missing data\n"),
01714 this->id ()));
01715 }
01716 return -1;
01717 }
01718
01719 if (q_data->more_fragments () ||
01720 q_data->msg_type () == GIOP::Fragment)
01721 {
01722
01723 TAO_Queued_Data *new_q_data = 0;
01724
01725 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01726 {
01727 case -1:
01728 return -1;
01729
01730 case 0:
01731 if (!new_q_data)
01732 {
01733 if (TAO_debug_level > 0)
01734 {
01735 ACE_ERROR ((LM_ERROR,
01736 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01737 ACE_TEXT ("error, consolidated message is NULL\n"),
01738 this->id ()));
01739 }
01740 return -1;
01741 }
01742
01743
01744 if (this->process_parsed_messages (new_q_data, rh) == -1)
01745 {
01746 TAO_Queued_Data::release (new_q_data);
01747
01748 if (TAO_debug_level > 0)
01749 {
01750 ACE_ERROR ((LM_ERROR,
01751 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01752 ACE_TEXT ("error processing consolidated message\n"),
01753 this->id ()));
01754 }
01755 return -1;
01756 }
01757
01758 TAO_Queued_Data::release (new_q_data);
01759
01760 break;
01761
01762 case 1:
01763 break;
01764 }
01765 }
01766 else
01767 {
01768 if (this->process_parsed_messages (q_data, rh) == -1)
01769 {
01770 TAO_Queued_Data::release (q_data);
01771
01772 if (TAO_debug_level > 0)
01773 {
01774 ACE_ERROR ((LM_ERROR,
01775 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01776 ACE_TEXT ("error processing message\n"),
01777 this->id ()));
01778 }
01779 return -1;
01780 }
01781
01782 TAO_Queued_Data::release (q_data);
01783
01784 }
01785
01786 return 0;
01787 }
01788
01789 int
01790 TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data)
01791 {
01792
01793
01794
01795 if (q_data->missing_data () != 0)
01796 {
01797 return -1;
01798 }
01799
01800 if (q_data->more_fragments () ||
01801 q_data->msg_type () == GIOP::Fragment)
01802 {
01803 TAO_Queued_Data *new_q_data = 0;
01804
01805 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01806 {
01807 case -1:
01808 return -1;
01809
01810 case 0:
01811 if (!new_q_data)
01812 {
01813 if (TAO_debug_level > 0)
01814 {
01815 ACE_ERROR ((LM_ERROR,
01816 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01817 ACE_TEXT ("error, consolidated message is NULL\n"),
01818 this->id ()));
01819 }
01820 return -1;
01821 }
01822
01823 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01824 {
01825 TAO_Queued_Data::release (new_q_data);
01826 return -1;
01827 }
01828 break;
01829
01830 case 1:
01831 break;
01832 }
01833 }
01834 else
01835 {
01836 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01837 {
01838 TAO_Queued_Data::release (q_data);
01839 return -1;
01840 }
01841 }
01842
01843 return 0;
01844 }
01845
01846 int
01847 TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh,
01848 ACE_Time_Value * max_wait_time,
01849 TAO_Queued_Data *q_data)
01850 {
01851
01852 if (q_data == 0)
01853 {
01854 return -1;
01855 }
01856
01857 if (TAO_debug_level > 3)
01858 {
01859 ACE_DEBUG ((LM_DEBUG,
01860 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01861 ACE_TEXT ("enter (missing data == %d)\n"),
01862 this->id (), q_data->missing_data ()));
01863 }
01864
01865 size_t const recv_size = q_data->missing_data ();
01866
01867 if (q_data->msg_block ()->space() < recv_size)
01868 {
01869
01870 size_t const message_size = recv_size + q_data->msg_block ()->length();
01871
01872 if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
01873 {
01874 return -1;
01875 }
01876 }
01877
01878
01879
01880
01881
01882
01883 this->recv_buffer_size_ = recv_size;
01884
01885
01886 ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
01887 recv_size,
01888 max_wait_time);
01889
01890 if (n <= 0)
01891 {
01892 return n;
01893 }
01894
01895 if (TAO_debug_level > 3)
01896 {
01897 ACE_DEBUG ((LM_DEBUG,
01898 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01899 ACE_TEXT ("read bytes %d\n"),
01900 this->id (), n));
01901 }
01902
01903 q_data->msg_block ()->wr_ptr(n);
01904 q_data->missing_data (q_data->missing_data () - n);
01905
01906 if (q_data->missing_data () == 0)
01907 {
01908
01909 if (this->incoming_message_stack_.pop (q_data) == -1)
01910 {
01911 return -1;
01912 }
01913
01914 if (this->consolidate_process_message (q_data, rh) == -1)
01915 {
01916 return -1;
01917 }
01918 }
01919
01920 return 0;
01921 }
01922
01923
01924 int
01925 TAO_Transport::handle_input_parse_extra_messages (
01926 ACE_Message_Block &message_block)
01927 {
01928
01929
01930
01931 int buf_status = 0;
01932
01933 TAO_Queued_Data *q_data = 0;
01934
01935
01936
01937
01938 while (message_block.length () > 0 &&
01939 (buf_status = this->messaging_object ()->extract_next_message
01940 (message_block, q_data)) != -1 &&
01941 q_data != 0)
01942 {
01943 if (q_data->missing_data () == 0)
01944 {
01945 if (this->consolidate_enqueue_message (q_data) == -1)
01946 {
01947 return -1;
01948 }
01949 }
01950 else
01951 {
01952
01953 this->incoming_message_stack_.push (q_data);
01954 }
01955
01956 q_data = 0;
01957 }
01958
01959 if (buf_status == -1)
01960 {
01961 return -1;
01962 }
01963
01964 return 0;
01965 }
01966
01967 int
01968 TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh,
01969 ACE_Time_Value * max_wait_time)
01970 {
01971 if (TAO_debug_level > 3)
01972 {
01973 ACE_DEBUG ((LM_DEBUG,
01974 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01975 ACE_TEXT ("enter\n"),
01976 this->id ()));
01977 }
01978
01979
01980
01981
01982
01983 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01984
01985 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01986 (void) ACE_OS::memset (buf,
01987 '\0',
01988 sizeof buf);
01989 #endif
01990
01991
01992 ACE_Data_Block db (sizeof (buf),
01993 ACE_Message_Block::MB_DATA,
01994 buf,
01995 this->orb_core_->input_cdr_buffer_allocator (),
01996 this->orb_core_->locking_strategy (),
01997 ACE_Message_Block::DONT_DELETE,
01998 this->orb_core_->input_cdr_dblock_allocator ());
01999
02000
02001 ACE_Message_Block message_block (&db,
02002 ACE_Message_Block::DONT_DELETE,
02003 this->orb_core_->input_cdr_msgblock_allocator ());
02004
02005
02006 ACE_CDR::mb_align (&message_block);
02007
02008 size_t recv_size = 0;
02009
02010
02011 TAO_Queued_Data *q_data = 0;
02012
02013
02014 size_t const header_length = this->messaging_object ()->header_length ();
02015
02016
02017 if (header_length > message_block.space ())
02018 {
02019 return -1;
02020 }
02021
02022 if (this->orb_core_->orb_params ()->single_read_optimization ())
02023 {
02024 recv_size = message_block.space ();
02025 }
02026 else
02027 {
02028
02029
02030
02031
02032
02033
02034 if (this->incoming_message_stack_.top (q_data) != -1
02035 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
02036 {
02037
02038
02039
02040 recv_size = header_length - q_data->msg_block ()->length ();
02041 }
02042 else
02043 {
02044
02045
02046 recv_size = header_length;
02047 }
02048
02049 }
02050
02051
02052
02053
02054 if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
02055 {
02056
02057
02058
02059
02060
02061 if (this->partial_message_->length () <= recv_size &&
02062 message_block.copy (this->partial_message_->rd_ptr (),
02063 this->partial_message_->length ()) == 0)
02064 {
02065
02066 recv_size -= this->partial_message_->length ();
02067 this->partial_message_->reset ();
02068 }
02069 else
02070 {
02071 return -1;
02072 }
02073 }
02074
02075
02076 if (0 >= recv_size)
02077 {
02078
02079
02080
02081
02082
02083 if (TAO_debug_level > 0)
02084 {
02085 ACE_ERROR ((LM_ERROR,
02086 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02087 ACE_TEXT ("Error - endless loop detection, closing connection"),
02088 this->id ()));
02089 }
02090 return -1;
02091 }
02092
02093
02094
02095
02096
02097
02098 this->recv_buffer_size_ = recv_size;
02099
02100
02101
02102 ssize_t const n = this->recv (message_block.wr_ptr (),
02103 recv_size,
02104 max_wait_time);
02105
02106
02107 if (n <= 0)
02108 {
02109 return n;
02110 }
02111
02112 if (TAO_debug_level > 3)
02113 {
02114 ACE_DEBUG ((LM_DEBUG,
02115 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02116 ACE_TEXT ("read %d bytes\n"),
02117 this->id (), n));
02118 }
02119
02120
02121 message_block.wr_ptr (n);
02122
02123
02124
02125
02126
02127
02128
02129 if (this->incoming_message_stack_.top (q_data) != -1
02130 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
02131 {
02132
02133
02134
02135
02136
02137
02138
02139
02140
02141
02142 if (this->messaging_object ()->consolidate_node (q_data,
02143 message_block) == -1)
02144 {
02145 if (TAO_debug_level > 0)
02146 {
02147 ACE_ERROR ((LM_ERROR,
02148 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02149 ACE_TEXT ("error consolidating message from input buffer\n"),
02150 this->id () ));
02151 }
02152 return -1;
02153 }
02154
02155
02156 if (q_data->missing_data () == 0)
02157 {
02158 if (this->incoming_message_stack_.pop (q_data) == -1)
02159 {
02160 return -1;
02161 }
02162
02163 if (this->consolidate_enqueue_message (q_data) == -1)
02164 {
02165 return -1;
02166 }
02167 }
02168
02169 if (message_block.length () > 0
02170 && this->handle_input_parse_extra_messages (message_block) == -1)
02171 {
02172 return -1;
02173 }
02174
02175
02176 if (this->process_queue_head (rh) == -1)
02177 {
02178 return -1;
02179 }
02180 }
02181 else
02182 {
02183
02184
02185
02186
02187
02188
02189
02190
02191
02192 TAO_Queued_Data qd (&message_block,
02193 this->orb_core_->transport_message_buffer_allocator ());
02194
02195 size_t mesg_length = 0;
02196
02197 if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1
02198 || (qd.missing_data () == 0
02199 && mesg_length > message_block.length ()) )
02200 {
02201
02202 return -1;
02203 }
02204
02205
02206
02207 if (qd.missing_data () != 0 ||
02208 qd.more_fragments () ||
02209 qd.msg_type () == GIOP::Fragment)
02210 {
02211 if (qd.missing_data () == 0)
02212 {
02213
02214 TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd);
02215
02216 if (nqd == 0)
02217 {
02218 return -1;
02219 }
02220
02221
02222 char* end_mark = nqd->msg_block ()->rd_ptr ()
02223 + mesg_length;
02224 nqd->msg_block ()->wr_ptr (end_mark);
02225
02226
02227 message_block.rd_ptr (mesg_length);
02228
02229
02230 if (this->consolidate_enqueue_message (nqd) == -1)
02231 {
02232 return -1;
02233 }
02234
02235 if (message_block.length () > 0
02236 && this->handle_input_parse_extra_messages (message_block) == -1)
02237 {
02238 return -1;
02239 }
02240
02241
02242 if (this->process_queue_head (rh) == -1)
02243 {
02244 return -1;
02245 }
02246 }
02247 else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
02248 {
02249
02250
02251 if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
02252 qd.missing_data () > message_block.space ())
02253 {
02254
02255 if (ACE_CDR::grow (qd.msg_block (),
02256 message_block.length ()
02257 + qd.missing_data ()) == -1)
02258 {
02259 return -1;
02260 }
02261 }
02262
02263 TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd);
02264
02265 if (nqd == 0)
02266 {
02267 return -1;
02268 }
02269
02270
02271 message_block.rd_ptr (message_block.length());
02272
02273 this->incoming_message_stack_.push (nqd);
02274 }
02275 }
02276 else
02277 {
02278
02279
02280
02281
02282
02283
02284
02285
02286
02287 char * end_marker = message_block.rd_ptr ()
02288 + mesg_length;
02289
02290 if (message_block.length () > mesg_length)
02291 {
02292
02293
02294 char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02295
02296
02297
02298 message_block.rd_ptr (mesg_length);
02299
02300
02301
02302 if (this->handle_input_parse_extra_messages (message_block) == -1)
02303 {
02304 return -1;
02305 }
02306
02307
02308
02309
02310 message_block.wr_ptr (end_marker);
02311
02312
02313 message_block.rd_ptr (rd_ptr_stack_mesg);
02314 }
02315
02316
02317
02318
02319
02320
02321
02322
02323 if (this->incoming_message_queue_.queue_length () > 0)
02324 {
02325 if (TAO_debug_level > 0)
02326 {
02327 ACE_DEBUG ((LM_DEBUG,
02328 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02329 ACE_TEXT ("notify reactor\n"),
02330 this->id ()));
02331 }
02332
02333 int const retval = this->notify_reactor ();
02334
02335 if (retval == 1)
02336 {
02337
02338
02339 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02340 }
02341 else if (retval < 0)
02342 return -1;
02343 }
02344 else
02345 {
02346
02347
02348 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02349 }
02350
02351
02352 if (this->process_parsed_messages (&qd, rh) == -1)
02353 {
02354 return -1;
02355 }
02356
02357 message_block.rd_ptr (end_marker);
02358 }
02359 }
02360
02361
02362
02363 if (message_block.length () > 0)
02364 {
02365 if (this->partial_message_ == 0)
02366 {
02367 this->allocate_partial_message_block ();
02368 }
02369
02370 if (this->partial_message_ != 0 &&
02371 this->partial_message_->copy (message_block.rd_ptr (),
02372 message_block.length ()) == 0)
02373 {
02374 message_block.rd_ptr (message_block.length ());
02375 }
02376 else
02377 {
02378 return -1;
02379 }
02380 }
02381
02382 return 0;
02383 }
02384
02385
02386 int
02387 TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
02388 TAO_Resume_Handle &rh)
02389 {
02390 if (TAO_debug_level > 7)
02391 {
02392 ACE_DEBUG ((LM_DEBUG,
02393 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02394 ACE_TEXT ("entering (missing data == %d)\n"),
02395 this->id(), qd->missing_data ()));
02396 }
02397
02398 #if TAO_HAS_TRANSPORT_CURRENT == 1
02399
02400 if (this->stats_ != 0)
02401 this->stats_->messages_received (qd->msg_block ()->length ());
02402 #endif
02403
02404 switch (qd->msg_type ())
02405 {
02406 case GIOP::CloseConnection:
02407 {
02408 if (TAO_debug_level > 0)
02409 {
02410 ACE_DEBUG ((LM_DEBUG,
02411 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02412 ACE_TEXT ("received CloseConnection message - %m\n"),
02413 this->id()));
02414 }
02415
02416
02417
02418 return -1;
02419 }
02420 break;
02421 case GIOP::Request:
02422 case GIOP::LocateRequest:
02423 {
02424
02425
02426 rh.resume_handle ();
02427
02428 if (this->messaging_object ()->process_request_message (this, qd) == -1)
02429 {
02430
02431
02432 return -1;
02433 }
02434 }
02435 break;
02436 case GIOP::Reply:
02437 case GIOP::LocateReply:
02438 {
02439 rh.resume_handle ();
02440
02441 TAO_Pluggable_Reply_Params params (this);
02442
02443 if (this->messaging_object ()->process_reply_message (params, qd) == -1)
02444 {
02445 if (TAO_debug_level > 0)
02446 {
02447 ACE_ERROR ((LM_ERROR,
02448 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02449 ACE_TEXT ("error in process_reply_message - %m\n"),
02450 this->id ()));
02451 }
02452
02453 return -1;
02454 }
02455
02456 }
02457 break;
02458 case GIOP::CancelRequest:
02459 {
02460
02461
02462
02463 if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02464 {
02465 if (TAO_debug_level > 0)
02466 {
02467 ACE_ERROR ((LM_ERROR,
02468 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02469 ACE_TEXT ("error processing CancelRequest\n"),
02470 this->id ()));
02471 }
02472 }
02473
02474
02475
02476
02477
02478
02479
02480 }
02481 break;
02482 case GIOP::MessageError:
02483 {
02484 if (TAO_debug_level > 0)
02485 {
02486 ACE_ERROR ((LM_ERROR,
02487 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02488 ACE_TEXT ("received MessageError, closing connection\n"),
02489 this->id ()));
02490 }
02491 return -1;
02492 }
02493 break;
02494 case GIOP::Fragment:
02495 {
02496
02497 }
02498 break;
02499 }
02500
02501
02502 return 0;
02503 }
02504
02505 int
02506 TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
02507 {
02508 if (TAO_debug_level > 3)
02509 {
02510 ACE_DEBUG ((LM_DEBUG,
02511 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02512 this->id (), this->incoming_message_queue_.queue_length () ));
02513 }
02514
02515
02516 if (this->incoming_message_queue_.queue_length () > 0)
02517 {
02518
02519 TAO_Queued_Data *qd =
02520 this->incoming_message_queue_.dequeue_head ();
02521
02522 if (TAO_debug_level > 3)
02523 {
02524 ACE_DEBUG ((LM_DEBUG,
02525 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02526 ACE_TEXT ("the size of the queue is [%d]\n"),
02527 this->id (),
02528 this->incoming_message_queue_.queue_length()));
02529 }
02530
02531
02532 if (this->incoming_message_queue_.queue_length () > 0)
02533 {
02534 if (TAO_debug_level > 0)
02535 {
02536 ACE_DEBUG ((LM_DEBUG,
02537 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02538 ACE_TEXT ("notify reactor\n"),
02539 this->id ()));
02540 }
02541
02542 int const retval = this->notify_reactor ();
02543
02544 if (retval == 1)
02545 {
02546
02547
02548 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02549 }
02550 else if (retval < 0)
02551 return -1;
02552 }
02553 else
02554 {
02555
02556
02557 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02558 }
02559
02560
02561 int const retval = this->process_parsed_messages (qd, rh);
02562
02563
02564 TAO_Queued_Data::release (qd);
02565
02566 return retval;
02567 }
02568
02569 return 1;
02570 }
02571
02572 int
02573 TAO_Transport::notify_reactor (void)
02574 {
02575 if (!this->ws_->is_registered ())
02576 {
02577 return 0;
02578 }
02579
02580 ACE_Event_Handler *eh = this->event_handler_i ();
02581
02582
02583 ACE_Reactor *reactor = this->orb_core ()->reactor ();
02584
02585 if (TAO_debug_level > 0)
02586 {
02587 ACE_DEBUG ((LM_DEBUG,
02588 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02589 ACE_TEXT ("notify to Reactor\n"),
02590 this->id ()));
02591 }
02592
02593
02594 int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
02595
02596 if (retval < 0 && TAO_debug_level > 2)
02597 {
02598
02599
02600 ACE_ERROR ((LM_ERROR,
02601 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02602 ACE_TEXT ("notify to the reactor failed..\n"),
02603 this->id ()));
02604 }
02605
02606 return 1;
02607 }
02608
02609 TAO::Transport_Cache_Manager &
02610 TAO_Transport::transport_cache_manager (void)
02611 {
02612 return this->orb_core_->lane_resources ().transport_cache ();
02613 }
02614
02615 void
02616 TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02617 {
02618 if (this->char_translator_)
02619 {
02620 this->char_translator_->assign (inp);
02621 this->char_translator_->assign (outp);
02622 }
02623 if (this->wchar_translator_)
02624 {
02625 this->wchar_translator_->assign (inp);
02626 this->wchar_translator_->assign (outp);
02627 }
02628 }
02629
02630 void
02631 TAO_Transport::clear_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02632 {
02633 if (inp)
02634 {
02635 inp->char_translator (0);
02636 inp->wchar_translator (0);
02637 }
02638 if (outp)
02639 {
02640 outp->char_translator (0);
02641 outp->wchar_translator (0);
02642 }
02643 }
02644
02645 ACE_Event_Handler::Reference_Count
02646 TAO_Transport::add_reference (void)
02647 {
02648 return this->event_handler_i ()->add_reference ();
02649 }
02650
02651 ACE_Event_Handler::Reference_Count
02652 TAO_Transport::remove_reference (void)
02653 {
02654 return this->event_handler_i ()->remove_reference ();
02655 }
02656
02657 TAO_OutputCDR &
02658 TAO_Transport::out_stream (void)
02659 {
02660 return this->messaging_object ()->out_stream ();
02661 }
02662
02663 TAO_SYNCH_MUTEX &
02664 TAO_Transport::output_cdr_lock (void)
02665 {
02666 return this->output_cdr_mutex_;
02667 }
02668
02669 void
02670 TAO_Transport::messaging_init (TAO_GIOP_Message_Version const &version)
02671 {
02672 this->messaging_object ()->init (version.major, version.minor);
02673 }
02674
02675 void
02676 TAO_Transport::pre_close (void)
02677 {
02678
02679
02680
02681
02682
02683 this->is_connected_ = false;
02684 this->transport_cache_manager ().mark_connected (this->cache_map_entry_,
02685 false);
02686 this->purge_entry ();
02687 {
02688 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
02689 this->cleanup_queue_i ();
02690 }
02691 }
02692
02693 bool
02694 TAO_Transport::post_open (size_t id)
02695 {
02696 if (TAO_debug_level > 9)
02697 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport::post_open, ")
02698 ACE_TEXT ("tport id changed from %d to %d\n"), this->id_, id));
02699 this->id_ = id;
02700
02701
02702
02703 if (!this->queue_is_empty_i ())
02704 {
02705
02706
02707
02708 if (this->wait_strategy ()->register_handler () == 0)
02709 {
02710 if (this->flush_in_post_open_)
02711 {
02712 TAO_Flushing_Strategy *flushing_strategy =
02713 this->orb_core ()->flushing_strategy ();
02714
02715 if (flushing_strategy == 0)
02716 throw CORBA::INTERNAL ();
02717
02718 this->flush_in_post_open_ = false;
02719 (void) flushing_strategy->schedule_output (this);
02720 }
02721 }
02722 else
02723 {
02724
02725
02726
02727
02728 (void) this->purge_entry ();
02729
02730
02731 (void) this->close_connection ();
02732
02733 if (TAO_debug_level > 0)
02734 {
02735 ACE_ERROR ((LM_ERROR,
02736 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open , ")
02737 ACE_TEXT ("could not register the transport ")
02738 ACE_TEXT ("in the reactor.\n"),
02739 this->id ()));
02740 }
02741
02742 return false;
02743 }
02744 }
02745
02746 {
02747 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false);
02748 this->is_connected_ = true;
02749 }
02750
02751 if (TAO_debug_level > 9 && !this->cache_map_entry_)
02752 {
02753 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open")
02754 ACE_TEXT (", cache_map_entry_ is 0\n"), this->id_));
02755 }
02756
02757 this->transport_cache_manager ().mark_connected (this->cache_map_entry_,
02758 true);
02759
02760
02761 this->transport_cache_manager ().set_entry_state (
02762 this->cache_map_entry_,
02763 TAO::ENTRY_IDLE_AND_PURGABLE);
02764
02765 return true;
02766 }
02767
02768 void
02769 TAO_Transport::allocate_partial_message_block (void)
02770 {
02771 if (this->partial_message_ == 0)
02772 {
02773
02774
02775 size_t const partial_message_size =
02776 this->messaging_object ()->header_length ();
02777
02778
02779
02780 ACE_NEW (this->partial_message_,
02781 ACE_Message_Block (partial_message_size));
02782 }
02783 }
02784
02785 void
02786 TAO_Transport::set_bidir_context_info (TAO_Operation_Details &)
02787 {
02788 }
02789
02790 ACE_Time_Value const *
02791 TAO_Transport::io_timeout(
02792 TAO::Transport::Drain_Constraints const & dc) const
02793 {
02794 if (dc.block_on_io())
02795 {
02796 return dc.timeout();
02797 }
02798 if (this->wait_strategy()->can_process_upcalls())
02799 {
02800 return 0;
02801 }
02802 return dc.timeout();
02803 }
02804
02805 bool
02806 TAO_Transport::using_blocking_io_for_synch_messages() const
02807 {
02808 if (this->wait_strategy()->can_process_upcalls())
02809 {
02810 return false;
02811 }
02812 return true;
02813 }
02814
02815 bool
02816 TAO_Transport::using_blocking_io_for_asynch_messages() const
02817 {
02818 return false;
02819 }
02820
02821
02822
02823
02824
02825
02826
02827
02828 TAO_END_VERSIONED_NAMESPACE_DECL