TAO_Leader_Follower Class Reference

TAO_Leader_Follower. More...

#include <Leader_Follower.h>

Collaboration diagram for TAO_Leader_Follower:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_Leader_Follower (TAO_ORB_Core *orb_core, TAO_New_Leader_Generator *new_leader_generator=0)
 Constructor.

 ~TAO_Leader_Follower (void)
 Destructor.

int set_event_loop_thread (ACE_Time_Value *max_wait_time)
void reset_event_loop_thread (void)
void set_upcall_thread (void)
int leader_available (void) const
 Is there any thread running as a leader?

void set_client_thread (void)
 A server thread is making a request.

void reset_client_thread (void)
 A server thread has finished is making a request.

int wait_for_event (TAO_LF_Event *event, TAO_Transport *transport, ACE_Time_Value *max_wait_time)
 Wait on the Leader/Followers loop until one event happens.

void set_client_leader_thread (void)
void reset_client_leader_thread (void)
void set_client_leader_thread (ACE_thread_t thread_ID)
int is_client_leader_thread (void) const
 checks if we are a leader thread

int elect_new_leader (void)
TAO_SYNCH_MUTEX & lock (void)
 Get a reference to the underlying mutex.

ACE_Reverse_Lock< TAO_SYNCH_MUTEX > & reverse_lock (void)
int has_clients (void) const
 Check if there are any client threads running.

ACE_Reactorreactor (void)
 Accesor to the reactor.

void no_leaders_available (void)
 Called when we are out of leaders.

Follower creation/destructions
The Leader/Followers set acts as a factory for the Follower objects. Followers are used to represent a thread blocked waiting in the Follower set.

The Leader/Followers abstraction keeps a list of the waiting followers, so it can wake up one when the leader thread stops handling events.

For performance reasons the Leader/Followers set uses a pool (or free-list) to keep Follower objects unattached to any thread. It could be tempting to use TSS to keep such followers, after all a thread can only need one such Follower object, however, that does not work with multiple Leader/Followers sets, consult this bug report for more details:

http://ace.cs.wustl.edu/bugzilla/show_bug.cgi?id=296

TAO_LF_Followerallocate_follower (void)
 Allocate a new follower to the caller.

void release_follower (TAO_LF_Follower *)
 The caller has finished using a follower.

Follower Set Operations
void add_follower (TAO_LF_Follower *follower)
 Add a new follower to the set.

void remove_follower (TAO_LF_Follower *follower)
 Removes a follower from the leader-follower set.

int follower_available (void) const
 Checks if there are any followers available.


Private Types

typedef ACE_Intrusive_List<
TAO_LF_Follower
Follower_Set
 Implement the Leader/Followers set using an intrusive list.


Private Member Functions

TAO_ORB_Core_TSS_Resourcesget_tss_resources (void) const
 Shortcut to obtain the TSS resources of the orb core.

int wait_for_client_leader_to_complete (ACE_Time_Value *max_wait_time)
 Wait for the client leader to complete.

void reset_event_loop_thread_i (TAO_ORB_Core_TSS_Resources *tss)
Follower Set Operations
int elect_new_leader_i (void)

Private Attributes

TAO_ORB_Coreorb_core_
 The orb core.

TAO_SYNCH_MUTEX lock_
 To synchronize access to the members.

ACE_Reverse_Lock< TAO_SYNCH_MUTEX > reverse_lock_
 do protect the access to the following three members

Follower_Set follower_set_
Follower_Set follower_free_list_
 Use a free list to allocate and release Follower objects.

int leaders_
int clients_
ACE_Reactorreactor_
 The reactor.

int client_thread_is_leader_
 Is a client thread the current leader?

int event_loop_threads_waiting_
 Are server threads waiting for the client leader to complete?

TAO_SYNCH_CONDITION event_loop_threads_condition_
TAO_New_Leader_Generatornew_leader_generator_

Detailed Description

TAO_Leader_Follower.

TAO_Leader_Follower

Definition at line 49 of file Leader_Follower.h.


Member Typedef Documentation

typedef ACE_Intrusive_List<TAO_LF_Follower> TAO_Leader_Follower::Follower_Set [private]
 

Implement the Leader/Followers set using an intrusive list.

Definition at line 227 of file Leader_Follower.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE TAO_Leader_Follower::TAO_Leader_Follower TAO_ORB_Core orb_core,
TAO_New_Leader_Generator new_leader_generator = 0
 

Constructor.

Definition at line 13 of file Leader_Follower.i.

00015   : orb_core_ (orb_core),
00016     reverse_lock_ (lock_),
00017     leaders_ (0),
00018     clients_ (0),
00019     reactor_ (0),
00020     client_thread_is_leader_ (0),
00021     event_loop_threads_waiting_ (0),
00022     event_loop_threads_condition_ (lock_),
00023     new_leader_generator_ (new_leader_generator)
00024 {
00025 }

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Leader_Follower::~TAO_Leader_Follower void   ) 
 

