00001
00002
00003 #include "ace/Countdown_Time.h"
00004 #include "ace/OS_NS_sys_time.h"
00005 #include "ace/Reactor.h"
00006
00007 #include "tao/Leader_Follower.h"
00008 #include "tao/LF_Follower_Auto_Ptr.h"
00009 #include "tao/LF_Follower_Auto_Adder.h"
00010 #include "tao/LF_Event_Binder.h"
00011 #include "tao/debug.h"
00012 #include "tao/Transport.h"
00013 #include "tao/GUIResource_Factory.h"
00014 #include "tao/ORB_Core.h"
00015
00016 #if !defined (__ACE_INLINE__)
00017 # include "tao/Leader_Follower.inl"
00018 #endif
00019
00020 ACE_RCSID (tao,
00021 Leader_Follower,
00022 "$Id: Leader_Follower.cpp 85406 2009-05-20 09:07:56Z johnnyw $")
00023
00024
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 TAO_Leader_Follower::~TAO_Leader_Follower (void)
00028 {
00029 while (!this->follower_free_list_.empty ())
00030 {
00031 TAO_LF_Follower *follower = this->follower_free_list_.pop_front ();
00032 delete follower;
00033 }
00034
00035
00036 if ( this->orb_core_->gui_resource_factory () )
00037 this->orb_core_->gui_resource_factory ()->reclaim_reactor (this->reactor_);
00038 else
00039 this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_);
00040
00041 this->reactor_ = 0;
00042 }
00043
00044 TAO_LF_Follower *
00045 TAO_Leader_Follower::allocate_follower (void)
00046 {
00047 if (!this->follower_free_list_.empty ())
00048 return this->follower_free_list_.pop_front ();
00049
00050 TAO_LF_Follower* ptr = 0;
00051 ACE_NEW_RETURN (ptr,
00052 TAO_LF_Follower (*this),
00053 0);
00054 return ptr;
00055 }
00056
00057 void
00058 TAO_Leader_Follower::release_follower (TAO_LF_Follower *follower)
00059 {
00060 this->follower_free_list_.push_front (follower);
00061 }
00062
00063 int
00064 TAO_Leader_Follower::elect_new_leader_i (void)
00065 {
00066 TAO_LF_Follower* const follower = this->follower_set_.head ();
00067
00068 #if defined (TAO_DEBUG_LEADER_FOLLOWER)
00069 ACE_DEBUG ((LM_DEBUG,
00070 "TAO (%P|%t) LF::elect_new_leader_i - "
00071 "follower is %x\n",
00072 follower));
00073 #endif
00074
00075 return follower->signal ();
00076 }
00077
00078 int
00079 TAO_Leader_Follower::wait_for_client_leader_to_complete (ACE_Time_Value *max_wait_time)
00080 {
00081 int result = 0;
00082 ACE_Countdown_Time countdown (max_wait_time);
00083
00084
00085 ++this->event_loop_threads_waiting_;
00086
00087 while (this->client_thread_is_leader_ && result != -1)
00088 {
00089 if (max_wait_time == 0)
00090 {
00091 if (this->event_loop_threads_condition_.wait () == -1)
00092 {
00093 ACE_ERROR ((LM_ERROR,
00094 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::")
00095 ACE_TEXT ("wait_for_client_leader_to_complete - ")
00096 ACE_TEXT ("Condition variable wait failed\n")));
00097
00098 result = -1;
00099 }
00100 }
00101 else
00102 {
00103 countdown.update ();
00104 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00105 tv += *max_wait_time;
00106 if (this->event_loop_threads_condition_.wait (&tv) == -1)
00107 {
00108 if (errno != ETIME)
00109 ACE_ERROR ((LM_ERROR,
00110 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::")
00111 ACE_TEXT ("wait_for_client_leader_to_complete - ")
00112 ACE_TEXT ("Condition variable wait failed\n")));
00113
00114 result = -1;
00115 }
00116 }
00117 }
00118
00119
00120 --this->event_loop_threads_waiting_;
00121
00122 return result;
00123 }
00124
00125 ACE_Reactor *
00126 TAO_Leader_Follower::reactor (void)
00127 {
00128 if (this->reactor_ == 0)
00129 {
00130 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), 0);
00131 if (this->reactor_ == 0)
00132 {
00133
00134 if ( this->orb_core_->gui_resource_factory () )
00135 this->reactor_ =
00136 this->orb_core_->gui_resource_factory ()->get_reactor ();
00137 else
00138 this->reactor_ =
00139 this->orb_core_->resource_factory ()->get_reactor ();
00140 }
00141 }
00142 return this->reactor_;
00143 }
00144
00145 void
00146 TAO_Leader_Follower::set_client_thread (void)
00147 {
00148
00149
00150 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00151 if (tss->event_loop_thread_ ||
00152 tss->client_leader_thread_)
00153 {
00154 --this->leaders_;
00155 }
00156
00157 if (this->clients_ == 0 &&
00158 this->orb_core_->has_shutdown () &&
00159 !this->orb_core_->resource_factory ()->drop_replies_during_shutdown ())
00160 {
00161
00162
00163
00164 this->orb_core_->reactor ()->reset_reactor_event_loop ();
00165 }
00166 ++this->clients_;
00167 }
00168
00169 void
00170 TAO_Leader_Follower::reset_client_thread (void)
00171 {
00172
00173
00174 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00175 if (tss->event_loop_thread_ ||
00176 tss->client_leader_thread_)
00177 {
00178 ++this->leaders_;
00179 }
00180
00181 --this->clients_;
00182 if (this->clients_ == 0 &&
00183 this->orb_core_->has_shutdown ())
00184 {
00185
00186
00187
00188 this->orb_core_->reactor ()->end_reactor_event_loop ();
00189 }
00190 }
00191
00192 int
00193 TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
00194 TAO_Transport *transport,
00195 ACE_Time_Value *max_wait_time)
00196 {
00197
00198 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
00199
00200 ACE_Countdown_Time countdown (max_wait_time);
00201
00202
00203 int result = 1;
00204
00205
00206
00207
00208
00209
00210
00211
00212 size_t t_id = 0;
00213
00214 if (TAO_debug_level && transport != 0)
00215 {
00216 t_id = transport->id ();
00217 }
00218
00219 {
00220
00221
00222 TAO_LF_Client_Thread_Helper client_thread_helper (*this);
00223 ACE_UNUSED_ARG (client_thread_helper);
00224
00225
00226
00227 if (this->leader_available ())
00228 {
00229
00230
00231
00232 TAO_LF_Follower_Auto_Ptr follower (*this);
00233 if (follower.get () == 0)
00234 return -1;
00235
00236 if (TAO_debug_level >= 5)
00237 ACE_DEBUG ((LM_DEBUG,
00238 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00239 " (follower), cond <%x>\n",
00240 t_id, follower.get ()));
00241
00242
00243
00244 TAO_LF_Event_Binder event_binder (event, follower.get ());
00245
00246 while (event->keep_waiting () &&
00247 this->leader_available ())
00248 {
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275 TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
00276
00277 if (max_wait_time == 0)
00278 {
00279 if (follower->wait (max_wait_time) == -1)
00280 {
00281 if (TAO_debug_level >= 5)
00282 ACE_DEBUG ((LM_DEBUG,
00283 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00284 " (follower) [no timer, cond failed]\n",
00285 t_id));
00286
00287
00288
00289
00290 return -1;
00291 }
00292 }
00293 else
00294 {
00295 countdown.update ();
00296 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00297 tv += *max_wait_time;
00298 if (follower->wait (&tv) == -1)
00299 {
00300 if (TAO_debug_level >= 5)
00301 ACE_DEBUG ((LM_DEBUG,
00302 "TAO (%P|%t) - Leader_Follower[%d]::wait, "
00303 "(follower) [has timer, follower failed]\n",
00304 t_id ));
00305
00306
00307
00308
00309 if (errno == ETIME)
00310
00311 event->set_state (TAO_LF_Event::LFS_TIMEOUT);
00312
00313 if (!event->successful ())
00314 {
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330 if (this->elect_new_leader () == -1
00331 && TAO_debug_level > 0)
00332 {
00333 ACE_ERROR ((LM_ERROR,
00334 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00335 "elect_new_leader failed\n",
00336 t_id ));
00337 }
00338 }
00339
00340
00341 return -1;
00342 }
00343 }
00344 }
00345
00346 countdown.update ();
00347
00348
00349
00350
00351
00352
00353
00354 if (TAO_debug_level >= 5)
00355 ACE_DEBUG ((LM_DEBUG,
00356 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00357 " done (follower), successful %d\n",
00358 t_id,
00359 event->successful ()));
00360
00361
00362
00363
00364 if (event->successful ())
00365 return 0;
00366
00367 if (event->error_detected ())
00368 return -1;
00369
00370
00371
00372
00373
00374 }
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387 TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
00388 ACE_UNUSED_ARG (client_leader_thread_helper);
00389
00390 {
00391 ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
00392 this->reverse_lock (), -1);
00393
00394
00395 ACE_Reactor *reactor = this->reactor_;
00396 reactor->owner (ACE_Thread::self ());
00397
00398
00399
00400 if (TAO_debug_level >= 5)
00401 ACE_DEBUG ((LM_DEBUG,
00402 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00403 " (leader) enter reactor event loop\n",
00404 t_id));
00405
00406
00407
00408 while (event->keep_waiting ())
00409 {
00410
00411 result = reactor->handle_events (max_wait_time);
00412
00413
00414 if (result == 0 &&
00415 max_wait_time != 0 &&
00416 *max_wait_time == ACE_Time_Value::zero)
00417 break;
00418
00419
00420 if (result == -1)
00421 break;
00422
00423
00424 }
00425
00426 if (TAO_debug_level >= 5)
00427 ACE_DEBUG ((LM_DEBUG,
00428 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00429 " (leader) exit reactor event loop\n",
00430 t_id));
00431 }
00432 }
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446 if (this->elect_new_leader () == -1)
00447 ACE_ERROR_RETURN ((LM_ERROR,
00448 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00449 " failed to elect new leader\n",
00450 t_id),
00451 -1);
00452
00453 if (result == -1 && !this->reactor_->reactor_event_loop_done ())
00454 ACE_ERROR_RETURN ((LM_ERROR,
00455 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00456 " handle_events failed\n",
00457 t_id),
00458 -1);
00459
00460
00461 if (max_wait_time != 0)
00462 {
00463 if (!event->successful ()
00464 && *max_wait_time == ACE_Time_Value::zero)
00465 {
00466 result = -1;
00467 errno = ETIME;
00468 }
00469 else if (event->error_detected ())
00470 {
00471
00472
00473 result = -1;
00474 }
00475 }
00476 else
00477 {
00478
00479
00480
00481
00482
00483
00484
00485 if (event->error_detected ())
00486 {
00487 result = -1;
00488 }
00489 }
00490
00491 return result;
00492 }
00493
00494 TAO_END_VERSIONED_NAMESPACE_DECL