#include <Leader_Follower.h>
Collaboration diagram for TAO_Leader_Follower:
Public Member Functions | |
TAO_Leader_Follower (TAO_ORB_Core *orb_core, TAO_New_Leader_Generator *new_leader_generator=0) | |
Constructor. | |
~TAO_Leader_Follower (void) | |
Destructor. | |
int | set_event_loop_thread (ACE_Time_Value *max_wait_time) |
void | reset_event_loop_thread (void) |
void | set_upcall_thread (void) |
int | leader_available (void) const |
Is there any thread running as a leader? | |
void | set_client_thread (void) |
A server thread is making a request. | |
void | reset_client_thread (void) |
A server thread has finished is making a request. | |
int | wait_for_event (TAO_LF_Event *event, TAO_Transport *transport, ACE_Time_Value *max_wait_time) |
Wait on the Leader/Followers loop until one event happens. | |
void | set_client_leader_thread (void) |
void | reset_client_leader_thread (void) |
void | set_client_leader_thread (ACE_thread_t thread_ID) |
int | is_client_leader_thread (void) const |
checks if we are a leader thread | |
int | elect_new_leader (void) |
TAO_SYNCH_MUTEX & | lock (void) |
Get a reference to the underlying mutex. | |
ACE_Reverse_Lock< TAO_SYNCH_MUTEX > & | reverse_lock (void) |
int | has_clients (void) const |
Check if there are any client threads running. | |
ACE_Reactor * | reactor (void) |
Accesor to the reactor. | |
void | no_leaders_available (void) |
Called when we are out of leaders. | |
Follower creation/destructions | |
The Leader/Followers set acts as a factory for the Follower objects. Followers are used to represent a thread blocked waiting in the Follower set. The Leader/Followers abstraction keeps a list of the waiting followers, so it can wake up one when the leader thread stops handling events. For performance reasons the Leader/Followers set uses a pool (or free-list) to keep Follower objects unattached to any thread. It could be tempting to use TSS to keep such followers, after all a thread can only need one such Follower object, however, that does not work with multiple Leader/Followers sets, consult this bug report for more details: | |
TAO_LF_Follower * | allocate_follower (void) |
Allocate a new follower to the caller. | |
void | release_follower (TAO_LF_Follower *) |
The caller has finished using a follower. | |
Follower Set Operations | |
void | add_follower (TAO_LF_Follower *follower) |
Add a new follower to the set. | |
void | remove_follower (TAO_LF_Follower *follower) |
Removes a follower from the leader-follower set. | |
int | follower_available (void) const |
Checks if there are any followers available. | |
Private Types | |
typedef ACE_Intrusive_List< TAO_LF_Follower > | Follower_Set |
Implement the Leader/Followers set using an intrusive list. | |
Private Member Functions | |
TAO_ORB_Core_TSS_Resources * | get_tss_resources (void) const |
Shortcut to obtain the TSS resources of the orb core. | |
int | wait_for_client_leader_to_complete (ACE_Time_Value *max_wait_time) |
Wait for the client leader to complete. | |
void | reset_event_loop_thread_i (TAO_ORB_Core_TSS_Resources *tss) |
Follower Set Operations | |
int | elect_new_leader_i (void) |
Private Attributes | |
TAO_ORB_Core * | orb_core_ |
The orb core. | |
TAO_SYNCH_MUTEX | lock_ |
To synchronize access to the members. | |
ACE_Reverse_Lock< TAO_SYNCH_MUTEX > | reverse_lock_ |
do protect the access to the following three members | |
Follower_Set | follower_set_ |
Follower_Set | follower_free_list_ |
Use a free list to allocate and release Follower objects. | |
int | leaders_ |
int | clients_ |
ACE_Reactor * | reactor_ |
The reactor. | |
int | client_thread_is_leader_ |
Is a client thread the current leader? | |
int | event_loop_threads_waiting_ |
Are server threads waiting for the client leader to complete? | |
TAO_SYNCH_CONDITION | event_loop_threads_condition_ |
TAO_New_Leader_Generator * | new_leader_generator_ |
TAO_Leader_Follower
Definition at line 49 of file Leader_Follower.h.
|
Implement the Leader/Followers set using an intrusive list.
Definition at line 227 of file Leader_Follower.h. |
|
Constructor.
Definition at line 13 of file Leader_Follower.i.
00015 : orb_core_ (orb_core), 00016 reverse_lock_ (lock_), 00017 leaders_ (0), 00018 clients_ (0), 00019 reactor_ (0), 00020 client_thread_is_leader_ (0), 00021 event_loop_threads_waiting_ (0), 00022 event_loop_threads_condition_ (lock_), 00023 new_leader_generator_ (new_leader_generator) 00024 { 00025 } |
|
Destructor.
Definition at line 27 of file Leader_Follower.cpp. References ACE_Intrusive_List< T >::empty(), follower_free_list_, TAO_ORB_Core::gui_resource_factory(), ACE_Intrusive_List< T >::pop_front(), TAO_Resource_Factory::reclaim_reactor(), TAO::GUIResource_Factory::reclaim_reactor(), and TAO_ORB_Core::resource_factory().
00028 { 00029 while (!this->follower_free_list_.empty ()) 00030 { 00031 TAO_LF_Follower *follower = 00032 this->follower_free_list_.pop_front (); 00033 delete follower; 00034 } 00035 // Hand the reactor back to the resource factory. 00036 // use GUI reactor factory if available 00037 if ( this->orb_core_->gui_resource_factory () ) 00038 this->orb_core_->gui_resource_factory ()->reclaim_reactor (this->reactor_); 00039 else 00040 this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_); 00041 00042 this->reactor_ = 0; 00043 } |
|
Add a new follower to the set.
Definition at line 176 of file Leader_Follower.i. References follower_set_, and ACE_Intrusive_List< T >::push_back(). Referenced by TAO_LF_Follower_Auto_Adder::TAO_LF_Follower_Auto_Adder().
00177 { 00178 this->follower_set_.push_back (follower); 00179 } |
|
Allocate a new follower to the caller.
Definition at line 46 of file Leader_Follower.cpp. References ACE_Intrusive_List< T >::empty(), follower_free_list_, ACE_Intrusive_List< T >::pop_front(), and TAO_LF_Follower.
00047 { 00048 if (!this->follower_free_list_.empty ()) 00049 return this->follower_free_list_.pop_front (); 00050 00051 return new TAO_LF_Follower (*this); 00052 } |
|
A leader thread is relinquishing its role, unless there are more leader threads running pick up a follower (if there is any) to play the leader role. Definition at line 47 of file Leader_Follower.i. References elect_new_leader_i(), event_loop_threads_condition_, event_loop_threads_waiting_, follower_available(), leaders_, and no_leaders_available(). Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread(), set_upcall_thread(), and wait_for_event().
00048 { 00049 if (this->leaders_ == 0) 00050 { 00051 if (this->event_loop_threads_waiting_) 00052 { 00053 return this->event_loop_threads_condition_.broadcast (); 00054 } 00055 else if (this->follower_available ()) 00056 { 00057 return this->elect_new_leader_i (); 00058 } 00059 else 00060 { 00061 this->no_leaders_available (); 00062 } 00063 } 00064 return 0; 00065 } |
|
This is a helper routine for elect_new_leader(), after verifying that all the pre-conditions are satisfied the Follower set is changed and the promoted Follower is signaled. Definition at line 61 of file Leader_Follower.cpp. References ACE_DEBUG, follower_set_, ACE_Intrusive_List< T >::head(), LM_DEBUG, and TAO_LF_Follower::signal(). Referenced by elect_new_leader().
00062 { 00063 TAO_LF_Follower* const follower = 00064 this->follower_set_.head (); 00065 00066 #if defined (TAO_DEBUG_LEADER_FOLLOWER) 00067 ACE_DEBUG ((LM_DEBUG, 00068 "TAO (%P|%t) LF::elect_new_leader_i - " 00069 "follower is %x\n", 00070 follower)); 00071 #endif /* TAO_DEBUG_LEADER_FOLLOWER */ 00072 00073 return follower->signal (); 00074 } |
|
Checks if there are any followers available.
Definition at line 34 of file Leader_Follower.i. References ACE_Intrusive_List< T >::empty(), and follower_set_. Referenced by elect_new_leader().
00035 { 00036 return !this->follower_set_.empty (); 00037 } |
|
Shortcut to obtain the TSS resources of the orb core.
Definition at line 28 of file Leader_Follower.i. References TAO_ORB_Core::get_tss_resources(). Referenced by is_client_leader_thread(), reset_client_leader_thread(), reset_client_thread(), reset_event_loop_thread(), set_client_leader_thread(), set_client_thread(), set_event_loop_thread(), and set_upcall_thread().
00029 { 00030 return this->orb_core_->get_tss_resources (); 00031 } |
|
Check if there are any client threads running.
Definition at line 194 of file Leader_Follower.i. References clients_. Referenced by TAO_Thread_Lane_Resources::shutdown_reactor().
00195 { 00196 return this->clients_; 00197 } |
|
checks if we are a leader thread
Definition at line 169 of file Leader_Follower.i. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, and get_tss_resources().
00170 { 00171 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00172 return tss->client_leader_thread_ != 0; 00173 } |
|
Is there any thread running as a leader?
Definition at line 145 of file Leader_Follower.i. References leaders_. Referenced by wait_for_event().
00146 { 00147 return this->leaders_ != 0; 00148 } |
|
Get a reference to the underlying mutex.
Definition at line 125 of file Leader_Follower.i. Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread(), TAO_LF_Strategy_Complete::set_event_loop_thread(), TAO_Thread_Lane_Resources::shutdown_reactor(), and TAO_LF_Event::state_changed().
00126 { 00127 return this->lock_; 00128 } |
|
Called when we are out of leaders.
Definition at line 40 of file Leader_Follower.i. References new_leader_generator_, and TAO_New_Leader_Generator::no_leaders_available(). Referenced by elect_new_leader().
00041 { 00042 if (this->new_leader_generator_) 00043 this->new_leader_generator_->no_leaders_available (); 00044 } |
|
Accesor to the reactor.
Definition at line 123 of file Leader_Follower.cpp. References ACE_GUARD_RETURN, TAO_Resource_Factory::get_reactor(), TAO::GUIResource_Factory::get_reactor(), TAO_ORB_Core::gui_resource_factory(), TAO_ORB_Core::resource_factory(), and TAO_SYNCH_MUTEX. Referenced by TAO_ORB_Core::reactor(), and TAO_Thread_Lane_Resources::shutdown_reactor().
00124 { 00125 if (this->reactor_ == 0) 00126 { 00127 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), 0); 00128 if (this->reactor_ == 0) 00129 { 00130 // use GUI reactor factory if available 00131 if ( this->orb_core_->gui_resource_factory () ) 00132 this->reactor_ = 00133 this->orb_core_->gui_resource_factory ()->get_reactor (); 00134 else 00135 this->reactor_ = 00136 this->orb_core_->resource_factory ()->get_reactor (); 00137 } 00138 } 00139 return this->reactor_; 00140 } |
|
The caller has finished using a follower.
Definition at line 55 of file Leader_Follower.cpp. References follower_free_list_, and ACE_Intrusive_List< T >::push_front(). Referenced by TAO_LF_Follower_Auto_Ptr::~TAO_LF_Follower_Auto_Ptr().
00056 { 00057 this->follower_free_list_.push_front (follower); 00058 } |
|
Removes a follower from the leader-follower set.
Definition at line 182 of file Leader_Follower.i. References follower_set_, and ACE_Intrusive_List< T >::remove(). Referenced by TAO_LF_Follower::signal(), and TAO_LF_Follower_Auto_Adder::~TAO_LF_Follower_Auto_Adder().
00183 { 00184 this->follower_set_.remove (follower); 00185 } |
|
The current thread is no longer the leader thread in the client side leader-follower set. Definition at line 160 of file Leader_Follower.i. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, get_tss_resources(), and leaders_. Referenced by TAO_LF_Client_Leader_Thread_Helper::~TAO_LF_Client_Leader_Thread_Helper().
00161 { 00162 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00163 --tss->client_leader_thread_; 00164 --this->leaders_; 00165 --this->client_thread_is_leader_; 00166 } |
|
A server thread has finished is making a request.
Definition at line 167 of file Leader_Follower.cpp. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, clients_, ACE_Reactor::end_reactor_event_loop(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), TAO_ORB_Core::has_shutdown(), leaders_, and TAO_ORB_Core::reactor(). Referenced by TAO_LF_Client_Thread_Helper::~TAO_LF_Client_Thread_Helper().
00168 { 00169 // If we were a leader thread or an event loop thread, take back 00170 // leadership. 00171 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00172 if (tss->event_loop_thread_ || 00173 tss->client_leader_thread_) 00174 { 00175 ++this->leaders_; 00176 } 00177 00178 --this->clients_; 00179 if (this->clients_ == 0 && 00180 this->orb_core_->has_shutdown ()) 00181 { 00182 // The ORB has shutdown and we are the last client thread, we 00183 // must stop the reactor to ensure that any server threads go 00184 // away. 00185 this->orb_core_->reactor ()->end_reactor_event_loop (); 00186 } 00187 } |
|
The current thread is not a server thread anymore, reset any flags and counters. Definition at line 117 of file Leader_Follower.i. References TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), and reset_event_loop_thread_i(). Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread().
00118 { 00119 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00120 if (tss->event_loop_thread_ > 0) 00121 this->reset_event_loop_thread_i (tss); 00122 } |
|
Implement the reset_event_loop_thread() method, once the TSS resources have been acquired. Also used in the set_upcall_thread. Definition at line 101 of file Leader_Follower.i. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, TAO_ORB_Core_TSS_Resources::event_loop_thread_, and leaders_. Referenced by reset_event_loop_thread(), and set_upcall_thread().
00102 { 00103 // Always decrement <event_loop_thread_>. If <event_loop_thread_> 00104 // reaches 0 and we are not a client leader, we are done with our 00105 // duties of running the event loop. Therefore, decrement the 00106 // leaders. Otherwise, we just got done with a nested call to the 00107 // event loop or a call to the event loop when we were the client 00108 // leader. 00109 --tss->event_loop_thread_; 00110 00111 if (tss->event_loop_thread_ == 0 && 00112 tss->client_leader_thread_ == 0) 00113 --this->leaders_; 00114 } |
|
The Leader/Followers set mutex must be release during some long running operations. This helper class simplifies the process of releasing and reacquiring said mutex. Definition at line 188 of file Leader_Follower.i.
00189 { 00190 return this->reverse_lock_; 00191 } |
|
sets the thread ID of the leader thread in the leader-follower model |
|
The current thread has become the leader thread in the client side leader-follower set. Definition at line 151 of file Leader_Follower.i. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, get_tss_resources(), and leaders_. Referenced by TAO_LF_Client_Leader_Thread_Helper::TAO_LF_Client_Leader_Thread_Helper().
00152 { 00153 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00154 ++this->leaders_; 00155 ++this->client_thread_is_leader_; 00156 ++tss->client_leader_thread_; 00157 } |
|
A server thread is making a request.
Definition at line 143 of file Leader_Follower.cpp. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, clients_, TAO_Resource_Factory::drop_replies_during_shutdown(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), TAO_ORB_Core::has_shutdown(), leaders_, TAO_ORB_Core::reactor(), ACE_Reactor::reset_reactor_event_loop(), and TAO_ORB_Core::resource_factory(). Referenced by TAO_LF_Client_Thread_Helper::TAO_LF_Client_Thread_Helper().
00144 { 00145 // If we were a leader thread or an event loop thread, give up 00146 // leadership. 00147 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00148 if (tss->event_loop_thread_ || 00149 tss->client_leader_thread_) 00150 { 00151 --this->leaders_; 00152 } 00153 00154 if (this->clients_ == 0 && 00155 this->orb_core_->has_shutdown () && 00156 !this->orb_core_->resource_factory ()->drop_replies_during_shutdown ()) 00157 { 00158 // The ORB has shutdown and we are the first client after 00159 // that. This means that the reactor is disabled, we must 00160 // re-enable it if we want to receive any replys... 00161 this->orb_core_->reactor ()->reset_reactor_event_loop (); 00162 } 00163 ++this->clients_; 00164 } |
|
The current thread has become a server thread (i.e. called ORB::run), update any flags and counters. Definition at line 68 of file Leader_Follower.i. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), leaders_, and wait_for_client_leader_to_complete(). Referenced by TAO_LF_Strategy_Complete::set_event_loop_thread().
00069 { 00070 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00071 00072 // Make sure that there is no other client thread run the show. If 00073 // we are the client thread running the show, then it is ok. 00074 if (this->client_thread_is_leader_ && 00075 tss->client_leader_thread_ == 0) 00076 { 00077 int result = 00078 this->wait_for_client_leader_to_complete (max_wait_time); 00079 00080 if (result != 0) 00081 return result; 00082 } 00083 00084 // If <event_loop_thread_> == 0 and <client_leader_thread_> == 0, we 00085 // are running the event loop for the first time. Therefore, 00086 // increment the leaders. Otherwise, simply increment 00087 // <event_loop_thread_> since either (a) if <event_loop_thread_> != 00088 // 0 this is a nested call to the event loop, or (b) 00089 // <client_leader_thread_> != 0 this is a call to the event loop 00090 // while we are a client leader. 00091 if (tss->event_loop_thread_ == 0 && 00092 tss->client_leader_thread_ == 0) 00093 ++this->leaders_; 00094 00095 ++tss->event_loop_thread_; 00096 00097 return 0; 00098 } |
|
This thread is going to perform an upcall, it will no longer be an event loop thread. Definition at line 131 of file Leader_Follower.i. References ACE_GUARD, elect_new_leader(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), reset_event_loop_thread_i(), and TAO_SYNCH_MUTEX. Referenced by TAO_LF_Strategy_Complete::set_upcall_thread().
00132 { 00133 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00134 00135 if (tss->event_loop_thread_ > 0) 00136 { 00137 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock ()); 00138 this->reset_event_loop_thread_i (tss); 00139 00140 this->elect_new_leader (); 00141 } 00142 } |
|
Wait for the client leader to complete.
Definition at line 77 of file Leader_Follower.cpp. References ACE_ERROR, ACE_TEXT, client_thread_is_leader_, ETIME, event_loop_threads_condition_, event_loop_threads_waiting_, ACE_OS::gettimeofday(), LM_ERROR, and ACE_Countdown_Time::update(). Referenced by set_event_loop_thread().
00078 { 00079 int result = 0; 00080 ACE_Countdown_Time countdown (max_wait_time); 00081 00082 // Note that we are waiting. 00083 ++this->event_loop_threads_waiting_; 00084 00085 while (this->client_thread_is_leader_ && 00086 result != -1) 00087 { 00088 if (max_wait_time == 0) 00089 { 00090 if (this->event_loop_threads_condition_.wait () == -1) 00091 { 00092 ACE_ERROR ((LM_ERROR, 00093 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ") 00094 ACE_TEXT ("Condition variable wait failed\n"))); 00095 00096 result = -1; 00097 } 00098 } 00099 else 00100 { 00101 countdown.update (); 00102 ACE_Time_Value tv = ACE_OS::gettimeofday (); 00103 tv += *max_wait_time; 00104 if (this->event_loop_threads_condition_.wait (&tv) == -1) 00105 { 00106 if (errno != ETIME) 00107 ACE_ERROR ((LM_ERROR, 00108 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ") 00109 ACE_TEXT ("Condition variable wait failed\n"))); 00110 00111 result = -1; 00112 } 00113 } 00114 } 00115 00116 // Reset waiting state. 00117 --this->event_loop_threads_waiting_; 00118 00119 return result; 00120 } |
|
Wait on the Leader/Followers loop until one event happens.
There should be no reason to reset the value of result here. If there was an error in handle_events () that the leader saw, I (Bala) beleave it should be propogated to the clients. result = 0; Definition at line 190 of file Leader_Follower.cpp. References ACE_DEBUG, ACE_ERROR, ACE_ERROR_RETURN, ACE_GUARD_RETURN, elect_new_leader(), TAO_LF_Event::error_detected(), ETIME, TAO_LF_Follower_Auto_Ptr::get(), ACE_OS::gettimeofday(), ACE_Reactor::handle_events(), TAO_Transport::id(), TAO_LF_Event::keep_waiting(), leader_available(), LM_DEBUG, LM_ERROR, ACE_Reactor::owner(), ACE_Reactor::reactor_event_loop_done(), TAO_LF_Event::set_state(), TAO_LF_Event::successful(), TAO_debug_level, TAO_SYNCH_MUTEX, and ACE_Countdown_Time::update(). Referenced by TAO_Leader_Follower_Flushing_Strategy::flush_message(), TAO_Wait_On_Leader_Follower::wait(), and TAO_LF_Connect_Strategy::wait_i().
00193 { 00194 // Obtain the lock. 00195 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1); 00196 00197 ACE_Countdown_Time countdown (max_wait_time); 00198 00199 // Optmize the first iteration [no access to errno] 00200 int result = 1; 00201 00202 // For some cases the transport may dissappear like when waiting for 00203 // connection to be initiated or closed. So cache the id. 00204 // @@ NOTE: This is not completely safe either. We will be fine for 00205 // cases that dont access the id ie. when debug level is off but 00206 // with debugging level on we are on a sticky wicket. Hopefully none 00207 // of our users should run TAO with debugging enabled like they did 00208 // in PathFinder 00209 size_t t_id = 0; 00210 00211 if (TAO_debug_level) 00212 { 00213 t_id = transport->id (); 00214 } 00215 00216 { 00217 // Calls this->set_client_thread () on construction and 00218 // this->reset_client_thread () on destruction. 00219 TAO_LF_Client_Thread_Helper client_thread_helper (*this); 00220 ACE_UNUSED_ARG (client_thread_helper); 00221 00222 // Check if there is a leader. Note that it cannot be us since we 00223 // gave up our leadership when we became a client. 00224 if (this->leader_available ()) 00225 { 00226 // = Wait as a follower. 00227 00228 // Grab a follower: 00229 TAO_LF_Follower_Auto_Ptr follower (*this); 00230 if (follower.get () == 0) 00231 return -1; 00232 00233 if (TAO_debug_level >= 5) 00234 ACE_DEBUG ((LM_DEBUG, 00235 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00236 " (follower), cond <%x>\n", 00237 t_id, follower.get ())); 00238 00239 // Bound the follower and the LF_Event, this is important to 00240 // get a signal when the event terminates 00241 TAO_LF_Event_Binder event_binder (event, follower.get ()); 00242 00243 while (event->keep_waiting () && 00244 this->leader_available ()) 00245 { 00246 // Add ourselves to the list, do it everytime we wake up 00247 // from the CV loop. Because: 00248 // 00249 // - The leader thread could have elected us as the new 00250 // leader. 00251 // - Before we can assume the role another thread becomes 00252 // the leader 00253 // - But our condition variable could have been removed 00254 // already, if we don't add it again we will never wake 00255 // up. 00256 // 00257 // Notice that we can have spurious wake ups, in that case 00258 // adding the leader results in an error, that must be 00259 // ignored. 00260 // You may be thinking of not removing the condition 00261 // variable in the code that sends the signal, but 00262 // removing it here, that does not work either, in that 00263 // case the condition variable may be used twice: 00264 // 00265 // - Wake up because its reply arrived 00266 // - Wake up because it must become the leader 00267 // 00268 // but only the first one has any effect, so the leader is 00269 // lost. 00270 // 00271 00272 TAO_LF_Follower_Auto_Adder auto_adder (*this, follower); 00273 00274 if (max_wait_time == 0) 00275 { 00276 if (follower->wait (max_wait_time) == -1) 00277 { 00278 if (TAO_debug_level >= 5) 00279 ACE_DEBUG ((LM_DEBUG, 00280 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, " 00281 " (follower) [no timer, cond failed]\n", 00282 t_id)); 00283 00284 // @@ Michael: What is our error handling in this case? 00285 // We could be elected as leader and 00286 // no leader would come in? 00287 return -1; 00288 } 00289 } 00290 else 00291 { 00292 countdown.update (); 00293 ACE_Time_Value tv = ACE_OS::gettimeofday (); 00294 tv += *max_wait_time; 00295 if (follower->wait (&tv) == -1) 00296 { 00297 if (TAO_debug_level >= 5) 00298 ACE_DEBUG ((LM_DEBUG, 00299 "TAO (%P|%t) - Leader_Follower[%d]::wait, " 00300 "(follower) [has timer, follower failed]\n", 00301 t_id )); 00302 00303 // If we have timedout set the state in the 00304 // LF_Event. We call the non-locking, 00305 // no-signalling method on LF_Event. 00306 if (errno == ETIME) 00307 // We have timedout 00308 event->set_state (TAO_LF_Event::LFS_TIMEOUT); 00309 00310 if (!event->successful ()) 00311 { 00312 // Remove follower can fail because either 00313 // 1) the condition was satisfied (i.e. reply 00314 // received or queue drained), or 00315 // 2) somebody elected us as leader, or 00316 // 3) the connection got closed. 00317 // 00318 // Therefore: 00319 // If remove_follower fails and the condition 00320 // was not satisfied, we know that we got 00321 // elected as a leader. 00322 // But we got a timeout, so we cannot become 00323 // the leader, therefore, we have to select a 00324 // new leader. 00325 // 00326 00327 if (this->elect_new_leader () == -1 00328 && TAO_debug_level > 0) 00329 { 00330 ACE_ERROR ((LM_ERROR, 00331 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, " 00332 "elect_new_leader failed\n", 00333 t_id )); 00334 } 00335 } 00336 00337 00338 return -1; 00339 } 00340 } 00341 } 00342 00343 countdown.update (); 00344 00345 // @@ Michael: This is an old comment why we do not want to 00346 // remove the follower here. 00347 // We should not remove the follower here, we *must* remove it when 00348 // we signal it so the same condition is not signalled for 00349 // both wake up as a follower and as the next leader. 00350 00351 if (TAO_debug_level >= 5) 00352 ACE_DEBUG ((LM_DEBUG, 00353 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00354 " done (follower), successful %d\n", 00355 t_id, 00356 event->successful ())); 00357 00358 // Now somebody woke us up to become a leader or to handle our 00359 // input. We are already removed from the follower queue. 00360 00361 if (event->successful ()) 00362 return 0; 00363 00364 if (event->error_detected ()) 00365 return -1; 00366 00367 // FALLTHROUGH 00368 // We only get here if we woke up but the reply is not 00369 // complete yet, time to assume the leader role.... 00370 // i.e. ACE_ASSERT (event->successful () == 0); 00371 } 00372 00373 // = Leader Code. 00374 00375 // The only way to reach this point is if we must become the 00376 // leader, because there is no leader or we have to update to a 00377 // leader or we are doing nested upcalls in this case we do 00378 // increase the refcount on the leader in TAO_ORB_Core. 00379 00380 // Calls this->set_client_leader_thread () on 00381 // construction and this->reset_client_leader_thread () 00382 // on destruction. Note that this may increase the refcount of 00383 // the leader. 00384 TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this); 00385 ACE_UNUSED_ARG (client_leader_thread_helper); 00386 00387 { 00388 ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon, 00389 this->reverse_lock (), -1); 00390 00391 // Become owner of the reactor. 00392 ACE_Reactor *reactor = this->reactor_; 00393 reactor->owner (ACE_Thread::self ()); 00394 00395 // Run the reactor event loop. 00396 00397 if (TAO_debug_level >= 5) 00398 ACE_DEBUG ((LM_DEBUG, 00399 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00400 " (leader) enter reactor event loop\n", 00401 t_id)); 00402 00403 // If we got our event, no need to run the event loop any 00404 // further. 00405 while (event->keep_waiting ()) 00406 { 00407 // Run the event loop. 00408 result = reactor->handle_events (max_wait_time); 00409 00410 // Did we timeout? If so, stop running the loop. 00411 if (result == 0 && 00412 max_wait_time != 0 && 00413 *max_wait_time == ACE_Time_Value::zero) 00414 break; 00415 00416 // Other errors? If so, stop running the loop. 00417 if (result == -1) 00418 break; 00419 00420 // Otherwise, keep going... 00421 } 00422 00423 if (TAO_debug_level >= 5) 00424 ACE_DEBUG ((LM_DEBUG, 00425 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00426 " (leader) exit reactor event loop\n", 00427 t_id)); 00428 } 00429 } 00430 // 00431 // End artificial scope for auto_ptr like helpers calling: 00432 // this->reset_client_thread () and (maybe) 00433 // this->reset_client_leader_thread (). 00434 // 00435 00436 // Wake up the next leader, we cannot do that in handle_input, 00437 // because the woken up thread would try to get into handle_events, 00438 // which is at the time in handle_input still occupied. But do it 00439 // before checking the error in <result>, even if there is an error 00440 // in our input we should continue running the loop in another 00441 // thread. 00442 00443 if (this->elect_new_leader () == -1) 00444 ACE_ERROR_RETURN ((LM_ERROR, 00445 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00446 " failed to elect new leader\n", 00447 t_id), 00448 -1); 00449 00450 if (result == -1 && !this->reactor_->reactor_event_loop_done ()) 00451 ACE_ERROR_RETURN ((LM_ERROR, 00452 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00453 " handle_events failed\n", 00454 t_id), 00455 -1); 00456 00457 // Return an error if there was a problem receiving the reply... 00458 if (max_wait_time != 0) 00459 { 00460 if (!event->successful () 00461 && *max_wait_time == ACE_Time_Value::zero) 00462 { 00463 result = -1; 00464 errno = ETIME; 00465 } 00466 else if (event->error_detected ()) 00467 { 00468 // If the time did not expire yet, but we get a failure, 00469 // e.g. the connections closed, we should still return an error. 00470 result = -1; 00471 } 00472 } 00473 else 00474 { 00475 /** 00476 * There should be no reason to reset the value of result 00477 * here. If there was an error in handle_events () that the 00478 * leader saw, I (Bala) beleave it should be propogated to the 00479 * clients. 00480 * result = 0; 00481 */ 00482 if (event->error_detected ()) 00483 { 00484 result = -1; 00485 } 00486 } 00487 00488 return result; 00489 } |
|
Is a client thread the current leader?
Definition at line 249 of file Leader_Follower.h. Referenced by reset_client_leader_thread(), set_client_leader_thread(), set_event_loop_thread(), and wait_for_client_leader_to_complete(). |
|
Count the number of active clients, this is useful to know when to deactivate the reactor Definition at line 243 of file Leader_Follower.h. Referenced by has_clients(), reset_client_thread(), and set_client_thread(). |
|
Condition variable for server threads waiting for the client leader to complete. Definition at line 256 of file Leader_Follower.h. Referenced by elect_new_leader(), and wait_for_client_leader_to_complete(). |
|
Are server threads waiting for the client leader to complete?
Definition at line 252 of file Leader_Follower.h. Referenced by elect_new_leader(), and wait_for_client_leader_to_complete(). |
|
Use a free list to allocate and release Follower objects.
Definition at line 231 of file Leader_Follower.h. Referenced by allocate_follower(), release_follower(), and ~TAO_Leader_Follower(). |
|
Definition at line 228 of file Leader_Follower.h. Referenced by add_follower(), elect_new_leader_i(), follower_available(), and remove_follower(). |
|
Count the number of active leaders. There could be many leaders in the thread pool (i.e. calling ORB::run), and the same leader could show up multiple times as it receives nested upcalls and sends more requests. Definition at line 239 of file Leader_Follower.h. Referenced by elect_new_leader(), leader_available(), reset_client_leader_thread(), reset_client_thread(), reset_event_loop_thread_i(), set_client_leader_thread(), set_client_thread(), and set_event_loop_thread(). |
|
To synchronize access to the members.
Definition at line 221 of file Leader_Follower.h. |
|
Leader/Follower class uses this method to notify the system that we are out of leaders. Definition at line 260 of file Leader_Follower.h. Referenced by no_leaders_available(). |
|
The orb core.
Definition at line 218 of file Leader_Follower.h. |
|
The reactor.
Definition at line 246 of file Leader_Follower.h. |
|
do protect the access to the following three members
Definition at line 224 of file Leader_Follower.h. |