Destructor.

Definition at line 27 of file Leader_Follower.cpp.

References ACE_Intrusive_List< T >::empty(), follower_free_list_, TAO_ORB_Core::gui_resource_factory(), ACE_Intrusive_List< T >::pop_front(), TAO_Resource_Factory::reclaim_reactor(), TAO::GUIResource_Factory::reclaim_reactor(), and TAO_ORB_Core::resource_factory().

00028 {
00029   while (!this->follower_free_list_.empty ())
00030     {
00031       TAO_LF_Follower *follower =
00032         this->follower_free_list_.pop_front ();
00033       delete follower;
00034     }
00035   // Hand the reactor back to the resource factory.
00036   // use GUI reactor factory if available
00037   if ( this->orb_core_->gui_resource_factory () )
00038     this->orb_core_->gui_resource_factory ()->reclaim_reactor (this->reactor_);
00039   else
00040     this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_);
00041 
00042   this->reactor_ = 0;
00043 }


Member Function Documentation

ACE_INLINE void TAO_Leader_Follower::add_follower TAO_LF_Follower follower  ) 
 

Add a new follower to the set.

Definition at line 176 of file Leader_Follower.i.

References follower_set_, and ACE_Intrusive_List< T >::push_back().

Referenced by TAO_LF_Follower_Auto_Adder::TAO_LF_Follower_Auto_Adder().

00177 {
00178   this->follower_set_.push_back (follower);
00179 }

TAO_LF_Follower * TAO_Leader_Follower::allocate_follower void   ) 
 

Allocate a new follower to the caller.

Definition at line 46 of file Leader_Follower.cpp.

References ACE_Intrusive_List< T >::empty(), follower_free_list_, ACE_Intrusive_List< T >::pop_front(), and TAO_LF_Follower.

00047 {
00048   if (!this->follower_free_list_.empty ())
00049     return this->follower_free_list_.pop_front ();
00050 
00051   return new TAO_LF_Follower (*this);
00052 }

ACE_INLINE int TAO_Leader_Follower::elect_new_leader void   ) 
 

A leader thread is relinquishing its role, unless there are more leader threads running pick up a follower (if there is any) to play the leader role.

Definition at line 47 of file Leader_Follower.i.

References elect_new_leader_i(), event_loop_threads_condition_, event_loop_threads_waiting_, follower_available(), leaders_, and no_leaders_available().

Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread(), set_upcall_thread(), and wait_for_event().

00048 {
00049   if (this->leaders_ == 0)
00050     {
00051       if (this->event_loop_threads_waiting_)
00052         {
00053           return this->event_loop_threads_condition_.broadcast ();
00054         }
00055       else if (this->follower_available ())
00056         {
00057           return this->elect_new_leader_i ();
00058         }
00059       else
00060         {
00061           this->no_leaders_available ();
00062         }
00063     }
00064   return 0;
00065 }

int TAO_Leader_Follower::elect_new_leader_i void   )  [private]
 

This is a helper routine for elect_new_leader(), after verifying that all the pre-conditions are satisfied the Follower set is changed and the promoted Follower is signaled.

Definition at line 61 of file Leader_Follower.cpp.

References ACE_DEBUG, follower_set_, ACE_Intrusive_List< T >::head(), LM_DEBUG, and TAO_LF_Follower::signal().

Referenced by elect_new_leader().

00062 {
00063   TAO_LF_Follower* const follower =
00064     this->follower_set_.head ();
00065 
00066 #if defined (TAO_DEBUG_LEADER_FOLLOWER)
00067   ACE_DEBUG ((LM_DEBUG,
00068               "TAO (%P|%t) LF::elect_new_leader_i - "
00069               "follower is %x\n",
00070               follower));
00071 #endif /* TAO_DEBUG_LEADER_FOLLOWER */
00072 
00073   return follower->signal ();
00074 }

ACE_INLINE int TAO_Leader_Follower::follower_available void   )  const
 

Checks if there are any followers available.

Returns:
1 if there follower set is not empty

Definition at line 34 of file Leader_Follower.i.

References ACE_Intrusive_List< T >::empty(), and follower_set_.

Referenced by elect_new_leader().

00035 {
00036   return !this->follower_set_.empty ();
00037 }

ACE_INLINE TAO_ORB_Core_TSS_Resources * TAO_Leader_Follower::get_tss_resources void   )  const [private]
 

Shortcut to obtain the TSS resources of the orb core.

Definition at line 28 of file Leader_Follower.i.

References TAO_ORB_Core::get_tss_resources().

