Message_Queue.cpp

Go to the documentation of this file.
00001 // $Id: Message_Queue.cpp 79134 2007-07-31 18:23:50Z johnnyw $
00002 
00003 #include "ace/Message_Queue.h"
00004 #include "ace/Log_Msg.h"
00005 
00006 #if !defined (__ACE_INLINE__)
00007 #include "ace/Message_Queue.inl"
00008 #endif /* __ACE_INLINE__ */
00009 
00010 
00011 ACE_RCSID (ace,
00012            Message_Queue,
00013            "$Id: Message_Queue.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00014 
00015 
00016 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00017 
00018 ACE_Message_Queue_Base::~ACE_Message_Queue_Base (void)
00019 {
00020 }
00021 
00022 int
00023 ACE_Message_Queue_Base::state (void)
00024 {
00025   ACE_TRACE ("ACE_Message_Queue_Base::state");
00026 
00027   return this->state_;
00028 }
00029 
00030 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00031 
00032 ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
00033   : max_cthrs_ (max_threads),
00034     cur_thrs_ (0),
00035     cur_bytes_ (0),
00036     cur_length_ (0),
00037     cur_count_ (0),
00038     completion_port_ (ACE_INVALID_HANDLE)
00039 {
00040   ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
00041   this->open (max_threads);
00042 }
00043 
00044 int
00045 ACE_Message_Queue_NT::open (DWORD max_threads)
00046 {
00047   ACE_TRACE ("ACE_Message_Queue_NT::open");
00048   this->max_cthrs_ = max_threads;
00049   this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
00050                                                      0,
00051                                                      ACE_Message_Queue_Base::ACTIVATED,
00052                                                      max_threads);
00053   return (this->completion_port_ == 0 ? -1 : 0);
00054 }
00055 
00056 int
00057 ACE_Message_Queue_NT::close (void)
00058 {
00059   ACE_TRACE ("ACE_Message_Queue_NT::close");
00060   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00061   this->deactivate ();
00062   return (::CloseHandle (this->completion_port_) ? 0 : -1 );
00063 }
00064 
00065 ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
00066 {
00067   ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
00068   this->close ();
00069 }
00070 
00071 int
00072 ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
00073                                ACE_Time_Value *)
00074 {
00075   ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
00076   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00077   if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
00078     {
00079       size_t const msize = new_item->total_size ();
00080       size_t const mlength = new_item->total_length ();
00081       // Note - we send ACTIVATED in the 3rd arg to tell the completion
00082       // routine it's _NOT_ being woken up because of deactivate().
00083       ULONG_PTR state_to_post;
00084       state_to_post = ACE_Message_Queue_Base::ACTIVATED;
00085       if (::PostQueuedCompletionStatus (this->completion_port_,
00086                                         static_cast<DWORD> (msize),
00087                                         state_to_post,
00088                                         reinterpret_cast<LPOVERLAPPED> (new_item)))
00089         {
00090           // Update the states once I succeed.
00091           this->cur_bytes_ += msize;
00092           this->cur_length_ += mlength;
00093           return ACE_Utils::truncate_cast<int> (++this->cur_count_);
00094         }
00095     }
00096   else
00097     errno = ESHUTDOWN;
00098 
00099   // Fail to enqueue the message.
00100   return -1;
00101 }
00102 
00103 int
00104 ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
00105                                ACE_Time_Value *timeout)
00106 {
00107   ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
00108 
00109   {
00110     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00111 
00112     // Make sure the MQ is not deactivated before proceeding.
00113     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
00114       {
00115         errno = ESHUTDOWN;      // Operation on deactivated MQ not allowed.
00116         return -1;
00117       }
00118     else
00119       ++this->cur_thrs_;        // Increase the waiting thread count.
00120   }
00121 
00122   ULONG_PTR queue_state;
00123   DWORD msize;
00124   // Get a message from the completion port.
00125   int retv = ::GetQueuedCompletionStatus (this->completion_port_,
00126                                           &msize,
00127                                           &queue_state,
00128                                           reinterpret_cast<LPOVERLAPPED *> (&first_item),
00129                                           (timeout == 0 ? INFINITE : timeout->msec ()));
00130   {
00131     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00132     --this->cur_thrs_;          // Decrease waiting thread count.
00133     if (retv)
00134       {
00135         if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
00136           {                     // Really get a valid MB from the queue.
00137             --this->cur_count_;
00138             this->cur_bytes_ -= msize;
00139             this->cur_length_ -= first_item->total_length ();
00140             return ACE_Utils::truncate_cast<int> (this->cur_count_);
00141           }
00142         else                    // Woken up by deactivate () or pulse ().
00143             errno = ESHUTDOWN;
00144       }
00145   }
00146   return -1;
00147 }
00148 
00149 int
00150 ACE_Message_Queue_NT::deactivate (void)
00151 {
00152   ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
00153   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00154 
00155   int const previous_state = this->state_;
00156   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00157     {
00158       this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00159 
00160       // Get the number of shutdown messages necessary to wake up all
00161       // waiting threads.
00162       DWORD cntr =
00163         this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00164       while (cntr-- > 0)
00165         ::PostQueuedCompletionStatus (this->completion_port_,
00166                                       0,
00167                                       this->state_,
00168                                       0);
00169     }
00170   return previous_state;
00171 }
00172 
00173 int
00174 ACE_Message_Queue_NT::activate (void)
00175 {
00176   ACE_TRACE ("ACE_Message_Queue_NT::activate");
00177   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00178   int const previous_status = this->state_;
00179   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00180   return previous_status;
00181 }
00182 
00183 int
00184 ACE_Message_Queue_NT::pulse (void)
00185 {
00186   ACE_TRACE ("ACE_Message_Queue_NT::pulse");
00187   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00188 
00189   int const previous_state = this->state_;
00190   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00191     {
00192       this->state_ = ACE_Message_Queue_Base::PULSED;
00193 
00194       // Get the number of shutdown messages necessary to wake up all
00195       // waiting threads.
00196 
00197       DWORD cntr =
00198         this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00199       while (cntr-- > 0)
00200         ::PostQueuedCompletionStatus (this->completion_port_,
00201                                       0,
00202                                       this->state_,
00203                                       0);
00204     }
00205   return previous_state;
00206 }
00207 
00208 void
00209 ACE_Message_Queue_NT::dump (void) const
00210 {
00211 #if defined (ACE_HAS_DUMP)
00212   ACE_TRACE ("ACE_Message_Queue_NT::dump");
00213 
00214   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00215   switch (this->state_)
00216     {
00217     case ACE_Message_Queue_Base::ACTIVATED:
00218       ACE_DEBUG ((LM_DEBUG,
00219                   ACE_TEXT ("state = ACTIVATED\n")));
00220       break;
00221     case ACE_Message_Queue_Base::DEACTIVATED:
00222       ACE_DEBUG ((LM_DEBUG,
00223                   ACE_TEXT ("state = DEACTIVATED\n")));
00224       break;
00225     case ACE_Message_Queue_Base::PULSED:
00226       ACE_DEBUG ((LM_DEBUG,
00227                   ACE_TEXT ("state = PULSED\n")));
00228       break;
00229     }
00230 
00231   ACE_DEBUG ((LM_DEBUG,
00232               ACE_TEXT ("max_cthrs_ = %d\n")
00233               ACE_TEXT ("cur_thrs_ = %d\n")
00234               ACE_TEXT ("cur_bytes = %d\n")
00235               ACE_TEXT ("cur_length = %d\n")
00236               ACE_TEXT ("cur_count = %d\n")
00237               ACE_TEXT ("completion_port_ = %x\n"),
00238               this->max_cthrs_,
00239               this->cur_thrs_,
00240               this->cur_bytes_,
00241               this->cur_length_,
00242               this->cur_count_,
00243               this->completion_port_));
00244   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00245 #endif /* ACE_HAS_DUMP */
00246 }
00247 
00248 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
00249 
00250 ACE_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 12:05:31 2008 for ACE by doxygen 1.3.6