00001 // $Id: Leader_Follower_Flushing_Strategy.cpp 88011 2009-12-09 09:50:25Z vzykov $ 00002 00003 #include "tao/Leader_Follower_Flushing_Strategy.h" 00004 #include "tao/LF_Follower.h" 00005 #include "tao/Leader_Follower.h" 00006 #include "tao/Transport.h" 00007 #include "tao/Queued_Message.h" 00008 #include "tao/ORB_Core.h" 00009 00010 ACE_RCSID (tao, 00011 Leader_Follower_Flushing_Strategy, 00012 "$Id: Leader_Follower_Flushing_Strategy.cpp 88011 2009-12-09 09:50:25Z vzykov $") 00013 00014 00015 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00016 00017 int 00018 TAO_Leader_Follower_Flushing_Strategy::schedule_output (TAO_Transport *transport) 00019 { 00020 return transport->schedule_output_i (); 00021 } 00022 00023 int 00024 TAO_Leader_Follower_Flushing_Strategy::cancel_output ( 00025 TAO_Transport *transport) 00026 { 00027 return transport->cancel_output_i (); 00028 } 00029 00030 int 00031 TAO_Leader_Follower_Flushing_Strategy::flush_message ( 00032 TAO_Transport *transport, 00033 TAO_Queued_Message *msg, 00034 ACE_Time_Value *max_wait_time) 00035 { 00036 TAO_Leader_Follower &leader_follower = 00037 transport->orb_core ()->leader_follower (); 00038 return leader_follower.wait_for_event (msg, transport, max_wait_time); 00039 } 00040 00041 int 00042 TAO_Leader_Follower_Flushing_Strategy::flush_transport ( 00043 TAO_Transport *transport, 00044 ACE_Time_Value *max_wait_time) 00045 { 00046 try 00047 { 00048 TAO_ORB_Core * const orb_core = transport->orb_core (); 00049 00050 while (!transport->queue_is_empty ()) 00051 { 00052 // In case max_wait_time==0 we cannot simply run the orb because 00053 // in multi-threaded applications it can easily happen that 00054 // the other thread will run the orb and drain the queue in the 00055 // transport we're coping with here and this thread will block. 00056 // Instead we do run for a small amount of time and then recheck 00057 // the queue. 00058 if (max_wait_time == 0) 00059 { 00060 ACE_Errno_Guard eguard (errno); 00061 00062 // Poll the reactor's queue. 00063 ACE_Time_Value tv = ACE_Time_Value::zero; 00064 orb_core->orb ()->perform_work (&tv); 00065 } 00066 else 00067 { 00068 orb_core->orb ()->perform_work (max_wait_time); 00069 } 00070 00071 if (max_wait_time != 0) { 00072 if (*max_wait_time <= ACE_Time_Value::zero) { 00073 errno = ETIME; 00074 return -1; 00075 } 00076 } 00077 } 00078 } 00079 catch (const ::CORBA::Exception&) 00080 { 00081 return -1; 00082 } 00083 00084 return 0; 00085 } 00086 00087 TAO_END_VERSIONED_NAMESPACE_DECL