Referenced by is_client_leader_thread(), reset_client_leader_thread(), reset_client_thread(), reset_event_loop_thread(), set_client_leader_thread(), set_client_thread(), set_event_loop_thread(), and set_upcall_thread().

00029 {
00030   return this->orb_core_->get_tss_resources ();
00031 }

ACE_INLINE int TAO_Leader_Follower::has_clients void   )  const
 

Check if there are any client threads running.

Definition at line 194 of file Leader_Follower.i.

References clients_.

Referenced by TAO_Thread_Lane_Resources::shutdown_reactor().

00195 {
00196   return this->clients_;
00197 }

ACE_INLINE int TAO_Leader_Follower::is_client_leader_thread void   )  const
 

checks if we are a leader thread

Definition at line 169 of file Leader_Follower.i.

References TAO_ORB_Core_TSS_Resources::client_leader_thread_, and get_tss_resources().

00170 {
00171   TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00172   return tss->client_leader_thread_ != 0;
00173 }

ACE_INLINE int TAO_Leader_Follower::leader_available void   )  const
 

Is there any thread running as a leader?

Definition at line 145 of file Leader_Follower.i.

References leaders_.

Referenced by wait_for_event().

00146 {
00147   return this->leaders_ != 0;
00148 }

ACE_INLINE TAO_SYNCH_MUTEX & TAO_Leader_Follower::lock void   ) 
 

Get a reference to the underlying mutex.

Definition at line 125 of file Leader_Follower.i.

Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread(), TAO_LF_Strategy_Complete::set_event_loop_thread(), TAO_Thread_Lane_Resources::shutdown_reactor(), and TAO_LF_Event::state_changed().

00126 {
00127   return this->lock_;
00128 }

ACE_INLINE void TAO_Leader_Follower::no_leaders_available void   ) 
 

Called when we are out of leaders.

Definition at line 40 of file Leader_Follower.i.

References new_leader_generator_, and TAO_New_Leader_Generator::no_leaders_available().

Referenced by elect_new_leader().

00041 {
00042   if (this->new_leader_generator_)
00043     this->new_leader_generator_->no_leaders_available ();
00044 }

ACE_Reactor * TAO_Leader_Follower::reactor void   ) 
 

Accesor to the reactor.

Definition at line 123 of file Leader_Follower.cpp.

References ACE_GUARD_RETURN, TAO_Resource_Factory::get_reactor(), TAO::GUIResource_Factory::get_reactor(), TAO_ORB_Core::gui_resource_factory(), TAO_ORB_Core::resource_factory(), and TAO_SYNCH_MUTEX.

Referenced by TAO_ORB_Core::reactor(), and TAO_Thread_Lane_Resources::shutdown_reactor().

00124 {
00125   if (this->reactor_ == 0)
00126     {
00127       ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), 0);
00128       if (this->reactor_ == 0)
00129         {
00130           // use GUI reactor factory if available
00131           if ( this->orb_core_->gui_resource_factory () )
00132             this->reactor_ =
00133               this->orb_core_->gui_resource_factory ()->get_reactor ();
00134           else
00135             this->reactor_ =
00136               this->orb_core_->resource_factory ()->get_reactor ();
00137         }
00138     }
00139   return this->reactor_;
00140 }

void TAO_Leader_Follower::release_follower TAO_LF_Follower  ) 
 

The caller has finished using a follower.

Definition at line 55 of file Leader_Follower.cpp.

References follower_free_list_, and ACE_Intrusive_List< T >::push_front().

Referenced by TAO_LF_Follower_Auto_Ptr::~TAO_LF_Follower_Auto_Ptr().

00056 {
00057   this->follower_free_list_.push_front (follower);
00058 }

ACE_INLINE void TAO_Leader_Follower::remove_follower TAO_LF_Follower follower  ) 
 

Removes a follower from the leader-follower set.

Definition at line 182 of file Leader_Follower.i.

References follower_set_, and ACE_Intrusive_List< T >::remove().

Referenced by TAO_LF_Follower::signal(), and TAO_LF_Follower_Auto_Adder::~TAO_LF_Follower_Auto_Adder().

00183 {
00184   this->follower_set_.remove (follower);
00185 }

ACE_INLINE void TAO_Leader_Follower::reset_client_leader_thread void   ) 
 

The current thread is no longer the leader thread in the client side leader-follower set.

Definition at line 160 of file Leader_Follower.i.

References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, get_tss_resources(), and leaders_.

Referenced by TAO_LF_Client_Leader_Thread_Helper::~TAO_LF_Client_Leader_Thread_Helper().

00161 {
00162   TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00163   --tss->client_leader_thread_;
00164   --this->leaders_;
00165   --this->client_thread_is_leader_;
00166 }

void TAO_Leader_Follower::reset_client_thread void   ) 
 

A server thread has finished is making a request.

