00001 
00002 
00003 #include "ace/Token.h"
00004 
00005 #if !defined (__ACE_INLINE__)
00006 #include "ace/Token.inl"
00007 #endif 
00008 
00009 ACE_RCSID(ace, Token, "Token.cpp,v 4.44 2006/04/19 11:54:56 jwillemsen Exp")
00010 
00011 #if defined (ACE_HAS_THREADS)
00012 
00013 #include "ace/Thread.h"
00014 #include "ace/Log_Msg.h"
00015 
00016 #if defined (DEBUGGING)
00017 
00018 #include "ace/streams.h"
00019 #endif 
00020 
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Token)
00024 
00025 void
00026 ACE_Token::dump (void) const
00027 {
00028 #if defined (ACE_HAS_DUMP)
00029   ACE_TRACE ("ACE_Token::dump");
00030 
00031   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00032 
00033   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nthread = %d"), ACE_Thread::self ()));
00034   
00035   
00036   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nowner_ addr = %x"), &this->owner_));
00037   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nwaiters_ = %d"), this->waiters_));
00038   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nin_use_ = %d"), this->in_use_));
00039   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nnesting level = %d"), this->nesting_level_));
00040   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00041 #endif 
00042 }
00043 
00044 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00045                                                          ACE_thread_t t_id)
00046   : next_ (0),
00047     thread_id_ (t_id),
00048 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00049     cv_ (0),
00050 #else
00051     cv_ (m),
00052 #endif 
00053     runable_ (0)
00054 {
00055 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00056   ACE_UNUSED_ARG (m);
00057 #endif 
00058 
00059   ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00060 }
00061 
00062 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00063                                                          ACE_thread_t t_id,
00064                                                          ACE_Condition_Attributes &attributes)
00065   : next_ (0),
00066     thread_id_ (t_id),
00067 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00068     cv_ (0),
00069 #else
00070     cv_ (m, attributes),
00071 #endif 
00072     runable_ (0)
00073 {
00074 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00075   ACE_UNUSED_ARG (m);
00076   ACE_UNUSED_ARG (attributes);
00077 #endif 
00078 
00079   ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00080 }
00081 
00082 ACE_Token::ACE_Token_Queue::ACE_Token_Queue (void)
00083   : head_ (0),
00084     tail_ (0)
00085 {
00086   ACE_TRACE ("ACE_Token::ACE_Token_Queue::ACE_Token_Queue");
00087 }
00088 
00089 
00090 
00091 
00092 void
00093 ACE_Token::ACE_Token_Queue::remove_entry (ACE_Token::ACE_Token_Queue_Entry *entry)
00094 {
00095   ACE_TRACE ("ACE_Token::ACE_Token_Queue::remove_entry");
00096   ACE_Token_Queue_Entry *curr = 0;
00097   ACE_Token_Queue_Entry *prev = 0;
00098 
00099   if (this->head_ == 0)
00100     return;
00101 
00102   for (curr = this->head_;
00103        curr != 0 && curr != entry;
00104        curr = curr->next_)
00105     prev = curr;
00106 
00107   if (curr == 0)
00108     
00109     return;
00110   else if (prev == 0)
00111     
00112     this->head_ = this->head_->next_;
00113   else
00114     
00115     prev->next_ = curr->next_;
00116 
00117   
00118   
00119   if (curr->next_ == 0)
00120     this->tail_ = prev;
00121 }
00122 
00123 
00124 
00125 
00126 void
00127 ACE_Token::ACE_Token_Queue::insert_entry (ACE_Token::ACE_Token_Queue_Entry &entry,
00128                                           int requeue_position)
00129 {
00130   if (this->head_ == 0)
00131     {
00132       
00133       this->head_ = &entry;
00134       this->tail_ = &entry;
00135     }
00136   else if (requeue_position == -1)
00137     {
00138       
00139       this->tail_->next_ = &entry;
00140       this->tail_ = &entry;
00141     }
00142   else if (requeue_position == 0)
00143     {
00144       
00145       entry.next_ = this->head_;
00146       this->head_ = &entry;
00147     }
00148   else
00149     
00150     {
00151       
00152 
00153       ACE_Token::ACE_Token_Queue_Entry *insert_after = this->head_;
00154       while (requeue_position-- && insert_after->next_ != 0)
00155         insert_after = insert_after->next_;
00156 
00157       entry.next_ = insert_after->next_;
00158 
00159       if (entry.next_ == 0)
00160         this->tail_ = &entry;
00161 
00162       insert_after->next_ = &entry;
00163     }
00164 }
00165 
00166 ACE_Token::ACE_Token (const ACE_TCHAR *name, void *any)
00167   : lock_ (name, (ACE_mutexattr_t *) any),
00168     owner_ (ACE_OS::NULL_thread),
00169     in_use_ (0),
00170     waiters_ (0),
00171     nesting_level_ (0),
00172     attributes_ (USYNC_THREAD),
00173     queueing_strategy_ (FIFO)
00174 {
00175 
00176 }
00177 
00178 ACE_Token::~ACE_Token (void)
00179 {
00180   ACE_TRACE ("ACE_Token::~ACE_Token");
00181 }
00182 
00183 int
00184 ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
00185                            void *arg,
00186                            ACE_Time_Value *timeout,
00187                            ACE_Token_Op_Type op_type)
00188 {
00189   ACE_TRACE ("ACE_Token::shared_acquire");
00190   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00191 
00192 #if defined (DEBUGGING)
00193   this->dump ();
00194 #endif 
00195 
00196   ACE_thread_t thr_id = ACE_Thread::self ();
00197 
00198   
00199   if (!this->in_use_)
00200     {
00201       
00202       this->in_use_ = op_type;
00203       this->owner_ = thr_id;
00204       return 0;
00205     }
00206 
00207   
00208   
00209   
00210 
00211   
00212   if (ACE_OS::thr_equal (thr_id, this->owner_))
00213     {
00214       ++this->nesting_level_;
00215       return 0;
00216     }
00217 
00218   
00219   if (timeout != 0 && timeout->sec () == 0 && timeout->usec () == 0)
00220     {
00221       errno = ETIME;
00222       return -1;
00223     }
00224 
00225   
00226   
00227   
00228 
00229   
00230   ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN
00231                             ? &this->readers_
00232                             : &this->writers_);
00233 
00234   
00235   
00236   ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00237                                              thr_id,
00238                                              this->attributes_);
00239   queue->insert_entry (my_entry, this->queueing_strategy_);
00240   ++this->waiters_;
00241 
00242   
00243   
00244   
00245   int ret = 0;
00246   if (sleep_hook_func)
00247     {
00248       (*sleep_hook_func) (arg);
00249       ++ret;
00250     }
00251   else
00252     {
00253       
00254       this->sleep_hook ();
00255       ++ret;
00256     }
00257 
00258   int timed_out = 0;
00259   int error = 0;
00260 
00261   
00262   do
00263     {
00264       int result = my_entry.wait (timeout,
00265                                   this->lock_);
00266 
00267       if (result == -1)
00268         {
00269           
00270           
00271           if (errno == EINTR)
00272             continue;
00273 
00274 #if defined (DEBUGGING)
00275           cerr << '(' << ACE_Thread::self () << ')'
00276                << " acquire: "
00277                << (errno == ETIME ? "timed out" : "error occurred")
00278                << endl;
00279 #endif 
00280 
00281           
00282           
00283           if (errno == ETIME)
00284             timed_out = 1;
00285           else
00286             error = 1;
00287 
00288           
00289           break;
00290         }
00291     }
00292   while (!ACE_OS::thr_equal (thr_id, this->owner_));
00293 
00294   
00295   --this->waiters_;
00296   queue->remove_entry (&my_entry);
00297 
00298 #if defined (DEBUGGING)
00299   cerr << '(' << ACE_Thread::self () << ')'
00300        << " acquire (UNBLOCKED)" << endl;
00301 #endif 
00302 
00303   
00304   if (timed_out)
00305     {
00306       
00307       if (my_entry.runable_)
00308         {
00309           
00310           this->wakeup_next_waiter ();
00311         }
00312 
00313       
00314      return -1;
00315     }
00316   else if (error)
00317     {
00318       
00319       return -1;
00320     }
00321 
00322   
00323   ACE_ASSERT (my_entry.runable_);
00324 
00325   return ret;
00326 }
00327 
00328 
00329 
00330 
00331 void
00332 ACE_Token::sleep_hook (void)
00333 {
00334   ACE_TRACE ("ACE_Token::sleep_hook");
00335 }
00336 
00337 int
00338 ACE_Token::acquire (ACE_Time_Value *timeout)
00339 {
00340   ACE_TRACE ("ACE_Token::acquire");
00341   return this->shared_acquire (0, 0, timeout, ACE_Token::WRITE_TOKEN);
00342 }
00343 
00344 
00345 
00346 
00347 int
00348 ACE_Token::acquire (void (*sleep_hook_func)(void *),
00349                     void *arg,
00350                     ACE_Time_Value *timeout)
00351 {
00352   ACE_TRACE ("ACE_Token::acquire");
00353   return this->shared_acquire (sleep_hook_func, arg, timeout, ACE_Token::WRITE_TOKEN);
00354 }
00355 
00356 
00357 
00358 int
00359 ACE_Token::renew (int requeue_position,
00360                   ACE_Time_Value *timeout)
00361 {
00362   ACE_TRACE ("ACE_Token::renew");
00363   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00364 
00365 #if defined (DEBUGGING)
00366   this->dump ();
00367 #endif 
00368   
00369 
00370   
00371   
00372 
00373   
00374   if (this->writers_.head_ == 0 &&
00375       (this->in_use_ == ACE_Token::WRITE_TOKEN ||
00376        this->readers_.head_ == 0))
00377     
00378     return 0;
00379 
00380   
00381 
00382   
00383   ACE_Token::ACE_Token_Queue *this_threads_queue =
00384     this->in_use_ == ACE_Token::READ_TOKEN ?
00385     &this->readers_ : &this->writers_;
00386 
00387   ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00388                                              this->owner_);
00389 
00390   this_threads_queue->insert_entry (my_entry,
00391                                     
00392                                     
00393                                     
00394                                     requeue_position == 0 ? 0 : this->queueing_strategy_);
00395   ++this->waiters_;
00396 
00397   
00398   int const save_nesting_level_ = this->nesting_level_;
00399 
00400   
00401   this->nesting_level_ = 0;
00402 
00403   
00404   this->wakeup_next_waiter ();
00405 
00406   int timed_out = 0;
00407   int error = 0;
00408 
00409   
00410   do
00411     {
00412       int result = my_entry.wait (timeout,
00413                                   this->lock_);
00414 
00415       if (result == -1)
00416         {
00417           
00418           
00419           if (errno == EINTR)
00420             continue;
00421 
00422 #if defined (DEBUGGING)
00423           cerr << '(' << ACE_Thread::self () << ')'
00424                << " renew: "
00425                << (errno == ETIME ? "timed out" : "error occurred")
00426                << endl;
00427 #endif 
00428 
00429           
00430           
00431           if (errno == ETIME)
00432             timed_out = 1;
00433           else
00434             error = 1;
00435 
00436           
00437           break;
00438         }
00439     }
00440   while (!ACE_OS::thr_equal (my_entry.thread_id_, this->owner_));
00441 
00442   
00443   --this->waiters_;
00444   this_threads_queue->remove_entry (&my_entry);
00445 
00446 #if defined (DEBUGGING)
00447   cerr << '(' << ACE_Thread::self () << ')'
00448        << " acquire (UNBLOCKED)" << endl;
00449 #endif 
00450 
00451   
00452   if (timed_out)
00453     {
00454       
00455       if (my_entry.runable_)
00456         {
00457           
00458           this->wakeup_next_waiter ();
00459         }
00460 
00461       
00462      return -1;
00463     }
00464   else if (error)
00465     {
00466       
00467       return -1;
00468     }
00469 
00470   
00471   ACE_ASSERT (my_entry.runable_);
00472 
00473   
00474   this->nesting_level_ = save_nesting_level_;
00475 
00476   return 0;
00477 }
00478 
00479 
00480 
00481 
00482 int
00483 ACE_Token::release (void)
00484 {
00485   ACE_TRACE ("ACE_Token::release");
00486   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00487 
00488   
00489 
00490 #if defined (DEBUGGING)
00491   this->dump ();
00492 #endif 
00493 
00494   
00495   if (this->nesting_level_ > 0)
00496     --this->nesting_level_;
00497   else
00498     {
00499       
00500       
00501       
00502 
00503       
00504       this->wakeup_next_waiter ();
00505     }
00506 
00507   return 0;
00508 }
00509 
00510 void
00511 ACE_Token::wakeup_next_waiter (void)
00512 {
00513   ACE_TRACE ("ACE_Token::wakeup_next_waiter");
00514 
00515   
00516   this->owner_ = ACE_OS::NULL_thread;
00517   this->in_use_ = 0;
00518 
00519   
00520   if (this->writers_.head_ == 0 &&
00521       this->readers_.head_ == 0)
00522     {
00523       
00524       return;
00525     }
00526 
00527   
00528   ACE_Token_Queue *queue = 0;
00529 
00530   
00531   if (this->writers_.head_ != 0)
00532     {
00533       this->in_use_ = ACE_Token::WRITE_TOKEN;
00534       queue = &this->writers_;
00535     }
00536   else
00537     {
00538       this->in_use_ = ACE_Token::READ_TOKEN;
00539       queue = &this->readers_;
00540     }
00541 
00542   
00543   queue->head_->runable_ = 1;
00544   queue->head_->signal ();
00545 
00546   this->owner_ = queue->head_->thread_id_;
00547 }
00548 
00549 ACE_END_VERSIONED_NAMESPACE_DECL
00550 
00551 #endif