00001
00002
00003 #include "ace/Countdown_Time.h"
00004 #include "ace/OS_NS_sys_time.h"
00005 #include "ace/Reactor.h"
00006
00007 #include "tao/Leader_Follower.h"
00008 #include "tao/LF_Follower_Auto_Ptr.h"
00009 #include "tao/LF_Follower_Auto_Adder.h"
00010 #include "tao/LF_Event_Binder.h"
00011 #include "tao/debug.h"
00012 #include "tao/Transport.h"
00013 #include "tao/GUIResource_Factory.h"
00014 #include "tao/ORB_Core.h"
00015
00016 #if !defined (__ACE_INLINE__)
00017 # include "tao/Leader_Follower.inl"
00018 #endif
00019
00020 ACE_RCSID (tao,
00021 Leader_Follower,
00022 "$Id: Leader_Follower.cpp 79159 2007-08-01 16:41:24Z wilsond $")
00023
00024
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 TAO_Leader_Follower::~TAO_Leader_Follower (void)
00028 {
00029 while (!this->follower_free_list_.empty ())
00030 {
00031 TAO_LF_Follower *follower =
00032 this->follower_free_list_.pop_front ();
00033 delete follower;
00034 }
00035
00036
00037 if ( this->orb_core_->gui_resource_factory () )
00038 this->orb_core_->gui_resource_factory ()->reclaim_reactor (this->reactor_);
00039 else
00040 this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_);
00041
00042 this->reactor_ = 0;
00043 }
00044
00045 TAO_LF_Follower *
00046 TAO_Leader_Follower::allocate_follower (void)
00047 {
00048 if (!this->follower_free_list_.empty ())
00049 return this->follower_free_list_.pop_front ();
00050
00051 TAO_LF_Follower* ptr = 0;
00052 ACE_NEW_RETURN (ptr,
00053 TAO_LF_Follower (*this),
00054 0);
00055 return ptr;
00056 }
00057
00058 void
00059 TAO_Leader_Follower::release_follower (TAO_LF_Follower *follower)
00060 {
00061 this->follower_free_list_.push_front (follower);
00062 }
00063
00064 int
00065 TAO_Leader_Follower::elect_new_leader_i (void)
00066 {
00067 TAO_LF_Follower* const follower =
00068 this->follower_set_.head ();
00069
00070 #if defined (TAO_DEBUG_LEADER_FOLLOWER)
00071 ACE_DEBUG ((LM_DEBUG,
00072 "TAO (%P|%t) LF::elect_new_leader_i - "
00073 "follower is %x\n",
00074 follower));
00075 #endif
00076
00077 return follower->signal ();
00078 }
00079
00080 int
00081 TAO_Leader_Follower::wait_for_client_leader_to_complete (ACE_Time_Value *max_wait_time)
00082 {
00083 int result = 0;
00084 ACE_Countdown_Time countdown (max_wait_time);
00085
00086
00087 ++this->event_loop_threads_waiting_;
00088
00089 while (this->client_thread_is_leader_ &&
00090 result != -1)
00091 {
00092 if (max_wait_time == 0)
00093 {
00094 if (this->event_loop_threads_condition_.wait () == -1)
00095 {
00096 ACE_ERROR ((LM_ERROR,
00097 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ")
00098 ACE_TEXT ("Condition variable wait failed\n")));
00099
00100 result = -1;
00101 }
00102 }
00103 else
00104 {
00105 countdown.update ();
00106 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00107 tv += *max_wait_time;
00108 if (this->event_loop_threads_condition_.wait (&tv) == -1)
00109 {
00110 if (errno != ETIME)
00111 ACE_ERROR ((LM_ERROR,
00112 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ")
00113 ACE_TEXT ("Condition variable wait failed\n")));
00114
00115 result = -1;
00116 }
00117 }
00118 }
00119
00120
00121 --this->event_loop_threads_waiting_;
00122
00123 return result;
00124 }
00125
00126 ACE_Reactor *
00127 TAO_Leader_Follower::reactor (void)
00128 {
00129 if (this->reactor_ == 0)
00130 {
00131 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), 0);
00132 if (this->reactor_ == 0)
00133 {
00134
00135 if ( this->orb_core_->gui_resource_factory () )
00136 this->reactor_ =
00137 this->orb_core_->gui_resource_factory ()->get_reactor ();
00138 else
00139 this->reactor_ =
00140 this->orb_core_->resource_factory ()->get_reactor ();
00141 }
00142 }
00143 return this->reactor_;
00144 }
00145
00146 void
00147 TAO_Leader_Follower::set_client_thread (void)
00148 {
00149
00150
00151 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00152 if (tss->event_loop_thread_ ||
00153 tss->client_leader_thread_)
00154 {
00155 --this->leaders_;
00156 }
00157
00158 if (this->clients_ == 0 &&
00159 this->orb_core_->has_shutdown () &&
00160 !this->orb_core_->resource_factory ()->drop_replies_during_shutdown ())
00161 {
00162
00163
00164
00165 this->orb_core_->reactor ()->reset_reactor_event_loop ();
00166 }
00167 ++this->clients_;
00168 }
00169
00170 void
00171 TAO_Leader_Follower::reset_client_thread (void)
00172 {
00173
00174
00175 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00176 if (tss->event_loop_thread_ ||
00177 tss->client_leader_thread_)
00178 {
00179 ++this->leaders_;
00180 }
00181
00182 --this->clients_;
00183 if (this->clients_ == 0 &&
00184 this->orb_core_->has_shutdown ())
00185 {
00186
00187
00188
00189 this->orb_core_->reactor ()->end_reactor_event_loop ();
00190 }
00191 }
00192
00193 int
00194 TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
00195 TAO_Transport *transport,
00196 ACE_Time_Value *max_wait_time)
00197 {
00198
00199 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
00200
00201 ACE_Countdown_Time countdown (max_wait_time);
00202
00203
00204 int result = 1;
00205
00206
00207
00208
00209
00210
00211
00212
00213 size_t t_id = 0;
00214
00215 if (TAO_debug_level && transport != 0)
00216 {
00217 t_id = transport->id ();
00218 }
00219
00220 {
00221
00222
00223 TAO_LF_Client_Thread_Helper client_thread_helper (*this);
00224 ACE_UNUSED_ARG (client_thread_helper);
00225
00226
00227
00228 if (this->leader_available ())
00229 {
00230
00231
00232
00233 TAO_LF_Follower_Auto_Ptr follower (*this);
00234 if (follower.get () == 0)
00235 return -1;
00236
00237 if (TAO_debug_level >= 5)
00238 ACE_DEBUG ((LM_DEBUG,
00239 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00240 " (follower), cond <%x>\n",
00241 t_id, follower.get ()));
00242
00243
00244
00245 TAO_LF_Event_Binder event_binder (event, follower.get ());
00246
00247 while (event->keep_waiting () &&
00248 this->leader_available ())
00249 {
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276 TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
00277
00278 if (max_wait_time == 0)
00279 {
00280 if (follower->wait (max_wait_time) == -1)
00281 {
00282 if (TAO_debug_level >= 5)
00283 ACE_DEBUG ((LM_DEBUG,
00284 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00285 " (follower) [no timer, cond failed]\n",
00286 t_id));
00287
00288
00289
00290
00291 return -1;
00292 }
00293 }
00294 else
00295 {
00296 countdown.update ();
00297 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00298 tv += *max_wait_time;
00299 if (follower->wait (&tv) == -1)
00300 {
00301 if (TAO_debug_level >= 5)
00302 ACE_DEBUG ((LM_DEBUG,
00303 "TAO (%P|%t) - Leader_Follower[%d]::wait, "
00304 "(follower) [has timer, follower failed]\n",
00305 t_id ));
00306
00307
00308
00309
00310 if (errno == ETIME)
00311
00312 event->set_state (TAO_LF_Event::LFS_TIMEOUT);
00313
00314 if (!event->successful ())
00315 {
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331 if (this->elect_new_leader () == -1
00332 && TAO_debug_level > 0)
00333 {
00334 ACE_ERROR ((LM_ERROR,
00335 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00336 "elect_new_leader failed\n",
00337 t_id ));
00338 }
00339 }
00340
00341
00342 return -1;
00343 }
00344 }
00345 }
00346
00347 countdown.update ();
00348
00349
00350
00351
00352
00353
00354
00355 if (TAO_debug_level >= 5)
00356 ACE_DEBUG ((LM_DEBUG,
00357 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00358 " done (follower), successful %d\n",
00359 t_id,
00360 event->successful ()));
00361
00362
00363
00364
00365 if (event->successful ())
00366 return 0;
00367
00368 if (event->error_detected ())
00369 return -1;
00370
00371
00372
00373
00374
00375 }
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388 TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
00389 ACE_UNUSED_ARG (client_leader_thread_helper);
00390
00391 {
00392 ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
00393 this->reverse_lock (), -1);
00394
00395
00396 ACE_Reactor *reactor = this->reactor_;
00397 reactor->owner (ACE_Thread::self ());
00398
00399
00400
00401 if (TAO_debug_level >= 5)
00402 ACE_DEBUG ((LM_DEBUG,
00403 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00404 " (leader) enter reactor event loop\n",
00405 t_id));
00406
00407
00408
00409 while (event->keep_waiting ())
00410 {
00411
00412 result = reactor->handle_events (max_wait_time);
00413
00414
00415 if (result == 0 &&
00416 max_wait_time != 0 &&
00417 *max_wait_time == ACE_Time_Value::zero)
00418 break;
00419
00420
00421 if (result == -1)
00422 break;
00423
00424
00425 }
00426
00427 if (TAO_debug_level >= 5)
00428 ACE_DEBUG ((LM_DEBUG,
00429 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00430 " (leader) exit reactor event loop\n",
00431 t_id));
00432 }
00433 }
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447 if (this->elect_new_leader () == -1)
00448 ACE_ERROR_RETURN ((LM_ERROR,
00449 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00450 " failed to elect new leader\n",
00451 t_id),
00452 -1);
00453
00454 if (result == -1 && !this->reactor_->reactor_event_loop_done ())
00455 ACE_ERROR_RETURN ((LM_ERROR,
00456 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00457 " handle_events failed\n",
00458 t_id),
00459 -1);
00460
00461
00462 if (max_wait_time != 0)
00463 {
00464 if (!event->successful ()
00465 && *max_wait_time == ACE_Time_Value::zero)
00466 {
00467 result = -1;
00468 errno = ETIME;
00469 }
00470 else if (event->error_detected ())
00471 {
00472
00473
00474 result = -1;
00475 }
00476 }
00477 else
00478 {
00479
00480
00481
00482
00483
00484
00485
00486 if (event->error_detected ())
00487 {
00488 result = -1;
00489 }
00490 }
00491
00492 return result;
00493 }
00494
00495 TAO_END_VERSIONED_NAMESPACE_DECL