Definition at line 167 of file Leader_Follower.cpp.

References TAO_ORB_Core_TSS_Resources::client_leader_thread_, clients_, ACE_Reactor::end_reactor_event_loop(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), TAO_ORB_Core::has_shutdown(), leaders_, and TAO_ORB_Core::reactor().

Referenced by TAO_LF_Client_Thread_Helper::~TAO_LF_Client_Thread_Helper().

00168 {
00169   // If we were a leader thread or an event loop thread, take back
00170   // leadership.
00171   TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00172   if (tss->event_loop_thread_ ||
00173       tss->client_leader_thread_)
00174     {
00175       ++this->leaders_;
00176     }
00177 
00178   --this->clients_;
00179   if (this->clients_ == 0 &&
00180       this->orb_core_->has_shutdown ())
00181     {
00182       // The ORB has shutdown and we are the last client thread, we
00183       // must stop the reactor to ensure that any server threads go
00184       // away.
00185       this->orb_core_->reactor ()->end_reactor_event_loop ();
00186     }
00187 }

ACE_INLINE void TAO_Leader_Follower::reset_event_loop_thread void   ) 
 

The current thread is not a server thread anymore, reset any flags and counters.

Definition at line 117 of file Leader_Follower.i.

References TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), and reset_event_loop_thread_i().

Referenced by TAO_LF_Strategy_Complete::reset_event_loop_thread().

00118 {
00119   TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00120   if (tss->event_loop_thread_ > 0)
00121     this->reset_event_loop_thread_i (tss);
00122 }

ACE_INLINE void TAO_Leader_Follower::reset_event_loop_thread_i TAO_ORB_Core_TSS_Resources tss  )  [private]
 

Implement the reset_event_loop_thread() method, once the TSS resources have been acquired. Also used in the set_upcall_thread.

Definition at line 101 of file Leader_Follower.i.

References TAO_ORB_Core_TSS_Resources::client_leader_thread_, TAO_ORB_Core_TSS_Resources::event_loop_thread_, and leaders_.

Referenced by reset_event_loop_thread(), and set_upcall_thread().

00102 {
00103   // Always decrement <event_loop_thread_>. If <event_loop_thread_>
00104   // reaches 0 and we are not a client leader, we are done with our
00105   // duties of running the event loop. Therefore, decrement the
00106   // leaders.  Otherwise, we just got done with a nested call to the
00107   // event loop or a call to the event loop when we were the client
00108   // leader.
00109   --tss->event_loop_thread_;
00110 
00111   if (tss->event_loop_thread_ == 0 &&
00112       tss->client_leader_thread_ == 0)
00113     --this->leaders_;
00114 }

ACE_INLINE ACE_Reverse_Lock< TAO_SYNCH_MUTEX > & TAO_Leader_Follower::reverse_lock void   ) 
 

The Leader/Followers set mutex must be release during some long running operations. This helper class simplifies the process of releasing and reacquiring said mutex.

Definition at line 188 of file Leader_Follower.i.

00189 {
00190   return this->reverse_lock_;
00191 }

void TAO_Leader_Follower::set_client_leader_thread ACE_thread_t  thread_ID  ) 
 

sets the thread ID of the leader thread in the leader-follower model

ACE_INLINE void TAO_Leader_Follower::set_client_leader_thread void   ) 
 

The current thread has become the leader thread in the client side leader-follower set.

Definition at line 151 of file Leader_Follower.i.

References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, get_tss_resources(), and leaders_.

Referenced by TAO_LF_Client_Leader_Thread_Helper::TAO_LF_Client_Leader_Thread_Helper().

00152 {
00153   TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00154   ++this->leaders_;
00155   ++this->client_thread_is_leader_;
00156   ++tss->client_leader_thread_;
00157 }

void TAO_Leader_Follower::set_client_thread void   ) 
 

A server thread is making a request.

Definition at line 143 of file Leader_Follower.cpp.

References TAO_ORB_Core_TSS_Resources::client_leader_thread_, clients_, TAO_Resource_Factory::drop_replies_during_shutdown(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), TAO_ORB_Core::has_shutdown(), leaders_, TAO_ORB_Core::reactor(), ACE_Reactor::reset_reactor_event_loop(), and TAO_ORB_Core::resource_factory().

Referenced by TAO_LF_Client_Thread_Helper::TAO_LF_Client_Thread_Helper().

00144 {
00145   // If we were a leader thread or an event loop thread, give up
00146   // leadership.
00147   TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00148   if (tss->event_loop_thread_ ||
00149       tss->client_leader_thread_)
00150     {
00151       --this->leaders_;
00152     }
00153 
00154   if (this->clients_ == 0 &&
00155       this->orb_core_->has_shutdown () &&
00156       !this->orb_core_->resource_factory ()->drop_replies_during_shutdown ())
00157     {
00158       // The ORB has shutdown and we are the first client after
00159       // that. This means that the reactor is disabled, we must
00160       // re-enable it if we want to receive any replys...
00161       this->orb_core_->reactor ()->reset_reactor_event_loop ();
00162     }
00163   ++this->clients_;
00164 }

