Token.cpp

Go to the documentation of this file.
00001 // Token.cpp,v 4.44 2006/04/19 11:54:56 jwillemsen Exp
00002 
00003 #include "ace/Token.h"
00004 
00005 #if !defined (__ACE_INLINE__)
00006 #include "ace/Token.inl"
00007 #endif /* __ACE_INLINE__ */
00008 
00009 ACE_RCSID(ace, Token, "Token.cpp,v 4.44 2006/04/19 11:54:56 jwillemsen Exp")
00010 
00011 #if defined (ACE_HAS_THREADS)
00012 
00013 #include "ace/Thread.h"
00014 #include "ace/Log_Msg.h"
00015 
00016 #if defined (DEBUGGING)
00017 // FUZZ: disable check_for_streams_include
00018 #include "ace/streams.h"
00019 #endif /* DEBUGGING */
00020 
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Token)
00024 
00025 void
00026 ACE_Token::dump (void) const
00027 {
00028 #if defined (ACE_HAS_DUMP)
00029   ACE_TRACE ("ACE_Token::dump");
00030 
00031   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00032 
00033   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nthread = %d"), ACE_Thread::self ()));
00034   // @@ Is there a portable way to do this?
00035   // ACE_DEBUG ((LM_DEBUG, "\nowner_ = %d", (long) this->owner_));
00036   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nowner_ addr = %x"), &this->owner_));
00037   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nwaiters_ = %d"), this->waiters_));
00038   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nin_use_ = %d"), this->in_use_));
00039   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nnesting level = %d"), this->nesting_level_));
00040   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00041 #endif /* ACE_HAS_DUMP */
00042 }
00043 
00044 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00045                                                          ACE_thread_t t_id)
00046   : next_ (0),
00047     thread_id_ (t_id),
00048 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00049     cv_ (0),
00050 #else
00051     cv_ (m),
00052 #endif /* ACE_TOKEN_USES_SEMAPHORE */
00053     runable_ (0)
00054 {
00055 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00056   ACE_UNUSED_ARG (m);
00057 #endif /* ACE_TOKEN_USES_SEMAPHORE */
00058 
00059   ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00060 }
00061 
00062 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00063                                                          ACE_thread_t t_id,
00064                                                          ACE_Condition_Attributes &attributes)
00065   : next_ (0),
00066     thread_id_ (t_id),
00067 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00068     cv_ (0),
00069 #else
00070     cv_ (m, attributes),
00071 #endif /* ACE_TOKEN_USES_SEMAPHORE */
00072     runable_ (0)
00073 {
00074 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00075   ACE_UNUSED_ARG (m);
00076   ACE_UNUSED_ARG (attributes);
00077 #endif /* ACE_TOKEN_USES_SEMAPHORE */
00078 
00079   ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00080 }
00081 
00082 ACE_Token::ACE_Token_Queue::ACE_Token_Queue (void)
00083   : head_ (0),
00084     tail_ (0)
00085 {
00086   ACE_TRACE ("ACE_Token::ACE_Token_Queue::ACE_Token_Queue");
00087 }
00088 
00089 //
00090 // Remove an entry from the list.  Must be called with locks held.
00091 //
00092 void
00093 ACE_Token::ACE_Token_Queue::remove_entry (ACE_Token::ACE_Token_Queue_Entry *entry)
00094 {
00095   ACE_TRACE ("ACE_Token::ACE_Token_Queue::remove_entry");
00096   ACE_Token_Queue_Entry *curr = 0;
00097   ACE_Token_Queue_Entry *prev = 0;
00098 
00099   if (this->head_ == 0)
00100     return;
00101 
00102   for (curr = this->head_;
00103        curr != 0 && curr != entry;
00104        curr = curr->next_)
00105     prev = curr;
00106 
00107   if (curr == 0)
00108     // Didn't find the entry...
00109     return;
00110   else if (prev == 0)
00111     // Delete at the head.
00112     this->head_ = this->head_->next_;
00113   else
00114     // Delete in the middle.
00115     prev->next_ = curr->next_;
00116 
00117   // We need to update the tail of the list if we've deleted the last
00118   // entry.
00119   if (curr->next_ == 0)
00120     this->tail_ = prev;
00121 }
00122 
00123 //
00124 // Add an entry into the list.  Must be called with locks held.
00125 //
00126 void
00127 ACE_Token::ACE_Token_Queue::insert_entry (ACE_Token::ACE_Token_Queue_Entry &entry,
00128                                           int requeue_position)
00129 {
00130   if (this->head_ == 0)
00131     {
00132       // No other threads - just add me
00133       this->head_ = &entry;
00134       this->tail_ = &entry;
00135     }
00136   else if (requeue_position == -1)
00137     {
00138       // Insert at the end of the queue.
00139       this->tail_->next_ = &entry;
00140       this->tail_ = &entry;
00141     }
00142   else if (requeue_position == 0)
00143     {
00144       // Insert at head of queue.
00145       entry.next_ = this->head_;
00146       this->head_ = &entry;
00147     }
00148   else
00149     // Insert in the middle of the queue somewhere.
00150     {
00151       // Determine where our thread should go in the queue of waiters.
00152 
00153       ACE_Token::ACE_Token_Queue_Entry *insert_after = this->head_;
00154       while (requeue_position-- && insert_after->next_ != 0)
00155         insert_after = insert_after->next_;
00156 
00157       entry.next_ = insert_after->next_;
00158 
00159       if (entry.next_ == 0)
00160         this->tail_ = &entry;
00161 
00162       insert_after->next_ = &entry;
00163     }
00164 }
00165 
00166 ACE_Token::ACE_Token (const ACE_TCHAR *name, void *any)
00167   : lock_ (name, (ACE_mutexattr_t *) any),
00168     owner_ (ACE_OS::NULL_thread),
00169     in_use_ (0),
00170     waiters_ (0),
00171     nesting_level_ (0),
00172     attributes_ (USYNC_THREAD),
00173     queueing_strategy_ (FIFO)
00174 {
00175 //  ACE_TRACE ("ACE_Token::ACE_Token");
00176 }
00177 
00178 ACE_Token::~ACE_Token (void)
00179 {
00180   ACE_TRACE ("ACE_Token::~ACE_Token");
00181 }
00182 
00183 int
00184 ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
00185                            void *arg,
00186                            ACE_Time_Value *timeout,
00187                            ACE_Token_Op_Type op_type)
00188 {
00189   ACE_TRACE ("ACE_Token::shared_acquire");
00190   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00191 
00192 #if defined (DEBUGGING)
00193   this->dump ();
00194 #endif /* DEBUGGING */
00195 
00196   ACE_thread_t thr_id = ACE_Thread::self ();
00197 
00198   // Nobody holds the token.
00199   if (!this->in_use_)
00200     {
00201       // Its mine!
00202       this->in_use_ = op_type;
00203       this->owner_ = thr_id;
00204       return 0;
00205     }
00206 
00207   //
00208   // Someone already holds the token.
00209   //
00210 
00211   // Check if it is us.
00212   if (ACE_OS::thr_equal (thr_id, this->owner_))
00213     {
00214       ++this->nesting_level_;
00215       return 0;
00216     }
00217 
00218   // Do a quick check for "polling" behavior.
00219   if (timeout != 0 && timeout->sec () == 0 && timeout->usec () == 0)
00220     {
00221       errno = ETIME;
00222       return -1;
00223     }
00224 
00225   //
00226   // We've got to sleep until we get the token.
00227   //
00228 
00229   // Which queue we should end up in...
00230   ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN
00231                             ? &this->readers_
00232                             : &this->writers_);
00233 
00234   // Allocate queue entry on stack.  This works since we don't exit
00235   // this method's activation record until we've got the token.
00236   ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00237                                              thr_id,
00238                                              this->attributes_);
00239   queue->insert_entry (my_entry, this->queueing_strategy_);
00240   ++this->waiters_;
00241 
00242   // Execute appropriate <sleep_hook> callback.  (@@ should these
00243   // methods return a success/failure status, and if so, what should
00244   // we do with it?)
00245   int ret = 0;
00246   if (sleep_hook_func)
00247     {
00248       (*sleep_hook_func) (arg);
00249       ++ret;
00250     }
00251   else
00252     {
00253       // Execute virtual method.
00254       this->sleep_hook ();
00255       ++ret;
00256     }
00257 
00258   int timed_out = 0;
00259   int error = 0;
00260 
00261   // Sleep until we've got the token (ignore signals).
00262   do
00263     {
00264       int result = my_entry.wait (timeout,
00265                                   this->lock_);
00266 
00267       if (result == -1)
00268         {
00269           // Note, this should obey whatever thread-specific interrupt
00270           // policy is currently in place...
00271           if (errno == EINTR)
00272             continue;
00273 
00274 #if defined (DEBUGGING)
00275           cerr << '(' << ACE_Thread::self () << ')'
00276                << " acquire: "
00277                << (errno == ETIME ? "timed out" : "error occurred")
00278                << endl;
00279 #endif /* DEBUGGING */
00280 
00281           // We come here if a timeout occurs or some serious
00282           // ACE_Condition object error.
00283           if (errno == ETIME)
00284             timed_out = 1;
00285           else
00286             error = 1;
00287 
00288           // Stop the loop.
00289           break;
00290         }
00291     }
00292   while (!ACE_OS::thr_equal (thr_id, this->owner_));
00293 
00294   // Do this always and irrespective of the result of wait().
00295   --this->waiters_;
00296   queue->remove_entry (&my_entry);
00297 
00298 #if defined (DEBUGGING)
00299   cerr << '(' << ACE_Thread::self () << ')'
00300        << " acquire (UNBLOCKED)" << endl;
00301 #endif /* DEBUGGING */
00302 
00303   // If timeout occured
00304   if (timed_out)
00305     {
00306       // This thread was still selected to own the token.
00307       if (my_entry.runable_)
00308         {
00309           // Wakeup next waiter since this thread timed out.
00310           this->wakeup_next_waiter ();
00311         }
00312 
00313       // Return error.
00314      return -1;
00315     }
00316   else if (error)
00317     {
00318       // Return error.
00319       return -1;
00320     }
00321 
00322   // If this is a normal wakeup, this thread should be runnable.
00323   ACE_ASSERT (my_entry.runable_);
00324 
00325   return ret;
00326 }
00327 
00328 // By default this is a no-op.
00329 
00330 /* virtual */
00331 void
00332 ACE_Token::sleep_hook (void)
00333 {
00334   ACE_TRACE ("ACE_Token::sleep_hook");
00335 }
00336 
00337 int
00338 ACE_Token::acquire (ACE_Time_Value *timeout)
00339 {
00340   ACE_TRACE ("ACE_Token::acquire");
00341   return this->shared_acquire (0, 0, timeout, ACE_Token::WRITE_TOKEN);
00342 }
00343 
00344 // Acquire the token, sleeping until it is obtained or until <timeout>
00345 // expires.
00346 
00347 int
00348 ACE_Token::acquire (void (*sleep_hook_func)(void *),
00349                     void *arg,
00350                     ACE_Time_Value *timeout)
00351 {
00352   ACE_TRACE ("ACE_Token::acquire");
00353   return this->shared_acquire (sleep_hook_func, arg, timeout, ACE_Token::WRITE_TOKEN);
00354 }
00355 
00356 // Try to renew the token.
00357 
00358 int
00359 ACE_Token::renew (int requeue_position,
00360                   ACE_Time_Value *timeout)
00361 {
00362   ACE_TRACE ("ACE_Token::renew");
00363   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00364 
00365 #if defined (DEBUGGING)
00366   this->dump ();
00367 #endif /* DEBUGGING */
00368   // ACE_ASSERT (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_));
00369 
00370   // Check to see if there are any waiters worth giving up the lock
00371   // for.
00372 
00373   // If no writers and either we are a writer or there are no readers.
00374   if (this->writers_.head_ == 0 &&
00375       (this->in_use_ == ACE_Token::WRITE_TOKEN ||
00376        this->readers_.head_ == 0))
00377     // Immediate return.
00378     return 0;
00379 
00380   // We've got to sleep until we get the token again.
00381 
00382   // Determine which queue should this thread go to.
00383   ACE_Token::ACE_Token_Queue *this_threads_queue =
00384     this->in_use_ == ACE_Token::READ_TOKEN ?
00385     &this->readers_ : &this->writers_;
00386 
00387   ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00388                                              this->owner_);
00389 
00390   this_threads_queue->insert_entry (my_entry,
00391                                     // if requeue_position == 0 then we want to go next,
00392                                     // otherwise use the queueing strategy, which might also
00393                                     // happen to be 0.
00394                                     requeue_position == 0 ? 0 : this->queueing_strategy_);
00395   ++this->waiters_;
00396 
00397   // Remember nesting level...
00398   int const save_nesting_level_ = this->nesting_level_;
00399 
00400   // Reset state for new owner.
00401   this->nesting_level_ = 0;
00402 
00403   // Wakeup waiter.
00404   this->wakeup_next_waiter ();
00405 
00406   int timed_out = 0;
00407   int error = 0;
00408 
00409   // Sleep until we've got the token (ignore signals).
00410   do
00411     {
00412       int result = my_entry.wait (timeout,
00413                                   this->lock_);
00414 
00415       if (result == -1)
00416         {
00417           // Note, this should obey whatever thread-specific interrupt
00418           // policy is currently in place...
00419           if (errno == EINTR)
00420             continue;
00421 
00422 #if defined (DEBUGGING)
00423           cerr << '(' << ACE_Thread::self () << ')'
00424                << " renew: "
00425                << (errno == ETIME ? "timed out" : "error occurred")
00426                << endl;
00427 #endif /* DEBUGGING */
00428 
00429           // We come here if a timeout occurs or some serious
00430           // ACE_Condition object error.
00431           if (errno == ETIME)
00432             timed_out = 1;
00433           else
00434             error = 1;
00435 
00436           // Stop the loop.
00437           break;
00438         }
00439     }
00440   while (!ACE_OS::thr_equal (my_entry.thread_id_, this->owner_));
00441 
00442   // Do this always and irrespective of the result of wait().
00443   --this->waiters_;
00444   this_threads_queue->remove_entry (&my_entry);
00445 
00446 #if defined (DEBUGGING)
00447   cerr << '(' << ACE_Thread::self () << ')'
00448        << " acquire (UNBLOCKED)" << endl;
00449 #endif /* DEBUGGING */
00450 
00451   // If timeout occured
00452   if (timed_out)
00453     {
00454       // This thread was still selected to own the token.
00455       if (my_entry.runable_)
00456         {
00457           // Wakeup next waiter since this thread timed out.
00458           this->wakeup_next_waiter ();
00459         }
00460 
00461       // Return error.
00462      return -1;
00463     }
00464   else if (error)
00465     {
00466       // Return error.
00467       return -1;
00468     }
00469 
00470   // If this is a normal wakeup, this thread should be runnable.
00471   ACE_ASSERT (my_entry.runable_);
00472 
00473   // Reinstate nesting level.
00474   this->nesting_level_ = save_nesting_level_;
00475 
00476   return 0;
00477 }
00478 
00479 // Release the current holder of the token (which had
00480 // better be the caller's thread!).
00481 
00482 int
00483 ACE_Token::release (void)
00484 {
00485   ACE_TRACE ("ACE_Token::release");
00486   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00487 
00488   // ACE_ASSERT (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_));
00489 
00490 #if defined (DEBUGGING)
00491   this->dump ();
00492 #endif /* DEBUGGING */
00493 
00494   // Nested release...
00495   if (this->nesting_level_ > 0)
00496     --this->nesting_level_;
00497   else
00498     {
00499       //
00500       // Regular release...
00501       //
00502 
00503       // Wakeup waiter.
00504       this->wakeup_next_waiter ();
00505     }
00506 
00507   return 0;
00508 }
00509 
00510 void
00511 ACE_Token::wakeup_next_waiter (void)
00512 {
00513   ACE_TRACE ("ACE_Token::wakeup_next_waiter");
00514 
00515   // Reset state for new owner.
00516   this->owner_ = ACE_OS::NULL_thread;
00517   this->in_use_ = 0;
00518 
00519   // Any waiters...
00520   if (this->writers_.head_ == 0 &&
00521       this->readers_.head_ == 0)
00522     {
00523       // No more waiters...
00524       return;
00525     }
00526 
00527   // Wakeup next waiter.
00528   ACE_Token_Queue *queue = 0;
00529 
00530   // Writer threads get priority to run first.
00531   if (this->writers_.head_ != 0)
00532     {
00533       this->in_use_ = ACE_Token::WRITE_TOKEN;
00534       queue = &this->writers_;
00535     }
00536   else
00537     {
00538       this->in_use_ = ACE_Token::READ_TOKEN;
00539       queue = &this->readers_;
00540     }
00541 
00542   // Wake up waiter and make it runable.
00543   queue->head_->runable_ = 1;
00544   queue->head_->signal ();
00545 
00546   this->owner_ = queue->head_->thread_id_;
00547 }
00548 
00549 ACE_END_VERSIONED_NAMESPACE_DECL
00550 
00551 #endif /* ACE_HAS_THREADS */

Generated on Thu Nov 9 09:42:08 2006 for ACE by doxygen 1.3.6