00001
00002
00003 #include "tao/Transport.h"
00004
00005 #include "tao/LF_Follower.h"
00006 #include "tao/Leader_Follower.h"
00007 #include "tao/Client_Strategy_Factory.h"
00008 #include "tao/Wait_Strategy.h"
00009 #include "tao/Transport_Mux_Strategy.h"
00010 #include "tao/Stub.h"
00011 #include "tao/Transport_Queueing_Strategies.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/GIOP_Message_Base.h"
00014 #include "tao/Synch_Queued_Message.h"
00015 #include "tao/Asynch_Queued_Message.h"
00016 #include "tao/Flushing_Strategy.h"
00017 #include "tao/Thread_Lane_Resources.h"
00018 #include "tao/Resume_Handle.h"
00019 #include "tao/Codeset_Manager.h"
00020 #include "tao/Codeset_Translator_Base.h"
00021 #include "tao/debug.h"
00022 #include "tao/CDR.h"
00023 #include "tao/ORB_Core.h"
00024 #include "tao/MMAP_Allocator.h"
00025 #include "tao/SystemException.h"
00026
00027 #include "ace/OS_NS_sys_time.h"
00028 #include "ace/OS_NS_stdio.h"
00029 #include "ace/Reactor.h"
00030 #include "ace/os_include/sys/os_uio.h"
00031 #include "ace/High_Res_Timer.h"
00032 #include "ace/Countdown_Time.h"
00033 #include "ace/CORBA_macros.h"
00034
00035
00036
00037
00038
00039
00040
00041 #if !defined (__ACE_INLINE__)
00042 # include "tao/Transport.inl"
00043 #endif
00044
00045
00046 ACE_RCSID (tao,
00047 Transport,
00048 "$Id: Transport.cpp 81319 2008-04-10 11:31:40Z johnnyw $")
00049
00050
00051
00052
00053 static void
00054 dump_iov (iovec *iov, int iovcnt, size_t id,
00055 size_t current_transfer,
00056 const char *location)
00057 {
00058 ACE_Guard <ACE_Log_Msg> log_guard (*ACE_Log_Msg::instance ());
00059
00060 ACE_DEBUG ((LM_DEBUG,
00061 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00062 ACE_TEXT ("sending %d buffers\n"),
00063 id, ACE_TEXT_CHAR_TO_TCHAR (location), iovcnt));
00064
00065 for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
00066 {
00067 size_t iov_len = iov[i].iov_len;
00068
00069
00070 if (current_transfer < iov_len)
00071 {
00072 iov_len = current_transfer;
00073 }
00074
00075 ACE_DEBUG ((LM_DEBUG,
00076 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00077 ACE_TEXT ("buffer %d/%d has %d bytes\n"),
00078 id, ACE_TEXT_CHAR_TO_TCHAR(location),
00079 i, iovcnt,
00080 iov_len));
00081
00082 size_t len;
00083
00084 for (size_t offset = 0; offset < iov_len; offset += len)
00085 {
00086 ACE_TCHAR header[1024];
00087 ACE_OS::sprintf (header,
00088 ACE_TEXT("TAO - ")
00089 ACE_TEXT("Transport[")
00090 ACE_SIZE_T_FORMAT_SPECIFIER
00091 ACE_TEXT("]::%s")
00092 ACE_TEXT(" (")
00093 ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT("/")
00094 ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT(")"),
00095 id, location, offset, iov_len);
00096
00097 len = iov_len - offset;
00098
00099 if (len > 512)
00100 {
00101 len = 512;
00102 }
00103
00104 ACE_HEX_DUMP ((LM_DEBUG,
00105 static_cast<char*> (iov[i].iov_base) + offset,
00106 len,
00107 header));
00108 }
00109 current_transfer -= iov_len;
00110 }
00111
00112 ACE_DEBUG ((LM_DEBUG,
00113 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00114 ACE_TEXT ("end of data\n"),
00115 id, ACE_TEXT_CHAR_TO_TCHAR(location)));
00116 }
00117
00118 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00119
00120 #if TAO_HAS_TRANSPORT_CURRENT == 1
00121 TAO::Transport::Stats::~Stats ()
00122 {
00123
00124 }
00125 #endif
00126
00127 TAO_Transport::TAO_Transport (CORBA::ULong tag,
00128 TAO_ORB_Core *orb_core,
00129 size_t input_cdr_size)
00130 : tag_ (tag)
00131 , orb_core_ (orb_core)
00132 , cache_map_entry_ (0)
00133 , bidirectional_flag_ (-1)
00134 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00135 , head_ (0)
00136 , tail_ (0)
00137 , incoming_message_queue_ (orb_core)
00138 , current_deadline_ (ACE_Time_Value::zero)
00139 , flush_timer_id_ (-1)
00140 , transport_timer_ (this)
00141 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00142 , id_ ((size_t) this)
00143 , purging_order_ (0)
00144 , recv_buffer_size_ (0)
00145 , sent_byte_count_ (0)
00146 , is_connected_ (false)
00147 , messaging_object_ (0)
00148 , char_translator_ (0)
00149 , wchar_translator_ (0)
00150 , tcs_set_ (0)
00151 , first_request_ (1)
00152 , partial_message_ (0)
00153 #if TAO_HAS_SENDFILE == 1
00154
00155
00156
00157
00158
00159 , mmap_allocator_ (
00160 dynamic_cast<TAO_MMAP_Allocator *> (
00161 orb_core->output_cdr_buffer_allocator ()))
00162 #endif
00163 #if TAO_HAS_TRANSPORT_CURRENT == 1
00164 , stats_ (0)
00165 #endif
00166 {
00167 ACE_NEW (this->messaging_object_,
00168 TAO_GIOP_Message_Base (orb_core,
00169 this,
00170 input_cdr_size));
00171
00172 TAO_Client_Strategy_Factory *cf =
00173 this->orb_core_->client_factory ();
00174
00175
00176 this->ws_ = cf->create_wait_strategy (this);
00177
00178
00179 this->tms_ = cf->create_transport_mux_strategy (this);
00180
00181 #if TAO_HAS_TRANSPORT_CURRENT == 1
00182
00183 ACE_NEW_THROW_EX (this->stats_,
00184 TAO::Transport::Stats,
00185 CORBA::NO_MEMORY ());
00186 #endif
00187
00188
00189
00190
00191
00192
00193
00194
00195 }
00196
00197 TAO_Transport::~TAO_Transport (void)
00198 {
00199 delete this->messaging_object_;
00200
00201 delete this->ws_;
00202
00203 delete this->tms_;
00204
00205 delete this->handler_lock_;
00206
00207 if (!this->is_connected_)
00208 {
00209
00210
00211 this->cleanup_queue_i();
00212
00213
00214 this->purge_entry();
00215 }
00216
00217
00218
00219 ACE_Message_Block::release (this->partial_message_);
00220
00221
00222
00223
00224
00225
00226 ACE_ASSERT (this->head_ == 0);
00227 ACE_ASSERT (this->cache_map_entry_ == 0);
00228
00229 #if TAO_HAS_TRANSPORT_CURRENT == 1
00230 delete this->stats_;
00231 #endif
00232
00233
00234
00235
00236
00237
00238
00239
00240 }
00241
00242 void
00243 TAO_Transport::provide_handler (TAO::Connection_Handler_Set &handlers)
00244 {
00245 (void) this->add_reference ();
00246
00247 handlers.insert (this->connection_handler_i ());
00248 }
00249
00250 bool
00251 TAO_Transport::provide_blockable_handler (TAO::Connection_Handler_Set &h)
00252 {
00253 if (this->ws_->non_blocking () ||
00254 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00255 return false;
00256
00257 (void) this->add_reference ();
00258
00259 h.insert (this->connection_handler_i ());
00260
00261 return true;
00262 }
00263
00264 bool
00265 TAO_Transport::idle_after_send (void)
00266 {
00267 return this->tms ()->idle_after_send ();
00268 }
00269
00270 bool
00271 TAO_Transport::idle_after_reply (void)
00272 {
00273 return this->tms ()->idle_after_reply ();
00274 }
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286 int
00287 TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
00288 {
00289 ACE_NOTSUP_RETURN (-1);
00290 }
00291
00292 int
00293 TAO_Transport::send_message_shared (TAO_Stub *stub,
00294 TAO_Message_Semantics message_semantics,
00295 const ACE_Message_Block *message_block,
00296 ACE_Time_Value *max_wait_time)
00297 {
00298 int result = 0;
00299
00300 {
00301 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00302
00303 result =
00304 this->send_message_shared_i (stub, message_semantics,
00305 message_block, max_wait_time);
00306 }
00307
00308 if (result == -1)
00309 {
00310
00311
00312
00313
00314 this->close_connection ();
00315 }
00316
00317 return result;
00318 }
00319
00320
00321
00322 bool
00323 TAO_Transport::post_connect_hook (void)
00324 {
00325 return true;
00326 }
00327
00328 void
00329 TAO_Transport::close_connection (void)
00330 {
00331 this->connection_handler_i ()->close_connection ();
00332 }
00333
00334 int
00335 TAO_Transport::register_handler (void)
00336 {
00337 if (TAO_debug_level > 4)
00338 {
00339 ACE_DEBUG ((LM_DEBUG,
00340 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00341 this->id ()));
00342 }
00343
00344 ACE_Reactor * const r = this->orb_core_->reactor ();
00345
00346
00347
00348 ACE_GUARD_RETURN (ACE_Lock,
00349 ace_mon,
00350 *this->handler_lock_,
00351 false);
00352
00353 if (r == this->event_handler_i ()->reactor ())
00354 {
00355 return 0;
00356 }
00357
00358
00359
00360
00361 this->ws_->is_registered (true);
00362
00363
00364 return r->register_handler (this->event_handler_i (),
00365 ACE_Event_Handler::READ_MASK);
00366 }
00367
00368 #if TAO_HAS_SENDFILE == 1
00369 ssize_t
00370 TAO_Transport::sendfile (TAO_MMAP_Allocator * ,
00371 iovec * iov,
00372 int iovcnt,
00373 size_t &bytes_transferred,
00374 ACE_Time_Value const * timeout)
00375 {
00376
00377
00378
00379
00380
00381
00382
00383 return this->send (iov, iovcnt, bytes_transferred, timeout);
00384 }
00385 #endif
00386
00387 int
00388 TAO_Transport::generate_locate_request (
00389 TAO_Target_Specification &spec,
00390 TAO_Operation_Details &opdetails,
00391 TAO_OutputCDR &output)
00392 {
00393 if (this->messaging_object ()->generate_locate_request_header (opdetails,
00394 spec,
00395 output) == -1)
00396 {
00397 if (TAO_debug_level > 0)
00398 {
00399 ACE_DEBUG ((LM_DEBUG,
00400 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00401 ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00402 this->id ()));
00403 }
00404
00405 return -1;
00406 }
00407
00408 return 0;
00409 }
00410
00411 int
00412 TAO_Transport::generate_request_header (
00413 TAO_Operation_Details &opdetails,
00414 TAO_Target_Specification &spec,
00415 TAO_OutputCDR &output)
00416 {
00417
00418
00419 if (this->first_request_)
00420 {
00421 TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00422 if (csm)
00423 csm->generate_service_context (opdetails,*this);
00424 }
00425
00426 if (this->messaging_object ()->generate_request_header (opdetails,
00427 spec,
00428 output) == -1)
00429 {
00430 if (TAO_debug_level > 0)
00431 {
00432 ACE_DEBUG ((LM_DEBUG,
00433 ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00434 ACE_TEXT ("error while marshalling the Request header\n"),
00435 this->id()));
00436 }
00437
00438 return -1;
00439 }
00440
00441 return 0;
00442 }
00443
00444
00445
00446 int
00447 TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
00448 {
00449
00450 this->purge_entry ();
00451
00452
00453 return this->transport_cache_manager ().cache_transport (desc, this);
00454 }
00455
00456 int
00457 TAO_Transport::purge_entry (void)
00458 {
00459 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00460 }
00461
00462 int
00463 TAO_Transport::make_idle (void)
00464 {
00465 if (TAO_debug_level > 3)
00466 {
00467 ACE_DEBUG ((LM_DEBUG,
00468 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00469 this->id ()));
00470 }
00471
00472 return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00473 }
00474
00475 int
00476 TAO_Transport::update_transport (void)
00477 {
00478 return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00479 }
00480
00481
00482
00483
00484
00485
00486 int
00487 TAO_Transport::handle_output (ACE_Time_Value *max_wait_time)
00488 {
00489 if (TAO_debug_level > 3)
00490 {
00491 ACE_DEBUG ((LM_DEBUG,
00492 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00493 this->id ()));
00494 }
00495
00496
00497
00498
00499 int const retval = this->drain_queue (max_wait_time);
00500
00501 if (TAO_debug_level > 3)
00502 {
00503 ACE_DEBUG ((LM_DEBUG,
00504 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00505 ACE_TEXT ("drain_queue returns %d/%d\n"),
00506 this->id (),
00507 retval, errno));
00508 }
00509
00510
00511 return retval;
00512 }
00513
00514 int
00515 TAO_Transport::format_queue_message (TAO_OutputCDR &stream,
00516 ACE_Time_Value *max_wait_time)
00517 {
00518 if (this->messaging_object ()->format_message (stream) != 0)
00519 return -1;
00520
00521 return this->queue_message_i (stream.begin (), max_wait_time);
00522 }
00523
00524 int
00525 TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
00526 size_t &bytes_transferred,
00527 ACE_Time_Value *max_wait_time)
00528 {
00529 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00530
00531 return this->send_message_block_chain_i (mb,
00532 bytes_transferred,
00533 max_wait_time);
00534 }
00535
00536 int
00537 TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
00538 size_t &bytes_transferred,
00539 ACE_Time_Value *max_wait_time)
00540 {
00541 size_t const total_length = mb->total_length ();
00542
00543
00544
00545 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00546
00547 synch_message.push_back (this->head_, this->tail_);
00548
00549 int const n = this->drain_queue_i (max_wait_time);
00550
00551 if (n == -1)
00552 {
00553 synch_message.remove_from_list (this->head_, this->tail_);
00554 return -1;
00555 }
00556 else if (n == 1)
00557 {
00558 bytes_transferred = total_length;
00559 return 1;
00560 }
00561
00562
00563 synch_message.remove_from_list (this->head_, this->tail_);
00564
00565 bytes_transferred = total_length - synch_message.message_length ();
00566
00567 return 0;
00568 }
00569
00570 int
00571 TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
00572 ACE_Time_Value *max_wait_time)
00573 {
00574
00575
00576 size_t const total_length = mb->total_length ();
00577 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00578
00579 synch_message.push_back (this->head_, this->tail_);
00580
00581 int const result = this->send_synch_message_helper_i (synch_message,
00582 max_wait_time);
00583 if (result == -1 && errno == ETIME)
00584 {
00585 if (total_length == synch_message.message_length ())
00586 {
00587 if (TAO_debug_level > 2)
00588 {
00589 ACE_DEBUG ((LM_DEBUG,
00590 ACE_TEXT ("TAO (%P|%t) - ")
00591 ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ")
00592 ACE_TEXT ("timeout encountered before any bytes sent\n"),
00593 this->id ()));
00594 }
00595 throw ::CORBA::TIMEOUT (
00596 CORBA::SystemException::_tao_minor_code (
00597 TAO_TIMEOUT_SEND_MINOR_CODE,
00598 ETIME),
00599 CORBA::COMPLETED_NO);
00600 }
00601 else
00602 {
00603 return -1;
00604 }
00605 }
00606 else if(result == -1 || result == 1)
00607 {
00608 return result;
00609 }
00610
00611 TAO_Flushing_Strategy *flushing_strategy =
00612 this->orb_core ()->flushing_strategy ();
00613 if (flushing_strategy->schedule_output (this) == -1)
00614 {
00615 synch_message.remove_from_list (this->head_, this->tail_);
00616 if (TAO_debug_level > 0)
00617 {
00618 ACE_ERROR ((LM_ERROR,
00619 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
00620 ACE_TEXT ("send_synchronous_message_i, ")
00621 ACE_TEXT ("error while scheduling flush - %m\n"),
00622 this->id ()));
00623 }
00624 return -1;
00625 }
00626
00627
00628
00629
00630
00631
00632 int flush_result;
00633 {
00634 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00635 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00636 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00637
00638 flush_result = flushing_strategy->flush_message (this,
00639 &synch_message,
00640 max_wait_time);
00641 }
00642
00643 if (flush_result == -1)
00644 {
00645 synch_message.remove_from_list (this->head_, this->tail_);
00646
00647
00648
00649
00650
00651 if (TAO_debug_level > 0)
00652 {
00653 ACE_ERROR ((LM_ERROR,
00654 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00655 ACE_TEXT ("error while sending message - %m\n"),
00656 this->id ()));
00657 }
00658
00659 return -1;
00660 }
00661
00662 return 1;
00663 }
00664
00665
00666 int
00667 TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
00668 ACE_Time_Value *max_wait_time)
00669 {
00670
00671 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00672
00673 synch_message.push_back (this->head_, this->tail_);
00674
00675 int const n =
00676 this->send_synch_message_helper_i (synch_message, max_wait_time);
00677
00678
00679 if (n == -1 || n == 1)
00680 {
00681 return n;
00682 }
00683
00684 if (TAO_debug_level > 3)
00685 {
00686 ACE_DEBUG ((LM_DEBUG,
00687 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00688 ACE_TEXT ("preparing to add to queue before leaving\n"),
00689 this->id ()));
00690 }
00691
00692
00693
00694 synch_message.remove_from_list (this->head_, this->tail_);
00695
00696
00697 TAO_Queued_Message *msg =
00698 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00699
00700
00701 msg->push_back (this->head_, this->tail_);
00702
00703 TAO_Flushing_Strategy *flushing_strategy =
00704 this->orb_core ()->flushing_strategy ();
00705
00706 int const result = flushing_strategy->schedule_output (this);
00707
00708 if (result == -1)
00709 {
00710 if (TAO_debug_level > 5)
00711 {
00712 ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00713 "message_i, dequeuing msg due to schedule_output "
00714 "failure\n", this->id ()));
00715 }
00716 msg->remove_from_list (this->head_, this->tail_);
00717 msg->destroy ();
00718 }
00719 else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00720 {
00721 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00722 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00723 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00724 (void) flushing_strategy->flush_message (this, msg, 0);
00725 }
00726
00727 return 1;
00728 }
00729
00730 int
00731 TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
00732 ACE_Time_Value * max_wait_time)
00733 {
00734 int const n = this->drain_queue_i (max_wait_time);
00735
00736 if (n == -1)
00737 {
00738 synch_message.remove_from_list (this->head_, this->tail_);
00739 return -1;
00740 }
00741 else if (n == 1)
00742 {
00743 return 1;
00744 }
00745
00746 if (synch_message.all_data_sent ())
00747 {
00748 return 1;
00749 }
00750
00751 return 0;
00752 }
00753
00754 bool
00755 TAO_Transport::queue_is_empty_i (void)
00756 {
00757 return (this->head_ == 0);
00758 }
00759
00760
00761 int
00762 TAO_Transport::schedule_output_i (void)
00763 {
00764 ACE_Event_Handler * const eh = this->event_handler_i ();
00765 ACE_Reactor * const reactor = eh->reactor ();
00766
00767 if (reactor == 0)
00768 return -1;
00769
00770
00771
00772
00773 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00774 if (found)
00775 {
00776 found->remove_reference ();
00777
00778 if (found != eh)
00779 {
00780 if (TAO_debug_level > 3)
00781 {
00782 ACE_DEBUG ((LM_DEBUG,
00783 ACE_TEXT ("TAO (%P|%t) - ")
00784 ACE_TEXT ("Transport[%d]::schedule_output_i ")
00785 ACE_TEXT ("event handler not found in reactor,")
00786 ACE_TEXT ("returning -1\n"),
00787 this->id ()));
00788 }
00789
00790 return -1;
00791 }
00792 }
00793
00794 if (TAO_debug_level > 3)
00795 {
00796 ACE_DEBUG ((LM_DEBUG,
00797 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00798 this->id ()));
00799 }
00800
00801 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00802 }
00803
00804 int
00805 TAO_Transport::cancel_output_i (void)
00806 {
00807 ACE_Event_Handler * const eh = this->event_handler_i ();
00808 ACE_Reactor *const reactor = eh->reactor ();
00809
00810 if (TAO_debug_level > 3)
00811 {
00812 ACE_DEBUG ((LM_DEBUG,
00813 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00814 this->id ()));
00815 }
00816
00817 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00818 }
00819
00820 int
00821 TAO_Transport::handle_timeout (const ACE_Time_Value & ,
00822 const void *act)
00823 {
00824 if (TAO_debug_level > 6)
00825 {
00826 ACE_DEBUG ((LM_DEBUG,
00827 ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00828 ACE_TEXT ("timer expired\n"),
00829 this->id ()));
00830 }
00831
00832
00833 if (act != &this->current_deadline_)
00834 {
00835 return -1;
00836 }
00837
00838 if (this->flush_timer_pending ())
00839 {
00840
00841
00842 this->reset_flush_timer ();
00843
00844 TAO_Flushing_Strategy *flushing_strategy =
00845 this->orb_core ()->flushing_strategy ();
00846 int const result = flushing_strategy->schedule_output (this);
00847 if (result == TAO_Flushing_Strategy::MUST_FLUSH)
00848 {
00849 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00850 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00851 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00852 if (flushing_strategy->flush_transport (this, 0) == -1) {
00853 return -1;
00854 }
00855 }
00856 }
00857
00858 return 0;
00859 }
00860
00861 int
00862 TAO_Transport::drain_queue (ACE_Time_Value *max_wait_time)
00863 {
00864 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00865 int const retval = this->drain_queue_i (max_wait_time);
00866
00867 if (retval == 1)
00868 {
00869
00870
00871 TAO_Flushing_Strategy *flushing_strategy =
00872 this->orb_core ()->flushing_strategy ();
00873
00874 flushing_strategy->cancel_output (this);
00875
00876 return 0;
00877 }
00878
00879 return retval;
00880 }
00881
00882 int
00883 TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time)
00884 {
00885 size_t byte_count = 0;
00886 ACE_Countdown_Time countdown (max_wait_time);
00887
00888
00889 ssize_t retval = -1;
00890
00891 #if TAO_HAS_SENDFILE == 1
00892 if (this->mmap_allocator_)
00893 retval = this->sendfile (this->mmap_allocator_,
00894 iov,
00895 iovcnt,
00896 byte_count);
00897 else
00898 #endif
00899 retval = this->send (iov, iovcnt, byte_count, max_wait_time);
00900
00901 if (TAO_debug_level == 5)
00902 {
00903 dump_iov (iov, iovcnt, this->id (),
00904 byte_count, "drain_queue_helper");
00905 }
00906
00907
00908
00909
00910 this->cleanup_queue (byte_count);
00911 iovcnt = 0;
00912
00913 if (retval == 0)
00914 {
00915 if (TAO_debug_level > 4)
00916 {
00917 ACE_DEBUG ((LM_DEBUG,
00918 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00919 ACE_TEXT ("send() returns 0\n"),
00920 this->id ()));
00921 }
00922 return -1;
00923 }
00924 else if (retval == -1)
00925 {
00926 if (TAO_debug_level > 4)
00927 {
00928 ACE_DEBUG ((LM_DEBUG,
00929 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00930 ACE_TEXT ("error during send (errno: %d) - %m\n"),
00931 this->id (), errno));
00932 }
00933
00934 if (errno == EWOULDBLOCK || errno == EAGAIN)
00935 {
00936 return 0;
00937 }
00938
00939 return -1;
00940 }
00941
00942
00943
00944
00945
00946 this->sent_byte_count_ += byte_count;
00947
00948 if (TAO_debug_level > 4)
00949 {
00950 ACE_DEBUG ((LM_DEBUG,
00951 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00952 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00953 this->id(), byte_count, (this->head_ == 0)));
00954 }
00955
00956 return 1;
00957 }
00958
00959 int
00960 TAO_Transport::drain_queue_i (ACE_Time_Value *max_wait_time)
00961 {
00962
00963
00964
00965 int iovcnt = 0;
00966 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00967 iovec iov[ACE_IOV_MAX] = { { 0 , 0 } };
00968 #else
00969 iovec iov[ACE_IOV_MAX];
00970 #endif
00971
00972
00973 TAO_Queued_Message *i = this->head_;
00974
00975
00976
00977 this->sent_byte_count_ = 0;
00978
00979
00980
00981
00982 ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
00983
00984 while (i != 0)
00985 {
00986 if (i->is_expired (now))
00987 {
00988 if (TAO_debug_level > 3)
00989 {
00990 ACE_DEBUG ((LM_DEBUG,
00991 ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ")
00992 ACE_TEXT ("Discarding expired queued message.\n"),
00993 this->id ()));
00994 }
00995 TAO_Queued_Message *next = i->next ();
00996 i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
00997 this->orb_core_->leader_follower ());
00998 i->remove_from_list (this->head_, this->tail_);
00999 i->destroy ();
01000 i = next;
01001 continue;
01002 }
01003
01004 i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
01005
01006
01007
01008
01009 if (iovcnt == ACE_IOV_MAX)
01010 {
01011 int const retval = this->drain_queue_helper (iovcnt, iov,
01012 max_wait_time);
01013
01014 now = ACE_High_Res_Timer::gettimeofday_hr ();
01015
01016 if (TAO_debug_level > 4)
01017 {
01018 ACE_DEBUG ((LM_DEBUG,
01019 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01020 ACE_TEXT ("helper retval = %d\n"),
01021 this->id (), retval));
01022 }
01023
01024 if (retval != 1)
01025 {
01026 return retval;
01027 }
01028
01029 i = this->head_;
01030 continue;
01031 }
01032
01033
01034 i = i->next ();
01035 }
01036
01037 if (iovcnt != 0)
01038 {
01039 int const retval = this->drain_queue_helper (iovcnt, iov, max_wait_time);
01040
01041 if (TAO_debug_level > 4)
01042 {
01043 ACE_DEBUG ((LM_DEBUG,
01044 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
01045 ACE_TEXT ("helper retval = %d\n"),
01046 this->id (), retval));
01047 }
01048
01049 if (retval != 1)
01050 {
01051 return retval;
01052 }
01053 }
01054
01055 if (this->head_ == 0)
01056 {
01057 if (this->flush_timer_pending ())
01058 {
01059 ACE_Event_Handler *eh = this->event_handler_i ();
01060 ACE_Reactor * const reactor = eh->reactor ();
01061 reactor->cancel_timer (this->flush_timer_id_);
01062 this->reset_flush_timer ();
01063 }
01064
01065 return 1;
01066 }
01067
01068 return 0;
01069 }
01070
01071 void
01072 TAO_Transport::cleanup_queue_i ()
01073 {
01074 if (TAO_debug_level > 4)
01075 {
01076 ACE_DEBUG ((LM_DEBUG,
01077 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01078 ACE_TEXT ("cleaning up complete queue\n"),
01079 this->id ()));
01080 }
01081
01082 size_t byte_count = 0;
01083 int msg_count = 0;
01084
01085
01086 while (this->head_ != 0)
01087 {
01088 TAO_Queued_Message *i = this->head_;
01089
01090 if (TAO_debug_level > 4)
01091 {
01092 byte_count += i->message_length();
01093 ++msg_count;
01094 }
01095
01096
01097 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01098 this->orb_core_->leader_follower ());
01099
01100 i->remove_from_list (this->head_, this->tail_);
01101
01102 i->destroy ();
01103 }
01104
01105 if (TAO_debug_level > 4)
01106 {
01107 ACE_DEBUG ((LM_DEBUG,
01108 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
01109 ACE_TEXT ("discarded %d messages, %u bytes.\n"),
01110 this->id (), msg_count, byte_count));
01111 }
01112 }
01113
01114 void
01115 TAO_Transport::cleanup_queue (size_t byte_count)
01116 {
01117 while (this->head_ != 0 && byte_count > 0)
01118 {
01119 TAO_Queued_Message *i = this->head_;
01120
01121 if (TAO_debug_level > 4)
01122 {
01123 ACE_DEBUG ((LM_DEBUG,
01124 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01125 ACE_TEXT ("byte_count = %d\n"),
01126 this->id (), byte_count));
01127 }
01128
01129
01130 i->bytes_transferred (byte_count);
01131
01132 if (TAO_debug_level > 4)
01133 {
01134 ACE_DEBUG ((LM_DEBUG,
01135 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01136 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01137 this->id (), byte_count, i->all_data_sent (),
01138 i->message_length ()));
01139 }
01140
01141
01142
01143 if (i->all_data_sent ())
01144 {
01145 i->remove_from_list (this->head_, this->tail_);
01146 i->destroy ();
01147 }
01148 }
01149 }
01150
01151 int
01152 TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
01153 {
01154
01155 size_t msg_count = 0;
01156 size_t total_bytes = 0;
01157
01158 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01159 {
01160 ++msg_count;
01161 total_bytes += i->message_length ();
01162 }
01163
01164 bool set_timer = false;
01165 ACE_Time_Value new_deadline;
01166
01167 TAO::Transport_Queueing_Strategy *queue_strategy =
01168 stub->transport_queueing_strategy ();
01169
01170 bool constraints_reached = true;
01171
01172 if (queue_strategy)
01173 {
01174 constraints_reached =
01175 queue_strategy->buffering_constraints_reached (stub,
01176 msg_count,
01177 total_bytes,
01178 must_flush,
01179 this->current_deadline_,
01180 set_timer,
01181 new_deadline);
01182 }
01183 else
01184 {
01185 must_flush = false;
01186 }
01187
01188
01189 if (set_timer)
01190 {
01191 ACE_Event_Handler *eh = this->event_handler_i ();
01192 ACE_Reactor * const reactor = eh->reactor ();
01193 this->current_deadline_ = new_deadline;
01194 ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
01195
01196 if (this->flush_timer_pending ())
01197 {
01198 reactor->cancel_timer (this->flush_timer_id_);
01199 }
01200
01201 this->flush_timer_id_ =
01202 reactor->schedule_timer (&this->transport_timer_,
01203 &this->current_deadline_,
01204 delay);
01205 }
01206
01207 return constraints_reached;
01208 }
01209
01210 void
01211 TAO_Transport::report_invalid_event_handler (const char *caller)
01212 {
01213 if (TAO_debug_level > 0)
01214 {
01215 ACE_DEBUG ((LM_DEBUG,
01216 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01217 ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01218 this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01219 }
01220 }
01221
01222 void
01223 TAO_Transport::send_connection_closed_notifications (void)
01224 {
01225 {
01226 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01227
01228 this->send_connection_closed_notifications_i ();
01229 }
01230
01231 this->tms ()->connection_closed ();
01232 }
01233
01234 void
01235 TAO_Transport::send_connection_closed_notifications_i (void)
01236 {
01237 this->cleanup_queue_i ();
01238 }
01239
01240 int
01241 TAO_Transport::send_message_shared_i (TAO_Stub *stub,
01242 TAO_Message_Semantics message_semantics,
01243 const ACE_Message_Block *message_block,
01244 ACE_Time_Value *max_wait_time)
01245 {
01246 int ret = 0;
01247
01248 #if TAO_HAS_TRANSPORT_CURRENT == 1
01249 size_t const message_length = message_block->length ();
01250 #endif
01251
01252 switch (message_semantics)
01253 {
01254 case TAO_Transport::TAO_TWOWAY_REQUEST:
01255 ret = this->send_synchronous_message_i (message_block, max_wait_time);
01256 break;
01257
01258 case TAO_Transport::TAO_REPLY:
01259 ret = this->send_reply_message_i (message_block, max_wait_time);
01260 break;
01261
01262 case TAO_Transport::TAO_ONEWAY_REQUEST:
01263 ret = this->send_asynchronous_message_i (stub,
01264 message_block,
01265 max_wait_time);
01266 break;
01267 }
01268
01269 #if TAO_HAS_TRANSPORT_CURRENT == 1
01270
01271 if (ret != -1 && this->stats_ != 0)
01272 this->stats_->messages_sent (message_length);
01273 #endif
01274
01275 return ret;
01276 }
01277
01278 int
01279 TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
01280 const ACE_Message_Block *message_block,
01281 ACE_Time_Value *max_wait_time)
01282 {
01283
01284
01285 bool try_sending_first = true;
01286
01287 bool const queue_empty = (this->head_ == 0);
01288
01289 TAO::Transport_Queueing_Strategy *queue_strategy =
01290 stub->transport_queueing_strategy ();
01291
01292 if (!queue_empty)
01293 {
01294 try_sending_first = false;
01295 }
01296 else if (queue_strategy)
01297 {
01298 if (queue_strategy->must_queue (queue_empty))
01299 {
01300 try_sending_first = false;
01301 }
01302 }
01303
01304 bool partially_sent = false;
01305 bool timeout_encountered = false;
01306
01307 if (try_sending_first)
01308 {
01309 ssize_t n = 0;
01310 size_t byte_count = 0;
01311
01312
01313 size_t const total_length = message_block->total_length ();
01314
01315 if (TAO_debug_level > 6)
01316 {
01317 ACE_DEBUG ((LM_DEBUG,
01318 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01319 ACE_TEXT ("trying to send the message (ml = %d)\n"),
01320 this->id (), total_length));
01321 }
01322
01323
01324
01325
01326
01327 n = this->send_message_block_chain_i (message_block,
01328 byte_count,
01329 max_wait_time);
01330
01331 if (n == -1)
01332 {
01333
01334
01335
01336
01337
01338 if (errno != EWOULDBLOCK && errno != ETIME)
01339 {
01340 if (TAO_debug_level > 0)
01341 {
01342 ACE_ERROR ((LM_ERROR,
01343 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01344 ACE_TEXT ("fatal error in ")
01345 ACE_TEXT ("send_message_block_chain_i - %m\n"),
01346 this->id ()));
01347 }
01348 return -1;
01349 }
01350 }
01351
01352
01353 if (total_length == byte_count)
01354 {
01355
01356
01357
01358
01359
01360 return 0;
01361 }
01362
01363 if (byte_count > 0)
01364 {
01365 partially_sent = true;
01366 }
01367
01368
01369 if (errno == ETIME)
01370 {
01371 timeout_encountered = true;
01372 if (byte_count == 0)
01373 {
01374
01375
01376 if (TAO_debug_level > 2)
01377 {
01378 ACE_DEBUG ((LM_DEBUG,
01379 ACE_TEXT ("TAO (%P|%t) - ")
01380 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01381 ACE_TEXT ("timeout encountered before any bytes sent\n"),
01382 this->id ()));
01383 }
01384 throw ::CORBA::TIMEOUT (
01385 CORBA::SystemException::_tao_minor_code (
01386 TAO_TIMEOUT_SEND_MINOR_CODE,
01387 ETIME),
01388 CORBA::COMPLETED_NO);
01389 }
01390 }
01391
01392 if (TAO_debug_level > 6)
01393 {
01394 ACE_DEBUG ((LM_DEBUG,
01395 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01396 ACE_TEXT ("partial send %d / %d bytes\n"),
01397 this->id (), byte_count, total_length));
01398 }
01399
01400
01401
01402 while (message_block != 0 && message_block->length () == 0)
01403 {
01404 message_block = message_block->cont ();
01405 }
01406
01407
01408
01409 }
01410
01411
01412
01413
01414 ACE_Time_Value *wait_time = (partially_sent ? 0: max_wait_time);
01415 if (this->queue_message_i (message_block, wait_time, !partially_sent)
01416 == -1)
01417 {
01418 if (TAO_debug_level > 0)
01419 {
01420 ACE_DEBUG ((LM_DEBUG,
01421 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01422 ACE_TEXT ("send_asynchronous_message_i, ")
01423 ACE_TEXT ("cannot queue message for - %m\n"),
01424 this->id ()));
01425 }
01426 return -1;
01427 }
01428
01429 if (TAO_debug_level > 6)
01430 {
01431 ACE_DEBUG ((LM_DEBUG,
01432 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01433 ACE_TEXT ("message is queued\n"),
01434 this->id ()));
01435 }
01436
01437 if (timeout_encountered && partially_sent)
01438 {
01439
01440
01441
01442 if (TAO_debug_level > 0)
01443 {
01444 ACE_DEBUG ((LM_DEBUG,
01445 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01446 ACE_TEXT ("send_asynchronous_message_i, ")
01447 ACE_TEXT ("timeout after partial send, closing.\n"),
01448 this->id ()));
01449 }
01450 return -1;
01451 }
01452 else if (!timeout_encountered)
01453 {
01454
01455
01456
01457 bool must_flush = false;
01458 const bool constraints_reached =
01459 this->check_buffering_constraints_i (stub,
01460 must_flush);
01461
01462
01463
01464
01465
01466 TAO_Flushing_Strategy *flushing_strategy =
01467 this->orb_core ()->flushing_strategy ();
01468
01469 if (constraints_reached || try_sending_first)
01470 {
01471 int const result = flushing_strategy->schedule_output (this);
01472 if (result == TAO_Flushing_Strategy::MUST_FLUSH)
01473 {
01474 must_flush = true;
01475 }
01476 }
01477
01478 if (must_flush)
01479 {
01480 if (TAO_debug_level > 0)
01481 {
01482 ACE_DEBUG ((LM_DEBUG,
01483 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
01484 ACE_TEXT ("send_asynchronous_message_i, ")
01485 ACE_TEXT ("flushing transport.\n"),
01486 this->id ()));
01487 }
01488
01489 size_t sent_byte = sent_byte_count_;
01490 int ret = 0;
01491 {
01492 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01493 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01494 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01495 ret = flushing_strategy->flush_transport (this, max_wait_time);
01496 }
01497
01498 if (ret == -1)
01499 {
01500 if (errno == ETIME)
01501 {
01502 if (sent_byte == sent_byte_count_)
01503 {
01504
01505
01506 if (TAO_debug_level > 2)
01507 {
01508 ACE_DEBUG ((LM_DEBUG,
01509 ACE_TEXT ("TAO (%P|%t) - ")
01510 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
01511 ACE_TEXT ("2 timeout encountered before any bytes sent\n"),
01512 this->id ()));
01513 }
01514 throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code
01515 (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME),
01516 CORBA::COMPLETED_NO);
01517 }
01518 }
01519 return -1;
01520 }
01521 }
01522 }
01523 return 0;
01524 }
01525
01526 int
01527 TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
01528 ACE_Time_Value *max_wait_time, bool back)
01529 {
01530 TAO_Queued_Message *queued_message = 0;
01531 ACE_NEW_RETURN (queued_message,
01532 TAO_Asynch_Queued_Message (message_block,
01533 this->orb_core_,
01534 max_wait_time,
01535 0,
01536 true),
01537 -1);
01538 if (back) {
01539 queued_message->push_back (this->head_, this->tail_);
01540 }
01541 else {
01542 queued_message->push_front (this->head_, this->tail_);
01543 }
01544
01545 return 0;
01546 }
01547
01548
01549
01550
01551
01552
01553
01554 int
01555 TAO_Transport::handle_input (TAO_Resume_Handle &rh,
01556 ACE_Time_Value * max_wait_time)
01557 {
01558 if (TAO_debug_level > 3)
01559 {
01560 ACE_DEBUG ((LM_DEBUG,
01561 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01562 this->id ()));
01563 }
01564
01565
01566 int const retval = this->process_queue_head (rh);
01567
01568 if (retval <= 0)
01569 {
01570 if (retval == -1)
01571 {
01572 if (TAO_debug_level > 2)
01573 {
01574 ACE_DEBUG ((LM_DEBUG,
01575 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01576 ACE_TEXT ("error while parsing the head of the queue\n"),
01577 this->id()));
01578
01579 }
01580 return -1;
01581 }
01582 else
01583 {
01584
01585
01586
01587
01588 return 0;
01589 }
01590 }
01591
01592 TAO_Queued_Data *q_data = 0;
01593
01594 if (this->incoming_message_stack_.top (q_data) != -1
01595 && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
01596 {
01597
01598 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01599 {
01600 if (TAO_debug_level > 0)
01601 {
01602 ACE_ERROR ((LM_ERROR,
01603 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01604 ACE_TEXT ("error consolidating incoming message\n"),
01605 this->id ()));
01606 }
01607 return -1;
01608 }
01609 }
01610 else
01611 {
01612 if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01613 {
01614 if (TAO_debug_level > 0)
01615 {
01616 ACE_ERROR ((LM_ERROR,
01617 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01618 ACE_TEXT ("error parsing incoming message\n"),
01619 this->id ()));
01620 }
01621 return -1;
01622 }
01623 }
01624
01625 return 0;
01626 }
01627
01628 int
01629 TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data,
01630 TAO_Resume_Handle &rh)
01631 {
01632
01633 if (q_data->missing_data () != 0)
01634 {
01635 if (TAO_debug_level > 0)
01636 {
01637 ACE_ERROR ((LM_ERROR,
01638 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01639 ACE_TEXT ("missing data\n"),
01640 this->id ()));
01641 }
01642 return -1;
01643 }
01644
01645 if (q_data->more_fragments () ||
01646 q_data->msg_type () == GIOP::Fragment)
01647 {
01648
01649 TAO_Queued_Data *new_q_data = 0;
01650
01651 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01652 {
01653 case -1:
01654 return -1;
01655
01656 case 0:
01657 if (!new_q_data)
01658 {
01659 if (TAO_debug_level > 0)
01660 {
01661 ACE_ERROR ((LM_ERROR,
01662 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01663 ACE_TEXT ("error, consolidated message is NULL\n"),
01664 this->id ()));
01665 }
01666 return -1;
01667 }
01668
01669
01670 if (this->process_parsed_messages (new_q_data, rh) == -1)
01671 {
01672 TAO_Queued_Data::release (new_q_data);
01673
01674 if (TAO_debug_level > 0)
01675 {
01676 ACE_ERROR ((LM_ERROR,
01677 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01678 ACE_TEXT ("error processing consolidated message\n"),
01679 this->id ()));
01680 }
01681 return -1;
01682 }
01683
01684 TAO_Queued_Data::release (new_q_data);
01685
01686 break;
01687
01688 case 1:
01689 break;
01690 }
01691 }
01692 else
01693 {
01694 if (this->process_parsed_messages (q_data, rh) == -1)
01695 {
01696 TAO_Queued_Data::release (q_data);
01697
01698 if (TAO_debug_level > 0)
01699 {
01700 ACE_ERROR ((LM_ERROR,
01701 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01702 ACE_TEXT ("error processing message\n"),
01703 this->id ()));
01704 }
01705 return -1;
01706 }
01707
01708 TAO_Queued_Data::release (q_data);
01709
01710 }
01711
01712 return 0;
01713 }
01714
01715 int
01716 TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data)
01717 {
01718
01719
01720
01721 if (q_data->missing_data () != 0)
01722 {
01723 return -1;
01724 }
01725
01726 if (q_data->more_fragments () ||
01727 q_data->msg_type () == GIOP::Fragment)
01728 {
01729 TAO_Queued_Data *new_q_data = 0;
01730
01731 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01732 {
01733 case -1:
01734 return -1;
01735
01736 case 0:
01737 if (!new_q_data)
01738 {
01739 if (TAO_debug_level > 0)
01740 {
01741 ACE_ERROR ((LM_ERROR,
01742 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01743 ACE_TEXT ("error, consolidated message is NULL\n"),
01744 this->id ()));
01745 }
01746 return -1;
01747 }
01748
01749 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01750 {
01751 TAO_Queued_Data::release (new_q_data);
01752 return -1;
01753 }
01754 break;
01755
01756 case 1:
01757 break;
01758 }
01759 }
01760 else
01761 {
01762 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01763 {
01764 TAO_Queued_Data::release (q_data);
01765 return -1;
01766 }
01767 }
01768
01769 return 0;
01770 }
01771
01772 int
01773 TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh,
01774 ACE_Time_Value * max_wait_time,
01775 TAO_Queued_Data *q_data)
01776 {
01777
01778 if (q_data == 0)
01779 {
01780 return -1;
01781 }
01782
01783 if (TAO_debug_level > 3)
01784 {
01785 ACE_DEBUG ((LM_DEBUG,
01786 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01787 ACE_TEXT ("enter (missing data == %d)\n"),
01788 this->id (), q_data->missing_data ()));
01789 }
01790
01791 size_t const recv_size = q_data->missing_data ();
01792
01793 if (q_data->msg_block ()->space() < recv_size)
01794 {
01795
01796 size_t const message_size = recv_size + q_data->msg_block ()->length();
01797
01798 if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
01799 {
01800 return -1;
01801 }
01802 }
01803
01804
01805
01806
01807
01808
01809 this->recv_buffer_size_ = recv_size;
01810
01811
01812 ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
01813 recv_size,
01814 max_wait_time);
01815
01816 if (n <= 0)
01817 {
01818 return n;
01819 }
01820
01821 if (TAO_debug_level > 3)
01822 {
01823 ACE_DEBUG ((LM_DEBUG,
01824 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01825 ACE_TEXT ("read bytes %d\n"),
01826 this->id (), n));
01827 }
01828
01829 q_data->msg_block ()->wr_ptr(n);
01830 q_data->missing_data (q_data->missing_data () - n);
01831
01832 if (q_data->missing_data () == 0)
01833 {
01834
01835 if (this->incoming_message_stack_.pop (q_data) == -1)
01836 {
01837 return -1;
01838 }
01839
01840 if (this->consolidate_process_message (q_data, rh) == -1)
01841 {
01842 return -1;
01843 }
01844 }
01845
01846 return 0;
01847 }
01848
01849
01850 int
01851 TAO_Transport::handle_input_parse_extra_messages (ACE_Message_Block &message_block)
01852 {
01853
01854
01855
01856
01857 int buf_status = 0;
01858
01859 TAO_Queued_Data *q_data = 0;
01860
01861
01862
01863
01864 while (message_block.length () > 0 &&
01865 (buf_status = this->messaging_object ()->extract_next_message
01866 (message_block, q_data)) != -1 &&
01867 q_data != 0)
01868 {
01869 if (q_data->missing_data () == 0)
01870 {
01871 if (this->consolidate_enqueue_message (q_data) == -1)
01872 {
01873 return -1;
01874 }
01875 }
01876 else
01877 {
01878
01879 this->incoming_message_stack_.push (q_data);
01880 }
01881
01882 q_data = 0;
01883 }
01884
01885 if (buf_status == -1)
01886 {
01887 return -1;
01888 }
01889
01890 return 0;
01891 }
01892
01893 int
01894 TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh,
01895 ACE_Time_Value * max_wait_time)
01896 {
01897
01898 if (TAO_debug_level > 3)
01899 {
01900 ACE_DEBUG ((LM_DEBUG,
01901 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01902 ACE_TEXT ("enter\n"),
01903 this->id ()));
01904 }
01905
01906
01907
01908
01909
01910
01911 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01912
01913 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01914 (void) ACE_OS::memset (buf,
01915 '\0',
01916 sizeof buf);
01917 #endif
01918
01919
01920 ACE_Data_Block db (sizeof (buf),
01921 ACE_Message_Block::MB_DATA,
01922 buf,
01923 this->orb_core_->input_cdr_buffer_allocator (),
01924 this->orb_core_->locking_strategy (),
01925 ACE_Message_Block::DONT_DELETE,
01926 this->orb_core_->input_cdr_dblock_allocator ());
01927
01928
01929 ACE_Message_Block message_block (&db,
01930 ACE_Message_Block::DONT_DELETE,
01931 this->orb_core_->input_cdr_msgblock_allocator ());
01932
01933
01934
01935 ACE_CDR::mb_align (&message_block);
01936
01937 size_t recv_size = 0;
01938
01939
01940 TAO_Queued_Data *q_data = 0;
01941
01942
01943 size_t const header_length = this->messaging_object ()->header_length ();
01944
01945
01946 if (header_length > message_block.space ())
01947 {
01948 return -1;
01949 }
01950
01951 if (this->orb_core_->orb_params ()->single_read_optimization ())
01952 {
01953 recv_size = message_block.space ();
01954 }
01955 else
01956 {
01957
01958
01959
01960
01961
01962
01963 if (this->incoming_message_stack_.top (q_data) != -1
01964 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
01965 {
01966
01967
01968
01969 recv_size = header_length - q_data->msg_block ()->length ();
01970 }
01971 else
01972 {
01973
01974
01975 recv_size = header_length;
01976 }
01977
01978 }
01979
01980
01981
01982
01983 if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01984 {
01985
01986
01987
01988
01989
01990 if (this->partial_message_->length () <= recv_size &&
01991 message_block.copy (this->partial_message_->rd_ptr (),
01992 this->partial_message_->length ()) == 0)
01993 {
01994
01995 recv_size -= this->partial_message_->length ();
01996 this->partial_message_->reset ();
01997 }
01998 else
01999 {
02000 return -1;
02001 }
02002 }
02003
02004
02005 if (0 >= recv_size)
02006 {
02007
02008
02009
02010
02011
02012 if (TAO_debug_level > 0)
02013 {
02014 ACE_ERROR ((LM_ERROR,
02015 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02016 ACE_TEXT ("Error - endless loop detection, closing connection"),
02017 this->id ()));
02018 }
02019 return -1;
02020 }
02021
02022
02023
02024
02025
02026
02027 this->recv_buffer_size_ = recv_size;
02028
02029
02030
02031 ssize_t const n = this->recv (message_block.wr_ptr (),
02032 recv_size,
02033 max_wait_time);
02034
02035
02036 if (n <= 0)
02037 {
02038 return n;
02039 }
02040
02041 if (TAO_debug_level > 3)
02042 {
02043 ACE_DEBUG ((LM_DEBUG,
02044 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02045 ACE_TEXT ("read %d bytes\n"),
02046 this->id (), n));
02047 }
02048
02049
02050 message_block.wr_ptr (n);
02051
02052
02053
02054
02055
02056
02057
02058 if (this->incoming_message_stack_.top (q_data) != -1
02059 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
02060 {
02061
02062
02063
02064
02065
02066
02067
02068
02069
02070
02071 if (this->messaging_object ()->consolidate_node (q_data,
02072 message_block) == -1)
02073 {
02074 if (TAO_debug_level > 0)
02075 {
02076 ACE_ERROR ((LM_ERROR,
02077 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02078 ACE_TEXT ("error consolidating message from input buffer\n"),
02079 this->id () ));
02080 }
02081 return -1;
02082 }
02083
02084
02085 if (q_data->missing_data () == 0)
02086 {
02087 if (this->incoming_message_stack_.pop (q_data) == -1)
02088 {
02089 return -1;
02090 }
02091
02092 if (this->consolidate_enqueue_message (q_data) == -1)
02093 {
02094 return -1;
02095 }
02096 }
02097
02098 if (message_block.length () > 0
02099 && this->handle_input_parse_extra_messages (message_block) == -1)
02100 {
02101 return -1;
02102 }
02103
02104
02105 if (this->process_queue_head (rh) == -1)
02106 {
02107 return -1;
02108 }
02109 }
02110 else
02111 {
02112
02113
02114
02115
02116
02117
02118
02119
02120
02121 TAO_Queued_Data qd (&message_block,
02122 this->orb_core_->transport_message_buffer_allocator ());
02123
02124 size_t mesg_length = 0;
02125
02126 if (this->messaging_object ()->parse_next_message (qd,
02127 mesg_length) == -1
02128 || (qd.missing_data () == 0
02129 && mesg_length > message_block.length ()) )
02130 {
02131
02132 return -1;
02133 }
02134
02135
02136
02137 if (qd.missing_data () != 0 ||
02138 qd.more_fragments () ||
02139 qd.msg_type () == GIOP::Fragment)
02140 {
02141 if (qd.missing_data () == 0)
02142 {
02143
02144 TAO_Queued_Data *nqd =
02145 TAO_Queued_Data::duplicate (qd);
02146
02147 if (nqd == 0)
02148 {
02149 return -1;
02150 }
02151
02152
02153 char* end_mark = nqd->msg_block ()->rd_ptr ()
02154 + mesg_length;
02155 nqd->msg_block ()->wr_ptr (end_mark);
02156
02157
02158 message_block.rd_ptr (mesg_length);
02159
02160
02161 if (this->consolidate_enqueue_message (nqd) == -1)
02162 {
02163 return -1;
02164 }
02165
02166 if (message_block.length () > 0
02167 && this->handle_input_parse_extra_messages (message_block) == -1)
02168 {
02169 return -1;
02170 }
02171
02172
02173 if (this->process_queue_head (rh) == -1)
02174 {
02175 return -1;
02176 }
02177 }
02178 else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
02179 {
02180
02181
02182 if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
02183 qd.missing_data () > message_block.space ())
02184 {
02185
02186 if (ACE_CDR::grow (qd.msg_block (),
02187 message_block.length ()
02188 + qd.missing_data ()) == -1)
02189 {
02190 return -1;
02191 }
02192 }
02193
02194 TAO_Queued_Data *nqd =
02195 TAO_Queued_Data::duplicate (qd);
02196
02197 if (nqd == 0)
02198 {
02199 return -1;
02200 }
02201
02202
02203 message_block.rd_ptr (message_block.length());
02204
02205 this->incoming_message_stack_.push (nqd);
02206 }
02207 }
02208 else
02209 {
02210
02211
02212
02213
02214
02215
02216
02217
02218
02219
02220 char * end_marker = message_block.rd_ptr ()
02221 + mesg_length;
02222
02223 if (message_block.length () > mesg_length)
02224 {
02225
02226
02227 char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02228
02229
02230
02231 message_block.rd_ptr (mesg_length);
02232
02233
02234
02235 if (this->handle_input_parse_extra_messages (message_block) == -1)
02236 {
02237 return -1;
02238 }
02239
02240
02241 end_marker = message_block.rd_ptr ();
02242
02243
02244 message_block.rd_ptr (rd_ptr_stack_mesg);
02245 }
02246
02247
02248
02249
02250
02251
02252
02253
02254 if (this->incoming_message_queue_.queue_length () > 0)
02255 {
02256 if (TAO_debug_level > 0)
02257 {
02258 ACE_DEBUG ((LM_DEBUG,
02259 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02260 ACE_TEXT ("notify reactor\n"),
02261 this->id ()));
02262
02263 }
02264
02265 const int retval = this->notify_reactor ();
02266
02267 if (retval == 1)
02268 {
02269
02270
02271 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02272 }
02273 else if (retval < 0)
02274 return -1;
02275 }
02276 else
02277 {
02278
02279
02280 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02281 }
02282
02283
02284 if (this->process_parsed_messages (&qd,
02285 rh) == -1)
02286 {
02287 return -1;
02288 }
02289
02290
02291 message_block.rd_ptr (end_marker);
02292 }
02293 }
02294
02295
02296
02297 if (message_block.length () > 0)
02298 {
02299 if (this->partial_message_ == 0)
02300 {
02301 this->allocate_partial_message_block ();
02302 }
02303
02304 if (this->partial_message_ != 0 &&
02305 this->partial_message_->copy (message_block.rd_ptr (),
02306 message_block.length ()) == 0)
02307 {
02308 message_block.rd_ptr (message_block.length ());
02309 }
02310 else
02311 {
02312 return -1;
02313 }
02314 }
02315
02316 return 0;
02317 }
02318
02319
02320 int
02321 TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
02322 TAO_Resume_Handle &rh)
02323 {
02324 if (TAO_debug_level > 7)
02325 {
02326 ACE_DEBUG ((LM_DEBUG,
02327 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02328 ACE_TEXT ("entering (missing data == %d)\n"),
02329 this->id(), qd->missing_data ()));
02330 }
02331
02332 #if TAO_HAS_TRANSPORT_CURRENT == 1
02333
02334 if (this->stats_ != 0)
02335 this->stats_->messages_received (qd->msg_block ()->length ());
02336 #endif
02337
02338 switch (qd->msg_type ())
02339 {
02340 case GIOP::CloseConnection:
02341 {
02342 if (TAO_debug_level > 0)
02343 ACE_DEBUG ((LM_DEBUG,
02344 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02345 ACE_TEXT ("received CloseConnection message - %m\n"),
02346 this->id()));
02347
02348
02349
02350 return -1;
02351 }
02352 break;
02353 case GIOP::Request:
02354 case GIOP::LocateRequest:
02355 {
02356
02357
02358 rh.resume_handle ();
02359
02360 if (this->messaging_object ()->process_request_message (
02361 this,
02362 qd) == -1)
02363 {
02364
02365
02366 return -1;
02367 }
02368 }
02369 break;
02370 case GIOP::Reply:
02371 case GIOP::LocateReply:
02372 {
02373 rh.resume_handle ();
02374
02375 TAO_Pluggable_Reply_Params params (this);
02376
02377 if (this->messaging_object ()->process_reply_message (params, qd) == -1)
02378 {
02379 if (TAO_debug_level > 0)
02380 ACE_DEBUG ((LM_DEBUG,
02381 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02382 ACE_TEXT ("error in process_reply_message - %m\n"),
02383 this->id ()));
02384
02385 return -1;
02386 }
02387
02388 }
02389 break;
02390 case GIOP::CancelRequest:
02391 {
02392
02393
02394
02395 if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02396 {
02397 if (TAO_debug_level > 0)
02398 {
02399 ACE_ERROR ((LM_ERROR,
02400 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02401 ACE_TEXT ("error processing CancelRequest\n"),
02402 this->id ()));
02403 }
02404 }
02405
02406
02407
02408
02409
02410
02411
02412 }
02413 break;
02414 case GIOP::MessageError:
02415 {
02416 if (TAO_debug_level > 0)
02417 {
02418 ACE_ERROR ((LM_ERROR,
02419 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02420 ACE_TEXT ("received MessageError, closing connection\n"),
02421 this->id ()));
02422 }
02423 return -1;
02424 }
02425 break;
02426 case GIOP::Fragment:
02427 {
02428
02429 }
02430 break;
02431 }
02432
02433
02434 return 0;
02435 }
02436
02437 int
02438 TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
02439 {
02440 if (TAO_debug_level > 3)
02441 {
02442 ACE_DEBUG ((LM_DEBUG,
02443 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02444 this->id (), this->incoming_message_queue_.queue_length () ));
02445 }
02446
02447
02448 if (this->incoming_message_queue_.queue_length () > 0)
02449 {
02450
02451 TAO_Queued_Data *qd =
02452 this->incoming_message_queue_.dequeue_head ();
02453
02454 if (TAO_debug_level > 3)
02455 {
02456 ACE_DEBUG ((LM_DEBUG,
02457 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02458 ACE_TEXT ("the size of the queue is [%d]\n"),
02459 this->id (),
02460 this->incoming_message_queue_.queue_length()));
02461 }
02462
02463
02464 if (this->incoming_message_queue_.queue_length () > 0)
02465 {
02466 if (TAO_debug_level > 0)
02467 {
02468 ACE_DEBUG ((LM_DEBUG,
02469 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02470 ACE_TEXT ("notify reactor\n"),
02471 this->id ()));
02472 }
02473
02474 int const retval = this->notify_reactor ();
02475
02476 if (retval == 1)
02477 {
02478
02479
02480 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02481 }
02482 else if (retval < 0)
02483 return -1;
02484 }
02485 else
02486 {
02487
02488
02489 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02490 }
02491
02492
02493 if (this->process_parsed_messages (qd, rh) == -1)
02494 {
02495 return -1;
02496 }
02497
02498
02499 TAO_Queued_Data::release (qd);
02500
02501 return 0;
02502 }
02503
02504 return 1;
02505 }
02506
02507 int
02508 TAO_Transport::notify_reactor (void)
02509 {
02510 if (!this->ws_->is_registered ())
02511 {
02512 return 0;
02513 }
02514
02515 ACE_Event_Handler *eh = this->event_handler_i ();
02516
02517
02518 ACE_Reactor *reactor = this->orb_core ()->reactor ();
02519
02520 if (TAO_debug_level > 0)
02521 {
02522 ACE_DEBUG ((LM_DEBUG,
02523 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02524 ACE_TEXT ("notify to Reactor\n"),
02525 this->id ()));
02526 }
02527
02528
02529
02530 int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
02531
02532 if (retval < 0 && TAO_debug_level > 2)
02533 {
02534
02535
02536 ACE_DEBUG ((LM_DEBUG,
02537 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02538 ACE_TEXT ("notify to the reactor failed..\n"),
02539 this->id ()));
02540 }
02541
02542 return 1;
02543 }
02544
02545 TAO::Transport_Cache_Manager &
02546 TAO_Transport::transport_cache_manager (void)
02547 {
02548 return this->orb_core_->lane_resources ().transport_cache ();
02549 }
02550
02551 void
02552 TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02553 {
02554 if (this->char_translator_)
02555 {
02556 this->char_translator_->assign (inp);
02557 this->char_translator_->assign (outp);
02558 }
02559 if (this->wchar_translator_)
02560 {
02561 this->wchar_translator_->assign (inp);
02562 this->wchar_translator_->assign (outp);
02563 }
02564 }
02565
02566 void
02567 TAO_Transport::clear_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02568 {
02569 if (inp)
02570 {
02571 inp->char_translator (0);
02572 inp->wchar_translator (0);
02573 }
02574 if (outp)
02575 {
02576 outp->char_translator (0);
02577 outp->wchar_translator (0);
02578 }
02579 }
02580
02581 ACE_Event_Handler::Reference_Count
02582 TAO_Transport::add_reference (void)
02583 {
02584 return this->event_handler_i ()->add_reference ();
02585 }
02586
02587 ACE_Event_Handler::Reference_Count
02588 TAO_Transport::remove_reference (void)
02589 {
02590 return this->event_handler_i ()->remove_reference ();
02591 }
02592
02593 TAO_OutputCDR &
02594 TAO_Transport::out_stream (void)
02595 {
02596 return this->messaging_object ()->out_stream ();
02597 }
02598
02599 void
02600 TAO_Transport::messaging_init (TAO_GIOP_Message_Version const &version)
02601 {
02602 this->messaging_object ()->init (version.major, version.minor);
02603 }
02604
02605 void
02606 TAO_Transport::pre_close (void)
02607 {
02608 this->is_connected_ = false;
02609 this->purge_entry ();
02610 {
02611 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
02612 this->cleanup_queue_i ();
02613 }
02614 }
02615
02616 bool
02617 TAO_Transport::post_open (size_t id)
02618 {
02619 this->id_ = id;
02620
02621 {
02622 ACE_GUARD_RETURN (ACE_Lock,
02623 ace_mon,
02624 *this->handler_lock_,
02625 false);
02626 this->is_connected_ = true;
02627 }
02628
02629
02630
02631 if (this->queue_is_empty_i ())
02632 return true;
02633
02634
02635
02636
02637 if (this->wait_strategy ()->register_handler () != 0)
02638 {
02639
02640
02641
02642
02643 (void) this->purge_entry ();
02644
02645
02646 (void) this->close_connection ();
02647
02648 if (TAO_debug_level > 0)
02649 ACE_ERROR ((LM_ERROR,
02650 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02651 ACE_TEXT ("could not register the transport ")
02652 ACE_TEXT ("in the reactor.\n"),
02653 this->id ()));
02654
02655 return false;
02656 }
02657
02658 return true;
02659 }
02660
02661 void
02662 TAO_Transport::allocate_partial_message_block (void)
02663 {
02664 if (this->partial_message_ == 0)
02665 {
02666
02667
02668 size_t const partial_message_size =
02669 this->messaging_object ()->header_length ();
02670
02671
02672
02673 ACE_NEW (this->partial_message_,
02674 ACE_Message_Block (partial_message_size));
02675 }
02676 }
02677
02678
02679
02680
02681
02682
02683
02684
02685 TAO_END_VERSIONED_NAMESPACE_DECL