ACE_INLINE int TAO_Leader_Follower::set_event_loop_thread ACE_Time_Value max_wait_time  ) 
 

The current thread has become a server thread (i.e. called ORB::run), update any flags and counters.

Definition at line 68 of file Leader_Follower.i.

References TAO_ORB_Core_TSS_Resources::client_leader_thread_, client_thread_is_leader_, TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), leaders_, and wait_for_client_leader_to_complete().

Referenced by TAO_LF_Strategy_Complete::set_event_loop_thread().

00069 {
00070   TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00071 
00072   // Make sure that there is no other client thread run the show.  If
00073   // we are the client thread running the show, then it is ok.
00074   if (this->client_thread_is_leader_ &&
00075       tss->client_leader_thread_ == 0)
00076     {
00077       int result =
00078         this->wait_for_client_leader_to_complete (max_wait_time);
00079 
00080       if (result != 0)
00081         return result;
00082     }
00083 
00084   // If <event_loop_thread_> == 0 and <client_leader_thread_> == 0, we
00085   // are running the event loop for the first time.  Therefore,
00086   // increment the leaders.  Otherwise, simply increment
00087   // <event_loop_thread_> since either (a) if <event_loop_thread_> !=
00088   // 0 this is a nested call to the event loop, or (b)
00089   // <client_leader_thread_> != 0 this is a call to the event loop
00090   // while we are a client leader.
00091   if (tss->event_loop_thread_ == 0 &&
00092       tss->client_leader_thread_ == 0)
00093     ++this->leaders_;
00094 
00095   ++tss->event_loop_thread_;
00096 
00097   return 0;
00098 }

ACE_INLINE void TAO_Leader_Follower::set_upcall_thread void   ) 
 

This thread is going to perform an upcall, it will no longer be an event loop thread.

Definition at line 131 of file Leader_Follower.i.

References ACE_GUARD, elect_new_leader(), TAO_ORB_Core_TSS_Resources::event_loop_thread_, get_tss_resources(), reset_event_loop_thread_i(), and TAO_SYNCH_MUTEX.

Referenced by TAO_LF_Strategy_Complete::set_upcall_thread().

00132 {
00133   TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00134 
00135   if (tss->event_loop_thread_ > 0)
00136     {
00137       ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock ());
00138       this->reset_event_loop_thread_i (tss);
00139 
00140       this->elect_new_leader ();
00141     }
00142 }

int TAO_Leader_Follower::wait_for_client_leader_to_complete ACE_Time_Value max_wait_time  )  [private]
 

Wait for the client leader to complete.

Definition at line 77 of file Leader_Follower.cpp.

References ACE_ERROR, ACE_TEXT, client_thread_is_leader_, ETIME, event_loop_threads_condition_, event_loop_threads_waiting_, ACE_OS::gettimeofday(), LM_ERROR, and ACE_Countdown_Time::update().

Referenced by set_event_loop_thread().

00078 {
00079   int result = 0;
00080   ACE_Countdown_Time countdown (max_wait_time);
00081 
00082   // Note that we are waiting.
00083   ++this->event_loop_threads_waiting_;
00084 
00085   while (this->client_thread_is_leader_ &&
00086          result != -1)
00087     {
00088       if (max_wait_time == 0)
00089         {
00090           if (this->event_loop_threads_condition_.wait () == -1)
00091             {
00092               ACE_ERROR ((LM_ERROR,
00093                           ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ")
00094                           ACE_TEXT ("Condition variable wait failed\n")));
00095 
00096               result = -1;
00097             }
00098         }
00099       else
00100         {
00101           countdown.update ();
00102           ACE_Time_Value tv = ACE_OS::gettimeofday ();
00103           tv += *max_wait_time;
00104           if (this->event_loop_threads_condition_.wait (&tv) == -1)
00105             {
00106               if (errno != ETIME)
00107                 ACE_ERROR ((LM_ERROR,
00108                             ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ")
00109                             ACE_TEXT ("Condition variable wait failed\n")));
00110 
00111               result = -1;
00112             }
00113         }
00114     }
00115 
00116   // Reset waiting state.
00117   --this->event_loop_threads_waiting_;
00118 
00119   return result;
00120 }

int TAO_Leader_Follower::wait_for_event TAO_LF_Event event,
TAO_Transport transport,
ACE_Time_Value max_wait_time
 

Wait on the Leader/Followers loop until one event happens.

