00001
00002
00003 #include "ace/Token.h"
00004
00005 #if !defined (__ACE_INLINE__)
00006 #include "ace/Token.inl"
00007 #endif
00008
00009 ACE_RCSID(ace, Token, "Token.cpp,v 4.44 2006/04/19 11:54:56 jwillemsen Exp")
00010
00011 #if defined (ACE_HAS_THREADS)
00012
00013 #include "ace/Thread.h"
00014 #include "ace/Log_Msg.h"
00015
00016 #if defined (DEBUGGING)
00017
00018 #include "ace/streams.h"
00019 #endif
00020
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Token)
00024
00025 void
00026 ACE_Token::dump (void) const
00027 {
00028 #if defined (ACE_HAS_DUMP)
00029 ACE_TRACE ("ACE_Token::dump");
00030
00031 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00032
00033 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nthread = %d"), ACE_Thread::self ()));
00034
00035
00036 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nowner_ addr = %x"), &this->owner_));
00037 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nwaiters_ = %d"), this->waiters_));
00038 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nin_use_ = %d"), this->in_use_));
00039 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nnesting level = %d"), this->nesting_level_));
00040 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00041 #endif
00042 }
00043
00044 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00045 ACE_thread_t t_id)
00046 : next_ (0),
00047 thread_id_ (t_id),
00048 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00049 cv_ (0),
00050 #else
00051 cv_ (m),
00052 #endif
00053 runable_ (0)
00054 {
00055 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00056 ACE_UNUSED_ARG (m);
00057 #endif
00058
00059 ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00060 }
00061
00062 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00063 ACE_thread_t t_id,
00064 ACE_Condition_Attributes &attributes)
00065 : next_ (0),
00066 thread_id_ (t_id),
00067 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00068 cv_ (0),
00069 #else
00070 cv_ (m, attributes),
00071 #endif
00072 runable_ (0)
00073 {
00074 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00075 ACE_UNUSED_ARG (m);
00076 ACE_UNUSED_ARG (attributes);
00077 #endif
00078
00079 ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00080 }
00081
00082 ACE_Token::ACE_Token_Queue::ACE_Token_Queue (void)
00083 : head_ (0),
00084 tail_ (0)
00085 {
00086 ACE_TRACE ("ACE_Token::ACE_Token_Queue::ACE_Token_Queue");
00087 }
00088
00089
00090
00091
00092 void
00093 ACE_Token::ACE_Token_Queue::remove_entry (ACE_Token::ACE_Token_Queue_Entry *entry)
00094 {
00095 ACE_TRACE ("ACE_Token::ACE_Token_Queue::remove_entry");
00096 ACE_Token_Queue_Entry *curr = 0;
00097 ACE_Token_Queue_Entry *prev = 0;
00098
00099 if (this->head_ == 0)
00100 return;
00101
00102 for (curr = this->head_;
00103 curr != 0 && curr != entry;
00104 curr = curr->next_)
00105 prev = curr;
00106
00107 if (curr == 0)
00108
00109 return;
00110 else if (prev == 0)
00111
00112 this->head_ = this->head_->next_;
00113 else
00114
00115 prev->next_ = curr->next_;
00116
00117
00118
00119 if (curr->next_ == 0)
00120 this->tail_ = prev;
00121 }
00122
00123
00124
00125
00126 void
00127 ACE_Token::ACE_Token_Queue::insert_entry (ACE_Token::ACE_Token_Queue_Entry &entry,
00128 int requeue_position)
00129 {
00130 if (this->head_ == 0)
00131 {
00132
00133 this->head_ = &entry;
00134 this->tail_ = &entry;
00135 }
00136 else if (requeue_position == -1)
00137 {
00138
00139 this->tail_->next_ = &entry;
00140 this->tail_ = &entry;
00141 }
00142 else if (requeue_position == 0)
00143 {
00144
00145 entry.next_ = this->head_;
00146 this->head_ = &entry;
00147 }
00148 else
00149
00150 {
00151
00152
00153 ACE_Token::ACE_Token_Queue_Entry *insert_after = this->head_;
00154 while (requeue_position-- && insert_after->next_ != 0)
00155 insert_after = insert_after->next_;
00156
00157 entry.next_ = insert_after->next_;
00158
00159 if (entry.next_ == 0)
00160 this->tail_ = &entry;
00161
00162 insert_after->next_ = &entry;
00163 }
00164 }
00165
00166 ACE_Token::ACE_Token (const ACE_TCHAR *name, void *any)
00167 : lock_ (name, (ACE_mutexattr_t *) any),
00168 owner_ (ACE_OS::NULL_thread),
00169 in_use_ (0),
00170 waiters_ (0),
00171 nesting_level_ (0),
00172 attributes_ (USYNC_THREAD),
00173 queueing_strategy_ (FIFO)
00174 {
00175
00176 }
00177
00178 ACE_Token::~ACE_Token (void)
00179 {
00180 ACE_TRACE ("ACE_Token::~ACE_Token");
00181 }
00182
00183 int
00184 ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
00185 void *arg,
00186 ACE_Time_Value *timeout,
00187 ACE_Token_Op_Type op_type)
00188 {
00189 ACE_TRACE ("ACE_Token::shared_acquire");
00190 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00191
00192 #if defined (DEBUGGING)
00193 this->dump ();
00194 #endif
00195
00196 ACE_thread_t thr_id = ACE_Thread::self ();
00197
00198
00199 if (!this->in_use_)
00200 {
00201
00202 this->in_use_ = op_type;
00203 this->owner_ = thr_id;
00204 return 0;
00205 }
00206
00207
00208
00209
00210
00211
00212 if (ACE_OS::thr_equal (thr_id, this->owner_))
00213 {
00214 ++this->nesting_level_;
00215 return 0;
00216 }
00217
00218
00219 if (timeout != 0 && timeout->sec () == 0 && timeout->usec () == 0)
00220 {
00221 errno = ETIME;
00222 return -1;
00223 }
00224
00225
00226
00227
00228
00229
00230 ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN
00231 ? &this->readers_
00232 : &this->writers_);
00233
00234
00235
00236 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00237 thr_id,
00238 this->attributes_);
00239 queue->insert_entry (my_entry, this->queueing_strategy_);
00240 ++this->waiters_;
00241
00242
00243
00244
00245 int ret = 0;
00246 if (sleep_hook_func)
00247 {
00248 (*sleep_hook_func) (arg);
00249 ++ret;
00250 }
00251 else
00252 {
00253
00254 this->sleep_hook ();
00255 ++ret;
00256 }
00257
00258 int timed_out = 0;
00259 int error = 0;
00260
00261
00262 do
00263 {
00264 int result = my_entry.wait (timeout,
00265 this->lock_);
00266
00267 if (result == -1)
00268 {
00269
00270
00271 if (errno == EINTR)
00272 continue;
00273
00274 #if defined (DEBUGGING)
00275 cerr << '(' << ACE_Thread::self () << ')'
00276 << " acquire: "
00277 << (errno == ETIME ? "timed out" : "error occurred")
00278 << endl;
00279 #endif
00280
00281
00282
00283 if (errno == ETIME)
00284 timed_out = 1;
00285 else
00286 error = 1;
00287
00288
00289 break;
00290 }
00291 }
00292 while (!ACE_OS::thr_equal (thr_id, this->owner_));
00293
00294
00295 --this->waiters_;
00296 queue->remove_entry (&my_entry);
00297
00298 #if defined (DEBUGGING)
00299 cerr << '(' << ACE_Thread::self () << ')'
00300 << " acquire (UNBLOCKED)" << endl;
00301 #endif
00302
00303
00304 if (timed_out)
00305 {
00306
00307 if (my_entry.runable_)
00308 {
00309
00310 this->wakeup_next_waiter ();
00311 }
00312
00313
00314 return -1;
00315 }
00316 else if (error)
00317 {
00318
00319 return -1;
00320 }
00321
00322
00323 ACE_ASSERT (my_entry.runable_);
00324
00325 return ret;
00326 }
00327
00328
00329
00330
00331 void
00332 ACE_Token::sleep_hook (void)
00333 {
00334 ACE_TRACE ("ACE_Token::sleep_hook");
00335 }
00336
00337 int
00338 ACE_Token::acquire (ACE_Time_Value *timeout)
00339 {
00340 ACE_TRACE ("ACE_Token::acquire");
00341 return this->shared_acquire (0, 0, timeout, ACE_Token::WRITE_TOKEN);
00342 }
00343
00344
00345
00346
00347 int
00348 ACE_Token::acquire (void (*sleep_hook_func)(void *),
00349 void *arg,
00350 ACE_Time_Value *timeout)
00351 {
00352 ACE_TRACE ("ACE_Token::acquire");
00353 return this->shared_acquire (sleep_hook_func, arg, timeout, ACE_Token::WRITE_TOKEN);
00354 }
00355
00356
00357
00358 int
00359 ACE_Token::renew (int requeue_position,
00360 ACE_Time_Value *timeout)
00361 {
00362 ACE_TRACE ("ACE_Token::renew");
00363 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00364
00365 #if defined (DEBUGGING)
00366 this->dump ();
00367 #endif
00368
00369
00370
00371
00372
00373
00374 if (this->writers_.head_ == 0 &&
00375 (this->in_use_ == ACE_Token::WRITE_TOKEN ||
00376 this->readers_.head_ == 0))
00377
00378 return 0;
00379
00380
00381
00382
00383 ACE_Token::ACE_Token_Queue *this_threads_queue =
00384 this->in_use_ == ACE_Token::READ_TOKEN ?
00385 &this->readers_ : &this->writers_;
00386
00387 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00388 this->owner_);
00389
00390 this_threads_queue->insert_entry (my_entry,
00391
00392
00393
00394 requeue_position == 0 ? 0 : this->queueing_strategy_);
00395 ++this->waiters_;
00396
00397
00398 int const save_nesting_level_ = this->nesting_level_;
00399
00400
00401 this->nesting_level_ = 0;
00402
00403
00404 this->wakeup_next_waiter ();
00405
00406 int timed_out = 0;
00407 int error = 0;
00408
00409
00410 do
00411 {
00412 int result = my_entry.wait (timeout,
00413 this->lock_);
00414
00415 if (result == -1)
00416 {
00417
00418
00419 if (errno == EINTR)
00420 continue;
00421
00422 #if defined (DEBUGGING)
00423 cerr << '(' << ACE_Thread::self () << ')'
00424 << " renew: "
00425 << (errno == ETIME ? "timed out" : "error occurred")
00426 << endl;
00427 #endif
00428
00429
00430
00431 if (errno == ETIME)
00432 timed_out = 1;
00433 else
00434 error = 1;
00435
00436
00437 break;
00438 }
00439 }
00440 while (!ACE_OS::thr_equal (my_entry.thread_id_, this->owner_));
00441
00442
00443 --this->waiters_;
00444 this_threads_queue->remove_entry (&my_entry);
00445
00446 #if defined (DEBUGGING)
00447 cerr << '(' << ACE_Thread::self () << ')'
00448 << " acquire (UNBLOCKED)" << endl;
00449 #endif
00450
00451
00452 if (timed_out)
00453 {
00454
00455 if (my_entry.runable_)
00456 {
00457
00458 this->wakeup_next_waiter ();
00459 }
00460
00461
00462 return -1;
00463 }
00464 else if (error)
00465 {
00466
00467 return -1;
00468 }
00469
00470
00471 ACE_ASSERT (my_entry.runable_);
00472
00473
00474 this->nesting_level_ = save_nesting_level_;
00475
00476 return 0;
00477 }
00478
00479
00480
00481
00482 int
00483 ACE_Token::release (void)
00484 {
00485 ACE_TRACE ("ACE_Token::release");
00486 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00487
00488
00489
00490 #if defined (DEBUGGING)
00491 this->dump ();
00492 #endif
00493
00494
00495 if (this->nesting_level_ > 0)
00496 --this->nesting_level_;
00497 else
00498 {
00499
00500
00501
00502
00503
00504 this->wakeup_next_waiter ();
00505 }
00506
00507 return 0;
00508 }
00509
00510 void
00511 ACE_Token::wakeup_next_waiter (void)
00512 {
00513 ACE_TRACE ("ACE_Token::wakeup_next_waiter");
00514
00515
00516 this->owner_ = ACE_OS::NULL_thread;
00517 this->in_use_ = 0;
00518
00519
00520 if (this->writers_.head_ == 0 &&
00521 this->readers_.head_ == 0)
00522 {
00523
00524 return;
00525 }
00526
00527
00528 ACE_Token_Queue *queue = 0;
00529
00530
00531 if (this->writers_.head_ != 0)
00532 {
00533 this->in_use_ = ACE_Token::WRITE_TOKEN;
00534 queue = &this->writers_;
00535 }
00536 else
00537 {
00538 this->in_use_ = ACE_Token::READ_TOKEN;
00539 queue = &this->readers_;
00540 }
00541
00542
00543 queue->head_->runable_ = 1;
00544 queue->head_->signal ();
00545
00546 this->owner_ = queue->head_->thread_id_;
00547 }
00548
00549 ACE_END_VERSIONED_NAMESPACE_DECL
00550
00551 #endif