00001
00002
00003 #include "ace/Countdown_Time.h"
00004 #include "ace/OS_NS_sys_time.h"
00005 #include "ace/Reactor.h"
00006
00007 #include "tao/Leader_Follower.h"
00008 #include "tao/LF_Follower_Auto_Ptr.h"
00009 #include "tao/LF_Follower_Auto_Adder.h"
00010 #include "tao/LF_Event_Binder.h"
00011 #include "tao/debug.h"
00012 #include "tao/Transport.h"
00013 #include "tao/GUIResource_Factory.h"
00014 #include "tao/ORB_Core.h"
00015
00016 #if !defined (__ACE_INLINE__)
00017 # include "tao/Leader_Follower.i"
00018 #endif
00019
00020 ACE_RCSID (tao,
00021 Leader_Follower,
00022 "Leader_Follower.cpp,v 1.38 2006/04/19 08:53:39 jwillemsen Exp")
00023
00024
00025 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 TAO_Leader_Follower::~TAO_Leader_Follower (void)
00028 {
00029 while (!this->follower_free_list_.empty ())
00030 {
00031 TAO_LF_Follower *follower =
00032 this->follower_free_list_.pop_front ();
00033 delete follower;
00034 }
00035
00036
00037 if ( this->orb_core_->gui_resource_factory () )
00038 this->orb_core_->gui_resource_factory ()->reclaim_reactor (this->reactor_);
00039 else
00040 this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_);
00041
00042 this->reactor_ = 0;
00043 }
00044
00045 TAO_LF_Follower *
00046 TAO_Leader_Follower::allocate_follower (void)
00047 {
00048 if (!this->follower_free_list_.empty ())
00049 return this->follower_free_list_.pop_front ();
00050
00051 return new TAO_LF_Follower (*this);
00052 }
00053
00054 void
00055 TAO_Leader_Follower::release_follower (TAO_LF_Follower *follower)
00056 {
00057 this->follower_free_list_.push_front (follower);
00058 }
00059
00060 int
00061 TAO_Leader_Follower::elect_new_leader_i (void)
00062 {
00063 TAO_LF_Follower* const follower =
00064 this->follower_set_.head ();
00065
00066 #if defined (TAO_DEBUG_LEADER_FOLLOWER)
00067 ACE_DEBUG ((LM_DEBUG,
00068 "TAO (%P|%t) LF::elect_new_leader_i - "
00069 "follower is %x\n",
00070 follower));
00071 #endif
00072
00073 return follower->signal ();
00074 }
00075
00076 int
00077 TAO_Leader_Follower::wait_for_client_leader_to_complete (ACE_Time_Value *max_wait_time)
00078 {
00079 int result = 0;
00080 ACE_Countdown_Time countdown (max_wait_time);
00081
00082
00083 ++this->event_loop_threads_waiting_;
00084
00085 while (this->client_thread_is_leader_ &&
00086 result != -1)
00087 {
00088 if (max_wait_time == 0)
00089 {
00090 if (this->event_loop_threads_condition_.wait () == -1)
00091 {
00092 ACE_ERROR ((LM_ERROR,
00093 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ")
00094 ACE_TEXT ("Condition variable wait failed\n")));
00095
00096 result = -1;
00097 }
00098 }
00099 else
00100 {
00101 countdown.update ();
00102 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00103 tv += *max_wait_time;
00104 if (this->event_loop_threads_condition_.wait (&tv) == -1)
00105 {
00106 if (errno != ETIME)
00107 ACE_ERROR ((LM_ERROR,
00108 ACE_TEXT ("TAO (%P|%t): TAO_Leader_Follower::wait_for_client_leader_to_complete - ")
00109 ACE_TEXT ("Condition variable wait failed\n")));
00110
00111 result = -1;
00112 }
00113 }
00114 }
00115
00116
00117 --this->event_loop_threads_waiting_;
00118
00119 return result;
00120 }
00121
00122 ACE_Reactor *
00123 TAO_Leader_Follower::reactor (void)
00124 {
00125 if (this->reactor_ == 0)
00126 {
00127 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), 0);
00128 if (this->reactor_ == 0)
00129 {
00130
00131 if ( this->orb_core_->gui_resource_factory () )
00132 this->reactor_ =
00133 this->orb_core_->gui_resource_factory ()->get_reactor ();
00134 else
00135 this->reactor_ =
00136 this->orb_core_->resource_factory ()->get_reactor ();
00137 }
00138 }
00139 return this->reactor_;
00140 }
00141
00142 void
00143 TAO_Leader_Follower::set_client_thread (void)
00144 {
00145
00146
00147 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00148 if (tss->event_loop_thread_ ||
00149 tss->client_leader_thread_)
00150 {
00151 --this->leaders_;
00152 }
00153
00154 if (this->clients_ == 0 &&
00155 this->orb_core_->has_shutdown () &&
00156 !this->orb_core_->resource_factory ()->drop_replies_during_shutdown ())
00157 {
00158
00159
00160
00161 this->orb_core_->reactor ()->reset_reactor_event_loop ();
00162 }
00163 ++this->clients_;
00164 }
00165
00166 void
00167 TAO_Leader_Follower::reset_client_thread (void)
00168 {
00169
00170
00171 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
00172 if (tss->event_loop_thread_ ||
00173 tss->client_leader_thread_)
00174 {
00175 ++this->leaders_;
00176 }
00177
00178 --this->clients_;
00179 if (this->clients_ == 0 &&
00180 this->orb_core_->has_shutdown ())
00181 {
00182
00183
00184
00185 this->orb_core_->reactor ()->end_reactor_event_loop ();
00186 }
00187 }
00188
00189 int
00190 TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
00191 TAO_Transport *transport,
00192 ACE_Time_Value *max_wait_time)
00193 {
00194
00195 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
00196
00197 ACE_Countdown_Time countdown (max_wait_time);
00198
00199
00200 int result = 1;
00201
00202
00203
00204
00205
00206
00207
00208
00209 size_t t_id = 0;
00210
00211 if (TAO_debug_level)
00212 {
00213 t_id = transport->id ();
00214 }
00215
00216 {
00217
00218
00219 TAO_LF_Client_Thread_Helper client_thread_helper (*this);
00220 ACE_UNUSED_ARG (client_thread_helper);
00221
00222
00223
00224 if (this->leader_available ())
00225 {
00226
00227
00228
00229 TAO_LF_Follower_Auto_Ptr follower (*this);
00230 if (follower.get () == 0)
00231 return -1;
00232
00233 if (TAO_debug_level >= 5)
00234 ACE_DEBUG ((LM_DEBUG,
00235 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00236 " (follower), cond <%x>\n",
00237 t_id, follower.get ()));
00238
00239
00240
00241 TAO_LF_Event_Binder event_binder (event, follower.get ());
00242
00243 while (event->keep_waiting () &&
00244 this->leader_available ())
00245 {
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272 TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
00273
00274 if (max_wait_time == 0)
00275 {
00276 if (follower->wait (max_wait_time) == -1)
00277 {
00278 if (TAO_debug_level >= 5)
00279 ACE_DEBUG ((LM_DEBUG,
00280 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00281 " (follower) [no timer, cond failed]\n",
00282 t_id));
00283
00284
00285
00286
00287 return -1;
00288 }
00289 }
00290 else
00291 {
00292 countdown.update ();
00293 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00294 tv += *max_wait_time;
00295 if (follower->wait (&tv) == -1)
00296 {
00297 if (TAO_debug_level >= 5)
00298 ACE_DEBUG ((LM_DEBUG,
00299 "TAO (%P|%t) - Leader_Follower[%d]::wait, "
00300 "(follower) [has timer, follower failed]\n",
00301 t_id ));
00302
00303
00304
00305
00306 if (errno == ETIME)
00307
00308 event->set_state (TAO_LF_Event::LFS_TIMEOUT);
00309
00310 if (!event->successful ())
00311 {
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327 if (this->elect_new_leader () == -1
00328 && TAO_debug_level > 0)
00329 {
00330 ACE_ERROR ((LM_ERROR,
00331 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, "
00332 "elect_new_leader failed\n",
00333 t_id ));
00334 }
00335 }
00336
00337
00338 return -1;
00339 }
00340 }
00341 }
00342
00343 countdown.update ();
00344
00345
00346
00347
00348
00349
00350
00351 if (TAO_debug_level >= 5)
00352 ACE_DEBUG ((LM_DEBUG,
00353 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00354 " done (follower), successful %d\n",
00355 t_id,
00356 event->successful ()));
00357
00358
00359
00360
00361 if (event->successful ())
00362 return 0;
00363
00364 if (event->error_detected ())
00365 return -1;
00366
00367
00368
00369
00370
00371 }
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384 TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
00385 ACE_UNUSED_ARG (client_leader_thread_helper);
00386
00387 {
00388 ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
00389 this->reverse_lock (), -1);
00390
00391
00392 ACE_Reactor *reactor = this->reactor_;
00393 reactor->owner (ACE_Thread::self ());
00394
00395
00396
00397 if (TAO_debug_level >= 5)
00398 ACE_DEBUG ((LM_DEBUG,
00399 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00400 " (leader) enter reactor event loop\n",
00401 t_id));
00402
00403
00404
00405 while (event->keep_waiting ())
00406 {
00407
00408 result = reactor->handle_events (max_wait_time);
00409
00410
00411 if (result == 0 &&
00412 max_wait_time != 0 &&
00413 *max_wait_time == ACE_Time_Value::zero)
00414 break;
00415
00416
00417 if (result == -1)
00418 break;
00419
00420
00421 }
00422
00423 if (TAO_debug_level >= 5)
00424 ACE_DEBUG ((LM_DEBUG,
00425 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00426 " (leader) exit reactor event loop\n",
00427 t_id));
00428 }
00429 }
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443 if (this->elect_new_leader () == -1)
00444 ACE_ERROR_RETURN ((LM_ERROR,
00445 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00446 " failed to elect new leader\n",
00447 t_id),
00448 -1);
00449
00450 if (result == -1 && !this->reactor_->reactor_event_loop_done ())
00451 ACE_ERROR_RETURN ((LM_ERROR,
00452 "TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,"
00453 " handle_events failed\n",
00454 t_id),
00455 -1);
00456
00457
00458 if (max_wait_time != 0)
00459 {
00460 if (!event->successful ()
00461 && *max_wait_time == ACE_Time_Value::zero)
00462 {
00463 result = -1;
00464 errno = ETIME;
00465 }
00466 else if (event->error_detected ())
00467 {
00468
00469
00470 result = -1;
00471 }
00472 }
00473 else
00474 {
00475
00476
00477
00478
00479
00480
00481
00482 if (event->error_detected ())
00483 {
00484 result = -1;
00485 }
00486 }
00487
00488 return result;
00489 }
00490
00491 TAO_END_VERSIONED_NAMESPACE_DECL