Parameters:
event The event we wait for, the loop iterates until the event is sucessful, or it fails due to timeout, and error or a connection closed.
transport The transport attached to the event
max_wait_time Limit the time spent on the loop
return Returns -1 on error, 0 or non-zero value otherwise.

Todo:
Document this better, split the Follower code to the TAO_LF_Follower class, we probably don't need the transport object.

There should be no reason to reset the value of result here. If there was an error in handle_events () that the leader saw, I (Bala) beleave it should be propogated to the clients. result = 0;

Definition at line 190 of file Leader_Follower.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_ERROR_RETURN, ACE_GUARD_RETURN, elect_new_leader(), TAO_LF_Event::error_detected(), ETIME, TAO_LF_Follower_Auto_Ptr::get(), ACE_OS::gettimeofday(), ACE_Reactor::handle_events(), TAO_Transport::id(), TAO_LF_Event::keep_waiting(), leader_available(), LM_DEBUG, LM_ERROR, ACE_Reactor::owner(), ACE_Reactor::reactor_event_loop_done(), TAO_LF_Event::set_state(), TAO_LF_Event::successful(), TAO_debug_level, TAO_SYNCH_MUTEX, and ACE_Countdown_Time::update().

Referenced by TAO_Leader_Follower_Flushing_Strategy::flush_message(), TAO_Wait_On_Leader_Follower::wait(), and TAO_LF_Connect_Strategy::wait_i().

