00001
00002
00003 #include "ace/Token.h"
00004
00005 #if !defined (__ACE_INLINE__)
00006 # include "ace/Token.inl"
00007 #endif
00008
00009 ACE_RCSID(ace, Token, "$Id: Token.cpp 83735 2008-11-14 09:41:52Z johnnyw $")
00010
00011 #if defined (ACE_HAS_THREADS)
00012
00013 #include "ace/Thread.h"
00014 #include "ace/Log_Msg.h"
00015
00016 #if defined (ACE_TOKEN_DEBUGGING)
00017
00018 #include "ace/streams.h"
00019 #endif
00020
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Token)
00024
00025 void
00026 ACE_Token::dump (void) const
00027 {
00028 #if defined (ACE_HAS_DUMP)
00029 ACE_TRACE ("ACE_Token::dump");
00030
00031 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00032
00033 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nthread = %d"), ACE_Thread::self ()));
00034
00035
00036 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nowner_ addr = %x"), &this->owner_));
00037 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nwaiters_ = %d"), this->waiters_));
00038 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nin_use_ = %d"), this->in_use_));
00039 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nnesting level = %d"), this->nesting_level_));
00040 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00041 #endif
00042 }
00043
00044 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00045 ACE_thread_t t_id)
00046 : next_ (0),
00047 thread_id_ (t_id),
00048 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00049 cv_ (0),
00050 #else
00051 cv_ (m),
00052 #endif
00053 runable_ (0)
00054 {
00055 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00056 ACE_UNUSED_ARG (m);
00057 #endif
00058
00059 ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00060 }
00061
00062 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00063 ACE_thread_t t_id,
00064 ACE_Condition_Attributes &attributes)
00065 : next_ (0),
00066 thread_id_ (t_id),
00067 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00068 cv_ (0),
00069 #else
00070 cv_ (m, attributes),
00071 #endif
00072 runable_ (0)
00073 {
00074 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00075 ACE_UNUSED_ARG (m);
00076 ACE_UNUSED_ARG (attributes);
00077 #endif
00078
00079 ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00080 }
00081
00082 ACE_Token::ACE_Token_Queue::ACE_Token_Queue (void)
00083 : head_ (0),
00084 tail_ (0)
00085 {
00086 ACE_TRACE ("ACE_Token::ACE_Token_Queue::ACE_Token_Queue");
00087 }
00088
00089
00090
00091
00092 void
00093 ACE_Token::ACE_Token_Queue::remove_entry (ACE_Token::ACE_Token_Queue_Entry *entry)
00094 {
00095 ACE_TRACE ("ACE_Token::ACE_Token_Queue::remove_entry");
00096 ACE_Token_Queue_Entry *curr = 0;
00097 ACE_Token_Queue_Entry *prev = 0;
00098
00099 if (this->head_ == 0)
00100 return;
00101
00102 for (curr = this->head_;
00103 curr != 0 && curr != entry;
00104 curr = curr->next_)
00105 prev = curr;
00106
00107 if (curr == 0)
00108
00109 return;
00110 else if (prev == 0)
00111
00112 this->head_ = this->head_->next_;
00113 else
00114
00115 prev->next_ = curr->next_;
00116
00117
00118
00119 if (curr->next_ == 0)
00120 this->tail_ = prev;
00121 }
00122
00123
00124
00125
00126 void
00127 ACE_Token::ACE_Token_Queue::insert_entry (ACE_Token::ACE_Token_Queue_Entry &entry,
00128 int requeue_position)
00129 {
00130 if (this->head_ == 0)
00131 {
00132
00133 this->head_ = &entry;
00134 this->tail_ = &entry;
00135 }
00136 else if (requeue_position == -1)
00137 {
00138
00139 this->tail_->next_ = &entry;
00140 this->tail_ = &entry;
00141 }
00142 else if (requeue_position == 0)
00143 {
00144
00145 entry.next_ = this->head_;
00146 this->head_ = &entry;
00147 }
00148 else
00149
00150 {
00151
00152
00153 ACE_Token::ACE_Token_Queue_Entry *insert_after = this->head_;
00154 while (requeue_position-- && insert_after->next_ != 0)
00155 insert_after = insert_after->next_;
00156
00157 entry.next_ = insert_after->next_;
00158
00159 if (entry.next_ == 0)
00160 this->tail_ = &entry;
00161
00162 insert_after->next_ = &entry;
00163 }
00164 }
00165
00166 ACE_Token::ACE_Token (const ACE_TCHAR *name, void *any)
00167 : lock_ (name, (ACE_mutexattr_t *) any),
00168 owner_ (ACE_OS::NULL_thread),
00169 in_use_ (0),
00170 waiters_ (0),
00171 nesting_level_ (0),
00172 attributes_ (USYNC_THREAD),
00173 queueing_strategy_ (FIFO)
00174 {
00175
00176 }
00177
00178 ACE_Token::~ACE_Token (void)
00179 {
00180 ACE_TRACE ("ACE_Token::~ACE_Token");
00181 }
00182
00183 int
00184 ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
00185 void *arg,
00186 ACE_Time_Value *timeout,
00187 ACE_Token_Op_Type op_type)
00188 {
00189 ACE_TRACE ("ACE_Token::shared_acquire");
00190 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00191
00192 #if defined (ACE_TOKEN_DEBUGGING)
00193 this->dump ();
00194 #endif
00195
00196 ACE_thread_t const thr_id = ACE_Thread::self ();
00197
00198
00199 if (!this->in_use_)
00200 {
00201
00202 this->in_use_ = op_type;
00203 this->owner_ = thr_id;
00204 return 0;
00205 }
00206
00207
00208
00209
00210
00211
00212 if (ACE_OS::thr_equal (thr_id, this->owner_))
00213 {
00214 ++this->nesting_level_;
00215 return 0;
00216 }
00217
00218
00219 if (timeout != 0 && *timeout == ACE_Time_Value::zero)
00220 {
00221 errno = ETIME;
00222 return -1;
00223 }
00224
00225
00226
00227
00228
00229
00230 ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN
00231 ? &this->readers_
00232 : &this->writers_);
00233
00234
00235
00236 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00237 thr_id,
00238 this->attributes_);
00239 queue->insert_entry (my_entry, this->queueing_strategy_);
00240 ++this->waiters_;
00241
00242
00243
00244
00245 int ret = 0;
00246 if (sleep_hook_func)
00247 {
00248 (*sleep_hook_func) (arg);
00249 ++ret;
00250 }
00251 else
00252 {
00253
00254 this->sleep_hook ();
00255 ++ret;
00256 }
00257
00258 bool timed_out = false;
00259 bool error = false;
00260
00261
00262 do
00263 {
00264 int const result = my_entry.wait (timeout, this->lock_);
00265
00266 if (result == -1)
00267 {
00268
00269
00270 if (errno == EINTR)
00271 continue;
00272
00273 #if defined (ACE_TOKEN_DEBUGGING)
00274 cerr << '(' << ACE_Thread::self () << ')'
00275 << " acquire: "
00276 << (errno == ETIME ? "timed out" : "error occurred")
00277 << endl;
00278 #endif
00279
00280
00281
00282 if (errno == ETIME)
00283 timed_out = true;
00284 else
00285 error = true;
00286
00287
00288 break;
00289 }
00290 }
00291 while (!ACE_OS::thr_equal (thr_id, this->owner_));
00292
00293
00294 --this->waiters_;
00295 queue->remove_entry (&my_entry);
00296
00297 #if defined (ACE_TOKEN_DEBUGGING)
00298 ACE_DEBUG ((LM_DEBUG, "(%t) ACE_Token::shared_acquire (UNBLOCKED)\n"));
00299 #endif
00300
00301
00302 if (timed_out)
00303 {
00304
00305 if (my_entry.runable_)
00306 {
00307
00308 this->wakeup_next_waiter ();
00309 }
00310
00311
00312 return -1;
00313 }
00314 else if (error)
00315 {
00316
00317 return -1;
00318 }
00319
00320
00321 ACE_ASSERT (my_entry.runable_);
00322
00323 return ret;
00324 }
00325
00326
00327
00328
00329 void
00330 ACE_Token::sleep_hook (void)
00331 {
00332 ACE_TRACE ("ACE_Token::sleep_hook");
00333 }
00334
00335 int
00336 ACE_Token::acquire (ACE_Time_Value *timeout)
00337 {
00338 ACE_TRACE ("ACE_Token::acquire");
00339 return this->shared_acquire (0, 0, timeout, ACE_Token::WRITE_TOKEN);
00340 }
00341
00342
00343
00344
00345 int
00346 ACE_Token::acquire (void (*sleep_hook_func)(void *),
00347 void *arg,
00348 ACE_Time_Value *timeout)
00349 {
00350 ACE_TRACE ("ACE_Token::acquire");
00351 return this->shared_acquire (sleep_hook_func, arg, timeout, ACE_Token::WRITE_TOKEN);
00352 }
00353
00354
00355
00356 int
00357 ACE_Token::renew (int requeue_position,
00358 ACE_Time_Value *timeout)
00359 {
00360 ACE_TRACE ("ACE_Token::renew");
00361 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00362
00363 #if defined (ACE_TOKEN_DEBUGGING)
00364 this->dump ();
00365 #endif
00366
00367
00368
00369
00370
00371
00372 if (this->writers_.head_ == 0 &&
00373 (this->in_use_ == ACE_Token::WRITE_TOKEN ||
00374 this->readers_.head_ == 0))
00375
00376 return 0;
00377
00378
00379
00380
00381 ACE_Token::ACE_Token_Queue *this_threads_queue =
00382 this->in_use_ == ACE_Token::READ_TOKEN ?
00383 &this->readers_ : &this->writers_;
00384
00385 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00386 this->owner_);
00387
00388 this_threads_queue->insert_entry (my_entry,
00389
00390
00391
00392 requeue_position == 0 ? 0 : this->queueing_strategy_);
00393 ++this->waiters_;
00394
00395
00396 int const save_nesting_level_ = this->nesting_level_;
00397
00398
00399 this->nesting_level_ = 0;
00400
00401
00402 this->wakeup_next_waiter ();
00403
00404 bool timed_out = false;
00405 bool error = false;
00406
00407
00408 do
00409 {
00410 int const result = my_entry.wait (timeout, this->lock_);
00411
00412 if (result == -1)
00413 {
00414
00415
00416 if (errno == EINTR)
00417 continue;
00418
00419 #if defined (ACE_TOKEN_DEBUGGING)
00420 cerr << '(' << ACE_Thread::self () << ')'
00421 << " renew: "
00422 << (errno == ETIME ? "timed out" : "error occurred")
00423 << endl;
00424 #endif
00425
00426
00427
00428 if (errno == ETIME)
00429 timed_out = true;
00430 else
00431 error = true;
00432
00433
00434 break;
00435 }
00436 }
00437 while (!ACE_OS::thr_equal (my_entry.thread_id_, this->owner_));
00438
00439
00440 --this->waiters_;
00441 this_threads_queue->remove_entry (&my_entry);
00442
00443 #if defined (ACE_TOKEN_DEBUGGING)
00444 ACE_DEBUG ((LM_DEBUG, "(%t) ACE_Token::renew (UNBLOCKED)\n"));
00445 #endif
00446
00447
00448 if (timed_out)
00449 {
00450
00451 if (my_entry.runable_)
00452 {
00453
00454 this->wakeup_next_waiter ();
00455 }
00456
00457
00458 return -1;
00459 }
00460 else if (error)
00461 {
00462
00463 return -1;
00464 }
00465
00466
00467 ACE_ASSERT (my_entry.runable_);
00468
00469
00470 this->nesting_level_ = save_nesting_level_;
00471
00472 return 0;
00473 }
00474
00475
00476
00477
00478 int
00479 ACE_Token::release (void)
00480 {
00481 ACE_TRACE ("ACE_Token::release");
00482 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00483
00484 #if defined (ACE_TOKEN_DEBUGGING)
00485 this->dump ();
00486 #endif
00487
00488
00489 if (this->nesting_level_ > 0)
00490 --this->nesting_level_;
00491 else
00492 {
00493
00494
00495
00496
00497
00498 this->wakeup_next_waiter ();
00499 }
00500
00501 return 0;
00502 }
00503
00504 void
00505 ACE_Token::wakeup_next_waiter (void)
00506 {
00507 ACE_TRACE ("ACE_Token::wakeup_next_waiter");
00508
00509
00510 this->owner_ = ACE_OS::NULL_thread;
00511 this->in_use_ = 0;
00512
00513
00514 if (this->writers_.head_ == 0 &&
00515 this->readers_.head_ == 0)
00516 {
00517
00518 return;
00519 }
00520
00521
00522 ACE_Token_Queue *queue = 0;
00523
00524
00525 if (this->writers_.head_ != 0)
00526 {
00527 this->in_use_ = ACE_Token::WRITE_TOKEN;
00528 queue = &this->writers_;
00529 }
00530 else
00531 {
00532 this->in_use_ = ACE_Token::READ_TOKEN;
00533 queue = &this->readers_;
00534 }
00535
00536
00537 queue->head_->runable_ = 1;
00538 queue->head_->signal ();
00539
00540 this->owner_ = queue->head_->thread_id_;
00541 }
00542
00543 ACE_END_VERSIONED_NAMESPACE_DECL
00544
00545 #endif