Message_Queue_NT.cpp

Go to the documentation of this file.
00001 // $Id: Message_Queue_NT.cpp 80826 2008-03-04 14:51:23Z wotte $
00002 
00003 #include "ace/Message_Queue_NT.h"
00004 #include "ace/Log_Msg.h"
00005 
00006 #if !defined (__ACE_INLINE__)
00007 #include "ace/Message_Queue_NT.inl"
00008 #endif /* __ACE_INLINE__ */
00009 
00010 ACE_RCSID (ace,
00011            Message_Queue_NT,
00012            "$Id: Message_Queue_NT.cpp 80826 2008-03-04 14:51:23Z wotte $")
00013 
00014 
00015 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00016 
00017 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00018 
00019 ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
00020   : max_cthrs_ (max_threads),
00021     cur_thrs_ (0),
00022     cur_bytes_ (0),
00023     cur_length_ (0),
00024     cur_count_ (0),
00025     completion_port_ (ACE_INVALID_HANDLE)
00026 {
00027   ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
00028   this->open (max_threads);
00029 }
00030 
00031 int
00032 ACE_Message_Queue_NT::open (DWORD max_threads)
00033 {
00034   ACE_TRACE ("ACE_Message_Queue_NT::open");
00035   this->max_cthrs_ = max_threads;
00036   this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
00037                                                      0,
00038                                                      ACE_Message_Queue_Base::ACTIVATED,
00039                                                      max_threads);
00040   return (this->completion_port_ == 0 ? -1 : 0);
00041 }
00042 
00043 int
00044 ACE_Message_Queue_NT::close (void)
00045 {
00046   ACE_TRACE ("ACE_Message_Queue_NT::close");
00047   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00048   this->deactivate ();
00049   return (::CloseHandle (this->completion_port_) ? 0 : -1 );
00050 }
00051 
00052 ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
00053 {
00054   ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
00055   this->close ();
00056 }
00057 
00058 int
00059 ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
00060                                ACE_Time_Value *)
00061 {
00062   ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
00063   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00064   if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
00065     {
00066       size_t const msize = new_item->total_size ();
00067       size_t const mlength = new_item->total_length ();
00068       // Note - we send ACTIVATED in the 3rd arg to tell the completion
00069       // routine it's _NOT_ being woken up because of deactivate().
00070       ULONG_PTR state_to_post;
00071       state_to_post = ACE_Message_Queue_Base::ACTIVATED;
00072       if (::PostQueuedCompletionStatus (this->completion_port_,
00073                                         static_cast<DWORD> (msize),
00074                                         state_to_post,
00075                                         reinterpret_cast<LPOVERLAPPED> (new_item)))
00076         {
00077           // Update the states once I succeed.
00078           this->cur_bytes_ += msize;
00079           this->cur_length_ += mlength;
00080           return ACE_Utils::truncate_cast<int> (++this->cur_count_);
00081         }
00082     }
00083   else
00084     errno = ESHUTDOWN;
00085 
00086   // Fail to enqueue the message.
00087   return -1;
00088 }
00089 
00090 int
00091 ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
00092                                ACE_Time_Value *timeout)
00093 {
00094   ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
00095 
00096   {
00097     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00098 
00099     // Make sure the MQ is not deactivated before proceeding.
00100     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
00101       {
00102         errno = ESHUTDOWN;      // Operation on deactivated MQ not allowed.
00103         return -1;
00104       }
00105     else
00106       ++this->cur_thrs_;        // Increase the waiting thread count.
00107   }
00108 
00109   ULONG_PTR queue_state;
00110   DWORD msize;
00111   // Get a message from the completion port.
00112   int retv = ::GetQueuedCompletionStatus (this->completion_port_,
00113                                           &msize,
00114                                           &queue_state,
00115                                           reinterpret_cast<LPOVERLAPPED *> (&first_item),
00116                                           (timeout == 0 ? INFINITE : timeout->msec ()));
00117   {
00118     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00119     --this->cur_thrs_;          // Decrease waiting thread count.
00120     if (retv)
00121       {
00122         if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
00123           {                     // Really get a valid MB from the queue.
00124             --this->cur_count_;
00125             this->cur_bytes_ -= msize;
00126             this->cur_length_ -= first_item->total_length ();
00127             return ACE_Utils::truncate_cast<int> (this->cur_count_);
00128           }
00129         else                    // Woken up by deactivate () or pulse ().
00130             errno = ESHUTDOWN;
00131       }
00132   }
00133   return -1;
00134 }
00135 
00136 int
00137 ACE_Message_Queue_NT::deactivate (void)
00138 {
00139   ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
00140   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00141 
00142   int const previous_state = this->state_;
00143   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00144     {
00145       this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00146 
00147       // Get the number of shutdown messages necessary to wake up all
00148       // waiting threads.
00149       DWORD cntr =
00150         this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00151       while (cntr-- > 0)
00152         ::PostQueuedCompletionStatus (this->completion_port_,
00153                                       0,
00154                                       this->state_,
00155                                       0);
00156     }
00157   return previous_state;
00158 }
00159 
00160 int
00161 ACE_Message_Queue_NT::activate (void)
00162 {
00163   ACE_TRACE ("ACE_Message_Queue_NT::activate");
00164   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00165   int const previous_status = this->state_;
00166   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00167   return previous_status;
00168 }
00169 
00170 int
00171 ACE_Message_Queue_NT::pulse (void)
00172 {
00173   ACE_TRACE ("ACE_Message_Queue_NT::pulse");
00174   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00175 
00176   int const previous_state = this->state_;
00177   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00178     {
00179       this->state_ = ACE_Message_Queue_Base::PULSED;
00180 
00181       // Get the number of shutdown messages necessary to wake up all
00182       // waiting threads.
00183 
00184       DWORD cntr =
00185         this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00186       while (cntr-- > 0)
00187         ::PostQueuedCompletionStatus (this->completion_port_,
00188                                       0,
00189                                       this->state_,
00190                                       0);
00191     }
00192   return previous_state;
00193 }
00194 
00195 void
00196 ACE_Message_Queue_NT::dump (void) const
00197 {
00198 #if defined (ACE_HAS_DUMP)
00199   ACE_TRACE ("ACE_Message_Queue_NT::dump");
00200 
00201   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00202   switch (this->state_)
00203     {
00204     case ACE_Message_Queue_Base::ACTIVATED:
00205       ACE_DEBUG ((LM_DEBUG,
00206                   ACE_TEXT ("state = ACTIVATED\n")));
00207       break;
00208     case ACE_Message_Queue_Base::DEACTIVATED:
00209       ACE_DEBUG ((LM_DEBUG,
00210                   ACE_TEXT ("state = DEACTIVATED\n")));
00211       break;
00212     case ACE_Message_Queue_Base::PULSED:
00213       ACE_DEBUG ((LM_DEBUG,
00214                   ACE_TEXT ("state = PULSED\n")));
00215       break;
00216     }
00217 
00218   ACE_DEBUG ((LM_DEBUG,
00219               ACE_TEXT ("max_cthrs_ = %d\n")
00220               ACE_TEXT ("cur_thrs_ = %d\n")
00221               ACE_TEXT ("cur_bytes = %d\n")
00222               ACE_TEXT ("cur_length = %d\n")
00223               ACE_TEXT ("cur_count = %d\n")
00224               ACE_TEXT ("completion_port_ = %x\n"),
00225               this->max_cthrs_,
00226               this->cur_thrs_,
00227               this->cur_bytes_,
00228               this->cur_length_,
00229               this->cur_count_,
00230               this->completion_port_));
00231   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00232 #endif /* ACE_HAS_DUMP */
00233 }
00234 
00235 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
00236 
00237 ACE_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:18:40 2010 for ACE by  doxygen 1.4.7