00193 {
00194   // Obtain the lock.
00195   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
00196 
00197   ACE_Countdown_Time countdown (max_wait_time);
00198 
00199   // Optmize the first iteration [no access to errno]
00200   int result = 1;
00201 
00202   // For some cases the transport may dissappear like when waiting for
00203   // connection to be initiated or closed. So cache the id.
00204   // @@ NOTE: This is not completely safe either. We will be fine for
00205   // cases that dont access the id ie. when debug level is off but
00206   // with debugging level on we are on a sticky wicket. Hopefully none
00207   // of our users should run TAO with debugging enabled like they did
00208   // in PathFinder
00209   size_t t_id = 0;
00210 
00211   if (TAO_debug_level)
00212     {
00213       t_id = transport->id ();
00214     }
00215 
00216   {
00217     // Calls this->set_client_thread () on construction and
00218     // this->reset_client_thread () on destruction.
00219     TAO_LF_Client_Thread_Helper client_thread_helper (*this);
00220     ACE_UNUSED_ARG (client_thread_helper);
00221 
00222     // Check if there is a leader.  Note that it cannot be us since we
00223     // gave up our leadership when we became a client.
00224     if (this->leader_available ())
00225       {
00226         // = Wait as a follower.
00227 
00228         // Grab a follower:
00229         TAO_LF_Follower_Auto_Ptr follower (*this);
00230         if (follower.get () == 0)
00231           return -1;
00232 
00233         if (TAO_debug_level >= 5)
00234           ACE_DEBUG ((LM_DEBUG,
00235                       "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00236                       " (follower), cond <%x>\n",
00237                       t_id, follower.get ()));
00238 
00239         // Bound the follower and the LF_Event, this is important to
00240         // get a signal when the event terminates
00241         TAO_LF_Event_Binder event_binder (event, follower.get ());
00242 
00243         while (event->keep_waiting () &&
00244                this->leader_available ())
00245           {
00246             // Add ourselves to the list, do it everytime we wake up
00247             // from the CV loop. Because:
00248             //
00249             // - The leader thread could have elected us as the new
00250             // leader.
00251             // - Before we can assume the role another thread becomes
00252             // the leader
00253             // - But our condition variable could have been removed
00254             // already, if we don't add it again we will never wake
00255             // up.
00256             //
00257             // Notice that we can have spurious wake ups, in that case
00258             // adding the leader results in an error, that must be
00259             // ignored.
00260             // You may be thinking of not removing the condition
00261             // variable in the code that sends the signal, but
00262             // removing it here, that does not work either, in that
00263             // case the condition variable may be used twice:
00264             //
00265             //  - Wake up because its reply arrived
00266             //  - Wake up because it must become the leader
00267             //
00268             // but only the first one has any effect, so the leader is
00269             // lost.
00270             //
00271 
00272             TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
00273 
00274             if (max_wait_time == 0)
00275               {
00276                 if (follower->wait (max_wait_time) == -1)
00277                   {
00278                     if (TAO_debug_level >= 5)
00279                       ACE_DEBUG ((LM_DEBUG,
00280                                   "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00281                                   " (follower) [no timer, cond failed]\n",
00282                                   t_id));
00283 
00284                     // @@ Michael: What is our error handling in this case?
00285                     //             We could be elected as leader and
00286                     //             no leader would come in?
00287                     return -1;
00288                   }
00289               }
00290             else
00291               {
00292                 countdown.update ();
00293                 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00294                 tv += *max_wait_time;
00295                 if (follower->wait (&tv) == -1)
00296                   {
00297                     if (TAO_debug_level >= 5)
00298                       ACE_DEBUG ((LM_DEBUG,
00299                                   "TAO (%P|%t) - Leader_Follower[%d]::wait, "
00300                                   "(follower) [has timer, follower failed]\n",
00301                                   t_id ));
00302 
00303                     // If we have timedout set the state in the
00304                     // LF_Event. We call the non-locking,
00305                     // no-signalling method on LF_Event.
00306                     if (errno == ETIME)
00307                         // We have timedout
00308                         event->set_state (TAO_LF_Event::LFS_TIMEOUT);
00309 
00310                     if (!event->successful ())
00311                       {
00312                         // Remove follower can fail because either
00313                         // 1) the condition was satisfied (i.e. reply
00314                         //    received or queue drained), or
00315                         // 2) somebody elected us as leader, or
00316                         // 3) the connection got closed.
00317                         //
00318                         // Therefore:
00319                         // If remove_follower fails and the condition
00320                         // was not satisfied, we know that we got
00321                         // elected as a leader.
00322                         // But we got a timeout, so we cannot become
00323                         // the leader, therefore, we have to select a
00324                         // new leader.
00325                         //
00326 
00327                         if (this->elect_new_leader () == -1
00328                             && TAO_debug_level > 0)
00329                           {
00330                             ACE_ERROR ((LM_ERROR,
00331                                         "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00332                                         "elect_new_leader failed\n",
00333                                         t_id ));
00334                           }
00335                       }
00336 
00337 
00338                     return -1;
00339                   }
00340               }
00341           }
00342 
00343         countdown.update ();
00344 
00345         // @@ Michael: This is an old comment why we do not want to
00346         //             remove the follower here.
00347         // We should not remove the follower here, we *must* remove it when
00348         // we signal it so the same condition is not signalled for
00349         // both wake up as a follower and as the next leader.
00350 
00351         if (TAO_debug_level >= 5)
00352           ACE_DEBUG ((LM_DEBUG,
00353                       "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00354                       " done (follower), successful %d\n",
00355                       t_id,
00356                       event->successful ()));
00357 
00358         // Now somebody woke us up to become a leader or to handle our
00359         // input. We are already removed from the follower queue.
00360 
00361         if (event->successful ())
00362           return 0;
00363 
00364         if (event->error_detected ())
00365           return -1;
00366 
00367         // FALLTHROUGH
00368         // We only get here if we woke up but the reply is not
00369         // complete yet, time to assume the leader role....
00370         // i.e. ACE_ASSERT (event->successful () == 0);
00371       }
00372 
00373     // = Leader Code.
00374 
00375     // The only way to reach this point is if we must become the
00376     // leader, because there is no leader or we have to update to a
00377     // leader or we are doing nested upcalls in this case we do
00378     // increase the refcount on the leader in TAO_ORB_Core.
00379 
00380     // Calls this->set_client_leader_thread () on
00381     // construction and this->reset_client_leader_thread ()
00382     // on destruction.  Note that this may increase the refcount of
00383     // the leader.
00384     TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
00385     ACE_UNUSED_ARG (client_leader_thread_helper);
00386 
00387     {
00388       ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
00389                         this->reverse_lock (), -1);
00390 
00391       // Become owner of the reactor.
00392       ACE_Reactor *reactor = this->reactor_;
00393       reactor->owner (ACE_Thread::self ());
00394 
00395       // Run the reactor event loop.
00396 
00397       if (TAO_debug_level >= 5)
00398         ACE_DEBUG ((LM_DEBUG,
00399                     "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00400                     " (leader) enter reactor event loop\n",
00401                     t_id));
00402 
00403       // If we got our event, no need to run the event loop any
00404       // further.
00405       while (event->keep_waiting ())
00406         {
00407           // Run the event loop.
00408           result = reactor->handle_events (max_wait_time);
00409 
00410           // Did we timeout? If so, stop running the loop.
00411           if (result == 0 &&
00412               max_wait_time != 0 &&
00413               *max_wait_time == ACE_Time_Value::zero)
00414             break;
00415 
00416           // Other errors? If so, stop running the loop.
00417           if (result == -1)
00418             break;
00419 
00420           // Otherwise, keep going...
00421         }
00422 
00423       if (TAO_debug_level >= 5)
00424         ACE_DEBUG ((LM_DEBUG,
00425                     "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00426                     " (leader) exit reactor event loop\n",
00427                     t_id));
00428     }
00429   }
00430   //
00431   // End artificial scope for auto_ptr like helpers calling:
00432   // this->reset_client_thread () and (maybe)
00433   // this->reset_client_leader_thread ().
00434   //
00435 
00436   // Wake up the next leader, we cannot do that in handle_input,
00437   // because the woken up thread would try to get into handle_events,
00438   // which is at the time in handle_input still occupied. But do it
00439   // before checking the error in <result>, even if there is an error
00440   // in our input we should continue running the loop in another
00441   // thread.
00442 
00443   if (this->elect_new_leader () == -1)
00444     ACE_ERROR_RETURN ((LM_ERROR,
00445                        "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00446                        " failed to elect new leader\n",
00447                        t_id),
00448                       -1);
00449 
00450   if (result == -1 && !this->reactor_->reactor_event_loop_done ())
00451     ACE_ERROR_RETURN ((LM_ERROR,
00452                        "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00453                        " handle_events failed\n",
00454                        t_id),
00455                       -1);
00456 
00457   // Return an error if there was a problem receiving the reply...
00458   if (max_wait_time != 0)
00459     {
00460       if (!event->successful ()
00461           && *max_wait_time == ACE_Time_Value::zero)
00462         {
00463           result = -1;
00464           errno = ETIME;
00465         }
00466       else if (event->error_detected ())
00467         {
00468           // If the time did not expire yet, but we get a failure,
00469           // e.g. the connections closed, we should still return an error.
00470           result = -1;
00471         }
00472     }
00473   else
00474     {
00475       /**
00476        * There should be no reason to reset the value of result
00477        * here. If there was an error in handle_events () that the
00478        * leader saw, I (Bala) beleave it should be propogated to the
00479        * clients.
00480        * result = 0;
00481        */
00482       if (event->error_detected ())
00483         {
00484           result = -1;
00485         }
00486     }
00487 
00488   return result;
00489 }


