00001
00002
00003 #include "ace/Message_Queue.h"
00004 #include "ace/Message_Queue_NT.h"
00005 #include "ace/Log_Msg.h"
00006
00007 #if !defined (__ACE_INLINE__)
00008 #include "ace/Message_Queue_NT.inl"
00009 #endif
00010
00011 ACE_RCSID (ace,
00012 Message_Queue_NT,
00013 "$Id: Message_Queue_NT.cpp 88560 2010-01-15 05:02:05Z schmidt $")
00014
00015 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00016
00017 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00018
00019 ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
00020 : max_cthrs_ (max_threads),
00021 cur_thrs_ (0),
00022 cur_bytes_ (0),
00023 cur_length_ (0),
00024 cur_count_ (0),
00025 completion_port_ (ACE_INVALID_HANDLE)
00026 {
00027 ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
00028 this->open (max_threads);
00029 }
00030
00031 int
00032 ACE_Message_Queue_NT::open (DWORD max_threads)
00033 {
00034 ACE_TRACE ("ACE_Message_Queue_NT::open");
00035 this->max_cthrs_ = max_threads;
00036 this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
00037 0,
00038 ACE_Message_Queue_Base::ACTIVATED,
00039 max_threads);
00040 return (this->completion_port_ == 0 ? -1 : 0);
00041 }
00042
00043 int
00044 ACE_Message_Queue_NT::close (void)
00045 {
00046 ACE_TRACE ("ACE_Message_Queue_NT::close");
00047 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00048 this->deactivate ();
00049 return (::CloseHandle (this->completion_port_) ? 0 : -1 );
00050 }
00051
00052 ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
00053 {
00054 ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
00055 this->close ();
00056 }
00057
00058 int
00059 ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
00060 ACE_Time_Value *)
00061 {
00062 ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
00063 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00064 if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
00065 {
00066 size_t const msize = new_item->total_size ();
00067 size_t const mlength = new_item->total_length ();
00068
00069
00070 ULONG_PTR state_to_post;
00071 state_to_post = ACE_Message_Queue_Base::ACTIVATED;
00072 if (::PostQueuedCompletionStatus (this->completion_port_,
00073 static_cast<DWORD> (msize),
00074 state_to_post,
00075 reinterpret_cast<LPOVERLAPPED> (new_item)))
00076 {
00077
00078 this->cur_bytes_ += msize;
00079 this->cur_length_ += mlength;
00080 return ACE_Utils::truncate_cast<int> (++this->cur_count_);
00081 }
00082 }
00083 else
00084 errno = ESHUTDOWN;
00085
00086
00087 return -1;
00088 }
00089
00090 int
00091 ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
00092 ACE_Time_Value *timeout)
00093 {
00094 ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
00095
00096 {
00097 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00098
00099
00100 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
00101 {
00102 errno = ESHUTDOWN;
00103 return -1;
00104 }
00105 else
00106 ++this->cur_thrs_;
00107 }
00108
00109 ULONG_PTR queue_state;
00110 DWORD msize;
00111
00112 int retv = ::GetQueuedCompletionStatus (this->completion_port_,
00113 &msize,
00114 &queue_state,
00115 reinterpret_cast<LPOVERLAPPED *> (&first_item),
00116 (timeout == 0 ? INFINITE : timeout->msec ()));
00117 {
00118 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00119 --this->cur_thrs_;
00120 if (retv)
00121 {
00122 if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
00123 {
00124 --this->cur_count_;
00125 this->cur_bytes_ -= msize;
00126 this->cur_length_ -= first_item->total_length ();
00127 return ACE_Utils::truncate_cast<int> (this->cur_count_);
00128 }
00129 else
00130 errno = ESHUTDOWN;
00131 }
00132 }
00133 return -1;
00134 }
00135
00136 int
00137 ACE_Message_Queue_NT::deactivate (void)
00138 {
00139 ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
00140 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00141
00142 int const previous_state = this->state_;
00143 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00144 {
00145 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00146
00147
00148
00149 DWORD cntr =
00150 this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00151 while (cntr-- > 0)
00152 ::PostQueuedCompletionStatus (this->completion_port_,
00153 0,
00154 this->state_,
00155 0);
00156 }
00157 return previous_state;
00158 }
00159
00160 int
00161 ACE_Message_Queue_NT::activate (void)
00162 {
00163 ACE_TRACE ("ACE_Message_Queue_NT::activate");
00164 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00165 int const previous_status = this->state_;
00166 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00167 return previous_status;
00168 }
00169
00170 int
00171 ACE_Message_Queue_NT::pulse (void)
00172 {
00173 ACE_TRACE ("ACE_Message_Queue_NT::pulse");
00174 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00175
00176 int const previous_state = this->state_;
00177 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00178 {
00179 this->state_ = ACE_Message_Queue_Base::PULSED;
00180
00181
00182
00183
00184 DWORD cntr =
00185 this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00186 while (cntr-- > 0)
00187 ::PostQueuedCompletionStatus (this->completion_port_,
00188 0,
00189 this->state_,
00190 0);
00191 }
00192 return previous_state;
00193 }
00194
00195 void
00196 ACE_Message_Queue_NT::dump (void) const
00197 {
00198 #if defined (ACE_HAS_DUMP)
00199 ACE_TRACE ("ACE_Message_Queue_NT::dump");
00200
00201 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00202 switch (this->state_)
00203 {
00204 case ACE_Message_Queue_Base::ACTIVATED:
00205 ACE_DEBUG ((LM_DEBUG,
00206 ACE_TEXT ("state = ACTIVATED\n")));
00207 break;
00208 case ACE_Message_Queue_Base::DEACTIVATED:
00209 ACE_DEBUG ((LM_DEBUG,
00210 ACE_TEXT ("state = DEACTIVATED\n")));
00211 break;
00212 case ACE_Message_Queue_Base::PULSED:
00213 ACE_DEBUG ((LM_DEBUG,
00214 ACE_TEXT ("state = PULSED\n")));
00215 break;
00216 }
00217
00218 ACE_DEBUG ((LM_DEBUG,
00219 ACE_TEXT ("max_cthrs_ = %d\n")
00220 ACE_TEXT ("cur_thrs_ = %d\n")
00221 ACE_TEXT ("cur_bytes = %d\n")
00222 ACE_TEXT ("cur_length = %d\n")
00223 ACE_TEXT ("cur_count = %d\n")
00224 ACE_TEXT ("completion_port_ = %x\n"),
00225 this->max_cthrs_,
00226 this->cur_thrs_,
00227 this->cur_bytes_,
00228 this->cur_length_,
00229 this->cur_count_,
00230 this->completion_port_));
00231 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00232 #endif
00233 }
00234
00235 #endif
00236
00237 ACE_END_VERSIONED_NAMESPACE_DECL