00001
00002
00003 #include "tao/Transport.h"
00004
00005 #include "tao/LF_Follower.h"
00006 #include "tao/Leader_Follower.h"
00007 #include "tao/Client_Strategy_Factory.h"
00008 #include "tao/Wait_Strategy.h"
00009 #include "tao/Transport_Mux_Strategy.h"
00010 #include "tao/Stub.h"
00011 #include "tao/Transport_Queueing_Strategies.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/Pluggable_Messaging.h"
00014 #include "tao/Synch_Queued_Message.h"
00015 #include "tao/Asynch_Queued_Message.h"
00016 #include "tao/Flushing_Strategy.h"
00017 #include "tao/Thread_Lane_Resources.h"
00018 #include "tao/Resume_Handle.h"
00019 #include "tao/Codeset_Manager.h"
00020 #include "tao/Codeset_Translator_Base.h"
00021 #include "tao/debug.h"
00022 #include "tao/CDR.h"
00023 #include "tao/ORB_Core.h"
00024 #include "tao/MMAP_Allocator.h"
00025 #include "tao/SystemException.h"
00026
00027 #include "ace/OS_NS_sys_time.h"
00028 #include "ace/OS_NS_stdio.h"
00029 #include "ace/Reactor.h"
00030 #include "ace/os_include/sys/os_uio.h"
00031 #include "ace/High_Res_Timer.h"
00032 #include "ace/CORBA_macros.h"
00033
00034
00035
00036
00037
00038
00039
00040 #if !defined (__ACE_INLINE__)
00041 # include "tao/Transport.inl"
00042 #endif
00043
00044
00045 ACE_RCSID (tao,
00046 Transport,
00047 "$Id: Transport.cpp 79388 2007-08-17 16:05:00Z wilsond $")
00048
00049
00050
00051
00052 static void
00053 dump_iov (iovec *iov, int iovcnt, size_t id,
00054 size_t current_transfer,
00055 const char *location)
00056 {
00057 ACE_Guard <ACE_Log_Msg> log_guard (*ACE_Log_Msg::instance ());
00058
00059 ACE_DEBUG ((LM_DEBUG,
00060 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00061 ACE_TEXT ("sending %d buffers\n"),
00062 id, ACE_TEXT_CHAR_TO_TCHAR (location), iovcnt));
00063
00064 for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
00065 {
00066 size_t iov_len = iov[i].iov_len;
00067
00068
00069 if (current_transfer < iov_len)
00070 {
00071 iov_len = current_transfer;
00072 }
00073
00074 ACE_DEBUG ((LM_DEBUG,
00075 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00076 ACE_TEXT ("buffer %d/%d has %d bytes\n"),
00077 id, ACE_TEXT_CHAR_TO_TCHAR(location),
00078 i, iovcnt,
00079 iov_len));
00080
00081 size_t len;
00082
00083 for (size_t offset = 0; offset < iov_len; offset += len)
00084 {
00085 ACE_TCHAR header[1024];
00086 ACE_OS::sprintf (header,
00087 ACE_TEXT("TAO - ")
00088 ACE_TEXT("Transport[")
00089 ACE_SIZE_T_FORMAT_SPECIFIER
00090 ACE_TEXT("]::%s")
00091 ACE_TEXT(" (")
00092 ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT("/")
00093 ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT(")"),
00094 id, location, offset, iov_len);
00095
00096 len = iov_len - offset;
00097
00098 if (len > 512)
00099 {
00100 len = 512;
00101 }
00102
00103 ACE_HEX_DUMP ((LM_DEBUG,
00104 static_cast<char*> (iov[i].iov_base) + offset,
00105 len,
00106 header));
00107 }
00108 current_transfer -= iov_len;
00109 }
00110
00111 ACE_DEBUG ((LM_DEBUG,
00112 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00113 ACE_TEXT ("end of data\n"),
00114 id, ACE_TEXT_CHAR_TO_TCHAR(location)));
00115 }
00116
00117 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00118
00119 #if TAO_HAS_TRANSPORT_CURRENT == 1
00120 TAO::Transport::Stats::~Stats ()
00121 {
00122
00123 }
00124 #endif
00125
00126 TAO_Transport::TAO_Transport (CORBA::ULong tag,
00127 TAO_ORB_Core *orb_core)
00128 : tag_ (tag)
00129 , orb_core_ (orb_core)
00130 , cache_map_entry_ (0)
00131 , bidirectional_flag_ (-1)
00132 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00133 , head_ (0)
00134 , tail_ (0)
00135 , incoming_message_queue_ (orb_core)
00136 , current_deadline_ (ACE_Time_Value::zero)
00137 , flush_timer_id_ (-1)
00138 , transport_timer_ (this)
00139 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00140 , id_ ((size_t) this)
00141 , purging_order_ (0)
00142 , recv_buffer_size_ (0)
00143 , sent_byte_count_ (0)
00144 , is_connected_ (false)
00145 , char_translator_ (0)
00146 , wchar_translator_ (0)
00147 , tcs_set_ (0)
00148 , first_request_ (1)
00149 , partial_message_ (0)
00150 #if TAO_HAS_SENDFILE == 1
00151
00152
00153
00154
00155
00156 , mmap_allocator_ (
00157 dynamic_cast<TAO_MMAP_Allocator *> (
00158 orb_core->output_cdr_buffer_allocator ()))
00159 #endif
00160 {
00161 TAO_Client_Strategy_Factory *cf =
00162 this->orb_core_->client_factory ();
00163
00164
00165 this->ws_ = cf->create_wait_strategy (this);
00166
00167
00168 this->tms_ = cf->create_transport_mux_strategy (this);
00169
00170 #if TAO_HAS_TRANSPORT_CURRENT == 1
00171
00172 ACE_NEW_THROW_EX (this->stats_,
00173 TAO::Transport::Stats,
00174 CORBA::NO_MEMORY ());
00175 #endif
00176
00177
00178
00179
00180
00181
00182
00183
00184 }
00185
00186 TAO_Transport::~TAO_Transport (void)
00187 {
00188 delete this->ws_;
00189
00190 delete this->tms_;
00191
00192 delete this->handler_lock_;
00193
00194 if (!this->is_connected_)
00195 {
00196
00197
00198 this->cleanup_queue_i();
00199
00200
00201 this->purge_entry();
00202 }
00203
00204
00205
00206 ACE_Message_Block::release (this->partial_message_);
00207
00208
00209
00210
00211
00212
00213 ACE_ASSERT (this->head_ == 0);
00214 ACE_ASSERT (this->cache_map_entry_ == 0);
00215
00216 #if TAO_HAS_TRANSPORT_CURRENT == 1
00217 delete this->stats_;
00218 #endif
00219
00220
00221
00222
00223
00224
00225
00226
00227 }
00228
00229 void
00230 TAO_Transport::provide_handler (TAO::Connection_Handler_Set &handlers)
00231 {
00232 (void) this->add_reference ();
00233
00234 handlers.insert (this->connection_handler_i ());
00235 }
00236
00237 bool
00238 TAO_Transport::provide_blockable_handler (TAO::Connection_Handler_Set &h)
00239 {
00240 if (this->ws_->non_blocking () ||
00241 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00242 return false;
00243
00244 (void) this->add_reference ();
00245
00246 h.insert (this->connection_handler_i ());
00247
00248 return true;
00249 }
00250
00251 bool
00252 TAO_Transport::idle_after_send (void)
00253 {
00254 return this->tms ()->idle_after_send ();
00255 }
00256
00257 bool
00258 TAO_Transport::idle_after_reply (void)
00259 {
00260 return this->tms ()->idle_after_reply ();
00261 }
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273 int
00274 TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
00275 {
00276 ACE_NOTSUP_RETURN (-1);
00277 }
00278
00279 int
00280 TAO_Transport::send_message_shared (TAO_Stub *stub,
00281 int message_semantics,
00282 const ACE_Message_Block *message_block,
00283 ACE_Time_Value *max_wait_time)
00284 {
00285 int result = 0;
00286
00287 {
00288 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00289
00290 result =
00291 this->send_message_shared_i (stub, message_semantics,
00292 message_block, max_wait_time);
00293 }
00294
00295 if (result == -1)
00296 {
00297 this->close_connection ();
00298 }
00299
00300 return result;
00301 }
00302
00303
00304
00305 bool
00306 TAO_Transport::post_connect_hook (void)
00307 {
00308 return true;
00309 }
00310
00311 void
00312 TAO_Transport::close_connection (void)
00313 {
00314 this->connection_handler_i ()->close_connection ();
00315 }
00316
00317 int
00318 TAO_Transport::register_handler (void)
00319 {
00320 if (TAO_debug_level > 4)
00321 {
00322 ACE_DEBUG ((LM_DEBUG,
00323 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00324 this->id ()));
00325 }
00326
00327 ACE_Reactor * const r = this->orb_core_->reactor ();
00328
00329
00330
00331 ACE_GUARD_RETURN (ACE_Lock,
00332 ace_mon,
00333 *this->handler_lock_,
00334 false);
00335
00336 if (r == this->event_handler_i ()->reactor ())
00337 {
00338 return 0;
00339 }
00340
00341
00342
00343
00344 this->ws_->is_registered (true);
00345
00346
00347 return r->register_handler (this->event_handler_i (),
00348 ACE_Event_Handler::READ_MASK);
00349 }
00350
00351 #if TAO_HAS_SENDFILE == 1
00352 ssize_t
00353 TAO_Transport::sendfile (TAO_MMAP_Allocator * ,
00354 iovec * iov,
00355 int iovcnt,
00356 size_t &bytes_transferred,
00357 ACE_Time_Value const * timeout)
00358 {
00359
00360
00361
00362
00363
00364
00365
00366 return this->send (iov, iovcnt, bytes_transferred, timeout);
00367 }
00368 #endif
00369
00370 int
00371 TAO_Transport::generate_locate_request (
00372 TAO_Target_Specification &spec,
00373 TAO_Operation_Details &opdetails,
00374 TAO_OutputCDR &output)
00375 {
00376 if (this->messaging_object ()->generate_locate_request_header (opdetails,
00377 spec,
00378 output) == -1)
00379 {
00380 if (TAO_debug_level > 0)
00381 {
00382 ACE_DEBUG ((LM_DEBUG,
00383 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00384 ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00385 this->id ()));
00386 }
00387
00388 return -1;
00389 }
00390
00391 return 0;
00392 }
00393
00394 int
00395 TAO_Transport::generate_request_header (
00396 TAO_Operation_Details &opdetails,
00397 TAO_Target_Specification &spec,
00398 TAO_OutputCDR &output)
00399 {
00400
00401
00402 if (this->first_request_)
00403 {
00404 TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00405 if (csm)
00406 csm->generate_service_context (opdetails,*this);
00407 }
00408
00409 if (this->messaging_object ()->generate_request_header (opdetails,
00410 spec,
00411 output) == -1)
00412 {
00413 if (TAO_debug_level > 0)
00414 {
00415 ACE_DEBUG ((LM_DEBUG,
00416 ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00417 ACE_TEXT ("error while marshalling the Request header\n"),
00418 this->id()));
00419 }
00420
00421 return -1;
00422 }
00423
00424 return 0;
00425 }
00426
00427
00428
00429 int
00430 TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
00431 {
00432
00433 this->purge_entry ();
00434
00435
00436 return this->transport_cache_manager ().cache_transport (desc,
00437 this);
00438 }
00439
00440 int
00441 TAO_Transport::purge_entry (void)
00442 {
00443 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00444 }
00445
00446 int
00447 TAO_Transport::make_idle (void)
00448 {
00449 if (TAO_debug_level > 3)
00450 {
00451 ACE_DEBUG ((LM_DEBUG,
00452 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00453 this->id ()));
00454 }
00455
00456 return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00457 }
00458
00459 int
00460 TAO_Transport::update_transport (void)
00461 {
00462 return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00463 }
00464
00465
00466
00467
00468
00469
00470 int
00471 TAO_Transport::handle_output (void)
00472 {
00473 if (TAO_debug_level > 3)
00474 {
00475 ACE_DEBUG ((LM_DEBUG,
00476 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00477 this->id ()));
00478 }
00479
00480
00481
00482
00483 int const retval = this->drain_queue ();
00484
00485 if (TAO_debug_level > 3)
00486 {
00487 ACE_DEBUG ((LM_DEBUG,
00488 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00489 ACE_TEXT ("drain_queue returns %d/%d\n"),
00490 this->id (),
00491 retval, errno));
00492 }
00493
00494
00495 return retval;
00496 }
00497
00498 int
00499 TAO_Transport::format_queue_message (TAO_OutputCDR &stream,
00500 ACE_Time_Value *max_wait_time)
00501 {
00502 if (this->messaging_object ()->format_message (stream) != 0)
00503 return -1;
00504
00505 return this->queue_message_i (stream.begin (), max_wait_time);
00506 }
00507
00508 int
00509 TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
00510 size_t &bytes_transferred,
00511 ACE_Time_Value *max_wait_time)
00512 {
00513 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00514
00515 return this->send_message_block_chain_i (mb,
00516 bytes_transferred,
00517 max_wait_time);
00518 }
00519
00520 int
00521 TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
00522 size_t &bytes_transferred,
00523 ACE_Time_Value *)
00524 {
00525 size_t const total_length = mb->total_length ();
00526
00527
00528
00529 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00530
00531 synch_message.push_back (this->head_, this->tail_);
00532
00533 int const n = this->drain_queue_i ();
00534
00535 if (n == -1)
00536 {
00537 synch_message.remove_from_list (this->head_, this->tail_);
00538 return -1;
00539 }
00540 else if (n == 1)
00541 {
00542 bytes_transferred = total_length;
00543 return 1;
00544 }
00545
00546
00547 synch_message.remove_from_list (this->head_, this->tail_);
00548
00549 bytes_transferred = total_length - synch_message.message_length ();
00550
00551 return 0;
00552 }
00553
00554 int
00555 TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
00556 ACE_Time_Value *max_wait_time)
00557 {
00558
00559
00560 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00561 size_t const message_length = synch_message.message_length ();
00562
00563 synch_message.push_back (this->head_, this->tail_);
00564
00565 int const n = this->send_synch_message_helper_i (synch_message,
00566 0 );
00567 if (n == -1 || n == 1)
00568 {
00569 return n;
00570 }
00571
00572
00573
00574 TAO_Flushing_Strategy *flushing_strategy =
00575 this->orb_core ()->flushing_strategy ();
00576 int result = flushing_strategy->schedule_output (this);
00577 if (result == -1)
00578 {
00579 synch_message.remove_from_list (this->head_, this->tail_);
00580 if (TAO_debug_level > 0)
00581 {
00582 ACE_ERROR ((LM_ERROR,
00583 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
00584 ACE_TEXT ("send_synchronous_message_i, ")
00585 ACE_TEXT ("error while scheduling flush - %m\n"),
00586 this->id ()));
00587 }
00588 return -1;
00589 }
00590
00591
00592
00593
00594
00595
00596 {
00597 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00598 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00599 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00600
00601 result = flushing_strategy->flush_message (this,
00602 &synch_message,
00603 max_wait_time);
00604 }
00605
00606 if (result == -1)
00607 {
00608 synch_message.remove_from_list (this->head_, this->tail_);
00609
00610 if (errno == ETIME)
00611 {
00612
00613 if (message_length != synch_message.message_length ())
00614 {
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628 TAO_Queued_Message *queued_message = 0;
00629 ACE_NEW_RETURN (queued_message,
00630 TAO_Asynch_Queued_Message (
00631 synch_message.current_block (),
00632 this->orb_core_,
00633 0,
00634 0,
00635 true),
00636 -1);
00637 queued_message->push_front (this->head_, this->tail_);
00638 }
00639 }
00640
00641 if (TAO_debug_level > 0)
00642 {
00643 ACE_ERROR ((LM_ERROR,
00644 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00645 ACE_TEXT ("error while flushing message - %m\n"),
00646 this->id ()));
00647 }
00648
00649 return -1;
00650 }
00651
00652 return 1;
00653 }
00654
00655
00656 int
00657 TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
00658 ACE_Time_Value *max_wait_time)
00659 {
00660
00661 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00662
00663 synch_message.push_back (this->head_,
00664 this->tail_);
00665
00666 int const n =
00667 this->send_synch_message_helper_i (synch_message,
00668 max_wait_time);
00669
00670 if (n == -1 || n == 1)
00671 {
00672 return n;
00673 }
00674
00675 if (TAO_debug_level > 3)
00676 {
00677 ACE_DEBUG ((LM_DEBUG,
00678 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00679 ACE_TEXT ("preparing to add to queue before leaving\n"),
00680 this->id ()));
00681 }
00682
00683
00684
00685 synch_message.remove_from_list (this->head_, this->tail_);
00686
00687
00688 TAO_Queued_Message *msg =
00689 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00690
00691
00692 msg->push_back (this->head_, this->tail_);
00693
00694 TAO_Flushing_Strategy *flushing_strategy =
00695 this->orb_core ()->flushing_strategy ();
00696
00697 int const result = flushing_strategy->schedule_output (this);
00698
00699 if (result == -1)
00700 {
00701 if (TAO_debug_level > 5)
00702 {
00703 ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00704 "message_i, dequeuing msg due to schedule_output "
00705 "failure\n", this->id ()));
00706 }
00707 msg->remove_from_list (this->head_, this->tail_);
00708 msg->destroy ();
00709 }
00710 else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00711 {
00712 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00713 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00714 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00715 (void) flushing_strategy->flush_message(this, msg, 0);
00716 }
00717
00718 return 1;
00719 }
00720
00721 int
00722 TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
00723 ACE_Time_Value * )
00724 {
00725
00726 int const n = this->drain_queue_i ();
00727
00728 if (n == -1)
00729 {
00730 synch_message.remove_from_list (this->head_, this->tail_);
00731 return -1;
00732 }
00733 else if (n == 1)
00734 {
00735 return 1;
00736 }
00737
00738 if (synch_message.all_data_sent ())
00739 {
00740 return 1;
00741 }
00742
00743 return 0;
00744 }
00745
00746 bool
00747 TAO_Transport::queue_is_empty_i (void)
00748 {
00749 return (this->head_ == 0);
00750 }
00751
00752
00753 int
00754 TAO_Transport::schedule_output_i (void)
00755 {
00756 ACE_Event_Handler * const eh = this->event_handler_i ();
00757 ACE_Reactor * const reactor = eh->reactor ();
00758
00759 if (reactor == 0)
00760 return -1;
00761
00762
00763
00764
00765 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00766 if (found != eh)
00767 {
00768 if(TAO_debug_level > 3)
00769 {
00770 ACE_DEBUG ((LM_DEBUG,
00771 "TAO (%P|%t) - Transport[%d]::schedule_output_i "
00772 "event handler not found in reactor, returning -1\n",
00773 this->id ()));
00774 }
00775 if (found)
00776 {
00777 found->remove_reference ();
00778 }
00779 return -1;
00780 }
00781 found->remove_reference ();
00782
00783 if (TAO_debug_level > 3)
00784 {
00785 ACE_DEBUG ((LM_DEBUG,
00786 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00787 this->id ()));
00788 }
00789
00790 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00791 }
00792
00793 int
00794 TAO_Transport::cancel_output_i (void)
00795 {
00796 ACE_Event_Handler * const eh = this->event_handler_i ();
00797 ACE_Reactor *const reactor = eh->reactor ();
00798
00799 if (TAO_debug_level > 3)
00800 {
00801 ACE_DEBUG ((LM_DEBUG,
00802 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00803 this->id ()));
00804 }
00805
00806 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00807 }
00808
00809 int
00810 TAO_Transport::handle_timeout (const ACE_Time_Value & ,
00811 const void *act)
00812 {
00813 if (TAO_debug_level > 6)
00814 {
00815 ACE_DEBUG ((LM_DEBUG,
00816 ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00817 ACE_TEXT ("timer expired\n"),
00818 this->id ()));
00819 }
00820
00821
00822 if (act != &this->current_deadline_)
00823 {
00824 return -1;
00825 }
00826
00827 if (this->flush_timer_pending ())
00828 {
00829
00830
00831 this->reset_flush_timer ();
00832
00833 TAO_Flushing_Strategy *flushing_strategy =
00834 this->orb_core ()->flushing_strategy ();
00835 int const result = flushing_strategy->schedule_output (this);
00836 if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00837 {
00838 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00839 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00840 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00841 (void) flushing_strategy->flush_transport (this);
00842 }
00843 }
00844
00845 return 0;
00846 }
00847
00848 int
00849 TAO_Transport::drain_queue (void)
00850 {
00851 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00852 int const retval = this->drain_queue_i ();
00853
00854 if (retval == 1)
00855 {
00856
00857
00858 TAO_Flushing_Strategy *flushing_strategy =
00859 this->orb_core ()->flushing_strategy ();
00860
00861 flushing_strategy->cancel_output (this);
00862
00863 return 0;
00864 }
00865
00866 return retval;
00867 }
00868
00869 int
00870 TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
00871 {
00872 size_t byte_count = 0;
00873
00874
00875 ssize_t retval = -1;
00876
00877 #if TAO_HAS_SENDFILE == 1
00878 if (this->mmap_allocator_)
00879 retval = this->sendfile (this->mmap_allocator_,
00880 iov,
00881 iovcnt,
00882 byte_count);
00883 else
00884 #endif
00885 retval = this->send (iov, iovcnt, byte_count);
00886
00887 if (TAO_debug_level == 5)
00888 {
00889 dump_iov (iov, iovcnt, this->id (),
00890 byte_count, "drain_queue_helper");
00891 }
00892
00893
00894
00895
00896 this->cleanup_queue (byte_count);
00897 iovcnt = 0;
00898
00899 if (retval == 0)
00900 {
00901 if (TAO_debug_level > 4)
00902 {
00903 ACE_DEBUG ((LM_DEBUG,
00904 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00905 ACE_TEXT ("send() returns 0\n"),
00906 this->id ()));
00907 }
00908 return -1;
00909 }
00910 else if (retval == -1)
00911 {
00912 if (TAO_debug_level > 4)
00913 {
00914 ACE_DEBUG ((LM_DEBUG,
00915 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00916 ACE_TEXT ("error during %p\n"),
00917 this->id (), ACE_TEXT ("send()")));
00918 }
00919
00920 if (errno == EWOULDBLOCK || errno == EAGAIN)
00921 {
00922 return 0;
00923 }
00924
00925 return -1;
00926 }
00927
00928
00929
00930
00931
00932 this->sent_byte_count_ += byte_count;
00933
00934 if (TAO_debug_level > 4)
00935 {
00936 ACE_DEBUG ((LM_DEBUG,
00937 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00938 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00939 this->id(), byte_count, (this->head_ == 0)));
00940 }
00941
00942 return 1;
00943 }
00944
00945 int
00946 TAO_Transport::drain_queue_i (void)
00947 {
00948
00949
00950
00951 int iovcnt = 0;
00952 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00953 iovec iov[ACE_IOV_MAX] = { 0 , 0 };
00954 #else
00955 iovec iov[ACE_IOV_MAX];
00956 #endif
00957
00958
00959 TAO_Queued_Message *i = this->head_;
00960
00961
00962
00963 this->sent_byte_count_ = 0;
00964
00965
00966
00967
00968 ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
00969
00970 while (i != 0)
00971 {
00972 if (i->is_expired (now))
00973 {
00974 if (TAO_debug_level > 3)
00975 {
00976 ACE_DEBUG ((LM_DEBUG,
00977 ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ")
00978 ACE_TEXT ("Discarding expired queued message.\n"),
00979 this->id ()));
00980 }
00981 TAO_Queued_Message *next = i->next ();
00982 i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
00983 this->orb_core_->leader_follower ());
00984 i->remove_from_list (this->head_, this->tail_);
00985 i->destroy ();
00986 i = next;
00987 continue;
00988 }
00989
00990 i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
00991
00992
00993
00994
00995 if (iovcnt == ACE_IOV_MAX)
00996 {
00997 int const retval =
00998 this->drain_queue_helper (iovcnt, iov);
00999
01000 now = ACE_High_Res_Timer::gettimeofday_hr ();
01001
01002 if (TAO_debug_level > 4)
01003 {
01004 ACE_DEBUG ((LM_DEBUG,
01005 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01006 ACE_TEXT ("helper retval = %d\n"),
01007 this->id (), retval));
01008 }
01009
01010 if (retval != 1)
01011 {
01012 return retval;
01013 }
01014
01015 i = this->head_;
01016 continue;
01017 }
01018
01019
01020 i = i->next ();
01021 }
01022
01023 if (iovcnt != 0)
01024 {
01025 int const retval = this->drain_queue_helper (iovcnt, iov);
01026
01027 if (TAO_debug_level > 4)
01028 {
01029 ACE_DEBUG ((LM_DEBUG,
01030 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01031 ACE_TEXT ("helper retval = %d\n"),
01032 this->id (), retval));
01033 }
01034
01035 if (retval != 1)
01036 {
01037 return retval;
01038 }
01039 }
01040
01041 if (this->head_ == 0)
01042 {
01043 if (this->flush_timer_pending ())
01044 {
01045 ACE_Event_Handler *eh = this->event_handler_i ();
01046 ACE_Reactor * const reactor = eh->reactor ();
01047 reactor->cancel_timer (this->flush_timer_id_);
01048 this->reset_flush_timer ();
01049 }
01050
01051 return 1;
01052 }
01053
01054 return 0;
01055 }
01056
01057 void
01058 TAO_Transport::cleanup_queue_i ()
01059 {
01060 if (TAO_debug_level > 4)
01061 {
01062 ACE_DEBUG ((LM_DEBUG,
01063 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01064 ACE_TEXT ("cleaning up complete queue\n"),
01065 this->id ()));
01066 }
01067
01068 size_t byte_count = 0;
01069 int msg_count = 0;
01070
01071
01072 while (this->head_ != 0)
01073 {
01074 TAO_Queued_Message *i = this->head_;
01075
01076 if (TAO_debug_level > 4)
01077 {
01078 byte_count += i->message_length();
01079 ++msg_count;
01080 }
01081
01082
01083 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01084 this->orb_core_->leader_follower ());
01085
01086 i->remove_from_list (this->head_, this->tail_);
01087
01088 i->destroy ();
01089 }
01090
01091 if (TAO_debug_level > 4)
01092 {
01093 ACE_DEBUG ((LM_DEBUG,
01094 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01095 ACE_TEXT ("discarded %d messages, %u bytes.\n"),
01096 this->id (), msg_count, byte_count));
01097 }
01098 }
01099
01100 void
01101 TAO_Transport::cleanup_queue (size_t byte_count)
01102 {
01103 while (this->head_ != 0 && byte_count > 0)
01104 {
01105 TAO_Queued_Message *i = this->head_;
01106
01107 if (TAO_debug_level > 4)
01108 {
01109 ACE_DEBUG ((LM_DEBUG,
01110 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01111 ACE_TEXT ("byte_count = %d\n"),
01112 this->id (), byte_count));
01113 }
01114
01115
01116 i->bytes_transferred (byte_count);
01117
01118 if (TAO_debug_level > 4)
01119 {
01120 ACE_DEBUG ((LM_DEBUG,
01121 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01122 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01123 this->id (), byte_count, i->all_data_sent (),
01124 i->message_length ()));
01125 }
01126
01127
01128
01129 if (i->all_data_sent ())
01130 {
01131 i->remove_from_list (this->head_, this->tail_);
01132 i->destroy ();
01133 }
01134 }
01135 }
01136
01137 int
01138 TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
01139 {
01140
01141 size_t msg_count = 0;
01142 size_t total_bytes = 0;
01143
01144 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01145 {
01146 ++msg_count;
01147 total_bytes += i->message_length ();
01148 }
01149
01150 bool set_timer = false;
01151 ACE_Time_Value new_deadline;
01152
01153 TAO::Transport_Queueing_Strategy *queue_strategy =
01154 stub->transport_queueing_strategy ();
01155
01156 bool constraints_reached = true;
01157
01158 if (queue_strategy)
01159 {
01160 constraints_reached =
01161 queue_strategy->buffering_constraints_reached (stub,
01162 msg_count,
01163 total_bytes,
01164 must_flush,
01165 this->current_deadline_,
01166 set_timer,
01167 new_deadline);
01168 }
01169 else
01170 {
01171 must_flush = false;
01172 }
01173
01174
01175 if (set_timer)
01176 {
01177 ACE_Event_Handler *eh = this->event_handler_i ();
01178 ACE_Reactor * const reactor = eh->reactor ();
01179 this->current_deadline_ = new_deadline;
01180 ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
01181
01182 if (this->flush_timer_pending ())
01183 {
01184 reactor->cancel_timer (this->flush_timer_id_);
01185 }
01186
01187 this->flush_timer_id_ =
01188 reactor->schedule_timer (&this->transport_timer_,
01189 &this->current_deadline_,
01190 delay);
01191 }
01192
01193 return constraints_reached;
01194 }
01195
01196 void
01197 TAO_Transport::report_invalid_event_handler (const char *caller)
01198 {
01199 if (TAO_debug_level > 0)
01200 {
01201 ACE_DEBUG ((LM_DEBUG,
01202 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01203 ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01204 this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01205 }
01206 }
01207
01208 void
01209 TAO_Transport::send_connection_closed_notifications (void)
01210 {
01211 {
01212 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01213
01214 this->send_connection_closed_notifications_i ();
01215 }
01216
01217 this->tms ()->connection_closed ();
01218 }
01219
01220 void
01221 TAO_Transport::send_connection_closed_notifications_i (void)
01222 {
01223 this->cleanup_queue_i ();
01224
01225 this->messaging_object ()->reset ();
01226 }
01227
01228 int
01229 TAO_Transport::send_message_shared_i (TAO_Stub *stub,
01230 int message_semantics,
01231 const ACE_Message_Block *message_block,
01232 ACE_Time_Value *max_wait_time)
01233 {
01234 int ret = 0;
01235
01236 #if TAO_HAS_TRANSPORT_CURRENT == 1
01237 size_t const message_length = message_block->length ();
01238 #endif
01239
01240 switch (message_semantics)
01241 {
01242 case TAO_Transport::TAO_TWOWAY_REQUEST:
01243 ret = this->send_synchronous_message_i (message_block,
01244 max_wait_time);
01245 break;
01246
01247 case TAO_Transport::TAO_REPLY:
01248 ret = this->send_reply_message_i (message_block,
01249 max_wait_time);
01250 break;
01251
01252 case TAO_Transport::TAO_ONEWAY_REQUEST:
01253 ret = this->send_asynchronous_message_i (stub,
01254 message_block,
01255 max_wait_time);
01256 break;
01257 }
01258
01259 #if TAO_HAS_TRANSPORT_CURRENT == 1
01260
01261 if (ret != -1 && this->stats_ != 0)
01262 this->stats_->messages_sent (message_length);
01263 #endif
01264
01265 return ret;
01266 }
01267
01268 int
01269 TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
01270 const ACE_Message_Block *message_block,
01271 ACE_Time_Value *max_wait_time)
01272 {
01273
01274
01275 bool try_sending_first = true;
01276
01277 bool const queue_empty = (this->head_ == 0);
01278
01279 TAO::Transport_Queueing_Strategy *queue_strategy =
01280 stub->transport_queueing_strategy ();
01281
01282 if (!queue_empty)
01283 {
01284 try_sending_first = false;
01285 }
01286 else if (queue_strategy)
01287 {
01288 if (queue_strategy->must_queue (queue_empty))
01289 {
01290 try_sending_first = false;
01291 }
01292 }
01293
01294 if (try_sending_first)
01295 {
01296 ssize_t n = 0;
01297 size_t byte_count = 0;
01298
01299
01300 size_t const total_length = message_block->total_length ();
01301
01302 if (TAO_debug_level > 6)
01303 {
01304 ACE_DEBUG ((LM_DEBUG,
01305 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01306 ACE_TEXT ("trying to send the message (ml = %d)\n"),
01307 this->id (), total_length));
01308 }
01309
01310
01311
01312
01313
01314 n = this->send_message_block_chain_i (message_block,
01315 byte_count,
01316 max_wait_time);
01317 if (n == -1)
01318 {
01319
01320
01321
01322
01323
01324 if (errno != EWOULDBLOCK && errno != ETIME)
01325 {
01326 if (TAO_debug_level > 0)
01327 {
01328 ACE_ERROR ((LM_ERROR,
01329 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01330 ACE_TEXT ("fatal error in ")
01331 ACE_TEXT ("send_message_block_chain_i - %m\n"),
01332 this->id ()));
01333 }
01334 return -1;
01335 }
01336 }
01337
01338
01339 if (total_length == byte_count)
01340 {
01341
01342
01343
01344
01345
01346 return 0;
01347 }
01348
01349
01350 if (byte_count > 0)
01351 max_wait_time = 0;
01352
01353 if (TAO_debug_level > 6)
01354 {
01355 ACE_DEBUG ((LM_DEBUG,
01356 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01357 ACE_TEXT ("partial send %d / %d bytes\n"),
01358 this->id (), byte_count, total_length));
01359 }
01360
01361
01362
01363 while (message_block != 0 && message_block->length () == 0)
01364 {
01365 message_block = message_block->cont ();
01366 }
01367
01368
01369
01370 }
01371
01372
01373
01374
01375 if (TAO_debug_level > 6)
01376 {
01377 ACE_DEBUG ((LM_DEBUG,
01378 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01379 ACE_TEXT ("message is queued\n"),
01380 this->id ()));
01381 }
01382
01383 if (this->queue_message_i (message_block, max_wait_time) == -1)
01384 {
01385 if (TAO_debug_level > 0)
01386 {
01387 ACE_DEBUG ((LM_DEBUG,
01388 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01389 ACE_TEXT ("send_asynchronous_message_i, ")
01390 ACE_TEXT ("cannot queue message for - %m\n"),
01391 this->id ()));
01392 }
01393 return -1;
01394 }
01395
01396
01397
01398 bool must_flush = false;
01399 const bool constraints_reached =
01400 this->check_buffering_constraints_i (stub,
01401 must_flush);
01402
01403
01404
01405
01406
01407 TAO_Flushing_Strategy *flushing_strategy =
01408 this->orb_core ()->flushing_strategy ();
01409
01410 if (constraints_reached || try_sending_first)
01411 {
01412 int const result = flushing_strategy->schedule_output (this);
01413 if (result == TAO_Flushing_Strategy::MUST_FLUSH)
01414 {
01415 must_flush = true;
01416 }
01417 }
01418
01419 if (must_flush)
01420 {
01421 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01422 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01423 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01424
01425 (void) flushing_strategy->flush_transport (this);
01426 }
01427
01428 return 0;
01429 }
01430
01431 int
01432 TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
01433 ACE_Time_Value *max_wait_time)
01434 {
01435 TAO_Queued_Message *queued_message = 0;
01436 ACE_NEW_RETURN (queued_message,
01437 TAO_Asynch_Queued_Message (message_block,
01438 this->orb_core_,
01439 max_wait_time,
01440 0,
01441 true),
01442 -1);
01443 queued_message->push_back (this->head_, this->tail_);
01444
01445 return 0;
01446 }
01447
01448
01449
01450
01451
01452
01453
01454 int
01455 TAO_Transport::handle_input (TAO_Resume_Handle &rh,
01456 ACE_Time_Value * max_wait_time)
01457 {
01458 if (TAO_debug_level > 3)
01459 {
01460 ACE_DEBUG ((LM_DEBUG,
01461 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01462 this->id ()));
01463 }
01464
01465
01466 int const retval = this->process_queue_head (rh);
01467
01468 if (retval <= 0)
01469 {
01470 if (retval == -1)
01471 {
01472 if (TAO_debug_level > 2)
01473 {
01474 ACE_DEBUG ((LM_DEBUG,
01475 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01476 ACE_TEXT ("error while parsing the head of the queue\n"),
01477 this->id()));
01478
01479 }
01480 return -1;
01481 }
01482 else
01483 {
01484
01485
01486
01487
01488 return 0;
01489 }
01490 }
01491
01492 TAO_Queued_Data *q_data = 0;
01493
01494 if (this->incoming_message_stack_.top (q_data) != -1
01495 && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
01496 {
01497
01498 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01499 {
01500 if (TAO_debug_level > 0)
01501 {
01502 ACE_ERROR ((LM_ERROR,
01503 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01504 ACE_TEXT ("error consolidating incoming message\n"),
01505 this->id ()));
01506 }
01507 return -1;
01508 }
01509 }
01510 else
01511 {
01512 if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01513 {
01514 if (TAO_debug_level > 0)
01515 {
01516 ACE_ERROR ((LM_ERROR,
01517 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01518 ACE_TEXT ("error parsing incoming message\n"),
01519 this->id ()));
01520 }
01521 return -1;
01522 }
01523 }
01524
01525 return 0;
01526 }
01527
01528 int
01529 TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data,
01530 TAO_Resume_Handle &rh)
01531 {
01532
01533 if (q_data->missing_data () != 0)
01534 {
01535 if (TAO_debug_level > 0)
01536 {
01537 ACE_ERROR ((LM_ERROR,
01538 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01539 ACE_TEXT ("missing data\n"),
01540 this->id ()));
01541 }
01542 return -1;
01543 }
01544
01545 if (q_data->more_fragments () ||
01546 q_data->msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01547 {
01548
01549 TAO_Queued_Data *new_q_data = 0;
01550
01551 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01552 {
01553 case -1:
01554 return -1;
01555
01556 case 0:
01557 if (!new_q_data)
01558 {
01559 if (TAO_debug_level > 0)
01560 {
01561 ACE_ERROR ((LM_ERROR,
01562 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01563 ACE_TEXT ("error, consolidated message is NULL\n"),
01564 this->id ()));
01565 }
01566 return -1;
01567 }
01568
01569
01570 if (this->process_parsed_messages (new_q_data, rh) == -1)
01571 {
01572 TAO_Queued_Data::release (new_q_data);
01573
01574 if (TAO_debug_level > 0)
01575 {
01576 ACE_ERROR ((LM_ERROR,
01577 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01578 ACE_TEXT ("error processing consolidated message\n"),
01579 this->id ()));
01580 }
01581 return -1;
01582 }
01583
01584 TAO_Queued_Data::release (new_q_data);
01585
01586 break;
01587
01588 case 1:
01589 break;
01590 }
01591 }
01592 else
01593 {
01594 if (this->process_parsed_messages (q_data, rh) == -1)
01595 {
01596 TAO_Queued_Data::release (q_data);
01597
01598 if (TAO_debug_level > 0)
01599 {
01600 ACE_ERROR ((LM_ERROR,
01601 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01602 ACE_TEXT ("error processing message\n"),
01603 this->id ()));
01604 }
01605 return -1;
01606 }
01607
01608 TAO_Queued_Data::release (q_data);
01609
01610 }
01611
01612 return 0;
01613 }
01614
01615 int
01616 TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data)
01617 {
01618
01619
01620
01621 if (q_data->missing_data () != 0)
01622 {
01623 return -1;
01624 }
01625
01626 if (q_data->more_fragments () ||
01627 q_data->msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01628 {
01629 TAO_Queued_Data *new_q_data = 0;
01630
01631 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01632 {
01633 case -1:
01634 return -1;
01635
01636 case 0:
01637 if (!new_q_data)
01638 {
01639 if (TAO_debug_level > 0)
01640 {
01641 ACE_ERROR ((LM_ERROR,
01642 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01643 ACE_TEXT ("error, consolidated message is NULL\n"),
01644 this->id ()));
01645 }
01646 return -1;
01647 }
01648
01649 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01650 {
01651 TAO_Queued_Data::release (new_q_data);
01652 return -1;
01653 }
01654 break;
01655
01656 case 1:
01657 break;
01658 }
01659 }
01660 else
01661 {
01662 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01663 {
01664 TAO_Queued_Data::release (q_data);
01665 return -1;
01666 }
01667 }
01668
01669 return 0;
01670 }
01671
01672 int
01673 TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh,
01674 ACE_Time_Value * max_wait_time,
01675 TAO_Queued_Data *q_data)
01676 {
01677
01678 if (q_data == 0)
01679 {
01680 return -1;
01681 }
01682
01683 if (TAO_debug_level > 3)
01684 {
01685 ACE_DEBUG ((LM_DEBUG,
01686 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01687 ACE_TEXT ("enter (missing data == %d)\n"),
01688 this->id (), q_data->missing_data ()));
01689 }
01690
01691 size_t const recv_size = q_data->missing_data ();
01692
01693 if (q_data->msg_block ()->space() < recv_size)
01694 {
01695
01696 size_t const message_size = recv_size + q_data->msg_block ()->length();
01697
01698 if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
01699 {
01700 return -1;
01701 }
01702 }
01703
01704
01705
01706
01707
01708
01709 this->recv_buffer_size_ = recv_size;
01710
01711
01712 ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
01713 recv_size,
01714 max_wait_time);
01715
01716
01717 if (n <= 0)
01718 {
01719 return n;
01720 }
01721
01722 if (TAO_debug_level > 3)
01723 {
01724 ACE_DEBUG ((LM_DEBUG,
01725 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01726 ACE_TEXT ("read bytes %d\n"),
01727 this->id (), n));
01728 }
01729
01730 q_data->msg_block ()->wr_ptr(n);
01731 q_data->missing_data (q_data->missing_data () - n);
01732
01733 if (q_data->missing_data () == 0)
01734 {
01735
01736 if (this->incoming_message_stack_.pop (q_data) == -1)
01737 {
01738 return -1;
01739 }
01740
01741 if (this->consolidate_process_message (q_data, rh) == -1)
01742 {
01743 return -1;
01744 }
01745 }
01746
01747 return 0;
01748 }
01749
01750
01751 int
01752 TAO_Transport::handle_input_parse_extra_messages (ACE_Message_Block &message_block)
01753 {
01754
01755
01756
01757
01758 int buf_status = 0;
01759
01760 TAO_Queued_Data *q_data = 0;
01761
01762
01763
01764
01765 while (message_block.length () > 0 &&
01766 (buf_status = this->messaging_object ()->extract_next_message
01767 (message_block, q_data)) != -1 &&
01768 q_data != 0)
01769 {
01770 if (q_data->missing_data () == 0)
01771 {
01772 if (this->consolidate_enqueue_message (q_data) == -1)
01773 {
01774 return -1;
01775 }
01776 }
01777 else
01778 {
01779
01780 this->incoming_message_stack_.push (q_data);
01781 }
01782
01783 q_data = 0;
01784 }
01785
01786 if (buf_status == -1)
01787 {
01788 return -1;
01789 }
01790
01791 return 0;
01792 }
01793
01794 int
01795 TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh,
01796 ACE_Time_Value * max_wait_time)
01797 {
01798
01799 if (TAO_debug_level > 3)
01800 {
01801 ACE_DEBUG ((LM_DEBUG,
01802 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01803 ACE_TEXT ("enter\n"),
01804 this->id ()));
01805 }
01806
01807
01808
01809
01810
01811
01812 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01813
01814 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01815 (void) ACE_OS::memset (buf,
01816 '\0',
01817 sizeof buf);
01818 #endif
01819
01820
01821 ACE_Data_Block db (sizeof (buf),
01822 ACE_Message_Block::MB_DATA,
01823 buf,
01824 this->orb_core_->input_cdr_buffer_allocator (),
01825 this->orb_core_->locking_strategy (),
01826 ACE_Message_Block::DONT_DELETE,
01827 this->orb_core_->input_cdr_dblock_allocator ());
01828
01829
01830 ACE_Message_Block message_block (&db,
01831 ACE_Message_Block::DONT_DELETE,
01832 this->orb_core_->input_cdr_msgblock_allocator ());
01833
01834
01835
01836 ACE_CDR::mb_align (&message_block);
01837
01838 size_t recv_size = 0;
01839
01840
01841 TAO_Queued_Data *q_data = 0;
01842
01843
01844 size_t const header_length = this->messaging_object ()->header_length ();
01845
01846
01847 if (header_length > message_block.space ())
01848 {
01849 return -1;
01850 }
01851
01852 if (this->orb_core_->orb_params ()->single_read_optimization ())
01853 {
01854 recv_size = message_block.space ();
01855 }
01856 else
01857 {
01858
01859
01860
01861
01862
01863
01864 if (this->incoming_message_stack_.top (q_data) != -1
01865 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01866 {
01867
01868
01869
01870 recv_size = header_length - q_data->msg_block ()->length ();
01871 }
01872 else
01873 {
01874
01875
01876 recv_size = header_length;
01877 }
01878
01879 }
01880
01881
01882
01883
01884 if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01885 {
01886
01887
01888
01889
01890
01891 if (this->partial_message_->length () <= recv_size &&
01892 message_block.copy (this->partial_message_->rd_ptr (),
01893 this->partial_message_->length ()) == 0)
01894 {
01895
01896 recv_size -= this->partial_message_->length ();
01897 this->partial_message_->reset ();
01898 }
01899 else
01900 {
01901 return -1;
01902 }
01903 }
01904
01905
01906 if (0 >= recv_size)
01907 {
01908
01909
01910
01911
01912
01913 if (TAO_debug_level > 0)
01914 {
01915 ACE_ERROR ((LM_ERROR,
01916 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01917 ACE_TEXT ("Error - endless loop detection, closing connection"),
01918 this->id ()));
01919 }
01920 return -1;
01921 }
01922
01923
01924
01925
01926
01927
01928 this->recv_buffer_size_ = recv_size;
01929
01930
01931
01932 ssize_t const n = this->recv (message_block.wr_ptr (),
01933 recv_size,
01934 max_wait_time);
01935
01936
01937 if (n <= 0)
01938 {
01939 return n;
01940 }
01941
01942 if (TAO_debug_level > 3)
01943 {
01944 ACE_DEBUG ((LM_DEBUG,
01945 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01946 ACE_TEXT ("read %d bytes\n"),
01947 this->id (), n));
01948 }
01949
01950
01951 message_block.wr_ptr (n);
01952
01953
01954
01955
01956
01957
01958
01959 if (this->incoming_message_stack_.top (q_data) != -1
01960 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01961 {
01962
01963
01964
01965
01966
01967
01968
01969
01970
01971
01972 if (this->messaging_object ()->consolidate_node (q_data,
01973 message_block) == -1)
01974 {
01975 if (TAO_debug_level > 0)
01976 {
01977 ACE_ERROR ((LM_ERROR,
01978 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01979 ACE_TEXT ("error consolidating message from input buffer\n"),
01980 this->id () ));
01981 }
01982 return -1;
01983 }
01984
01985
01986 if (q_data->missing_data () == 0)
01987 {
01988 if (this->incoming_message_stack_.pop (q_data) == -1)
01989 {
01990 return -1;
01991 }
01992
01993 if (this->consolidate_enqueue_message (q_data) == -1)
01994 {
01995 return -1;
01996 }
01997 }
01998
01999 if (message_block.length () > 0
02000 && this->handle_input_parse_extra_messages (message_block) == -1)
02001 {
02002 return -1;
02003 }
02004
02005
02006 if (this->process_queue_head (rh) == -1)
02007 {
02008 return -1;
02009 }
02010 }
02011 else
02012 {
02013
02014
02015
02016
02017
02018
02019
02020
02021
02022 TAO_Queued_Data qd (&message_block,
02023 this->orb_core_->transport_message_buffer_allocator ());
02024
02025 size_t mesg_length = 0;
02026
02027 if (this->messaging_object ()->parse_next_message (qd,
02028 mesg_length) == -1
02029 || (qd.missing_data () == 0
02030 && mesg_length > message_block.length ()) )
02031 {
02032
02033 return -1;
02034 }
02035
02036
02037
02038 if (qd.missing_data () != 0 ||
02039 qd.more_fragments () ||
02040 qd.msg_type () == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
02041 {
02042 if (qd.missing_data () == 0)
02043 {
02044
02045 TAO_Queued_Data *nqd =
02046 TAO_Queued_Data::duplicate (qd);
02047
02048 if (nqd == 0)
02049 {
02050 return -1;
02051 }
02052
02053
02054 char* end_mark = nqd->msg_block ()->rd_ptr ()
02055 + mesg_length;
02056 nqd->msg_block ()->wr_ptr (end_mark);
02057
02058
02059 message_block.rd_ptr (mesg_length);
02060
02061
02062 if (this->consolidate_enqueue_message (nqd) == -1)
02063 {
02064 return -1;
02065 }
02066
02067 if (message_block.length () > 0
02068 && this->handle_input_parse_extra_messages (message_block) == -1)
02069 {
02070 return -1;
02071 }
02072
02073
02074 if (this->process_queue_head (rh) == -1)
02075 {
02076 return -1;
02077 }
02078 }
02079 else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
02080 {
02081
02082
02083 if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
02084 qd.missing_data () > message_block.space ())
02085 {
02086
02087 if (ACE_CDR::grow (qd.msg_block (),
02088 message_block.length ()
02089 + qd.missing_data ()) == -1)
02090 {
02091 return -1;
02092 }
02093 }
02094
02095 TAO_Queued_Data *nqd =
02096 TAO_Queued_Data::duplicate (qd);
02097
02098 if (nqd == 0)
02099 {
02100 return -1;
02101 }
02102
02103
02104 message_block.rd_ptr (message_block.length());
02105
02106 this->incoming_message_stack_.push (nqd);
02107 }
02108 }
02109 else
02110 {
02111
02112
02113
02114
02115
02116
02117
02118
02119
02120
02121 char * end_marker = message_block.rd_ptr ()
02122 + mesg_length;
02123
02124 if (message_block.length () > mesg_length)
02125 {
02126
02127
02128 char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02129
02130
02131
02132 message_block.rd_ptr (mesg_length);
02133
02134
02135
02136 if (this->handle_input_parse_extra_messages (message_block) == -1)
02137 {
02138 return -1;
02139 }
02140
02141
02142 end_marker = message_block.rd_ptr ();
02143
02144
02145 message_block.rd_ptr (rd_ptr_stack_mesg);
02146 }
02147
02148
02149
02150
02151
02152
02153
02154
02155 if (this->incoming_message_queue_.queue_length () > 0)
02156 {
02157 if (TAO_debug_level > 0)
02158 {
02159 ACE_DEBUG ((LM_DEBUG,
02160 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02161 ACE_TEXT ("notify reactor\n"),
02162 this->id ()));
02163
02164 }
02165
02166 const int retval = this->notify_reactor ();
02167
02168 if (retval == 1)
02169 {
02170
02171
02172 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02173 }
02174 else if (retval < 0)
02175 return -1;
02176 }
02177 else
02178 {
02179
02180
02181 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02182 }
02183
02184
02185 if (this->process_parsed_messages (&qd,
02186 rh) == -1)
02187 {
02188 return -1;
02189 }
02190
02191
02192 message_block.rd_ptr (end_marker);
02193 }
02194 }
02195
02196
02197
02198 if (message_block.length () > 0)
02199 {
02200 if (this->partial_message_ == 0)
02201 {
02202 this->allocate_partial_message_block ();
02203 }
02204
02205 if (this->partial_message_ != 0 &&
02206 this->partial_message_->copy (message_block.rd_ptr (),
02207 message_block.length ()) == 0)
02208 {
02209 message_block.rd_ptr (message_block.length ());
02210 }
02211 else
02212 {
02213 return -1;
02214 }
02215 }
02216
02217 return 0;
02218 }
02219
02220
02221 int
02222 TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
02223 TAO_Resume_Handle &rh)
02224 {
02225 if (TAO_debug_level > 7)
02226 {
02227 ACE_DEBUG ((LM_DEBUG,
02228 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02229 ACE_TEXT ("entering (missing data == %d)\n"),
02230 this->id(), qd->missing_data ()));
02231 }
02232
02233 #if TAO_HAS_TRANSPORT_CURRENT == 1
02234
02235 if (this->stats_ != 0)
02236 this->stats_->messages_received (qd->msg_block ()->length ());
02237 #endif
02238
02239 switch (qd->msg_type ())
02240 {
02241 case TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION:
02242 {
02243 if (TAO_debug_level > 0)
02244 ACE_DEBUG ((LM_DEBUG,
02245 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02246 ACE_TEXT ("received CloseConnection message - %m\n"),
02247 this->id()));
02248
02249
02250
02251 return -1;
02252 }
02253 break;
02254 case TAO_PLUGGABLE_MESSAGE_REQUEST:
02255 case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
02256 {
02257
02258
02259 rh.resume_handle ();
02260
02261 if (this->messaging_object ()->process_request_message (
02262 this,
02263 qd) == -1)
02264 {
02265
02266
02267 return -1;
02268 }
02269 }
02270 break;
02271 case TAO_PLUGGABLE_MESSAGE_REPLY:
02272 case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
02273 {
02274 rh.resume_handle ();
02275
02276 TAO_Pluggable_Reply_Params params (this);
02277
02278 if (this->messaging_object ()->process_reply_message (params,
02279 qd) == -1)
02280 {
02281 if (TAO_debug_level > 0)
02282 ACE_DEBUG ((LM_DEBUG,
02283 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02284 ACE_TEXT ("error in process_reply_message - %m\n"),
02285 this->id ()));
02286
02287 return -1;
02288 }
02289
02290 }
02291 break;
02292 case TAO_PLUGGABLE_MESSAGE_CANCELREQUEST:
02293 {
02294
02295
02296
02297 if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02298 {
02299 if (TAO_debug_level > 0)
02300 {
02301 ACE_ERROR ((LM_ERROR,
02302 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02303 ACE_TEXT ("error processing CancelRequest\n"),
02304 this->id ()));
02305 }
02306 }
02307
02308
02309
02310
02311
02312
02313
02314 }
02315 break;
02316 case TAO_PLUGGABLE_MESSAGE_MESSAGERROR:
02317 {
02318 if (TAO_debug_level > 0)
02319 {
02320 ACE_ERROR ((LM_ERROR,
02321 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02322 ACE_TEXT ("received MessageError, closing connection\n"),
02323 this->id ()));
02324 }
02325 return -1;
02326 }
02327 break;
02328 case TAO_PLUGGABLE_MESSAGE_FRAGMENT:
02329 {
02330
02331 }
02332 break;
02333 }
02334
02335
02336 return 0;
02337 }
02338
02339 int
02340 TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
02341 {
02342 if (TAO_debug_level > 3)
02343 {
02344 ACE_DEBUG ((LM_DEBUG,
02345 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02346 this->id (), this->incoming_message_queue_.queue_length () ));
02347 }
02348
02349
02350 if (this->incoming_message_queue_.queue_length () > 0)
02351 {
02352
02353 TAO_Queued_Data *qd =
02354 this->incoming_message_queue_.dequeue_head ();
02355
02356 if (TAO_debug_level > 3)
02357 {
02358 ACE_DEBUG ((LM_DEBUG,
02359 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02360 ACE_TEXT ("the size of the queue is [%d]\n"),
02361 this->id (),
02362 this->incoming_message_queue_.queue_length()));
02363 }
02364
02365
02366 if (this->incoming_message_queue_.queue_length () > 0)
02367 {
02368 if (TAO_debug_level > 0)
02369 {
02370 ACE_DEBUG ((LM_DEBUG,
02371 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02372 ACE_TEXT ("notify reactor\n"),
02373 this->id ()));
02374 }
02375
02376 int const retval = this->notify_reactor ();
02377
02378 if (retval == 1)
02379 {
02380
02381
02382 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02383 }
02384 else if (retval < 0)
02385 return -1;
02386 }
02387 else
02388 {
02389
02390
02391 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02392 }
02393
02394
02395 if (this->process_parsed_messages (qd, rh) == -1)
02396 {
02397 return -1;
02398 }
02399
02400
02401 TAO_Queued_Data::release (qd);
02402
02403 return 0;
02404 }
02405
02406 return 1;
02407 }
02408
02409 int
02410 TAO_Transport::notify_reactor (void)
02411 {
02412 if (!this->ws_->is_registered ())
02413 {
02414 return 0;
02415 }
02416
02417 ACE_Event_Handler *eh = this->event_handler_i ();
02418
02419
02420 ACE_Reactor *reactor = this->orb_core ()->reactor ();
02421
02422 if (TAO_debug_level > 0)
02423 {
02424 ACE_DEBUG ((LM_DEBUG,
02425 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02426 ACE_TEXT ("notify to Reactor\n"),
02427 this->id ()));
02428 }
02429
02430
02431
02432 int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
02433
02434 if (retval < 0 && TAO_debug_level > 2)
02435 {
02436
02437
02438 ACE_DEBUG ((LM_DEBUG,
02439 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02440 ACE_TEXT ("notify to the reactor failed..\n"),
02441 this->id ()));
02442 }
02443
02444 return 1;
02445 }
02446
02447 TAO::Transport_Cache_Manager &
02448 TAO_Transport::transport_cache_manager (void)
02449 {
02450 return this->orb_core_->lane_resources ().transport_cache ();
02451 }
02452
02453 void
02454 TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02455 {
02456 if (this->char_translator_)
02457 {
02458 this->char_translator_->assign (inp);
02459 this->char_translator_->assign (outp);
02460 }
02461 if (this->wchar_translator_)
02462 {
02463 this->wchar_translator_->assign (inp);
02464 this->wchar_translator_->assign (outp);
02465 }
02466 }
02467
02468 void
02469 TAO_Transport::clear_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02470 {
02471 if (inp)
02472 {
02473 inp->char_translator (0);
02474 inp->wchar_translator (0);
02475 }
02476 if (outp)
02477 {
02478 outp->char_translator (0);
02479 outp->wchar_translator (0);
02480 }
02481 }
02482
02483 ACE_Event_Handler::Reference_Count
02484 TAO_Transport::add_reference (void)
02485 {
02486 return this->event_handler_i ()->add_reference ();
02487 }
02488
02489 ACE_Event_Handler::Reference_Count
02490 TAO_Transport::remove_reference (void)
02491 {
02492 return this->event_handler_i ()->remove_reference ();
02493 }
02494
02495 TAO_OutputCDR &
02496 TAO_Transport::out_stream (void)
02497 {
02498 return this->messaging_object ()->out_stream ();
02499 }
02500
02501 void
02502 TAO_Transport::pre_close (void)
02503 {
02504 this->is_connected_ = false;
02505 this->purge_entry ();
02506 {
02507 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
02508 this->cleanup_queue_i ();
02509 }
02510 }
02511
02512 bool
02513 TAO_Transport::post_open (size_t id)
02514 {
02515 this->id_ = id;
02516
02517 {
02518 ACE_GUARD_RETURN (ACE_Lock,
02519 ace_mon,
02520 *this->handler_lock_,
02521 false);
02522 this->is_connected_ = true;
02523 }
02524
02525
02526
02527 if (this->queue_is_empty_i ())
02528 return true;
02529
02530
02531
02532
02533 if (this->wait_strategy ()->register_handler () != 0)
02534 {
02535
02536
02537
02538
02539 (void) this->purge_entry ();
02540
02541
02542 (void) this->close_connection ();
02543
02544 if (TAO_debug_level > 0)
02545 ACE_ERROR ((LM_ERROR,
02546 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02547 ACE_TEXT ("could not register the transport ")
02548 ACE_TEXT ("in the reactor.\n"),
02549 this->id ()));
02550
02551 return false;
02552 }
02553
02554 return true;
02555 }
02556
02557 void
02558 TAO_Transport::allocate_partial_message_block (void)
02559 {
02560 if (this->partial_message_ == 0)
02561 {
02562
02563
02564 size_t const partial_message_size =
02565 this->messaging_object ()->header_length ();
02566
02567
02568
02569 ACE_NEW (this->partial_message_,
02570 ACE_Message_Block (partial_message_size));
02571 }
02572 }
02573
02574
02575
02576
02577
02578
02579
02580
02581 TAO_END_VERSIONED_NAMESPACE_DECL