Member Data Documentation

int TAO_Leader_Follower::client_thread_is_leader_ [private]
 

Is a client thread the current leader?

Definition at line 249 of file Leader_Follower.h.

Referenced by reset_client_leader_thread(), set_client_leader_thread(), set_event_loop_thread(), and wait_for_client_leader_to_complete().

int TAO_Leader_Follower::clients_ [private]
 

Count the number of active clients, this is useful to know when to deactivate the reactor

Definition at line 243 of file Leader_Follower.h.

Referenced by has_clients(), reset_client_thread(), and set_client_thread().

TAO_SYNCH_CONDITION TAO_Leader_Follower::event_loop_threads_condition_ [private]
 

Condition variable for server threads waiting for the client leader to complete.

Definition at line 256 of file Leader_Follower.h.

Referenced by elect_new_leader(), and wait_for_client_leader_to_complete().

int TAO_Leader_Follower::event_loop_threads_waiting_ [private]
 

Are server threads waiting for the client leader to complete?

Definition at line 252 of file Leader_Follower.h.

Referenced by elect_new_leader(), and wait_for_client_leader_to_complete().

Follower_Set TAO_Leader_Follower::follower_free_list_ [private]
 

Use a free list to allocate and release Follower objects.

Definition at line 231 of file Leader_Follower.h.

Referenced by allocate_follower(), release_follower(), and ~TAO_Leader_Follower().

Follower_Set TAO_Leader_Follower::follower_set_ [private]
 

Definition at line 228 of file Leader_Follower.h.

Referenced by add_follower(), elect_new_leader_i(), follower_available(), and remove_follower().

int TAO_Leader_Follower::leaders_ [private]
 

Count the number of active leaders. There could be many leaders in the thread pool (i.e. calling ORB::run), and the same leader could show up multiple times as it receives nested upcalls and sends more requests.

Definition at line 239 of file Leader_Follower.h.

Referenced by elect_new_leader(), leader_available(), reset_client_leader_thread(), reset_client_thread(), reset_event_loop_thread_i(), set_client_leader_thread(), set_client_thread(), and set_event_loop_thread().

TAO_SYNCH_MUTEX TAO_Leader_Follower::lock_ [private]
 

To synchronize access to the members.

Definition at line 221 of file Leader_Follower.h.

TAO_New_Leader_Generator* TAO_Leader_Follower::new_leader_generator_ [private]
 

Leader/Follower class uses this method to notify the system that we are out of leaders.

Definition at line 260 of file Leader_Follower.h.

Referenced by no_leaders_available().

TAO_ORB_Core* TAO_Leader_Follower::orb_core_ [private]
 

The orb core.

Definition at line 218 of file Leader_Follower.h.

ACE_Reactor* TAO_Leader_Follower::reactor_ [private]
 

The reactor.

Definition at line 246 of file Leader_Follower.h.

ACE_Reverse_Lock<TAO_SYNCH_MUTEX> TAO_Leader_Follower::reverse_lock_ [private]
 

do protect the access to the following three members

Definition at line 224 of file Leader_Follower.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 12:21:57 2006 for TAO by doxygen 1.3.6