Leader_Follower.cpp

Go to the documentation of this file.
00001 // $Id: Leader_Follower.cpp 79159 2007-08-01 16:41:24Z wilsond $
00002 
00003 #include "ace/Countdown_Time.h"
00004 #include "ace/OS_NS_sys_time.h"
00005 #include "ace/Reactor.h"
00006 
00007 #include "tao/Leader_Follower.h"
00008 #include "tao/LF_Follower_Auto_Ptr.h"
00009 #include "tao/LF_Follower_Auto_Adder.h"
00010 #include "tao/LF_Event_Binder.h"
00011 #include "tao/debug.h"
00012 #include "tao/Transport.h"
00013 #include "tao/GUIResource_Factory.h"
00014 #include "tao/ORB_Core.h"
00015 
00016 #if !defined (__ACE_INLINE__)
00017 # include "tao/Leader_Follower.inl"
00018 #endif /* ! __ACE_INLINE__ */
00019 
00020 ACE_RCSID (tao,
00021            Leader_Follower,
00022            "$Id: Leader_Follower.cpp 79159 2007-08-01 16:41:24Z wilsond $")
00023 
00024 
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026 
00027 TAO_Leader_Follower::~TAO_Leader_Follower (void)
00028 {
00029   while (!this->follower_free_list_.empty ())
00030     {
00031       TAO_LF_Follower *follower =
00032         this->follower_free_list_.pop_front ();
00033       delete follower;
00034     }
00035   // Hand the reactor back to the resource factory.
00036   // use GUI reactor factory if available
00037   if ( this->orb_core_->gui_resource_factory () )
00038     this->orb_core_->gui_resource_factory ()->reclaim_reactor (this->reactor_);
00039   else
00040     this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_);
00041 
00042   this->reactor_ = 0;
00043 }
00044 
00045 TAO_LF_Follower *
00046 TAO_Leader_Follower::allocate_follower (void)
00047 {
00048   if (!this->follower_free_list_.empty ())
00049     return this->follower_free_list_.pop_front ();
00050 
00051   TAO_LF_Follower* ptr = 0;
00052   ACE_NEW_RETURN (ptr,
00053                   TAO_LF_Follower (*this),
00054                   0);
00055   return ptr;
00056 }
00057 
00058 void
00059 TAO_Leader_Follower::release_follower (TAO_LF_Follower *follower)
00060 {
00061   this->follower_free_list_.push_front (follower);
00062 }
00063 
00064 int
00065 TAO_Leader_Follower::elect_new_leader_i (void)
00066 {
00067   TAO_LF_Follower* const follower =
00068     this->follower_set_.head ();
00069 
00070 #if defined (TAO_DEBUG_LEADER_FOLLOWER)
00071   ACE_DEBUG ((LM_DEBUG,
00072               "TAO (%P|%t) LF::elect_new_leader_i - "
00073               "follower is %x\n",
00074               follower));
00075 #endif /* TAO_DEBUG_LEADER_FOLLOWER */
00076 
00077   return follower->signal ();
00078 }
00079 
00080 int
00081 TAO_Leader_Follower::wait_for_client_leader_to_complete (ACE_Time_Value *max_wait_time)
00082 {
00083   int result = 0;
00084   ACE_Countdown_Time countdown (max_wait_time);
00085 
00086   // Note that we are waiting.
00087   ++this->event_loop_threads_waiting_;
00088 
00089   while (this->client_thread_is_leader_ &&
00090          result != -1)
00091     {
00092       if (max_wait_time == 0)
00093         {
00094           if (this->event_loop_threads_condition_.wait () == -1)
00095             {
00096               ACE_ERROR ((LM_ERROR,
00097                           ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ")
00098                           ACE_TEXT ("Condition variable wait failed\n")));
00099 
00100               result = -1;
00101             }
00102         }
00103       else
00104         {
00105           countdown.update ();
00106           ACE_Time_Value tv = ACE_OS::gettimeofday ();
00107           tv += *max_wait_time;
00108           if (this->event_loop_threads_condition_.wait (&tv) == -1)
00109             {
00110               if (errno != ETIME)
00111                 ACE_ERROR ((LM_ERROR,
00112                             ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ")
00113                             ACE_TEXT ("Condition variable wait failed\n")));
00114 
00115               result = -1;
00116             }
00117         }
00118     }
00119 
00120   // Reset waiting state.
00121   --this->event_loop_threads_waiting_;
00122 
00123   return result;
00124 }
00125 
00126 ACE_Reactor *
00127 TAO_Leader_Follower::reactor (void)
00128 {
00129   if (this->reactor_ == 0)
00130     {
00131       ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), 0);
00132       if (this->reactor_ == 0)
00133         {
00134           // use GUI reactor factory if available
00135           if ( this->orb_core_->gui_resource_factory () )
00136             this->reactor_ =
00137               this->orb_core_->gui_resource_factory ()->get_reactor ();
00138           else
00139             this->reactor_ =
00140               this->orb_core_->resource_factory ()->get_reactor ();
00141         }
00142     }
00143   return this->reactor_;
00144 }
00145 
00146 void
00147 TAO_Leader_Follower::set_client_thread (void)
00148 {
00149   // If we were a leader thread or an event loop thread, give up
00150   // leadership.
00151   TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00152   if (tss->event_loop_thread_ ||
00153       tss->client_leader_thread_)
00154     {
00155       --this->leaders_;
00156     }
00157 
00158   if (this->clients_ == 0 &&
00159       this->orb_core_->has_shutdown () &&
00160       !this->orb_core_->resource_factory ()->drop_replies_during_shutdown ())
00161     {
00162       // The ORB has shutdown and we are the first client after
00163       // that. This means that the reactor is disabled, we must
00164       // re-enable it if we want to receive any replys...
00165       this->orb_core_->reactor ()->reset_reactor_event_loop ();
00166     }
00167   ++this->clients_;
00168 }
00169 
00170 void
00171 TAO_Leader_Follower::reset_client_thread (void)
00172 {
00173   // If we were a leader thread or an event loop thread, take back
00174   // leadership.
00175   TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00176   if (tss->event_loop_thread_ ||
00177       tss->client_leader_thread_)
00178     {
00179       ++this->leaders_;
00180     }
00181 
00182   --this->clients_;
00183   if (this->clients_ == 0 &&
00184       this->orb_core_->has_shutdown ())
00185     {
00186       // The ORB has shutdown and we are the last client thread, we
00187       // must stop the reactor to ensure that any server threads go
00188       // away.
00189       this->orb_core_->reactor ()->end_reactor_event_loop ();
00190     }
00191 }
00192 
00193 int
00194 TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
00195                                      TAO_Transport *transport,
00196                                      ACE_Time_Value *max_wait_time)
00197 {
00198   // Obtain the lock.
00199   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
00200 
00201   ACE_Countdown_Time countdown (max_wait_time);
00202 
00203   // Optmize the first iteration [no access to errno]
00204   int result = 1;
00205 
00206   // For some cases the transport may dissappear like when waiting for
00207   // connection to be initiated or closed. So cache the id.
00208   // @@ NOTE: This is not completely safe either. We will be fine for
00209   // cases that dont access the id ie. when debug level is off but
00210   // with debugging level on we are on a sticky wicket. Hopefully none
00211   // of our users should run TAO with debugging enabled like they did
00212   // in PathFinder
00213   size_t t_id = 0;
00214 
00215   if (TAO_debug_level && transport != 0)
00216     {
00217       t_id = transport->id ();
00218     }
00219 
00220   {
00221     // Calls this->set_client_thread () on construction and
00222     // this->reset_client_thread () on destruction.
00223     TAO_LF_Client_Thread_Helper client_thread_helper (*this);
00224     ACE_UNUSED_ARG (client_thread_helper);
00225 
00226     // Check if there is a leader.  Note that it cannot be us since we
00227     // gave up our leadership when we became a client.
00228     if (this->leader_available ())
00229       {
00230         // = Wait as a follower.
00231 
00232         // Grab a follower:
00233         TAO_LF_Follower_Auto_Ptr follower (*this);
00234         if (follower.get () == 0)
00235           return -1;
00236 
00237         if (TAO_debug_level >= 5)
00238           ACE_DEBUG ((LM_DEBUG,
00239                       "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00240                       " (follower), cond <%x>\n",
00241                       t_id, follower.get ()));
00242 
00243         // Bound the follower and the LF_Event, this is important to
00244         // get a signal when the event terminates
00245         TAO_LF_Event_Binder event_binder (event, follower.get ());
00246 
00247         while (event->keep_waiting () &&
00248                this->leader_available ())
00249           {
00250             // Add ourselves to the list, do it everytime we wake up
00251             // from the CV loop. Because:
00252             //
00253             // - The leader thread could have elected us as the new
00254             // leader.
00255             // - Before we can assume the role another thread becomes
00256             // the leader
00257             // - But our condition variable could have been removed
00258             // already, if we don't add it again we will never wake
00259             // up.
00260             //
00261             // Notice that we can have spurious wake ups, in that case
00262             // adding the leader results in an error, that must be
00263             // ignored.
00264             // You may be thinking of not removing the condition
00265             // variable in the code that sends the signal, but
00266             // removing it here, that does not work either, in that
00267             // case the condition variable may be used twice:
00268             //
00269             //  - Wake up because its reply arrived
00270             //  - Wake up because it must become the leader
00271             //
00272             // but only the first one has any effect, so the leader is
00273             // lost.
00274             //
00275 
00276             TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
00277 
00278             if (max_wait_time == 0)
00279               {
00280                 if (follower->wait (max_wait_time) == -1)
00281                   {
00282                     if (TAO_debug_level >= 5)
00283                       ACE_DEBUG ((LM_DEBUG,
00284                                   "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00285                                   " (follower) [no timer, cond failed]\n",
00286                                   t_id));
00287 
00288                     // @@ Michael: What is our error handling in this case?
00289                     //             We could be elected as leader and
00290                     //             no leader would come in?
00291                     return -1;
00292                   }
00293               }
00294             else
00295               {
00296                 countdown.update ();
00297                 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00298                 tv += *max_wait_time;
00299                 if (follower->wait (&tv) == -1)
00300                   {
00301                     if (TAO_debug_level >= 5)
00302                       ACE_DEBUG ((LM_DEBUG,
00303                                   "TAO (%P|%t) - Leader_Follower[%d]::wait, "
00304                                   "(follower) [has timer, follower failed]\n",
00305                                   t_id ));
00306 
00307                     // If we have timedout set the state in the
00308                     // LF_Event. We call the non-locking,
00309                     // no-signalling method on LF_Event.
00310                     if (errno == ETIME)
00311                         // We have timedout
00312                         event->set_state (TAO_LF_Event::LFS_TIMEOUT);
00313 
00314                     if (!event->successful ())
00315                       {
00316                         // Remove follower can fail because either
00317                         // 1) the condition was satisfied (i.e. reply
00318                         //    received or queue drained), or
00319                         // 2) somebody elected us as leader, or
00320                         // 3) the connection got closed.
00321                         //
00322                         // Therefore:
00323                         // If remove_follower fails and the condition
00324                         // was not satisfied, we know that we got
00325                         // elected as a leader.
00326                         // But we got a timeout, so we cannot become
00327                         // the leader, therefore, we have to select a
00328                         // new leader.
00329                         //
00330 
00331                         if (this->elect_new_leader () == -1
00332                             && TAO_debug_level > 0)
00333                           {
00334                             ACE_ERROR ((LM_ERROR,
00335                                         "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00336                                         "elect_new_leader failed\n",
00337                                         t_id ));
00338                           }
00339                       }
00340 
00341 
00342                     return -1;
00343                   }
00344               }
00345           }
00346 
00347         countdown.update ();
00348 
00349         // @@ Michael: This is an old comment why we do not want to
00350         //             remove the follower here.
00351         // We should not remove the follower here, we *must* remove it when
00352         // we signal it so the same condition is not signalled for
00353         // both wake up as a follower and as the next leader.
00354 
00355         if (TAO_debug_level >= 5)
00356           ACE_DEBUG ((LM_DEBUG,
00357                       "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00358                       " done (follower), successful %d\n",
00359                       t_id,
00360                       event->successful ()));
00361 
00362         // Now somebody woke us up to become a leader or to handle our
00363         // input. We are already removed from the follower queue.
00364 
00365         if (event->successful ())
00366           return 0;
00367 
00368         if (event->error_detected ())
00369           return -1;
00370 
00371         // FALLTHROUGH
00372         // We only get here if we woke up but the reply is not
00373         // complete yet, time to assume the leader role....
00374         // i.e. ACE_ASSERT (event->successful () == 0);
00375       }
00376 
00377     // = Leader Code.
00378 
00379     // The only way to reach this point is if we must become the
00380     // leader, because there is no leader or we have to update to a
00381     // leader or we are doing nested upcalls in this case we do
00382     // increase the refcount on the leader in TAO_ORB_Core.
00383 
00384     // Calls this->set_client_leader_thread () on
00385     // construction and this->reset_client_leader_thread ()
00386     // on destruction.  Note that this may increase the refcount of
00387     // the leader.
00388     TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
00389     ACE_UNUSED_ARG (client_leader_thread_helper);
00390 
00391     {
00392       ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
00393                         this->reverse_lock (), -1);
00394 
00395       // Become owner of the reactor.
00396       ACE_Reactor *reactor = this->reactor_;
00397       reactor->owner (ACE_Thread::self ());
00398 
00399       // Run the reactor event loop.
00400 
00401       if (TAO_debug_level >= 5)
00402         ACE_DEBUG ((LM_DEBUG,
00403                     "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00404                     " (leader) enter reactor event loop\n",
00405                     t_id));
00406 
00407       // If we got our event, no need to run the event loop any
00408       // further.
00409       while (event->keep_waiting ())
00410         {
00411           // Run the event loop.
00412           result = reactor->handle_events (max_wait_time);
00413 
00414           // Did we timeout? If so, stop running the loop.
00415           if (result == 0 &&
00416               max_wait_time != 0 &&
00417               *max_wait_time == ACE_Time_Value::zero)
00418             break;
00419 
00420           // Other errors? If so, stop running the loop.
00421           if (result == -1)
00422             break;
00423 
00424           // Otherwise, keep going...
00425         }
00426 
00427       if (TAO_debug_level >= 5)
00428         ACE_DEBUG ((LM_DEBUG,
00429                     "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00430                     " (leader) exit reactor event loop\n",
00431                     t_id));
00432     }
00433   }
00434   //
00435   // End artificial scope for auto_ptr like helpers calling:
00436   // this->reset_client_thread () and (maybe)
00437   // this->reset_client_leader_thread ().
00438   //
00439 
00440   // Wake up the next leader, we cannot do that in handle_input,
00441   // because the woken up thread would try to get into handle_events,
00442   // which is at the time in handle_input still occupied. But do it
00443   // before checking the error in <result>, even if there is an error
00444   // in our input we should continue running the loop in another
00445   // thread.
00446 
00447   if (this->elect_new_leader () == -1)
00448     ACE_ERROR_RETURN ((LM_ERROR,
00449                        "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00450                        " failed to elect new leader\n",
00451                        t_id),
00452                       -1);
00453 
00454   if (result == -1 && !this->reactor_->reactor_event_loop_done ())
00455     ACE_ERROR_RETURN ((LM_ERROR,
00456                        "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00457                        " handle_events failed\n",
00458                        t_id),
00459                       -1);
00460 
00461   // Return an error if there was a problem receiving the reply...
00462   if (max_wait_time != 0)
00463     {
00464       if (!event->successful ()
00465           && *max_wait_time == ACE_Time_Value::zero)
00466         {
00467           result = -1;
00468           errno = ETIME;
00469         }
00470       else if (event->error_detected ())
00471         {
00472           // If the time did not expire yet, but we get a failure,
00473           // e.g. the connections closed, we should still return an error.
00474           result = -1;
00475         }
00476     }
00477   else
00478     {
00479       /**
00480        * There should be no reason to reset the value of result
00481        * here. If there was an error in handle_events () that the
00482        * leader saw, I (Bala) beleave it should be propogated to the
00483        * clients.
00484        * result = 0;
00485        */
00486       if (event->error_detected ())
00487         {
00488           result = -1;
00489         }
00490     }
00491 
00492   return result;
00493 }
00494 
00495 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:37:52 2010 for TAO by  doxygen 1.4.7