#include <Leader_Follower.h>
Collaboration diagram for TAO_Leader_Follower:
Public Member Functions | |
TAO_Leader_Follower (TAO_ORB_Core *orb_core, TAO_New_Leader_Generator *new_leader_generator=0) | |
Constructor. | |
~TAO_Leader_Follower (void) | |
Destructor. | |
int | set_event_loop_thread (ACE_Time_Value *max_wait_time) |
void | reset_event_loop_thread (void) |
void | set_upcall_thread (void) |
int | leader_available (void) const |
Is there any thread running as a leader? | |
void | set_client_thread (void) |
A server thread is making a request. | |
void | reset_client_thread (void) |
A server thread has finished is making a request. | |
int | wait_for_event (TAO_LF_Event *event, TAO_Transport *transport, ACE_Time_Value *max_wait_time) |
Wait on the Leader/Followers loop until one event happens. | |
void | set_client_leader_thread (void) |
void | reset_client_leader_thread (void) |
void | set_client_leader_thread (ACE_thread_t thread_ID) |
int | is_client_leader_thread (void) const |
checks if we are a leader thread | |
int | elect_new_leader (void) |
TAO_SYNCH_MUTEX & | lock (void) |
Get a reference to the underlying mutex. | |
ACE_Reverse_Lock< TAO_SYNCH_MUTEX > & | reverse_lock (void) |
int | has_clients (void) const |
Check if there are any client threads running. | |
ACE_Reactor * | reactor (void) |
Accesor to the reactor. | |
void | no_leaders_available (void) |
Called when we are out of leaders. | |
Follower creation/destructions | |
The Leader/Followers set acts as a factory for the Follower objects. Followers are used to represent a thread blocked waiting in the Follower set. The Leader/Followers abstraction keeps a list of the waiting followers, so it can wake up one when the leader thread stops handling events. For performance reasons the Leader/Followers set uses a pool (or free-list) to keep Follower objects unattached to any thread. It could be tempting to use TSS to keep such followers, after all a thread can only need one such Follower object, however, that does not work with multiple Leader/Followers sets, consult this bug report for more details: | |
TAO_LF_Follower * | allocate_follower (void) |
Allocate a new follower to the caller. | |
void | release_follower (TAO_LF_Follower *) |
The caller has finished using a follower. | |
Follower Set Operations | |
void | add_follower (TAO_LF_Follower *follower) |
Add a new follower to the set. | |
void | remove_follower (TAO_LF_Follower *follower) |
Removes a follower from the leader-follower set. | |
int | follower_available (void) const |
Checks if there are any followers available. | |
Private Types | |
typedef ACE_Intrusive_List< TAO_LF_Follower > | Follower_Set |
Implement the Leader/Followers set using an intrusive list. | |
Private Member Functions | |
TAO_ORB_Core_TSS_Resources * | get_tss_resources (void) const |
Shortcut to obtain the TSS resources of the orb core. | |
int | wait_for_client_leader_to_complete (ACE_Time_Value *max_wait_time) |
Wait for the client leader to complete. | |
void | reset_event_loop_thread_i (TAO_ORB_Core_TSS_Resources *tss) |
Follower Set Operations | |
int | elect_new_leader_i (void) |
Private Attributes | |
TAO_ORB_Core * | orb_core_ |
The orb core. | |
TAO_SYNCH_MUTEX | lock_ |
To synchronize access to the members. | |
ACE_Reverse_Lock< TAO_SYNCH_MUTEX > | reverse_lock_ |
Do protect the access to the following three members. | |
Follower_Set | follower_set_ |
Follower_Set | follower_free_list_ |
Use a free list to allocate and release Follower objects. | |
int | leaders_ |
int | clients_ |
ACE_Reactor * | reactor_ |
The reactor. | |
int | client_thread_is_leader_ |
Is a client thread the current leader? | |
int | event_loop_threads_waiting_ |
Are server threads waiting for the client leader to complete? | |
TAO_SYNCH_CONDITION | event_loop_threads_condition_ |
TAO_New_Leader_Generator * | new_leader_generator_ |
Definition at line 49 of file Leader_Follower.h.
typedef ACE_Intrusive_List<TAO_LF_Follower> TAO_Leader_Follower::Follower_Set [private] |
Implement the Leader/Followers set using an intrusive list.
Definition at line 227 of file Leader_Follower.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE TAO_Leader_Follower::TAO_Leader_Follower | ( | TAO_ORB_Core * | orb_core, | |
TAO_New_Leader_Generator * | new_leader_generator = 0 | |||
) |
Constructor.
Definition at line 13 of file Leader_Follower.inl.
00015 : orb_core_ (orb_core), 00016 reverse_lock_ (lock_), 00017 leaders_ (0), 00018 clients_ (0), 00019 reactor_ (0), 00020 client_thread_is_leader_ (0), 00021 event_loop_threads_waiting_ (0), 00022 event_loop_threads_condition_ (lock_), 00023 new_leader_generator_ (new_leader_generator) 00024 { 00025 }
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Leader_Follower::~TAO_Leader_Follower | ( | void | ) |
Destructor.
Definition at line 27 of file Leader_Follower.cpp.
00028 { 00029 while (!this->follower_free_list_.empty ()) 00030 { 00031 TAO_LF_Follower *follower = 00032 this->follower_free_list_.pop_front (); 00033 delete follower; 00034 } 00035 // Hand the reactor back to the resource factory. 00036 // use GUI reactor factory if available 00037 if ( this->orb_core_->gui_resource_factory () ) 00038 this->orb_core_->gui_resource_factory ()->reclaim_reactor (this->reactor_); 00039 else 00040 this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_); 00041 00042 this->reactor_ = 0; 00043 }
ACE_INLINE void TAO_Leader_Follower::add_follower | ( | TAO_LF_Follower * | follower | ) |
Add a new follower to the set.
Definition at line 176 of file Leader_Follower.inl.
References follower_set_, and ACE_Intrusive_List< T >::push_back().
00177 { 00178 this->follower_set_.push_back (follower); 00179 }
TAO_LF_Follower * TAO_Leader_Follower::allocate_follower | ( | void | ) |
Allocate a new follower to the caller.
Definition at line 46 of file Leader_Follower.cpp.
References ACE_NEW_RETURN.
00047 { 00048 if (!this->follower_free_list_.empty ()) 00049 return this->follower_free_list_.pop_front (); 00050 00051 TAO_LF_Follower* ptr = 0; 00052 ACE_NEW_RETURN (ptr, 00053 TAO_LF_Follower (*this), 00054 0); 00055 return ptr; 00056 }
ACE_INLINE int TAO_Leader_Follower::elect_new_leader | ( | void | ) |
A leader thread is relinquishing its role, unless there are more leader threads running pick up a follower (if there is any) to play the leader role.
Definition at line 47 of file Leader_Follower.inl.
References event_loop_threads_condition_, and no_leaders_available().
Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread(), and set_upcall_thread().
00048 { 00049 if (this->leaders_ == 0) 00050 { 00051 if (this->event_loop_threads_waiting_) 00052 { 00053 return this->event_loop_threads_condition_.broadcast (); 00054 } 00055 else if (this->follower_available ()) 00056 { 00057 return this->elect_new_leader_i (); 00058 } 00059 else 00060 { 00061 this->no_leaders_available (); 00062 } 00063 } 00064 return 0; 00065 }
int TAO_Leader_Follower::elect_new_leader_i | ( | void | ) | [private] |
This is a helper routine for elect_new_leader(), after verifying that all the pre-conditions are satisfied the Follower set is changed and the promoted Follower is signaled.
Definition at line 65 of file Leader_Follower.cpp.
References ACE_DEBUG, follower_set_, ACE_Intrusive_List< T >::head(), LM_DEBUG, and TAO_LF_Follower::signal().
00066 { 00067 TAO_LF_Follower* const follower = 00068 this->follower_set_.head (); 00069 00070 #if defined (TAO_DEBUG_LEADER_FOLLOWER) 00071 ACE_DEBUG ((LM_DEBUG, 00072 "TAO (%P|%t) LF::elect_new_leader_i - " 00073 "follower is %x\n", 00074 follower)); 00075 #endif /* TAO_DEBUG_LEADER_FOLLOWER */ 00076 00077 return follower->signal (); 00078 }
ACE_INLINE int TAO_Leader_Follower::follower_available | ( | void | ) | const |
Checks if there are any followers available.
Definition at line 34 of file Leader_Follower.inl.
References ACE_Intrusive_List< T >::empty(), and follower_set_.
00035 { 00036 return !this->follower_set_.empty (); 00037 }
ACE_INLINE TAO_ORB_Core_TSS_Resources * TAO_Leader_Follower::get_tss_resources | ( | void | ) | const [private] |
Shortcut to obtain the TSS resources of the orb core.
Definition at line 28 of file Leader_Follower.inl.
References TAO_ORB_Core::get_tss_resources(), and orb_core_.
Referenced by is_client_leader_thread(), reset_client_leader_thread(), reset_client_thread(), reset_event_loop_thread(), set_client_leader_thread(), set_client_thread(), set_event_loop_thread(), and set_upcall_thread().
00029 { 00030 return this->orb_core_->get_tss_resources (); 00031 }
ACE_INLINE int TAO_Leader_Follower::has_clients | ( | void | ) | const |
Check if there are any client threads running.
Definition at line 194 of file Leader_Follower.inl.
References clients_.
Referenced by TAO_Thread_Lane_Resources::shutdown_reactor().
00195 { 00196 return this->clients_; 00197 }
ACE_INLINE int TAO_Leader_Follower::is_client_leader_thread | ( | void | ) | const |
checks if we are a leader thread
Definition at line 169 of file Leader_Follower.inl.
References TAO_ORB_Core_TSS_Resources::client_leader_thread_, and get_tss_resources().
00170 { 00171 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00172 return tss->client_leader_thread_ != 0; 00173 }
ACE_INLINE int TAO_Leader_Follower::leader_available | ( | void | ) | const |
Is there any thread running as a leader?
Definition at line 145 of file Leader_Follower.inl.
References leaders_.
Referenced by wait_for_event().
00146 { 00147 return this->leaders_ != 0; 00148 }
ACE_INLINE TAO_SYNCH_MUTEX & TAO_Leader_Follower::lock | ( | void | ) |
Get a reference to the underlying mutex.
Definition at line 125 of file Leader_Follower.inl.
References lock_.
Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread(), TAO_LF_Strategy_Complete::set_event_loop_thread(), TAO_Thread_Lane_Resources::shutdown_reactor(), and TAO_LF_Event::state_changed().
00126 { 00127 return this->lock_; 00128 }
ACE_INLINE void TAO_Leader_Follower::no_leaders_available | ( | void | ) |
Called when we are out of leaders.
Definition at line 40 of file Leader_Follower.inl.
References new_leader_generator_, and TAO_New_Leader_Generator::no_leaders_available().
Referenced by elect_new_leader().
00041 { 00042 if (this->new_leader_generator_) 00043 this->new_leader_generator_->no_leaders_available (); 00044 }
ACE_Reactor * TAO_Leader_Follower::reactor | ( | void | ) |
Accesor to the reactor.
Definition at line 127 of file Leader_Follower.cpp.
References ACE_GUARD_RETURN, TAO_Resource_Factory::get_reactor(), orb_core_, reactor_, TAO_ORB_Core::resource_factory(), and TAO_SYNCH_MUTEX.
Referenced by TAO_ORB_Core::reactor(), TAO_Thread_Lane_Resources::shutdown_reactor(), and wait_for_event().
00128 { 00129 if (this->reactor_ == 0) 00130 { 00131 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), 0); 00132 if (this->reactor_ == 0) 00133 { 00134 // use GUI reactor factory if available 00135 if ( this->orb_core_->gui_resource_factory () ) 00136 this->reactor_ = 00137 this->orb_core_->gui_resource_factory ()->get_reactor (); 00138 else 00139 this->reactor_ = 00140 this->orb_core_->resource_factory ()->get_reactor (); 00141 } 00142 } 00143 return this->reactor_; 00144 }
void TAO_Leader_Follower::release_follower | ( | TAO_LF_Follower * | ) |
The caller has finished using a follower.
Definition at line 59 of file Leader_Follower.cpp.
References follower_free_list_, and ACE_Intrusive_List< T >::push_front().
00060 { 00061 this->follower_free_list_.push_front (follower); 00062 }
ACE_INLINE void TAO_Leader_Follower::remove_follower | ( | TAO_LF_Follower * | follower | ) |
Removes a follower from the leader-follower set.
Definition at line 182 of file Leader_Follower.inl.
References follower_set_, and ACE_Intrusive_List< T >::remove().
00183 { 00184 this->follower_set_.remove (follower); 00185 }
ACE_INLINE void TAO_Leader_Follower::reset_client_leader_thread | ( | void | ) |
The current thread is no longer the leader thread in the client side leader-follower set.
Definition at line 160 of file Leader_Follower.inl.
References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, get_tss_resources(), and leaders_.
Referenced by TAO_LF_Client_Leader_Thread_Helper::~TAO_LF_Client_Leader_Thread_Helper().
00161 { 00162 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00163 --tss->client_leader_thread_; 00164 --this->leaders_; 00165 --this->client_thread_is_leader_; 00166 }
void TAO_Leader_Follower::reset_client_thread | ( | void | ) |
A server thread has finished is making a request.
Definition at line 171 of file Leader_Follower.cpp.
References TAO_ORB_Core_TSS_Resources::client_leader_thread_, clients_, TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), and leaders_.
Referenced by TAO_LF_Client_Thread_Helper::~TAO_LF_Client_Thread_Helper().
00172 { 00173 // If we were a leader thread or an event loop thread, take back 00174 // leadership. 00175 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00176 if (tss->event_loop_thread_ || 00177 tss->client_leader_thread_) 00178 { 00179 ++this->leaders_; 00180 } 00181 00182 --this->clients_; 00183 if (this->clients_ == 0 && 00184 this->orb_core_->has_shutdown ()) 00185 { 00186 // The ORB has shutdown and we are the last client thread, we 00187 // must stop the reactor to ensure that any server threads go 00188 // away. 00189 this->orb_core_->reactor ()->end_reactor_event_loop (); 00190 } 00191 }
ACE_INLINE void TAO_Leader_Follower::reset_event_loop_thread | ( | void | ) |
The current thread is not a server thread anymore, reset any flags and counters.
Definition at line 117 of file Leader_Follower.inl.
References TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), and reset_event_loop_thread_i().
Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread().
00118 { 00119 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00120 if (tss->event_loop_thread_ > 0) 00121 this->reset_event_loop_thread_i (tss); 00122 }
ACE_INLINE void TAO_Leader_Follower::reset_event_loop_thread_i | ( | TAO_ORB_Core_TSS_Resources * | tss | ) | [private] |
Implement the reset_event_loop_thread() method, once the TSS resources have been acquired. Also used in the set_upcall_thread.
Definition at line 101 of file Leader_Follower.inl.
References TAO_ORB_Core_TSS_Resources::client_leader_thread_, TAO_ORB_Core_TSS_Resources::event_loop_thread_, and leaders_.
Referenced by reset_event_loop_thread(), and set_upcall_thread().
00102 { 00103 // Always decrement <event_loop_thread_>. If <event_loop_thread_> 00104 // reaches 0 and we are not a client leader, we are done with our 00105 // duties of running the event loop. Therefore, decrement the 00106 // leaders. Otherwise, we just got done with a nested call to the 00107 // event loop or a call to the event loop when we were the client 00108 // leader. 00109 --tss->event_loop_thread_; 00110 00111 if (tss->event_loop_thread_ == 0 && 00112 tss->client_leader_thread_ == 0) 00113 --this->leaders_; 00114 }
ACE_INLINE ACE_Reverse_Lock< TAO_SYNCH_MUTEX > & TAO_Leader_Follower::reverse_lock | ( | void | ) |
The Leader/Followers set mutex must be release during some long running operations. This helper class simplifies the process of releasing and reacquiring said mutex.
Definition at line 188 of file Leader_Follower.inl.
References reverse_lock_.
00189 { 00190 return this->reverse_lock_; 00191 }
void TAO_Leader_Follower::set_client_leader_thread | ( | ACE_thread_t | thread_ID | ) |
sets the thread ID of the leader thread in the leader-follower model
ACE_INLINE void TAO_Leader_Follower::set_client_leader_thread | ( | void | ) |
The current thread has become the leader thread in the client side leader-follower set.
Definition at line 151 of file Leader_Follower.inl.
References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, get_tss_resources(), and leaders_.
Referenced by TAO_LF_Client_Leader_Thread_Helper::TAO_LF_Client_Leader_Thread_Helper().
00152 { 00153 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00154 ++this->leaders_; 00155 ++this->client_thread_is_leader_; 00156 ++tss->client_leader_thread_; 00157 }
void TAO_Leader_Follower::set_client_thread | ( | void | ) |
A server thread is making a request.
Definition at line 147 of file Leader_Follower.cpp.
References TAO_ORB_Core_TSS_Resources::client_leader_thread_, clients_, TAO_Resource_Factory::drop_replies_during_shutdown(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), leaders_, orb_core_, and TAO_ORB_Core::resource_factory().
Referenced by TAO_LF_Client_Thread_Helper::TAO_LF_Client_Thread_Helper().
00148 { 00149 // If we were a leader thread or an event loop thread, give up 00150 // leadership. 00151 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00152 if (tss->event_loop_thread_ || 00153 tss->client_leader_thread_) 00154 { 00155 --this->leaders_; 00156 } 00157 00158 if (this->clients_ == 0 && 00159 this->orb_core_->has_shutdown () && 00160 !this->orb_core_->resource_factory ()->drop_replies_during_shutdown ()) 00161 { 00162 // The ORB has shutdown and we are the first client after 00163 // that. This means that the reactor is disabled, we must 00164 // re-enable it if we want to receive any replys... 00165 this->orb_core_->reactor ()->reset_reactor_event_loop (); 00166 } 00167 ++this->clients_; 00168 }
ACE_INLINE int TAO_Leader_Follower::set_event_loop_thread | ( | ACE_Time_Value * | max_wait_time | ) |
The current thread has become a server thread (i.e. called ORB::run), update any flags and counters.
Definition at line 68 of file Leader_Follower.inl.
References TAO_ORB_Core_TSS_Resources::client_leader_thread_, TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), leaders_, and wait_for_client_leader_to_complete().
Referenced by TAO_LF_Strategy_Complete::set_event_loop_thread().
00069 { 00070 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00071 00072 // Make sure that there is no other client thread run the show. If 00073 // we are the client thread running the show, then it is ok. 00074 if (this->client_thread_is_leader_ && 00075 tss->client_leader_thread_ == 0) 00076 { 00077 int result = 00078 this->wait_for_client_leader_to_complete (max_wait_time); 00079 00080 if (result != 0) 00081 return result; 00082 } 00083 00084 // If <event_loop_thread_> == 0 and <client_leader_thread_> == 0, we 00085 // are running the event loop for the first time. Therefore, 00086 // increment the leaders. Otherwise, simply increment 00087 // <event_loop_thread_> since either (a) if <event_loop_thread_> != 00088 // 0 this is a nested call to the event loop, or (b) 00089 // <client_leader_thread_> != 0 this is a call to the event loop 00090 // while we are a client leader. 00091 if (tss->event_loop_thread_ == 0 && 00092 tss->client_leader_thread_ == 0) 00093 ++this->leaders_; 00094 00095 ++tss->event_loop_thread_; 00096 00097 return 0; 00098 }
ACE_INLINE void TAO_Leader_Follower::set_upcall_thread | ( | void | ) |
This thread is going to perform an upcall, it will no longer be an event loop thread.
Definition at line 131 of file Leader_Follower.inl.
References ACE_GUARD, elect_new_leader(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), reset_event_loop_thread_i(), and TAO_SYNCH_MUTEX.
Referenced by TAO_LF_Strategy_Complete::set_upcall_thread().
00132 { 00133 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); 00134 00135 if (tss->event_loop_thread_ > 0) 00136 { 00137 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock ()); 00138 this->reset_event_loop_thread_i (tss); 00139 00140 this->elect_new_leader (); 00141 } 00142 }
int TAO_Leader_Follower::wait_for_client_leader_to_complete | ( | ACE_Time_Value * | max_wait_time | ) | [private] |
Wait for the client leader to complete.
Definition at line 81 of file Leader_Follower.cpp.
References ACE_ERROR, ACE_TEXT, event_loop_threads_waiting_, ACE_OS::gettimeofday(), LM_ERROR, and ACE_Countdown_Time::update().
Referenced by set_event_loop_thread().
00082 { 00083 int result = 0; 00084 ACE_Countdown_Time countdown (max_wait_time); 00085 00086 // Note that we are waiting. 00087 ++this->event_loop_threads_waiting_; 00088 00089 while (this->client_thread_is_leader_ && 00090 result != -1) 00091 { 00092 if (max_wait_time == 0) 00093 { 00094 if (this->event_loop_threads_condition_.wait () == -1) 00095 { 00096 ACE_ERROR ((LM_ERROR, 00097 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ") 00098 ACE_TEXT ("Condition variable wait failed\n"))); 00099 00100 result = -1; 00101 } 00102 } 00103 else 00104 { 00105 countdown.update (); 00106 ACE_Time_Value tv = ACE_OS::gettimeofday (); 00107 tv += *max_wait_time; 00108 if (this->event_loop_threads_condition_.wait (&tv) == -1) 00109 { 00110 if (errno != ETIME) 00111 ACE_ERROR ((LM_ERROR, 00112 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ") 00113 ACE_TEXT ("Condition variable wait failed\n"))); 00114 00115 result = -1; 00116 } 00117 } 00118 } 00119 00120 // Reset waiting state. 00121 --this->event_loop_threads_waiting_; 00122 00123 return result; 00124 }
int TAO_Leader_Follower::wait_for_event | ( | TAO_LF_Event * | event, | |
TAO_Transport * | transport, | |||
ACE_Time_Value * | max_wait_time | |||
) |
Wait on the Leader/Followers loop until one event happens.
event | The event we wait for, the loop iterates until the event is sucessful, or it fails due to timeout, and error or a connection closed. | |
transport | The transport attached to the event | |
max_wait_time | Limit the time spent on the loop | |
return | Returns -1 on error, 0 or non-zero value otherwise. |
There should be no reason to reset the value of result here. If there was an error in handle_events () that the leader saw, I (Bala) beleave it should be propogated to the clients. result = 0;
Definition at line 194 of file Leader_Follower.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_ERROR_RETURN, ACE_GUARD_RETURN, TAO_LF_Event::error_detected(), ACE_OS::gettimeofday(), TAO_Transport::id(), TAO_LF_Event::keep_waiting(), leader_available(), TAO_LF_Event::LFS_TIMEOUT, LM_DEBUG, LM_ERROR, ACE_Reactor::owner(), reactor(), reactor_, ACE_Thread::self(), TAO_LF_Event::set_state(), TAO_LF_Event::successful(), TAO_debug_level, TAO_SYNCH_MUTEX, and ACE_Time_Value::zero.
Referenced by TAO_Leader_Follower_Flushing_Strategy::flush_message(), TAO_Wait_On_Leader_Follower::wait(), and TAO_LF_Connect_Strategy::wait_i().
00197 { 00198 // Obtain the lock. 00199 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1); 00200 00201 ACE_Countdown_Time countdown (max_wait_time); 00202 00203 // Optmize the first iteration [no access to errno] 00204 int result = 1; 00205 00206 // For some cases the transport may dissappear like when waiting for 00207 // connection to be initiated or closed. So cache the id. 00208 // @@ NOTE: This is not completely safe either. We will be fine for 00209 // cases that dont access the id ie. when debug level is off but 00210 // with debugging level on we are on a sticky wicket. Hopefully none 00211 // of our users should run TAO with debugging enabled like they did 00212 // in PathFinder 00213 size_t t_id = 0; 00214 00215 if (TAO_debug_level && transport != 0) 00216 { 00217 t_id = transport->id (); 00218 } 00219 00220 { 00221 // Calls this->set_client_thread () on construction and 00222 // this->reset_client_thread () on destruction. 00223 TAO_LF_Client_Thread_Helper client_thread_helper (*this); 00224 ACE_UNUSED_ARG (client_thread_helper); 00225 00226 // Check if there is a leader. Note that it cannot be us since we 00227 // gave up our leadership when we became a client. 00228 if (this->leader_available ()) 00229 { 00230 // = Wait as a follower. 00231 00232 // Grab a follower: 00233 TAO_LF_Follower_Auto_Ptr follower (*this); 00234 if (follower.get () == 0) 00235 return -1; 00236 00237 if (TAO_debug_level >= 5) 00238 ACE_DEBUG ((LM_DEBUG, 00239 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00240 " (follower), cond <%x>\n", 00241 t_id, follower.get ())); 00242 00243 // Bound the follower and the LF_Event, this is important to 00244 // get a signal when the event terminates 00245 TAO_LF_Event_Binder event_binder (event, follower.get ()); 00246 00247 while (event->keep_waiting () && 00248 this->leader_available ()) 00249 { 00250 // Add ourselves to the list, do it everytime we wake up 00251 // from the CV loop. Because: 00252 // 00253 // - The leader thread could have elected us as the new 00254 // leader. 00255 // - Before we can assume the role another thread becomes 00256 // the leader 00257 // - But our condition variable could have been removed 00258 // already, if we don't add it again we will never wake 00259 // up. 00260 // 00261 // Notice that we can have spurious wake ups, in that case 00262 // adding the leader results in an error, that must be 00263 // ignored. 00264 // You may be thinking of not removing the condition 00265 // variable in the code that sends the signal, but 00266 // removing it here, that does not work either, in that 00267 // case the condition variable may be used twice: 00268 // 00269 // - Wake up because its reply arrived 00270 // - Wake up because it must become the leader 00271 // 00272 // but only the first one has any effect, so the leader is 00273 // lost. 00274 // 00275 00276 TAO_LF_Follower_Auto_Adder auto_adder (*this, follower); 00277 00278 if (max_wait_time == 0) 00279 { 00280 if (follower->wait (max_wait_time) == -1) 00281 { 00282 if (TAO_debug_level >= 5) 00283 ACE_DEBUG ((LM_DEBUG, 00284 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, " 00285 " (follower) [no timer, cond failed]\n", 00286 t_id)); 00287 00288 // @@ Michael: What is our error handling in this case? 00289 // We could be elected as leader and 00290 // no leader would come in? 00291 return -1; 00292 } 00293 } 00294 else 00295 { 00296 countdown.update (); 00297 ACE_Time_Value tv = ACE_OS::gettimeofday (); 00298 tv += *max_wait_time; 00299 if (follower->wait (&tv) == -1) 00300 { 00301 if (TAO_debug_level >= 5) 00302 ACE_DEBUG ((LM_DEBUG, 00303 "TAO (%P|%t) - Leader_Follower[%d]::wait, " 00304 "(follower) [has timer, follower failed]\n", 00305 t_id )); 00306 00307 // If we have timedout set the state in the 00308 // LF_Event. We call the non-locking, 00309 // no-signalling method on LF_Event. 00310 if (errno == ETIME) 00311 // We have timedout 00312 event->set_state (TAO_LF_Event::LFS_TIMEOUT); 00313 00314 if (!event->successful ()) 00315 { 00316 // Remove follower can fail because either 00317 // 1) the condition was satisfied (i.e. reply 00318 // received or queue drained), or 00319 // 2) somebody elected us as leader, or 00320 // 3) the connection got closed. 00321 // 00322 // Therefore: 00323 // If remove_follower fails and the condition 00324 // was not satisfied, we know that we got 00325 // elected as a leader. 00326 // But we got a timeout, so we cannot become 00327 // the leader, therefore, we have to select a 00328 // new leader. 00329 // 00330 00331 if (this->elect_new_leader () == -1 00332 && TAO_debug_level > 0) 00333 { 00334 ACE_ERROR ((LM_ERROR, 00335 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, " 00336 "elect_new_leader failed\n", 00337 t_id )); 00338 } 00339 } 00340 00341 00342 return -1; 00343 } 00344 } 00345 } 00346 00347 countdown.update (); 00348 00349 // @@ Michael: This is an old comment why we do not want to 00350 // remove the follower here. 00351 // We should not remove the follower here, we *must* remove it when 00352 // we signal it so the same condition is not signalled for 00353 // both wake up as a follower and as the next leader. 00354 00355 if (TAO_debug_level >= 5) 00356 ACE_DEBUG ((LM_DEBUG, 00357 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00358 " done (follower), successful %d\n", 00359 t_id, 00360 event->successful ())); 00361 00362 // Now somebody woke us up to become a leader or to handle our 00363 // input. We are already removed from the follower queue. 00364 00365 if (event->successful ()) 00366 return 0; 00367 00368 if (event->error_detected ()) 00369 return -1; 00370 00371 // FALLTHROUGH 00372 // We only get here if we woke up but the reply is not 00373 // complete yet, time to assume the leader role.... 00374 // i.e. ACE_ASSERT (event->successful () == 0); 00375 } 00376 00377 // = Leader Code. 00378 00379 // The only way to reach this point is if we must become the 00380 // leader, because there is no leader or we have to update to a 00381 // leader or we are doing nested upcalls in this case we do 00382 // increase the refcount on the leader in TAO_ORB_Core. 00383 00384 // Calls this->set_client_leader_thread () on 00385 // construction and this->reset_client_leader_thread () 00386 // on destruction. Note that this may increase the refcount of 00387 // the leader. 00388 TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this); 00389 ACE_UNUSED_ARG (client_leader_thread_helper); 00390 00391 { 00392 ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon, 00393 this->reverse_lock (), -1); 00394 00395 // Become owner of the reactor. 00396 ACE_Reactor *reactor = this->reactor_; 00397 reactor->owner (ACE_Thread::self ()); 00398 00399 // Run the reactor event loop. 00400 00401 if (TAO_debug_level >= 5) 00402 ACE_DEBUG ((LM_DEBUG, 00403 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00404 " (leader) enter reactor event loop\n", 00405 t_id)); 00406 00407 // If we got our event, no need to run the event loop any 00408 // further. 00409 while (event->keep_waiting ()) 00410 { 00411 // Run the event loop. 00412 result = reactor->handle_events (max_wait_time); 00413 00414 // Did we timeout? If so, stop running the loop. 00415 if (result == 0 && 00416 max_wait_time != 0 && 00417 *max_wait_time == ACE_Time_Value::zero) 00418 break; 00419 00420 // Other errors? If so, stop running the loop. 00421 if (result == -1) 00422 break; 00423 00424 // Otherwise, keep going... 00425 } 00426 00427 if (TAO_debug_level >= 5) 00428 ACE_DEBUG ((LM_DEBUG, 00429 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00430 " (leader) exit reactor event loop\n", 00431 t_id)); 00432 } 00433 } 00434 // 00435 // End artificial scope for auto_ptr like helpers calling: 00436 // this->reset_client_thread () and (maybe) 00437 // this->reset_client_leader_thread (). 00438 // 00439 00440 // Wake up the next leader, we cannot do that in handle_input, 00441 // because the woken up thread would try to get into handle_events, 00442 // which is at the time in handle_input still occupied. But do it 00443 // before checking the error in <result>, even if there is an error 00444 // in our input we should continue running the loop in another 00445 // thread. 00446 00447 if (this->elect_new_leader () == -1) 00448 ACE_ERROR_RETURN ((LM_ERROR, 00449 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00450 " failed to elect new leader\n", 00451 t_id), 00452 -1); 00453 00454 if (result == -1 && !this->reactor_->reactor_event_loop_done ()) 00455 ACE_ERROR_RETURN ((LM_ERROR, 00456 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event," 00457 " handle_events failed\n", 00458 t_id), 00459 -1); 00460 00461 // Return an error if there was a problem receiving the reply... 00462 if (max_wait_time != 0) 00463 { 00464 if (!event->successful () 00465 && *max_wait_time == ACE_Time_Value::zero) 00466 { 00467 result = -1; 00468 errno = ETIME; 00469 } 00470 else if (event->error_detected ()) 00471 { 00472 // If the time did not expire yet, but we get a failure, 00473 // e.g. the connections closed, we should still return an error. 00474 result = -1; 00475 } 00476 } 00477 else 00478 { 00479 /** 00480 * There should be no reason to reset the value of result 00481 * here. If there was an error in handle_events () that the 00482 * leader saw, I (Bala) beleave it should be propogated to the 00483 * clients. 00484 * result = 0; 00485 */ 00486 if (event->error_detected ()) 00487 { 00488 result = -1; 00489 } 00490 } 00491 00492 return result; 00493 }
int TAO_Leader_Follower::client_thread_is_leader_ [private] |
Is a client thread the current leader?
Definition at line 249 of file Leader_Follower.h.
Referenced by reset_client_leader_thread(), and set_client_leader_thread().
int TAO_Leader_Follower::clients_ [private] |
Count the number of active clients, this is useful to know when to deactivate the reactor
Definition at line 243 of file Leader_Follower.h.
Referenced by has_clients(), reset_client_thread(), and set_client_thread().
TAO_SYNCH_CONDITION TAO_Leader_Follower::event_loop_threads_condition_ [private] |
Condition variable for server threads waiting for the client leader to complete.
Definition at line 256 of file Leader_Follower.h.
Referenced by elect_new_leader().
int TAO_Leader_Follower::event_loop_threads_waiting_ [private] |
Are server threads waiting for the client leader to complete?
Definition at line 252 of file Leader_Follower.h.
Referenced by wait_for_client_leader_to_complete().
Use a free list to allocate and release Follower objects.
Definition at line 231 of file Leader_Follower.h.
Referenced by release_follower().
Definition at line 228 of file Leader_Follower.h.
Referenced by add_follower(), elect_new_leader_i(), follower_available(), and remove_follower().
int TAO_Leader_Follower::leaders_ [private] |
Count the number of active leaders. There could be many leaders in the thread pool (i.e. calling ORB::run), and the same leader could show up multiple times as it receives nested upcalls and sends more requests.
Definition at line 239 of file Leader_Follower.h.
Referenced by leader_available(), reset_client_leader_thread(), reset_client_thread(), reset_event_loop_thread_i(), set_client_leader_thread(), set_client_thread(), and set_event_loop_thread().
TAO_SYNCH_MUTEX TAO_Leader_Follower::lock_ [private] |
To synchronize access to the members.
Definition at line 221 of file Leader_Follower.h.
Referenced by lock().
Leader/Follower class uses this method to notify the system that we are out of leaders.
Definition at line 260 of file Leader_Follower.h.
Referenced by no_leaders_available().
TAO_ORB_Core* TAO_Leader_Follower::orb_core_ [private] |
The orb core.
Definition at line 218 of file Leader_Follower.h.
Referenced by get_tss_resources(), reactor(), and set_client_thread().
ACE_Reactor* TAO_Leader_Follower::reactor_ [private] |
The reactor.
Definition at line 246 of file Leader_Follower.h.
Referenced by reactor(), and wait_for_event().
ACE_Reverse_Lock<TAO_SYNCH_MUTEX> TAO_Leader_Follower::reverse_lock_ [private] |
Do protect the access to the following three members.
Definition at line 224 of file Leader_Follower.h.
Referenced by reverse_lock().