#include <Leader_Follower.h>
Collaboration diagram for TAO_Leader_Follower:

Public Member Functions | |
| TAO_Leader_Follower (TAO_ORB_Core *orb_core, TAO_New_Leader_Generator *new_leader_generator=0) | |
| Constructor. | |
| ~TAO_Leader_Follower (void) | |
| Destructor. | |
| int | set_event_loop_thread (ACE_Time_Value *max_wait_time) |
| void | reset_event_loop_thread (void) |
| void | set_upcall_thread (void) |
| int | leader_available (void) const |
| Is there any thread running as a leader? | |
| void | set_client_thread (void) |
| A server thread is making a request. | |
| void | reset_client_thread (void) |
| A server thread has finished is making a request. | |
| int | wait_for_event (TAO_LF_Event *event, TAO_Transport *transport, ACE_Time_Value *max_wait_time) |
| Wait on the Leader/Followers loop until one event happens. | |
| void | set_client_leader_thread (void) |
| void | reset_client_leader_thread (void) |
| void | set_client_leader_thread (ACE_thread_t thread_ID) |
| int | is_client_leader_thread (void) const |
| checks if we are a leader thread | |
| int | elect_new_leader (void) |
| TAO_SYNCH_MUTEX & | lock (void) |
| Get a reference to the underlying mutex. | |
| ACE_Reverse_Lock< TAO_SYNCH_MUTEX > & | reverse_lock (void) |
| int | has_clients (void) const |
| Check if there are any client threads running. | |
| ACE_Reactor * | reactor (void) |
| Accesor to the reactor. | |
| void | no_leaders_available (void) |
| Called when we are out of leaders. | |
Follower creation/destructions | |
The Leader/Followers set acts as a factory for the Follower objects. Followers are used to represent a thread blocked waiting in the Follower set. The Leader/Followers abstraction keeps a list of the waiting followers, so it can wake up one when the leader thread stops handling events. For performance reasons the Leader/Followers set uses a pool (or free-list) to keep Follower objects unattached to any thread. It could be tempting to use TSS to keep such followers, after all a thread can only need one such Follower object, however, that does not work with multiple Leader/Followers sets, consult this bug report for more details: | |
| TAO_LF_Follower * | allocate_follower (void) |
| Allocate a new follower to the caller. | |
| void | release_follower (TAO_LF_Follower *) |
| The caller has finished using a follower. | |
Follower Set Operations | |
| void | add_follower (TAO_LF_Follower *follower) |
| Add a new follower to the set. | |
| void | remove_follower (TAO_LF_Follower *follower) |
| Removes a follower from the leader-follower set. | |
| int | follower_available (void) const |
| Checks if there are any followers available. | |
Private Types | |
| typedef ACE_Intrusive_List< TAO_LF_Follower > | Follower_Set |
| Implement the Leader/Followers set using an intrusive list. | |
Private Member Functions | |
| TAO_ORB_Core_TSS_Resources * | get_tss_resources (void) const |
| Shortcut to obtain the TSS resources of the orb core. | |
| int | wait_for_client_leader_to_complete (ACE_Time_Value *max_wait_time) |
| Wait for the client leader to complete. | |
| void | reset_event_loop_thread_i (TAO_ORB_Core_TSS_Resources *tss) |
Follower Set Operations | |
| int | elect_new_leader_i (void) |
Private Attributes | |
| TAO_ORB_Core * | orb_core_ |
| The orb core. | |
| TAO_SYNCH_MUTEX | lock_ |
| To synchronize access to the members. | |
| ACE_Reverse_Lock< TAO_SYNCH_MUTEX > | reverse_lock_ |
| do protect the access to the following three members | |
| Follower_Set | follower_set_ |
| Follower_Set | follower_free_list_ |
| Use a free list to allocate and release Follower objects. | |
| int | leaders_ |
| int | clients_ |
| ACE_Reactor * | reactor_ |
| The reactor. | |
| int | client_thread_is_leader_ |
| Is a client thread the current leader? | |
| int | event_loop_threads_waiting_ |
| Are server threads waiting for the client leader to complete? | |
| TAO_SYNCH_CONDITION | event_loop_threads_condition_ |
| TAO_New_Leader_Generator * | new_leader_generator_ |
TAO_Leader_Follower
Definition at line 49 of file Leader_Follower.h.
|
|
Implement the Leader/Followers set using an intrusive list.
Definition at line 227 of file Leader_Follower.h. |
|
||||||||||||
|
Constructor.
Definition at line 13 of file Leader_Follower.inl.
00015 : orb_core_ (orb_core), 00016 reverse_lock_ (lock_), 00017 leaders_ (0), 00018 clients_ (0), 00019 reactor_ (0), 00020 client_thread_is_leader_ (0), 00021 event_loop_threads_waiting_ (0), 00022 event_loop_threads_condition_ (lock_), 00023 new_leader_generator_ (new_leader_generator) 00024 { 00025 } |
|
|
Destructor.
Definition at line 27 of file Leader_Follower.cpp. References ACE_Intrusive_List< T >::empty(), follower_free_list_, TAO_ORB_Core::gui_resource_factory(), ACE_Intrusive_List< T >::pop_front(), TAO_Resource_Factory::reclaim_reactor(), TAO::GUIResource_Factory::reclaim_reactor(), and TAO_ORB_Core::resource_factory().
00028 {
00029 while (!this->follower_free_list_.empty ())
00030 {
00031 TAO_LF_Follower *follower =
00032 this->follower_free_list_.pop_front ();
00033 delete follower;
00034 }
00035 // Hand the reactor back to the resource factory.
00036 // use GUI reactor factory if available
00037 if ( this->orb_core_->gui_resource_factory () )
00038 this->orb_core_->gui_resource_factory ()->reclaim_reactor (this->reactor_);
00039 else
00040 this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_);
00041
00042 this->reactor_ = 0;
00043 }
|
|
|
Add a new follower to the set.
Definition at line 176 of file Leader_Follower.inl. References follower_set_, and ACE_Intrusive_List< T >::push_back(). Referenced by TAO_LF_Follower_Auto_Adder::TAO_LF_Follower_Auto_Adder().
00177 {
00178 this->follower_set_.push_back (follower);
00179 }
|
|
|
Allocate a new follower to the caller.
Definition at line 46 of file Leader_Follower.cpp. References ACE_NEW_RETURN, ACE_Intrusive_List< T >::empty(), follower_free_list_, and ACE_Intrusive_List< T >::pop_front().
00047 {
00048 if (!this->follower_free_list_.empty ())
00049 return this->follower_free_list_.pop_front ();
00050
00051 TAO_LF_Follower* ptr = 0;
00052 ACE_NEW_RETURN (ptr,
00053 TAO_LF_Follower (*this),
00054 0);
00055 return ptr;
00056 }
|
|
|
A leader thread is relinquishing its role, unless there are more leader threads running pick up a follower (if there is any) to play the leader role. Definition at line 47 of file Leader_Follower.inl. References elect_new_leader_i(), event_loop_threads_condition_, event_loop_threads_waiting_, follower_available(), leaders_, and no_leaders_available(). Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread(), set_upcall_thread(), and wait_for_event().
00048 {
00049 if (this->leaders_ == 0)
00050 {
00051 if (this->event_loop_threads_waiting_)
00052 {
00053 return this->event_loop_threads_condition_.broadcast ();
00054 }
00055 else if (this->follower_available ())
00056 {
00057 return this->elect_new_leader_i ();
00058 }
00059 else
00060 {
00061 this->no_leaders_available ();
00062 }
00063 }
00064 return 0;
00065 }
|
|
|
This is a helper routine for elect_new_leader(), after verifying that all the pre-conditions are satisfied the Follower set is changed and the promoted Follower is signaled. Definition at line 65 of file Leader_Follower.cpp. References ACE_DEBUG, follower_set_, ACE_Intrusive_List< T >::head(), LM_DEBUG, and TAO_LF_Follower::signal(). Referenced by elect_new_leader().
00066 {
00067 TAO_LF_Follower* const follower =
00068 this->follower_set_.head ();
00069
00070 #if defined (TAO_DEBUG_LEADER_FOLLOWER)
00071 ACE_DEBUG ((LM_DEBUG,
00072 "TAO (%P|%t) LF::elect_new_leader_i - "
00073 "follower is %x\n",
00074 follower));
00075 #endif /* TAO_DEBUG_LEADER_FOLLOWER */
00076
00077 return follower->signal ();
00078 }
|
|
|
Checks if there are any followers available.
Definition at line 34 of file Leader_Follower.inl. References ACE_Intrusive_List< T >::empty(), and follower_set_. Referenced by elect_new_leader().
00035 {
00036 return !this->follower_set_.empty ();
00037 }
|
|
|
Shortcut to obtain the TSS resources of the orb core.
Definition at line 28 of file Leader_Follower.inl. References TAO_ORB_Core::get_tss_resources(). Referenced by is_client_leader_thread(), reset_client_leader_thread(), reset_client_thread(), reset_event_loop_thread(), set_client_leader_thread(), set_client_thread(), set_event_loop_thread(), and set_upcall_thread().
00029 {
00030 return this->orb_core_->get_tss_resources ();
00031 }
|
|
|
Check if there are any client threads running.
Definition at line 194 of file Leader_Follower.inl. References clients_. Referenced by TAO_Thread_Lane_Resources::shutdown_reactor().
00195 {
00196 return this->clients_;
00197 }
|
|
|
checks if we are a leader thread
Definition at line 169 of file Leader_Follower.inl. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, and get_tss_resources().
00170 {
00171 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00172 return tss->client_leader_thread_ != 0;
00173 }
|
|
|
Is there any thread running as a leader?
Definition at line 145 of file Leader_Follower.inl. References leaders_. Referenced by wait_for_event().
00146 {
00147 return this->leaders_ != 0;
00148 }
|
|
|
Get a reference to the underlying mutex.
Definition at line 125 of file Leader_Follower.inl. Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread(), TAO_LF_Strategy_Complete::set_event_loop_thread(), TAO_Thread_Lane_Resources::shutdown_reactor(), and TAO_LF_Event::state_changed().
00126 {
00127 return this->lock_;
00128 }
|
|
|
Called when we are out of leaders.
Definition at line 40 of file Leader_Follower.inl. References new_leader_generator_, and TAO_New_Leader_Generator::no_leaders_available(). Referenced by elect_new_leader().
00041 {
00042 if (this->new_leader_generator_)
00043 this->new_leader_generator_->no_leaders_available ();
00044 }
|
|
|
Accesor to the reactor.
Definition at line 127 of file Leader_Follower.cpp. References ACE_GUARD_RETURN, TAO_Resource_Factory::get_reactor(), TAO::GUIResource_Factory::get_reactor(), TAO_ORB_Core::gui_resource_factory(), TAO_ORB_Core::resource_factory(), and TAO_SYNCH_MUTEX. Referenced by TAO_ORB_Core::reactor(), and TAO_Thread_Lane_Resources::shutdown_reactor().
00128 {
00129 if (this->reactor_ == 0)
00130 {
00131 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), 0);
00132 if (this->reactor_ == 0)
00133 {
00134 // use GUI reactor factory if available
00135 if ( this->orb_core_->gui_resource_factory () )
00136 this->reactor_ =
00137 this->orb_core_->gui_resource_factory ()->get_reactor ();
00138 else
00139 this->reactor_ =
00140 this->orb_core_->resource_factory ()->get_reactor ();
00141 }
00142 }
00143 return this->reactor_;
00144 }
|
|
|
The caller has finished using a follower.
Definition at line 59 of file Leader_Follower.cpp. References follower_free_list_, and ACE_Intrusive_List< T >::push_front(). Referenced by TAO_LF_Follower_Auto_Ptr::~TAO_LF_Follower_Auto_Ptr().
00060 {
00061 this->follower_free_list_.push_front (follower);
00062 }
|
|
|
Removes a follower from the leader-follower set.
Definition at line 182 of file Leader_Follower.inl. References follower_set_, and ACE_Intrusive_List< T >::remove(). Referenced by TAO_LF_Follower::signal(), and TAO_LF_Follower_Auto_Adder::~TAO_LF_Follower_Auto_Adder().
00183 {
00184 this->follower_set_.remove (follower);
00185 }
|
|
|
The current thread is no longer the leader thread in the client side leader-follower set. Definition at line 160 of file Leader_Follower.inl. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, get_tss_resources(), and leaders_. Referenced by TAO_LF_Client_Leader_Thread_Helper::~TAO_LF_Client_Leader_Thread_Helper().
00161 {
00162 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00163 --tss->client_leader_thread_;
00164 --this->leaders_;
00165 --this->client_thread_is_leader_;
00166 }
|
|
|
A server thread has finished is making a request.
Definition at line 171 of file Leader_Follower.cpp. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, clients_, ACE_Reactor::end_reactor_event_loop(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), TAO_ORB_Core::has_shutdown(), leaders_, and TAO_ORB_Core::reactor(). Referenced by TAO_LF_Client_Thread_Helper::~TAO_LF_Client_Thread_Helper().
00172 {
00173 // If we were a leader thread or an event loop thread, take back
00174 // leadership.
00175 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00176 if (tss->event_loop_thread_ ||
00177 tss->client_leader_thread_)
00178 {
00179 ++this->leaders_;
00180 }
00181
00182 --this->clients_;
00183 if (this->clients_ == 0 &&
00184 this->orb_core_->has_shutdown ())
00185 {
00186 // The ORB has shutdown and we are the last client thread, we
00187 // must stop the reactor to ensure that any server threads go
00188 // away.
00189 this->orb_core_->reactor ()->end_reactor_event_loop ();
00190 }
00191 }
|
|
|
The current thread is not a server thread anymore, reset any flags and counters. Definition at line 117 of file Leader_Follower.inl. References TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), and reset_event_loop_thread_i(). Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread().
00118 {
00119 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00120 if (tss->event_loop_thread_ > 0)
00121 this->reset_event_loop_thread_i (tss);
00122 }
|
|
|
Implement the reset_event_loop_thread() method, once the TSS resources have been acquired. Also used in the set_upcall_thread. Definition at line 101 of file Leader_Follower.inl. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, TAO_ORB_Core_TSS_Resources::event_loop_thread_, and leaders_. Referenced by reset_event_loop_thread(), and set_upcall_thread().
00102 {
00103 // Always decrement <event_loop_thread_>. If <event_loop_thread_>
00104 // reaches 0 and we are not a client leader, we are done with our
00105 // duties of running the event loop. Therefore, decrement the
00106 // leaders. Otherwise, we just got done with a nested call to the
00107 // event loop or a call to the event loop when we were the client
00108 // leader.
00109 --tss->event_loop_thread_;
00110
00111 if (tss->event_loop_thread_ == 0 &&
00112 tss->client_leader_thread_ == 0)
00113 --this->leaders_;
00114 }
|
|
|
The Leader/Followers set mutex must be release during some long running operations. This helper class simplifies the process of releasing and reacquiring said mutex. Definition at line 188 of file Leader_Follower.inl.
00189 {
00190 return this->reverse_lock_;
00191 }
|
|
|
sets the thread ID of the leader thread in the leader-follower model |
|
|
The current thread has become the leader thread in the client side leader-follower set. Definition at line 151 of file Leader_Follower.inl. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, get_tss_resources(), and leaders_. Referenced by TAO_LF_Client_Leader_Thread_Helper::TAO_LF_Client_Leader_Thread_Helper().
00152 {
00153 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00154 ++this->leaders_;
00155 ++this->client_thread_is_leader_;
00156 ++tss->client_leader_thread_;
00157 }
|
|
|
A server thread is making a request.
Definition at line 147 of file Leader_Follower.cpp. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, clients_, TAO_Resource_Factory::drop_replies_during_shutdown(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), TAO_ORB_Core::has_shutdown(), leaders_, TAO_ORB_Core::reactor(), ACE_Reactor::reset_reactor_event_loop(), and TAO_ORB_Core::resource_factory(). Referenced by TAO_LF_Client_Thread_Helper::TAO_LF_Client_Thread_Helper().
00148 {
00149 // If we were a leader thread or an event loop thread, give up
00150 // leadership.
00151 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00152 if (tss->event_loop_thread_ ||
00153 tss->client_leader_thread_)
00154 {
00155 --this->leaders_;
00156 }
00157
00158 if (this->clients_ == 0 &&
00159 this->orb_core_->has_shutdown () &&
00160 !this->orb_core_->resource_factory ()->drop_replies_during_shutdown ())
00161 {
00162 // The ORB has shutdown and we are the first client after
00163 // that. This means that the reactor is disabled, we must
00164 // re-enable it if we want to receive any replys...
00165 this->orb_core_->reactor ()->reset_reactor_event_loop ();
00166 }
00167 ++this->clients_;
00168 }
|
|
|
The current thread has become a server thread (i.e. called ORB::run), update any flags and counters. Definition at line 68 of file Leader_Follower.inl. References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), leaders_, and wait_for_client_leader_to_complete(). Referenced by TAO_LF_Strategy_Complete::set_event_loop_thread().
00069 {
00070 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00071
00072 // Make sure that there is no other client thread run the show. If
00073 // we are the client thread running the show, then it is ok.
00074 if (this->client_thread_is_leader_ &&
00075 tss->client_leader_thread_ == 0)
00076 {
00077 int result =
00078 this->wait_for_client_leader_to_complete (max_wait_time);
00079
00080 if (result != 0)
00081 return result;
00082 }
00083
00084 // If <event_loop_thread_> == 0 and <client_leader_thread_> == 0, we
00085 // are running the event loop for the first time. Therefore,
00086 // increment the leaders. Otherwise, simply increment
00087 // <event_loop_thread_> since either (a) if <event_loop_thread_> !=
00088 // 0 this is a nested call to the event loop, or (b)
00089 // <client_leader_thread_> != 0 this is a call to the event loop
00090 // while we are a client leader.
00091 if (tss->event_loop_thread_ == 0 &&
00092 tss->client_leader_thread_ == 0)
00093 ++this->leaders_;
00094
00095 ++tss->event_loop_thread_;
00096
00097 return 0;
00098 }
|
|
|
This thread is going to perform an upcall, it will no longer be an event loop thread. Definition at line 131 of file Leader_Follower.inl. References ACE_GUARD, elect_new_leader(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), reset_event_loop_thread_i(), and TAO_SYNCH_MUTEX. Referenced by TAO_LF_Strategy_Complete::set_upcall_thread().
00132 {
00133 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00134
00135 if (tss->event_loop_thread_ > 0)
00136 {
00137 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock ());
00138 this->reset_event_loop_thread_i (tss);
00139
00140 this->elect_new_leader ();
00141 }
00142 }
|
|
|
Wait for the client leader to complete.
Definition at line 81 of file Leader_Follower.cpp. References ACE_ERROR, ACE_TEXT, client_thread_is_leader_, ETIME, event_loop_threads_condition_, event_loop_threads_waiting_, ACE_OS::gettimeofday(), LM_ERROR, and ACE_Countdown_Time::update(). Referenced by set_event_loop_thread().
00082 {
00083 int result = 0;
00084 ACE_Countdown_Time countdown (max_wait_time);
00085
00086 // Note that we are waiting.
00087 ++this->event_loop_threads_waiting_;
00088
00089 while (this->client_thread_is_leader_ &&
00090 result != -1)
00091 {
00092 if (max_wait_time == 0)
00093 {
00094 if (this->event_loop_threads_condition_.wait () == -1)
00095 {
00096 ACE_ERROR ((LM_ERROR,
00097 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ")
00098 ACE_TEXT ("Condition variable wait failed\n")));
00099
00100 result = -1;
00101 }
00102 }
00103 else
00104 {
00105 countdown.update ();
00106 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00107 tv += *max_wait_time;
00108 if (this->event_loop_threads_condition_.wait (&tv) == -1)
00109 {
00110 if (errno != ETIME)
00111 ACE_ERROR ((LM_ERROR,
00112 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ")
00113 ACE_TEXT ("Condition variable wait failed\n")));
00114
00115 result = -1;
00116 }
00117 }
00118 }
00119
00120 // Reset waiting state.
00121 --this->event_loop_threads_waiting_;
00122
00123 return result;
00124 }
|
|
||||||||||||||||
|
Wait on the Leader/Followers loop until one event happens.
There should be no reason to reset the value of result here. If there was an error in handle_events () that the leader saw, I (Bala) beleave it should be propogated to the clients. result = 0; Definition at line 194 of file Leader_Follower.cpp. References ACE_DEBUG, ACE_ERROR, ACE_ERROR_RETURN, ACE_GUARD_RETURN, elect_new_leader(), TAO_LF_Event::error_detected(), ETIME, TAO_LF_Follower_Auto_Ptr::get(), ACE_OS::gettimeofday(), ACE_Reactor::handle_events(), TAO_Transport::id(), TAO_LF_Event::keep_waiting(), leader_available(), LM_DEBUG, LM_ERROR, ACE_Reactor::owner(), ACE_Reactor::reactor_event_loop_done(), TAO_LF_Event::set_state(), TAO_LF_Event::successful(), TAO_debug_level, TAO_SYNCH_MUTEX, and ACE_Countdown_Time::update(). Referenced by TAO_Leader_Follower_Flushing_Strategy::flush_message(), TAO_Wait_On_Leader_Follower::wait(), and TAO_LF_Connect_Strategy::wait_i().
00197 {
00198 // Obtain the lock.
00199 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
00200
00201 ACE_Countdown_Time countdown (max_wait_time);
00202
00203 // Optmize the first iteration [no access to errno]
00204 int result = 1;
00205
00206 // For some cases the transport may dissappear like when waiting for
00207 // connection to be initiated or closed. So cache the id.
00208 // @@ NOTE: This is not completely safe either. We will be fine for
00209 // cases that dont access the id ie. when debug level is off but
00210 // with debugging level on we are on a sticky wicket. Hopefully none
00211 // of our users should run TAO with debugging enabled like they did
00212 // in PathFinder
00213 size_t t_id = 0;
00214
00215 if (TAO_debug_level && transport != 0)
00216 {
00217 t_id = transport->id ();
00218 }
00219
00220 {
00221 // Calls this->set_client_thread () on construction and
00222 // this->reset_client_thread () on destruction.
00223 TAO_LF_Client_Thread_Helper client_thread_helper (*this);
00224 ACE_UNUSED_ARG (client_thread_helper);
00225
00226 // Check if there is a leader. Note that it cannot be us since we
00227 // gave up our leadership when we became a client.
00228 if (this->leader_available ())
00229 {
00230 // = Wait as a follower.
00231
00232 // Grab a follower:
00233 TAO_LF_Follower_Auto_Ptr follower (*this);
00234 if (follower.get () == 0)
00235 return -1;
00236
00237 if (TAO_debug_level >= 5)
00238 ACE_DEBUG ((LM_DEBUG,
00239 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00240 " (follower), cond <%x>\n",
00241 t_id, follower.get ()));
00242
00243 // Bound the follower and the LF_Event, this is important to
00244 // get a signal when the event terminates
00245 TAO_LF_Event_Binder event_binder (event, follower.get ());
00246
00247 while (event->keep_waiting () &&
00248 this->leader_available ())
00249 {
00250 // Add ourselves to the list, do it everytime we wake up
00251 // from the CV loop. Because:
00252 //
00253 // - The leader thread could have elected us as the new
00254 // leader.
00255 // - Before we can assume the role another thread becomes
00256 // the leader
00257 // - But our condition variable could have been removed
00258 // already, if we don't add it again we will never wake
00259 // up.
00260 //
00261 // Notice that we can have spurious wake ups, in that case
00262 // adding the leader results in an error, that must be
00263 // ignored.
00264 // You may be thinking of not removing the condition
00265 // variable in the code that sends the signal, but
00266 // removing it here, that does not work either, in that
00267 // case the condition variable may be used twice:
00268 //
00269 // - Wake up because its reply arrived
00270 // - Wake up because it must become the leader
00271 //
00272 // but only the first one has any effect, so the leader is
00273 // lost.
00274 //
00275
00276 TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
00277
00278 if (max_wait_time == 0)
00279 {
00280 if (follower->wait (max_wait_time) == -1)
00281 {
00282 if (TAO_debug_level >= 5)
00283 ACE_DEBUG ((LM_DEBUG,
00284 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00285 " (follower) [no timer, cond failed]\n",
00286 t_id));
00287
00288 // @@ Michael: What is our error handling in this case?
00289 // We could be elected as leader and
00290 // no leader would come in?
00291 return -1;
00292 }
00293 }
00294 else
00295 {
00296 countdown.update ();
00297 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00298 tv += *max_wait_time;
00299 if (follower->wait (&tv) == -1)
00300 {
00301 if (TAO_debug_level >= 5)
00302 ACE_DEBUG ((LM_DEBUG,
00303 "TAO (%P|%t) - Leader_Follower[%d]::wait, "
00304 "(follower) [has timer, follower failed]\n",
00305 t_id ));
00306
00307 // If we have timedout set the state in the
00308 // LF_Event. We call the non-locking,
00309 // no-signalling method on LF_Event.
00310 if (errno == ETIME)
00311 // We have timedout
00312 event->set_state (TAO_LF_Event::LFS_TIMEOUT);
00313
00314 if (!event->successful ())
00315 {
00316 // Remove follower can fail because either
00317 // 1) the condition was satisfied (i.e. reply
00318 // received or queue drained), or
00319 // 2) somebody elected us as leader, or
00320 // 3) the connection got closed.
00321 //
00322 // Therefore:
00323 // If remove_follower fails and the condition
00324 // was not satisfied, we know that we got
00325 // elected as a leader.
00326 // But we got a timeout, so we cannot become
00327 // the leader, therefore, we have to select a
00328 // new leader.
00329 //
00330
00331 if (this->elect_new_leader () == -1
00332 && TAO_debug_level > 0)
00333 {
00334 ACE_ERROR ((LM_ERROR,
00335 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00336 "elect_new_leader failed\n",
00337 t_id ));
00338 }
00339 }
00340
00341
00342 return -1;
00343 }
00344 }
00345 }
00346
00347 countdown.update ();
00348
00349 // @@ Michael: This is an old comment why we do not want to
00350 // remove the follower here.
00351 // We should not remove the follower here, we *must* remove it when
00352 // we signal it so the same condition is not signalled for
00353 // both wake up as a follower and as the next leader.
00354
00355 if (TAO_debug_level >= 5)
00356 ACE_DEBUG ((LM_DEBUG,
00357 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00358 " done (follower), successful %d\n",
00359 t_id,
00360 event->successful ()));
00361
00362 // Now somebody woke us up to become a leader or to handle our
00363 // input. We are already removed from the follower queue.
00364
00365 if (event->successful ())
00366 return 0;
00367
00368 if (event->error_detected ())
00369 return -1;
00370
00371 // FALLTHROUGH
00372 // We only get here if we woke up but the reply is not
00373 // complete yet, time to assume the leader role....
00374 // i.e. ACE_ASSERT (event->successful () == 0);
00375 }
00376
00377 // = Leader Code.
00378
00379 // The only way to reach this point is if we must become the
00380 // leader, because there is no leader or we have to update to a
00381 // leader or we are doing nested upcalls in this case we do
00382 // increase the refcount on the leader in TAO_ORB_Core.
00383
00384 // Calls this->set_client_leader_thread () on
00385 // construction and this->reset_client_leader_thread ()
00386 // on destruction. Note that this may increase the refcount of
00387 // the leader.
00388 TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
00389 ACE_UNUSED_ARG (client_leader_thread_helper);
00390
00391 {
00392 ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
00393 this->reverse_lock (), -1);
00394
00395 // Become owner of the reactor.
00396 ACE_Reactor *reactor = this->reactor_;
00397 reactor->owner (ACE_Thread::self ());
00398
00399 // Run the reactor event loop.
00400
00401 if (TAO_debug_level >= 5)
00402 ACE_DEBUG ((LM_DEBUG,
00403 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00404 " (leader) enter reactor event loop\n",
00405 t_id));
00406
00407 // If we got our event, no need to run the event loop any
00408 // further.
00409 while (event->keep_waiting ())
00410 {
00411 // Run the event loop.
00412 result = reactor->handle_events (max_wait_time);
00413
00414 // Did we timeout? If so, stop running the loop.
00415 if (result == 0 &&
00416 max_wait_time != 0 &&
00417 *max_wait_time == ACE_Time_Value::zero)
00418 break;
00419
00420 // Other errors? If so, stop running the loop.
00421 if (result == -1)
00422 break;
00423
00424 // Otherwise, keep going...
00425 }
00426
00427 if (TAO_debug_level >= 5)
00428 ACE_DEBUG ((LM_DEBUG,
00429 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00430 " (leader) exit reactor event loop\n",
00431 t_id));
00432 }
00433 }
00434 //
00435 // End artificial scope for auto_ptr like helpers calling:
00436 // this->reset_client_thread () and (maybe)
00437 // this->reset_client_leader_thread ().
00438 //
00439
00440 // Wake up the next leader, we cannot do that in handle_input,
00441 // because the woken up thread would try to get into handle_events,
00442 // which is at the time in handle_input still occupied. But do it
00443 // before checking the error in <result>, even if there is an error
00444 // in our input we should continue running the loop in another
00445 // thread.
00446
00447 if (this->elect_new_leader () == -1)
00448 ACE_ERROR_RETURN ((LM_ERROR,
00449 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00450 " failed to elect new leader\n",
00451 t_id),
00452 -1);
00453
00454 if (result == -1 && !this->reactor_->reactor_event_loop_done ())
00455 ACE_ERROR_RETURN ((LM_ERROR,
00456 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00457 " handle_events failed\n",
00458 t_id),
00459 -1);
00460
00461 // Return an error if there was a problem receiving the reply...
00462 if (max_wait_time != 0)
00463 {
00464 if (!event->successful ()
00465 && *max_wait_time == ACE_Time_Value::zero)
00466 {
00467 result = -1;
00468 errno = ETIME;
00469 }
00470 else if (event->error_detected ())
00471 {
00472 // If the time did not expire yet, but we get a failure,
00473 // e.g. the connections closed, we should still return an error.
00474 result = -1;
00475 }
00476 }
00477 else
00478 {
00479 /**
00480 * There should be no reason to reset the value of result
00481 * here. If there was an error in handle_events () that the
00482 * leader saw, I (Bala) beleave it should be propogated to the
00483 * clients.
00484 * result = 0;
00485 */
00486 if (event->error_detected ())
00487 {
00488 result = -1;
00489 }
00490 }
00491
00492 return result;
00493 }
|
|
|
Is a client thread the current leader?
Definition at line 249 of file Leader_Follower.h. Referenced by reset_client_leader_thread(), set_client_leader_thread(), set_event_loop_thread(), and wait_for_client_leader_to_complete(). |
|
|
Count the number of active clients, this is useful to know when to deactivate the reactor Definition at line 243 of file Leader_Follower.h. Referenced by has_clients(), reset_client_thread(), and set_client_thread(). |
|
|
Condition variable for server threads waiting for the client leader to complete. Definition at line 256 of file Leader_Follower.h. Referenced by elect_new_leader(), and wait_for_client_leader_to_complete(). |
|
|
Are server threads waiting for the client leader to complete?
Definition at line 252 of file Leader_Follower.h. Referenced by elect_new_leader(), and wait_for_client_leader_to_complete(). |
|
|
Use a free list to allocate and release Follower objects.
Definition at line 231 of file Leader_Follower.h. Referenced by allocate_follower(), release_follower(), and ~TAO_Leader_Follower(). |
|
|
Definition at line 228 of file Leader_Follower.h. Referenced by add_follower(), elect_new_leader_i(), follower_available(), and remove_follower(). |
|
|
Count the number of active leaders. There could be many leaders in the thread pool (i.e. calling ORB::run), and the same leader could show up multiple times as it receives nested upcalls and sends more requests. Definition at line 239 of file Leader_Follower.h. Referenced by elect_new_leader(), leader_available(), reset_client_leader_thread(), reset_client_thread(), reset_event_loop_thread_i(), set_client_leader_thread(), set_client_thread(), and set_event_loop_thread(). |
|
|
To synchronize access to the members.
Definition at line 221 of file Leader_Follower.h. |
|
|
Leader/Follower class uses this method to notify the system that we are out of leaders. Definition at line 260 of file Leader_Follower.h. Referenced by no_leaders_available(). |
|
|
The orb core.
Definition at line 218 of file Leader_Follower.h. |
|
|
The reactor.
Definition at line 246 of file Leader_Follower.h. |
|
|
do protect the access to the following three members
Definition at line 224 of file Leader_Follower.h. |
1.3.6