00001
00002
00003 #include "tao/Transport.h"
00004
00005 #include "tao/LF_Follower.h"
00006 #include "tao/Leader_Follower.h"
00007 #include "tao/Client_Strategy_Factory.h"
00008 #include "tao/Wait_Strategy.h"
00009 #include "tao/Transport_Mux_Strategy.h"
00010 #include "tao/Stub.h"
00011 #include "tao/Transport_Queueing_Strategies.h"
00012 #include "tao/Connection_Handler.h"
00013 #include "tao/Pluggable_Messaging.h"
00014 #include "tao/Synch_Queued_Message.h"
00015 #include "tao/Asynch_Queued_Message.h"
00016 #include "tao/Flushing_Strategy.h"
00017 #include "tao/Thread_Lane_Resources.h"
00018 #include "tao/Resume_Handle.h"
00019 #include "tao/Codeset_Manager.h"
00020 #include "tao/Codeset_Translator_Base.h"
00021 #include "tao/debug.h"
00022 #include "tao/CDR.h"
00023 #include "tao/ORB_Core.h"
00024 #include "tao/MMAP_Allocator.h"
00025
00026 #include "ace/OS_NS_sys_time.h"
00027 #include "ace/OS_NS_stdio.h"
00028 #include "ace/Reactor.h"
00029 #include "ace/os_include/sys/os_uio.h"
00030
00031
00032
00033
00034
00035
00036
00037 #if !defined (__ACE_INLINE__)
00038 # include "tao/Transport.inl"
00039 #endif
00040
00041
00042 ACE_RCSID (tao,
00043 Transport,
00044 "Transport.cpp,v 1.150 2006/06/20 06:33:40 jwillemsen Exp")
00045
00046
00047
00048
00049 static void
00050 dump_iov (iovec *iov, int iovcnt, size_t id,
00051 size_t current_transfer,
00052 const char *location)
00053 {
00054 ACE_Log_Msg::instance ()->acquire ();
00055
00056 ACE_DEBUG ((LM_DEBUG,
00057 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00058 ACE_TEXT ("sending %d buffers\n"),
00059 id, ACE_TEXT_CHAR_TO_TCHAR (location), iovcnt));
00060
00061 for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
00062 {
00063 size_t iov_len = iov[i].iov_len;
00064
00065
00066 if (current_transfer < iov_len)
00067 {
00068 iov_len = current_transfer;
00069 }
00070
00071 ACE_DEBUG ((LM_DEBUG,
00072 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00073 ACE_TEXT ("buffer %d/%d has %d bytes\n"),
00074 id, ACE_TEXT_CHAR_TO_TCHAR(location),
00075 i, iovcnt,
00076 iov_len));
00077
00078 size_t len;
00079
00080 for (size_t offset = 0; offset < iov_len; offset += len)
00081 {
00082 ACE_TCHAR header[1024];
00083 ACE_OS::sprintf (header,
00084 ACE_TEXT("TAO - ")
00085 ACE_TEXT("Transport[")
00086 ACE_SIZE_T_FORMAT_SPECIFIER
00087 ACE_TEXT("]::%s")
00088 ACE_TEXT(" (")
00089 ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT("/")
00090 ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT(")"),
00091 id, location, offset, iov_len);
00092
00093 len = iov_len - offset;
00094
00095 if (len > 512)
00096 {
00097 len = 512;
00098 }
00099
00100 ACE_HEX_DUMP ((LM_DEBUG,
00101 static_cast<char*> (iov[i].iov_base) + offset,
00102 len,
00103 header));
00104 }
00105 current_transfer -= iov_len;
00106 }
00107
00108 ACE_DEBUG ((LM_DEBUG,
00109 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
00110 ACE_TEXT ("end of data\n"),
00111 id, ACE_TEXT_CHAR_TO_TCHAR(location)));
00112
00113 ACE_Log_Msg::instance ()->release ();
00114 }
00115
00116 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00117
00118 TAO_Transport::TAO_Transport (CORBA::ULong tag,
00119 TAO_ORB_Core *orb_core)
00120 : tag_ (tag)
00121 , orb_core_ (orb_core)
00122 , cache_map_entry_ (0)
00123 , bidirectional_flag_ (-1)
00124 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
00125 , head_ (0)
00126 , tail_ (0)
00127 , incoming_message_queue_ (orb_core)
00128 , current_deadline_ (ACE_Time_Value::zero)
00129 , flush_timer_id_ (-1)
00130 , transport_timer_ (this)
00131 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
00132 , id_ ((size_t) this)
00133 , purging_order_ (0)
00134 , recv_buffer_size_ (0)
00135 , sent_byte_count_ (0)
00136 , is_connected_ (false)
00137 , char_translator_ (0)
00138 , wchar_translator_ (0)
00139 , tcs_set_ (0)
00140 , first_request_ (1)
00141 , partial_message_ (0)
00142 #ifdef ACE_HAS_SENDFILE
00143
00144
00145
00146
00147
00148 , mmap_allocator_ (
00149 dynamic_cast<TAO_MMAP_Allocator *> (
00150 orb_core->output_cdr_buffer_allocator ()))
00151 #endif
00152 {
00153 TAO_Client_Strategy_Factory *cf =
00154 this->orb_core_->client_factory ();
00155
00156
00157 this->ws_ = cf->create_wait_strategy (this);
00158
00159
00160 this->tms_ = cf->create_transport_mux_strategy (this);
00161
00162
00163
00164
00165
00166
00167
00168
00169 }
00170
00171 TAO_Transport::~TAO_Transport (void)
00172 {
00173 delete this->ws_;
00174
00175 delete this->tms_;
00176
00177 delete this->handler_lock_;
00178
00179 if (!this->is_connected_)
00180 {
00181
00182
00183 this->cleanup_queue_i();
00184
00185
00186 this->purge_entry();
00187 }
00188
00189
00190
00191 ACE_Message_Block::release (this->partial_message_);
00192
00193
00194
00195
00196
00197
00198 ACE_ASSERT (this->head_ == 0);
00199 ACE_ASSERT (this->cache_map_entry_ == 0);
00200
00201
00202
00203
00204
00205
00206
00207
00208 }
00209
00210 void
00211 TAO_Transport::provide_handler (TAO::Connection_Handler_Set &handlers)
00212 {
00213 (void) this->add_reference ();
00214
00215 handlers.insert (this->connection_handler_i ());
00216 }
00217
00218 bool
00219 TAO_Transport::provide_blockable_handler (TAO::Connection_Handler_Set &h)
00220 {
00221 if (this->ws_->non_blocking () ||
00222 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
00223 return false;
00224
00225 (void) this->add_reference ();
00226
00227 h.insert (this->connection_handler_i ());
00228
00229 return true;
00230 }
00231
00232 bool
00233 TAO_Transport::idle_after_send (void)
00234 {
00235 return this->tms ()->idle_after_send ();
00236 }
00237
00238 bool
00239 TAO_Transport::idle_after_reply (void)
00240 {
00241 return this->tms ()->idle_after_reply ();
00242 }
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254 int
00255 TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
00256 {
00257 ACE_NOTSUP_RETURN (-1);
00258 }
00259
00260 int
00261 TAO_Transport::send_message_shared (TAO_Stub *stub,
00262 int message_semantics,
00263 const ACE_Message_Block *message_block,
00264 ACE_Time_Value *max_wait_time)
00265 {
00266 int result = 0;
00267
00268 {
00269 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00270
00271 result =
00272 this->send_message_shared_i (stub, message_semantics,
00273 message_block, max_wait_time);
00274 }
00275
00276 if (result == -1)
00277 {
00278 this->close_connection ();
00279 }
00280
00281 return result;
00282 }
00283
00284
00285
00286 bool
00287 TAO_Transport::post_connect_hook (void)
00288 {
00289 return true;
00290 }
00291
00292 void
00293 TAO_Transport::close_connection (void)
00294 {
00295 this->connection_handler_i ()->close_connection ();
00296 }
00297
00298 int
00299 TAO_Transport::register_handler (void)
00300 {
00301 if (TAO_debug_level > 4)
00302 {
00303 ACE_DEBUG ((LM_DEBUG,
00304 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
00305 this->id ()));
00306 }
00307
00308 ACE_Reactor * const r = this->orb_core_->reactor ();
00309
00310
00311
00312 ACE_GUARD_RETURN (ACE_Lock,
00313 ace_mon,
00314 *this->handler_lock_,
00315 false);
00316
00317 if (r == this->event_handler_i ()->reactor ())
00318 {
00319 return 0;
00320 }
00321
00322
00323
00324
00325 this->ws_->is_registered (true);
00326
00327
00328 return r->register_handler (this->event_handler_i (),
00329 ACE_Event_Handler::READ_MASK);
00330 }
00331
00332 #ifdef ACE_HAS_SENDFILE
00333 ssize_t
00334 TAO_Transport::sendfile (TAO_MMAP_Allocator * ,
00335 iovec * iov,
00336 int iovcnt,
00337 size_t &bytes_transferred,
00338 ACE_Time_Value const * timeout)
00339 {
00340
00341
00342
00343
00344
00345
00346
00347 return this->send (iov, iovcnt, bytes_transferred, timeout);
00348 }
00349 #endif
00350
00351 int
00352 TAO_Transport::generate_locate_request (
00353 TAO_Target_Specification &spec,
00354 TAO_Operation_Details &opdetails,
00355 TAO_OutputCDR &output)
00356 {
00357 if (this->messaging_object ()->generate_locate_request_header (opdetails,
00358 spec,
00359 output)
00360 == -1)
00361 {
00362 if (TAO_debug_level > 0)
00363 {
00364 ACE_DEBUG ((LM_DEBUG,
00365 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
00366 ACE_TEXT ("error while marshalling the LocateRequest header\n"),
00367 this->id ()));
00368 }
00369
00370 return -1;
00371 }
00372
00373 return 0;
00374 }
00375
00376 int
00377 TAO_Transport::generate_request_header (
00378 TAO_Operation_Details &opdetails,
00379 TAO_Target_Specification &spec,
00380 TAO_OutputCDR &output)
00381 {
00382
00383
00384 if (this->first_request_)
00385 {
00386 TAO_Codeset_Manager * const csm = this->orb_core ()->codeset_manager ();
00387 if (csm)
00388 csm->generate_service_context (opdetails,*this);
00389 }
00390
00391 if (this->messaging_object ()->generate_request_header (opdetails,
00392 spec,
00393 output) == -1)
00394 {
00395 if (TAO_debug_level > 0)
00396 {
00397 ACE_DEBUG ((LM_DEBUG,
00398 ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
00399 ACE_TEXT ("error while marshalling the Request header\n"),
00400 this->id()));
00401 }
00402
00403 return -1;
00404 }
00405
00406 return 0;
00407 }
00408
00409
00410
00411 int
00412 TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
00413 {
00414
00415 this->purge_entry ();
00416
00417
00418 return this->transport_cache_manager ().cache_transport (desc,
00419 this);
00420 }
00421
00422 int
00423 TAO_Transport::purge_entry (void)
00424 {
00425 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00426 }
00427
00428 int
00429 TAO_Transport::make_idle (void)
00430 {
00431 if (TAO_debug_level > 3)
00432 {
00433 ACE_DEBUG ((LM_DEBUG,
00434 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
00435 this->id ()));
00436 }
00437
00438 return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00439 }
00440
00441 int
00442 TAO_Transport::update_transport (void)
00443 {
00444 return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00445 }
00446
00447
00448
00449
00450
00451
00452 int
00453 TAO_Transport::handle_output (void)
00454 {
00455 if (TAO_debug_level > 3)
00456 {
00457 ACE_DEBUG ((LM_DEBUG,
00458 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
00459 this->id ()));
00460 }
00461
00462
00463
00464
00465 int const retval = this->drain_queue ();
00466
00467 if (TAO_debug_level > 3)
00468 {
00469 ACE_DEBUG ((LM_DEBUG,
00470 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
00471 ACE_TEXT ("drain_queue returns %d/%d\n"),
00472 this->id (),
00473 retval, errno));
00474 }
00475
00476
00477 return retval;
00478 }
00479
00480 int
00481 TAO_Transport::format_queue_message (TAO_OutputCDR &stream)
00482 {
00483 if (this->messaging_object ()->format_message (stream) != 0)
00484 return -1;
00485
00486 return this->queue_message_i (stream.begin());
00487 }
00488
00489 int
00490 TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
00491 size_t &bytes_transferred,
00492 ACE_Time_Value *max_wait_time)
00493 {
00494 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00495
00496 return this->send_message_block_chain_i (mb,
00497 bytes_transferred,
00498 max_wait_time);
00499 }
00500
00501 int
00502 TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
00503 size_t &bytes_transferred,
00504 ACE_Time_Value *)
00505 {
00506 size_t const total_length = mb->total_length ();
00507
00508
00509
00510 TAO_Synch_Queued_Message synch_message (mb,
00511 this->orb_core_);
00512
00513 synch_message.push_back (this->head_, this->tail_);
00514
00515 int const n = this->drain_queue_i ();
00516
00517 if (n == -1)
00518 {
00519 synch_message.remove_from_list (this->head_, this->tail_);
00520 return -1;
00521 }
00522 else if (n == 1)
00523 {
00524 bytes_transferred = total_length;
00525 return 1;
00526 }
00527
00528
00529 synch_message.remove_from_list (this->head_, this->tail_);
00530
00531 bytes_transferred =
00532 total_length - synch_message.message_length ();
00533
00534 return 0;
00535 }
00536
00537 int
00538 TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
00539 ACE_Time_Value *max_wait_time)
00540 {
00541
00542
00543 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00544
00545
00546 synch_message.push_back (this->head_, this->tail_);
00547
00548 int const n =
00549 this->send_synch_message_helper_i (synch_message,
00550 max_wait_time);
00551
00552 if (n == -1 || n == 1)
00553 {
00554 return n;
00555 }
00556
00557
00558
00559 TAO_Flushing_Strategy *flushing_strategy =
00560 this->orb_core ()->flushing_strategy ();
00561 (void) flushing_strategy->schedule_output (this);
00562
00563
00564
00565 int result;
00566 {
00567 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00568 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00569 ACE_GUARD_RETURN (TAO_REVERSE_LOCK,
00570 ace_mon,
00571 reverse,
00572 -1);
00573
00574 result = flushing_strategy->flush_message (this,
00575 &synch_message,
00576 max_wait_time);
00577 }
00578
00579 if (result == -1)
00580 {
00581 synch_message.remove_from_list (this->head_, this->tail_);
00582
00583 if (errno == ETIME)
00584 {
00585 if (this->head_ == &synch_message)
00586 {
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601 synch_message.remove_from_list (this->head_, this->tail_);
00602 TAO_Queued_Message *queued_message = 0;
00603 ACE_NEW_RETURN (queued_message,
00604 TAO_Asynch_Queued_Message (
00605 synch_message.current_block (),
00606 this->orb_core_,
00607 0,
00608 1),
00609 -1);
00610 queued_message->push_front (this->head_, this->tail_);
00611 }
00612 }
00613
00614 if (TAO_debug_level > 0)
00615 {
00616 ACE_ERROR ((LM_ERROR,
00617 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
00618 ACE_TEXT ("error while flushing message - %m\n"),
00619 this->id ()));
00620 }
00621
00622 return -1;
00623 }
00624
00625 return 1;
00626 }
00627
00628
00629 int
00630 TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
00631 ACE_Time_Value *max_wait_time)
00632 {
00633
00634 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
00635
00636 synch_message.push_back (this->head_,
00637 this->tail_);
00638
00639 int const n =
00640 this->send_synch_message_helper_i (synch_message,
00641 max_wait_time);
00642
00643 if (n == -1 || n == 1)
00644 {
00645 return n;
00646 }
00647
00648 if (TAO_debug_level > 3)
00649 {
00650 ACE_DEBUG ((LM_DEBUG,
00651 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
00652 ACE_TEXT ("preparing to add to queue before leaving \n"),
00653 this->id ()));
00654 }
00655
00656
00657
00658 synch_message.remove_from_list (this->head_,
00659 this->tail_);
00660
00661
00662 TAO_Queued_Message *msg =
00663 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00664
00665
00666 msg->push_back (this->head_,
00667 this->tail_);
00668
00669 TAO_Flushing_Strategy *flushing_strategy =
00670 this->orb_core ()->flushing_strategy ();
00671
00672 int result = flushing_strategy->schedule_output (this);
00673
00674 if (result == -1)
00675 {
00676 if (TAO_debug_level > 5)
00677 {
00678 ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
00679 "message_i dequeuing msg due to schedule_output "
00680 "failure\n", this->id ()));
00681 }
00682 msg->remove_from_list (this->head_, this->tail_);
00683 msg->destroy ();
00684 }
00685
00686 return 1;
00687 }
00688
00689 int
00690 TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
00691 ACE_Time_Value * )
00692 {
00693
00694 int const n = this->drain_queue_i ();
00695
00696 if (n == -1)
00697 {
00698 synch_message.remove_from_list (this->head_, this->tail_);
00699 return -1;
00700 }
00701 else if (n == 1)
00702 {
00703 return 1;
00704 }
00705
00706 if (synch_message.all_data_sent ())
00707 {
00708 return 1;
00709 }
00710
00711 return 0;
00712 }
00713
00714 int
00715 TAO_Transport::queue_is_empty_i (void)
00716 {
00717 return (this->head_ == 0);
00718 }
00719
00720
00721 int
00722 TAO_Transport::schedule_output_i (void)
00723 {
00724 ACE_Event_Handler *eh = this->event_handler_i ();
00725 ACE_Reactor *reactor = eh->reactor ();
00726
00727
00728
00729
00730 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
00731 if (found != eh)
00732 {
00733 if(TAO_debug_level > 3)
00734 {
00735 ACE_DEBUG ((LM_DEBUG,
00736 "TAO (%P|%t) - Transport[%d]::schedule_output_i "
00737 "event handler not found in reactor, returning -1\n",
00738 this->id ()));
00739 }
00740 if (found)
00741 {
00742 found->remove_reference ();
00743 }
00744 return -1;
00745 }
00746 found->remove_reference ();
00747
00748 if (TAO_debug_level > 3)
00749 {
00750 ACE_DEBUG ((LM_DEBUG,
00751 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
00752 this->id ()));
00753 }
00754
00755 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00756 }
00757
00758 int
00759 TAO_Transport::cancel_output_i (void)
00760 {
00761 ACE_Event_Handler * const eh = this->event_handler_i ();
00762 ACE_Reactor *const reactor = eh->reactor ();
00763
00764 if (TAO_debug_level > 3)
00765 {
00766 ACE_DEBUG ((LM_DEBUG,
00767 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
00768 this->id ()));
00769 }
00770
00771 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00772 }
00773
00774 int
00775 TAO_Transport::handle_timeout (const ACE_Time_Value & ,
00776 const void *act)
00777 {
00778 if (TAO_debug_level > 6)
00779 {
00780 ACE_DEBUG ((LM_DEBUG,
00781 ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
00782 ACE_TEXT ("timer expired\n"),
00783 this->id ()));
00784 }
00785
00786
00787 if (act != &this->current_deadline_)
00788 {
00789 return -1;
00790 }
00791
00792 if (this->flush_timer_pending ())
00793 {
00794
00795
00796 this->reset_flush_timer ();
00797
00798 TAO_Flushing_Strategy *flushing_strategy =
00799 this->orb_core ()->flushing_strategy ();
00800 (void) flushing_strategy->schedule_output (this);
00801 }
00802
00803 return 0;
00804 }
00805
00806 int
00807 TAO_Transport::drain_queue (void)
00808 {
00809 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00810 int const retval = this->drain_queue_i ();
00811
00812 if (retval == 1)
00813 {
00814
00815
00816 TAO_Flushing_Strategy *flushing_strategy =
00817 this->orb_core ()->flushing_strategy ();
00818
00819 flushing_strategy->cancel_output (this);
00820
00821 return 0;
00822 }
00823
00824 return retval;
00825 }
00826
00827 int
00828 TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
00829 {
00830 size_t byte_count = 0;
00831
00832
00833 ssize_t retval = -1;
00834
00835 #ifdef ACE_HAS_SENDFILE
00836 if (this->mmap_allocator_)
00837 retval = this->sendfile (this->mmap_allocator_,
00838 iov,
00839 iovcnt,
00840 byte_count);
00841 else
00842 #endif
00843 retval = this->send (iov, iovcnt, byte_count);
00844
00845 if (TAO_debug_level == 5)
00846 {
00847 dump_iov (iov, iovcnt, this->id (),
00848 byte_count, "drain_queue_helper");
00849 }
00850
00851
00852
00853
00854 this->cleanup_queue (byte_count);
00855 iovcnt = 0;
00856
00857 if (retval == 0)
00858 {
00859 if (TAO_debug_level > 4)
00860 {
00861 ACE_DEBUG ((LM_DEBUG,
00862 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00863 ACE_TEXT ("send() returns 0\n"),
00864 this->id ()));
00865 }
00866 return -1;
00867 }
00868 else if (retval == -1)
00869 {
00870 if (TAO_debug_level > 4)
00871 {
00872 ACE_DEBUG ((LM_DEBUG,
00873 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00874 ACE_TEXT ("error during %p\n"),
00875 this->id (), ACE_TEXT ("send()")));
00876 }
00877
00878 if (errno == EWOULDBLOCK || errno == EAGAIN)
00879 {
00880 return 0;
00881 }
00882
00883 return -1;
00884 }
00885
00886
00887
00888
00889
00890 this->sent_byte_count_ += byte_count;
00891
00892 if (TAO_debug_level > 4)
00893 {
00894 ACE_DEBUG ((LM_DEBUG,
00895 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
00896 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
00897 this->id(), byte_count, (this->head_ == 0)));
00898 }
00899
00900 return 1;
00901 }
00902
00903 int
00904 TAO_Transport::drain_queue_i (void)
00905 {
00906
00907
00908
00909 int iovcnt = 0;
00910 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
00911 iovec iov[ACE_IOV_MAX] = { 0 , 0 };
00912 #else
00913 iovec iov[ACE_IOV_MAX];
00914 #endif
00915
00916
00917 TAO_Queued_Message *i = this->head_;
00918
00919
00920
00921 this->sent_byte_count_ = 0;
00922
00923 while (i != 0)
00924 {
00925
00926 i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
00927
00928
00929
00930
00931 if (iovcnt == ACE_IOV_MAX)
00932 {
00933 int const retval =
00934 this->drain_queue_helper (iovcnt, iov);
00935
00936 if (TAO_debug_level > 4)
00937 {
00938 ACE_DEBUG ((LM_DEBUG,
00939 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
00940 ACE_TEXT ("helper retval = %d\n"),
00941 this->id (), retval));
00942 }
00943
00944 if (retval != 1)
00945 {
00946 return retval;
00947 }
00948
00949 i = this->head_;
00950 continue;
00951 }
00952
00953
00954 i = i->next ();
00955 }
00956
00957 if (iovcnt != 0)
00958 {
00959 int const retval = this->drain_queue_helper (iovcnt, iov);
00960
00961 if (TAO_debug_level > 4)
00962 {
00963 ACE_DEBUG ((LM_DEBUG,
00964 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
00965 ACE_TEXT ("helper retval = %d\n"),
00966 this->id (), retval));
00967 }
00968
00969 if (retval != 1)
00970 {
00971 return retval;
00972 }
00973 }
00974
00975 if (this->head_ == 0)
00976 {
00977 if (this->flush_timer_pending ())
00978 {
00979 ACE_Event_Handler *eh = this->event_handler_i ();
00980 ACE_Reactor * const reactor = eh->reactor ();
00981 reactor->cancel_timer (this->flush_timer_id_);
00982 this->reset_flush_timer ();
00983 }
00984
00985 return 1;
00986 }
00987
00988 return 0;
00989 }
00990
00991 void
00992 TAO_Transport::cleanup_queue_i ()
00993 {
00994 if (TAO_debug_level > 4)
00995 {
00996 ACE_DEBUG ((LM_DEBUG,
00997 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
00998 ACE_TEXT ("cleaning up complete queue\n"),
00999 this->id ()));
01000 }
01001
01002
01003 while (this->head_ != 0)
01004 {
01005 TAO_Queued_Message *i = this->head_;
01006
01007
01008
01009 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
01010 this->orb_core_->leader_follower ());
01011
01012 i->remove_from_list (this->head_, this->tail_);
01013
01014 i->destroy ();
01015 }
01016 }
01017
01018 void
01019 TAO_Transport::cleanup_queue (size_t byte_count)
01020 {
01021 while (this->head_ != 0 && byte_count > 0)
01022 {
01023 TAO_Queued_Message *i = this->head_;
01024
01025 if (TAO_debug_level > 4)
01026 {
01027 ACE_DEBUG ((LM_DEBUG,
01028 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01029 ACE_TEXT ("byte_count = %d\n"),
01030 this->id (), byte_count));
01031 }
01032
01033
01034 i->bytes_transferred (byte_count);
01035
01036 if (TAO_debug_level > 4)
01037 {
01038 ACE_DEBUG ((LM_DEBUG,
01039 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
01040 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
01041 this->id (), byte_count, i->all_data_sent (),
01042 i->message_length ()));
01043 }
01044
01045
01046
01047 if (i->all_data_sent ())
01048 {
01049 i->remove_from_list (this->head_, this->tail_);
01050 i->destroy ();
01051 }
01052 }
01053 }
01054
01055 int
01056 TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub,
01057 bool &must_flush)
01058 {
01059
01060 size_t msg_count = 0;
01061 size_t total_bytes = 0;
01062
01063 for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01064 {
01065 ++msg_count;
01066 total_bytes += i->message_length ();
01067 }
01068
01069 bool set_timer;
01070 ACE_Time_Value new_deadline;
01071
01072 bool constraints_reached =
01073 stub->transport_queueing_strategy ().
01074 buffering_constraints_reached (stub,
01075 msg_count,
01076 total_bytes,
01077 must_flush,
01078 this->current_deadline_,
01079 set_timer,
01080 new_deadline);
01081
01082
01083 if (set_timer)
01084 {
01085 ACE_Event_Handler *eh = this->event_handler_i ();
01086 ACE_Reactor *reactor = eh->reactor ();
01087 this->current_deadline_ = new_deadline;
01088 ACE_Time_Value delay =
01089 new_deadline - ACE_OS::gettimeofday ();
01090
01091 if (this->flush_timer_pending ())
01092 {
01093 reactor->cancel_timer (this->flush_timer_id_);
01094 }
01095
01096 this->flush_timer_id_ =
01097 reactor->schedule_timer (&this->transport_timer_,
01098 &this->current_deadline_,
01099 delay);
01100 }
01101
01102 return constraints_reached;
01103 }
01104
01105 void
01106 TAO_Transport::report_invalid_event_handler (const char *caller)
01107 {
01108 if (TAO_debug_level > 0)
01109 {
01110 ACE_DEBUG ((LM_DEBUG,
01111 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
01112 ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
01113 this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
01114 }
01115 }
01116
01117 void
01118 TAO_Transport::send_connection_closed_notifications (void)
01119 {
01120 {
01121 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01122
01123 this->send_connection_closed_notifications_i ();
01124 }
01125
01126 this->tms ()->connection_closed ();
01127 }
01128
01129 void
01130 TAO_Transport::send_connection_closed_notifications_i (void)
01131 {
01132 this->cleanup_queue_i ();
01133
01134 this->messaging_object ()->reset ();
01135 }
01136
01137 int
01138 TAO_Transport::send_message_shared_i (TAO_Stub *stub,
01139 int message_semantics,
01140 const ACE_Message_Block *message_block,
01141 ACE_Time_Value *max_wait_time)
01142 {
01143 switch (message_semantics)
01144 {
01145 case TAO_Transport::TAO_TWOWAY_REQUEST:
01146 return this->send_synchronous_message_i (message_block,
01147 max_wait_time);
01148 case TAO_Transport::TAO_REPLY:
01149 return this->send_reply_message_i (message_block,
01150 max_wait_time);
01151 case TAO_Transport::TAO_ONEWAY_REQUEST:
01152 return this->send_asynchronous_message_i (stub,
01153 message_block,
01154 max_wait_time);
01155 }
01156
01157 return -1;
01158 }
01159
01160 int
01161 TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
01162 const ACE_Message_Block *message_block,
01163 ACE_Time_Value *max_wait_time)
01164 {
01165
01166
01167 bool try_sending_first = true;
01168
01169 const bool queue_empty = (this->head_ == 0);
01170
01171 if (!queue_empty)
01172 {
01173 try_sending_first = false;
01174 }
01175 else if (stub->transport_queueing_strategy ().must_queue (queue_empty))
01176 {
01177 try_sending_first = false;
01178 }
01179
01180 if (try_sending_first)
01181 {
01182 ssize_t n = 0;
01183 size_t byte_count = 0;
01184
01185
01186 const size_t total_length = message_block->total_length ();
01187
01188 if (TAO_debug_level > 6)
01189 {
01190 ACE_DEBUG ((LM_DEBUG,
01191 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01192 ACE_TEXT ("trying to send the message (ml = %d)\n"),
01193 this->id (), total_length));
01194 }
01195
01196
01197
01198
01199
01200 n = this->send_message_block_chain_i (message_block,
01201 byte_count,
01202 max_wait_time);
01203 if (n == -1)
01204 {
01205
01206
01207
01208
01209
01210 if (errno != EWOULDBLOCK && errno != ETIME)
01211 {
01212 if (TAO_debug_level > 0)
01213 {
01214 ACE_ERROR ((LM_ERROR,
01215 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01216 ACE_TEXT ("fatal error in ")
01217 ACE_TEXT ("send_message_block_chain_i - %m\n"),
01218 this->id ()));
01219 }
01220 return -1;
01221 }
01222 }
01223
01224
01225 if (total_length == byte_count)
01226 {
01227
01228
01229
01230
01231
01232 return 0;
01233 }
01234
01235 if (TAO_debug_level > 6)
01236 {
01237 ACE_DEBUG ((LM_DEBUG,
01238 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01239 ACE_TEXT ("partial send %d / %d bytes\n"),
01240 this->id (), byte_count, total_length));
01241 }
01242
01243
01244
01245 while (message_block != 0 && message_block->length () == 0)
01246 {
01247 message_block = message_block->cont ();
01248 }
01249
01250
01251
01252 }
01253
01254
01255
01256
01257 if (TAO_debug_level > 6)
01258 {
01259 ACE_DEBUG ((LM_DEBUG,
01260 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01261 ACE_TEXT ("message is queued\n"),
01262 this->id ()));
01263 }
01264
01265 if (this->queue_message_i(message_block) == -1)
01266 {
01267 if (TAO_debug_level > 0)
01268 ACE_DEBUG ((LM_DEBUG,
01269 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
01270 ACE_TEXT ("cannot queue message for ")
01271 ACE_TEXT (" - %m\n"),
01272 this->id ()));
01273 return -1;
01274 }
01275
01276
01277
01278 bool must_flush = false;
01279 const bool constraints_reached =
01280 this->check_buffering_constraints_i (stub,
01281 must_flush);
01282
01283
01284
01285
01286
01287 TAO_Flushing_Strategy *flushing_strategy =
01288 this->orb_core ()->flushing_strategy ();
01289
01290 if (constraints_reached || try_sending_first)
01291 {
01292 (void) flushing_strategy->schedule_output (this);
01293 }
01294
01295 if (must_flush)
01296 {
01297 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01298 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01299 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01300
01301 (void) flushing_strategy->flush_transport (this);
01302 }
01303
01304 return 0;
01305 }
01306
01307 int
01308 TAO_Transport::queue_message_i(const ACE_Message_Block *message_block)
01309 {
01310 TAO_Queued_Message *queued_message = 0;
01311 ACE_NEW_RETURN (queued_message,
01312 TAO_Asynch_Queued_Message (message_block,
01313 this->orb_core_,
01314 0,
01315 1),
01316 -1);
01317 queued_message->push_back (this->head_, this->tail_);
01318
01319 return 0;
01320 }
01321
01322
01323
01324
01325
01326
01327
01328 int
01329 TAO_Transport::handle_input (TAO_Resume_Handle &rh,
01330 ACE_Time_Value * max_wait_time,
01331 int )
01332 {
01333 if (TAO_debug_level > 3)
01334 {
01335 ACE_DEBUG ((LM_DEBUG,
01336 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
01337 this->id ()));
01338 }
01339
01340
01341 int const retval = this->process_queue_head (rh);
01342
01343 if (retval <= 0)
01344 {
01345 if (retval == -1)
01346 {
01347 if (TAO_debug_level > 2)
01348 {
01349 ACE_DEBUG ((LM_DEBUG,
01350 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01351 ACE_TEXT ("error while parsing the head of the queue\n"),
01352 this->id()));
01353
01354 }
01355 return -1;
01356 }
01357 else
01358 {
01359
01360
01361
01362
01363 return 0;
01364 }
01365 }
01366
01367 TAO_Queued_Data *q_data = 0;
01368
01369 if (this->incoming_message_stack_.top (q_data) != -1
01370 && q_data->missing_data_ != TAO_MISSING_DATA_UNDEFINED)
01371 {
01372
01373 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
01374 {
01375 if (TAO_debug_level > 0)
01376 {
01377 ACE_ERROR ((LM_ERROR,
01378 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01379 ACE_TEXT ("error consolidating incoming message\n"),
01380 this->id ()));
01381 }
01382 return -1;
01383 }
01384 }
01385 else
01386 {
01387 if (this->handle_input_parse_data (rh, max_wait_time) == -1)
01388 {
01389 if (TAO_debug_level > 0)
01390 {
01391 ACE_ERROR ((LM_ERROR,
01392 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
01393 ACE_TEXT ("error parsing incoming message\n"),
01394 this->id ()));
01395 }
01396 return -1;
01397 }
01398 }
01399
01400 return 0;
01401 }
01402
01403 int
01404 TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data,
01405 TAO_Resume_Handle &rh)
01406 {
01407
01408 if (q_data->missing_data_ != 0)
01409 {
01410 if (TAO_debug_level > 0)
01411 {
01412 ACE_ERROR ((LM_ERROR,
01413 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01414 ACE_TEXT ("missing data\n"),
01415 this->id ()));
01416 }
01417 return -1;
01418 }
01419
01420 if (q_data->more_fragments_ ||
01421 q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01422 {
01423
01424 TAO_Queued_Data *new_q_data = 0;
01425
01426 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01427 {
01428 case -1:
01429 return -1;
01430
01431 case 0:
01432 if (!new_q_data)
01433 {
01434 if (TAO_debug_level > 0)
01435 {
01436 ACE_ERROR ((LM_ERROR,
01437 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01438 ACE_TEXT ("error, consolidated message is NULL\n"),
01439 this->id ()));
01440 }
01441 return -1;
01442 }
01443
01444
01445 if (this->process_parsed_messages (new_q_data, rh) == -1)
01446 {
01447 TAO_Queued_Data::release (new_q_data);
01448
01449 if (TAO_debug_level > 0)
01450 {
01451 ACE_ERROR ((LM_ERROR,
01452 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01453 ACE_TEXT ("error processing consolidated message\n"),
01454 this->id ()));
01455 }
01456 return -1;
01457 }
01458
01459 TAO_Queued_Data::release (new_q_data);
01460
01461 break;
01462
01463 case 1:
01464 break;
01465 }
01466 }
01467 else
01468 {
01469 if (this->process_parsed_messages (q_data, rh) == -1)
01470 {
01471 TAO_Queued_Data::release (q_data);
01472
01473 if (TAO_debug_level > 0)
01474 {
01475 ACE_ERROR ((LM_ERROR,
01476 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
01477 ACE_TEXT ("error processing message\n"),
01478 this->id ()));
01479 }
01480 return -1;
01481 }
01482
01483 TAO_Queued_Data::release (q_data);
01484
01485 }
01486
01487 return 0;
01488 }
01489
01490 int
01491 TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data)
01492 {
01493
01494
01495
01496 if (q_data->missing_data_ != 0)
01497 {
01498 return -1;
01499 }
01500
01501 if (q_data->more_fragments_ ||
01502 q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01503 {
01504 TAO_Queued_Data *new_q_data = 0;
01505
01506 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
01507 {
01508 case -1:
01509 return -1;
01510
01511 case 0:
01512 if (!new_q_data)
01513 {
01514 if (TAO_debug_level > 0)
01515 {
01516 ACE_ERROR ((LM_ERROR,
01517 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
01518 ACE_TEXT ("error, consolidated message is NULL\n"),
01519 this->id ()));
01520 }
01521 return -1;
01522 }
01523
01524 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
01525 {
01526 TAO_Queued_Data::release (new_q_data);
01527 return -1;
01528 }
01529 break;
01530
01531 case 1:
01532 break;
01533 }
01534 }
01535 else
01536 {
01537 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
01538 {
01539 TAO_Queued_Data::release (q_data);
01540 return -1;
01541 }
01542 }
01543
01544 return 0;
01545 }
01546
01547 int
01548 TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh,
01549 ACE_Time_Value * max_wait_time,
01550 TAO_Queued_Data *q_data)
01551 {
01552
01553 if (q_data == 0)
01554 {
01555 return -1;
01556 }
01557
01558 if (TAO_debug_level > 3)
01559 {
01560 ACE_DEBUG ((LM_DEBUG,
01561 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01562 ACE_TEXT ("enter (missing data == %d)\n"),
01563 this->id (), q_data->missing_data_));
01564 }
01565
01566 size_t const recv_size = q_data->missing_data_;
01567
01568
01569 size_t const message_size = recv_size
01570 + q_data->msg_block_->length();
01571
01572 if (q_data->msg_block_->space() < recv_size)
01573 {
01574 if (ACE_CDR::grow (q_data->msg_block_, message_size) == -1)
01575 {
01576 return -1;
01577 }
01578 }
01579
01580
01581
01582
01583
01584
01585 this->recv_buffer_size_ = recv_size;
01586
01587
01588 ssize_t const n = this->recv (q_data->msg_block_->wr_ptr(),
01589 recv_size,
01590 max_wait_time);
01591
01592
01593 if (n <= 0)
01594 {
01595 return n;
01596 }
01597
01598 if (TAO_debug_level > 3)
01599 {
01600 ACE_DEBUG ((LM_DEBUG,
01601 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
01602 ACE_TEXT ("read bytes %d\n"),
01603 this->id (), n));
01604 }
01605
01606 q_data->msg_block_->wr_ptr(n);
01607 q_data->missing_data_ -= n;
01608
01609 if (q_data->missing_data_ == 0)
01610 {
01611
01612 if (this->incoming_message_stack_.pop (q_data) == -1)
01613 {
01614 return -1;
01615 }
01616
01617 if (this->consolidate_process_message (q_data, rh) == -1)
01618 {
01619 return -1;
01620 }
01621 }
01622
01623 return 0;
01624 }
01625
01626
01627 int
01628 TAO_Transport::handle_input_parse_extra_messages (ACE_Message_Block &message_block)
01629 {
01630
01631
01632
01633
01634 int buf_status = 0;
01635
01636 TAO_Queued_Data *q_data = 0;
01637
01638
01639
01640
01641 while (message_block.length () > 0 &&
01642 (buf_status = this->messaging_object ()->extract_next_message
01643 (message_block, q_data)) != -1 &&
01644 q_data != 0)
01645 {
01646 if (q_data->missing_data_ == 0)
01647 {
01648 if (this->consolidate_enqueue_message (q_data) == -1)
01649 {
01650 return -1;
01651 }
01652 }
01653 else
01654 {
01655
01656 this->incoming_message_stack_.push (q_data);
01657 }
01658
01659 q_data = 0;
01660 }
01661
01662 if (buf_status == -1)
01663 {
01664 return -1;
01665 }
01666
01667 return 0;
01668 }
01669
01670 int
01671 TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh,
01672 ACE_Time_Value * max_wait_time)
01673 {
01674
01675 if (TAO_debug_level > 3)
01676 {
01677 ACE_DEBUG ((LM_DEBUG,
01678 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01679 ACE_TEXT ("enter\n"),
01680 this->id ()));
01681 }
01682
01683
01684
01685
01686
01687
01688 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
01689
01690 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
01691 (void) ACE_OS::memset (buf,
01692 '\0',
01693 sizeof buf);
01694 #endif
01695
01696
01697 ACE_Data_Block db (sizeof (buf),
01698 ACE_Message_Block::MB_DATA,
01699 buf,
01700 this->orb_core_->input_cdr_buffer_allocator (),
01701 this->orb_core_->locking_strategy (),
01702 ACE_Message_Block::DONT_DELETE,
01703 this->orb_core_->input_cdr_dblock_allocator ());
01704
01705
01706 ACE_Message_Block message_block (&db,
01707 ACE_Message_Block::DONT_DELETE,
01708 this->orb_core_->input_cdr_msgblock_allocator ());
01709
01710
01711
01712 ACE_CDR::mb_align (&message_block);
01713
01714 size_t recv_size = 0;
01715
01716
01717 TAO_Queued_Data *q_data = 0;
01718
01719
01720 const size_t header_length =
01721 this->messaging_object ()->header_length ();
01722
01723
01724 if (header_length > message_block.space ())
01725 {
01726 return -1;
01727 }
01728
01729 if (this->orb_core_->orb_params ()->single_read_optimization ())
01730 {
01731 recv_size =
01732 message_block.space ();
01733 }
01734 else
01735 {
01736
01737
01738
01739
01740
01741
01742 if (this->incoming_message_stack_.top (q_data) != -1
01743 && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
01744 {
01745
01746
01747
01748 recv_size = header_length - q_data->msg_block_->length ();
01749 }
01750 else
01751 {
01752
01753
01754 recv_size = header_length;
01755 }
01756
01757 }
01758
01759
01760
01761
01762 if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
01763 {
01764
01765
01766
01767
01768
01769 if (this->partial_message_->length () <= recv_size &&
01770 message_block.copy (this->partial_message_->rd_ptr (),
01771 this->partial_message_->length ()) == 0)
01772 {
01773
01774 recv_size -= this->partial_message_->length ();
01775 this->partial_message_->reset ();
01776 }
01777 else
01778 {
01779 return -1;
01780 }
01781 }
01782
01783
01784 if (0 >= recv_size)
01785 {
01786
01787
01788
01789
01790
01791 if (TAO_debug_level > 0)
01792 {
01793 ACE_ERROR ((LM_ERROR,
01794 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01795 ACE_TEXT ("Error - endless loop detection, closing connection"),
01796 this->id ()));
01797 }
01798 return -1;
01799 }
01800
01801
01802
01803
01804
01805
01806 this->recv_buffer_size_ = recv_size;
01807
01808
01809
01810 const ssize_t n = this->recv (message_block.wr_ptr (),
01811 recv_size,
01812 max_wait_time);
01813
01814
01815 if (n <= 0)
01816 {
01817 return n;
01818 }
01819
01820 if (TAO_debug_level > 3)
01821 {
01822 ACE_DEBUG ((LM_DEBUG,
01823 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01824 ACE_TEXT ("read %d bytes\n"),
01825 this->id (), n));
01826 }
01827
01828
01829 message_block.wr_ptr (n);
01830
01831
01832
01833
01834
01835
01836
01837 if (this->incoming_message_stack_.top (q_data) != -1
01838 && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
01839 {
01840
01841
01842
01843
01844
01845
01846
01847
01848
01849
01850 if (this->messaging_object ()->consolidate_node (q_data,
01851 message_block) == -1)
01852 {
01853 if (TAO_debug_level > 0)
01854 {
01855 ACE_ERROR ((LM_ERROR,
01856 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
01857 ACE_TEXT ("error consolidating message from input buffer\n"),
01858 this->id () ));
01859 }
01860 return -1;
01861 }
01862
01863
01864 if (q_data->missing_data_ == 0)
01865 {
01866 if (this->incoming_message_stack_.pop (q_data) == -1)
01867 {
01868 return -1;
01869 }
01870
01871 if (this->consolidate_enqueue_message (q_data) == -1)
01872 {
01873 return -1;
01874 }
01875 }
01876
01877 if (message_block.length () > 0
01878 && this->handle_input_parse_extra_messages (message_block) == -1)
01879 {
01880 return -1;
01881 }
01882
01883
01884 if (this->process_queue_head (rh) == -1)
01885 {
01886 return -1;
01887 }
01888 }
01889 else
01890 {
01891
01892
01893
01894
01895
01896
01897
01898
01899
01900 TAO_Queued_Data qd (&message_block,
01901 this->orb_core_->transport_message_buffer_allocator ());
01902
01903 size_t mesg_length = 0;
01904
01905 if (this->messaging_object ()->parse_next_message (message_block,
01906 qd,
01907 mesg_length) == -1
01908 || (qd.missing_data_ == 0
01909 && mesg_length > message_block.length ()) )
01910 {
01911
01912 return -1;
01913 }
01914
01915
01916
01917 if (qd.missing_data_ != 0 ||
01918 qd.more_fragments_ ||
01919 qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
01920 {
01921 if (qd.missing_data_ == 0)
01922 {
01923
01924 TAO_Queued_Data *nqd =
01925 TAO_Queued_Data::duplicate (qd);
01926
01927 if (nqd == 0)
01928 {
01929 return -1;
01930 }
01931
01932
01933 char* end_mark = nqd->msg_block_->rd_ptr ()
01934 + mesg_length;
01935 nqd->msg_block_->wr_ptr (end_mark);
01936
01937
01938 message_block.rd_ptr (mesg_length);
01939
01940
01941 if (this->consolidate_enqueue_message (nqd) == -1)
01942 {
01943 return -1;
01944 }
01945
01946 if (message_block.length () > 0
01947 && this->handle_input_parse_extra_messages (message_block) == -1)
01948 {
01949 return -1;
01950 }
01951
01952
01953 if (this->process_queue_head (rh) == -1)
01954 {
01955 return -1;
01956 }
01957 }
01958 else if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED)
01959 {
01960
01961
01962 if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED &&
01963 qd.missing_data_ > message_block.space ())
01964 {
01965
01966 if (ACE_CDR::grow (qd.msg_block_,
01967 message_block.length ()
01968 + qd.missing_data_) == -1)
01969 {
01970 return -1;
01971 }
01972 }
01973
01974 TAO_Queued_Data *nqd =
01975 TAO_Queued_Data::duplicate (qd);
01976
01977 if (nqd == 0)
01978 {
01979 return -1;
01980 }
01981
01982
01983 message_block.rd_ptr (message_block.length());
01984
01985 this->incoming_message_stack_.push (nqd);
01986 }
01987 }
01988 else
01989 {
01990
01991
01992
01993
01994
01995
01996
01997
01998
01999
02000 char * end_marker = message_block.rd_ptr ()
02001 + mesg_length;
02002
02003 if (message_block.length () > mesg_length)
02004 {
02005
02006
02007 char *rd_ptr_stack_mesg = message_block.rd_ptr ();
02008
02009
02010
02011 message_block.rd_ptr (mesg_length);
02012
02013
02014
02015 if (this->handle_input_parse_extra_messages (message_block) == -1)
02016 {
02017 return -1;
02018 }
02019
02020
02021 end_marker = message_block.rd_ptr ();
02022
02023
02024 message_block.rd_ptr (rd_ptr_stack_mesg);
02025 }
02026
02027
02028
02029
02030
02031
02032
02033
02034 if (this->incoming_message_queue_.queue_length () > 0)
02035 {
02036 if (TAO_debug_level > 0)
02037 {
02038 ACE_DEBUG ((LM_DEBUG,
02039 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
02040 ACE_TEXT ("notify reactor\n"),
02041 this->id ()));
02042
02043 }
02044
02045 const int retval = this->notify_reactor ();
02046
02047 if (retval == 1)
02048 {
02049
02050
02051 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02052 }
02053 else if (retval < 0)
02054 return -1;
02055 }
02056 else
02057 {
02058
02059
02060 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02061 }
02062
02063
02064 if (this->process_parsed_messages (&qd,
02065 rh) == -1)
02066 {
02067 return -1;
02068 }
02069
02070
02071 message_block.rd_ptr (end_marker);
02072 }
02073 }
02074
02075
02076
02077 if (message_block.length () > 0)
02078 {
02079 if (this->partial_message_ == 0)
02080 {
02081 this->allocate_partial_message_block ();
02082 }
02083
02084 if (this->partial_message_ != 0 &&
02085 this->partial_message_->copy (message_block.rd_ptr (),
02086 message_block.length ()) == 0)
02087 {
02088 message_block.rd_ptr (message_block.length ());
02089 }
02090 else
02091 {
02092 return -1;
02093 }
02094 }
02095
02096 return 0;
02097 }
02098
02099
02100 int
02101 TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
02102 TAO_Resume_Handle &rh)
02103 {
02104 if (TAO_debug_level > 7)
02105 {
02106 ACE_DEBUG ((LM_DEBUG,
02107 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02108 ACE_TEXT ("entering (missing data == %d)\n"),
02109 this->id(), qd->missing_data_));
02110 }
02111
02112
02113 const TAO_Pluggable_Message_Type t = qd->msg_type_;
02114
02115
02116
02117 if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
02118 {
02119 if (TAO_debug_level > 0)
02120 ACE_DEBUG ((LM_DEBUG,
02121 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02122 ACE_TEXT ("received CloseConnection message - %m\n"),
02123 this->id()));
02124
02125
02126
02127 return -1;
02128 }
02129 else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST ||
02130 t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST)
02131 {
02132
02133
02134 rh.resume_handle ();
02135
02136 if (this->messaging_object ()->process_request_message (
02137 this,
02138 qd) == -1)
02139 {
02140
02141
02142 return -1;
02143 }
02144 }
02145 else if (t == TAO_PLUGGABLE_MESSAGE_REPLY ||
02146 t == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
02147 {
02148 rh.resume_handle ();
02149
02150 TAO_Pluggable_Reply_Params params (this);
02151
02152 if (this->messaging_object ()->process_reply_message (params,
02153 qd) == -1)
02154 {
02155 if (TAO_debug_level > 0)
02156 ACE_DEBUG ((LM_DEBUG,
02157 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02158 ACE_TEXT ("error in process_reply_message - %m\n"),
02159 this->id ()));
02160
02161 return -1;
02162 }
02163
02164 }
02165 else if (t == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST)
02166 {
02167
02168
02169
02170
02171 if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
02172 {
02173 if (TAO_debug_level > 0)
02174 {
02175 ACE_ERROR ((LM_ERROR,
02176 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02177 ACE_TEXT ("error processing CancelRequest\n"),
02178 this->id ()));
02179 }
02180 }
02181
02182
02183
02184
02185
02186
02187
02188 }
02189 else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
02190 {
02191 if (TAO_debug_level > 0)
02192 {
02193 ACE_ERROR ((LM_ERROR,
02194 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
02195 ACE_TEXT ("received MessageError, closing connection\n"),
02196 this->id ()));
02197 }
02198 return -1;
02199 }
02200
02201
02202 return 0;
02203 }
02204
02205 int
02206 TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
02207 {
02208 if (TAO_debug_level > 3)
02209 {
02210 ACE_DEBUG ((LM_DEBUG,
02211 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
02212 this->id (), this->incoming_message_queue_.queue_length () ));
02213 }
02214
02215
02216 if (this->incoming_message_queue_.queue_length () > 0)
02217 {
02218
02219 TAO_Queued_Data *qd =
02220 this->incoming_message_queue_.dequeue_head ();
02221
02222 if (TAO_debug_level > 3)
02223 {
02224 ACE_DEBUG ((LM_DEBUG,
02225 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02226 ACE_TEXT ("the size of the queue is [%d]\n"),
02227 this->id (),
02228 this->incoming_message_queue_.queue_length()));
02229 }
02230
02231
02232 if (this->incoming_message_queue_.queue_length () > 0)
02233 {
02234 if (TAO_debug_level > 0)
02235 {
02236 ACE_DEBUG ((LM_DEBUG,
02237 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
02238 ACE_TEXT ("notify reactor\n"),
02239 this->id ()));
02240
02241 }
02242
02243 const int retval = this->notify_reactor ();
02244
02245 if (retval == 1)
02246 {
02247
02248
02249 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02250 }
02251 else if (retval < 0)
02252 return -1;
02253 }
02254 else
02255 {
02256
02257
02258 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02259 }
02260
02261
02262 if (this->process_parsed_messages (qd, rh) == -1)
02263 {
02264 return -1;
02265 }
02266
02267
02268 TAO_Queued_Data::release (qd);
02269
02270 return 0;
02271 }
02272
02273 return 1;
02274 }
02275
02276 int
02277 TAO_Transport::notify_reactor (void)
02278 {
02279 if (!this->ws_->is_registered ())
02280 {
02281 return 0;
02282 }
02283
02284 ACE_Event_Handler *eh = this->event_handler_i ();
02285
02286
02287 ACE_Reactor *reactor = this->orb_core ()->reactor ();
02288
02289 if (TAO_debug_level > 0)
02290 {
02291 ACE_DEBUG ((LM_DEBUG,
02292 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02293 ACE_TEXT ("notify to Reactor\n"),
02294 this->id ()));
02295 }
02296
02297
02298
02299 const int retval = reactor->notify (eh,
02300 ACE_Event_Handler::READ_MASK);
02301
02302 if (retval < 0 && TAO_debug_level > 2)
02303 {
02304
02305
02306 ACE_DEBUG ((LM_DEBUG,
02307 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
02308 ACE_TEXT ("notify to the reactor failed..\n"),
02309 this->id ()));
02310 }
02311
02312 return 1;
02313 }
02314
02315 TAO::Transport_Cache_Manager &
02316 TAO_Transport::transport_cache_manager (void)
02317 {
02318 return this->orb_core_->lane_resources ().transport_cache ();
02319 }
02320
02321 void
02322 TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02323 {
02324 if (this->char_translator_)
02325 {
02326 this->char_translator_->assign (inp);
02327 this->char_translator_->assign (outp);
02328 }
02329 if (this->wchar_translator_)
02330 {
02331 this->wchar_translator_->assign (inp);
02332 this->wchar_translator_->assign (outp);
02333 }
02334 }
02335
02336 void
02337 TAO_Transport::clear_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
02338 {
02339 if (inp)
02340 {
02341 inp->char_translator (0);
02342 inp->wchar_translator (0);
02343 }
02344 if (outp)
02345 {
02346 outp->char_translator (0);
02347 outp->wchar_translator (0);
02348 }
02349 }
02350
02351 ACE_Event_Handler::Reference_Count
02352 TAO_Transport::add_reference (void)
02353 {
02354 return this->event_handler_i ()->add_reference ();
02355 }
02356
02357 ACE_Event_Handler::Reference_Count
02358 TAO_Transport::remove_reference (void)
02359 {
02360 return this->event_handler_i ()->remove_reference ();
02361 }
02362
02363 TAO_OutputCDR &
02364 TAO_Transport::out_stream (void)
02365 {
02366 return this->messaging_object ()->out_stream ();
02367 }
02368
02369 bool
02370 TAO_Transport::post_open (size_t id)
02371 {
02372 this->id_ = id;
02373
02374 {
02375 ACE_GUARD_RETURN (ACE_Lock,
02376 ace_mon,
02377 *this->handler_lock_,
02378 false);
02379 this->is_connected_ = true;
02380 }
02381
02382
02383
02384 if (this->queue_is_empty_i ())
02385 return true;
02386
02387
02388
02389
02390 if (this->wait_strategy ()->register_handler () == 0)
02391 {
02392 TAO_Flushing_Strategy *flushing_strategy =
02393 this->orb_core ()->flushing_strategy ();
02394 (void) flushing_strategy->schedule_output (this);
02395 }
02396 else
02397 {
02398
02399
02400
02401
02402 (void) this->purge_entry ();
02403
02404
02405 (void) this->close_connection ();
02406
02407 if (TAO_debug_level > 0)
02408 ACE_ERROR ((LM_ERROR,
02409 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
02410 ACE_TEXT ("could not register the transport ")
02411 ACE_TEXT ("in the reactor.\n"),
02412 this->id ()));
02413
02414 return false;
02415 }
02416
02417 return true;
02418 }
02419
02420 void
02421 TAO_Transport::allocate_partial_message_block (void)
02422 {
02423 if (this->partial_message_ == 0)
02424 {
02425
02426
02427 const size_t partial_message_size =
02428 this->messaging_object ()->header_length ();
02429
02430
02431
02432 ACE_NEW (this->partial_message_,
02433 ACE_Message_Block (partial_message_size));
02434 }
02435 }
02436
02437
02438
02439
02440
02441
02442
02443
02444 TAO_END_VERSIONED_NAMESPACE_DECL