00001
00002
00003 #include "ace/Message_Queue.h"
00004 #include "ace/Log_Msg.h"
00005
00006 #if !defined (__ACE_INLINE__)
00007 #include "ace/Message_Queue.inl"
00008 #endif
00009
00010
00011 ACE_RCSID (ace,
00012 Message_Queue,
00013 "$Id: Message_Queue.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00014
00015
00016 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 ACE_Message_Queue_Base::~ACE_Message_Queue_Base (void)
00019 {
00020 }
00021
00022 int
00023 ACE_Message_Queue_Base::state (void)
00024 {
00025 ACE_TRACE ("ACE_Message_Queue_Base::state");
00026
00027 return this->state_;
00028 }
00029
00030 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00031
00032 ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
00033 : max_cthrs_ (max_threads),
00034 cur_thrs_ (0),
00035 cur_bytes_ (0),
00036 cur_length_ (0),
00037 cur_count_ (0),
00038 completion_port_ (ACE_INVALID_HANDLE)
00039 {
00040 ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
00041 this->open (max_threads);
00042 }
00043
00044 int
00045 ACE_Message_Queue_NT::open (DWORD max_threads)
00046 {
00047 ACE_TRACE ("ACE_Message_Queue_NT::open");
00048 this->max_cthrs_ = max_threads;
00049 this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
00050 0,
00051 ACE_Message_Queue_Base::ACTIVATED,
00052 max_threads);
00053 return (this->completion_port_ == 0 ? -1 : 0);
00054 }
00055
00056 int
00057 ACE_Message_Queue_NT::close (void)
00058 {
00059 ACE_TRACE ("ACE_Message_Queue_NT::close");
00060 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00061 this->deactivate ();
00062 return (::CloseHandle (this->completion_port_) ? 0 : -1 );
00063 }
00064
00065 ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
00066 {
00067 ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
00068 this->close ();
00069 }
00070
00071 int
00072 ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
00073 ACE_Time_Value *)
00074 {
00075 ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
00076 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00077 if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
00078 {
00079 size_t const msize = new_item->total_size ();
00080 size_t const mlength = new_item->total_length ();
00081
00082
00083 ULONG_PTR state_to_post;
00084 state_to_post = ACE_Message_Queue_Base::ACTIVATED;
00085 if (::PostQueuedCompletionStatus (this->completion_port_,
00086 static_cast<DWORD> (msize),
00087 state_to_post,
00088 reinterpret_cast<LPOVERLAPPED> (new_item)))
00089 {
00090
00091 this->cur_bytes_ += msize;
00092 this->cur_length_ += mlength;
00093 return ACE_Utils::truncate_cast<int> (++this->cur_count_);
00094 }
00095 }
00096 else
00097 errno = ESHUTDOWN;
00098
00099
00100 return -1;
00101 }
00102
00103 int
00104 ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
00105 ACE_Time_Value *timeout)
00106 {
00107 ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
00108
00109 {
00110 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00111
00112
00113 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
00114 {
00115 errno = ESHUTDOWN;
00116 return -1;
00117 }
00118 else
00119 ++this->cur_thrs_;
00120 }
00121
00122 ULONG_PTR queue_state;
00123 DWORD msize;
00124
00125 int retv = ::GetQueuedCompletionStatus (this->completion_port_,
00126 &msize,
00127 &queue_state,
00128 reinterpret_cast<LPOVERLAPPED *> (&first_item),
00129 (timeout == 0 ? INFINITE : timeout->msec ()));
00130 {
00131 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00132 --this->cur_thrs_;
00133 if (retv)
00134 {
00135 if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
00136 {
00137 --this->cur_count_;
00138 this->cur_bytes_ -= msize;
00139 this->cur_length_ -= first_item->total_length ();
00140 return ACE_Utils::truncate_cast<int> (this->cur_count_);
00141 }
00142 else
00143 errno = ESHUTDOWN;
00144 }
00145 }
00146 return -1;
00147 }
00148
00149 int
00150 ACE_Message_Queue_NT::deactivate (void)
00151 {
00152 ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
00153 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00154
00155 int const previous_state = this->state_;
00156 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00157 {
00158 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00159
00160
00161
00162 DWORD cntr =
00163 this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00164 while (cntr-- > 0)
00165 ::PostQueuedCompletionStatus (this->completion_port_,
00166 0,
00167 this->state_,
00168 0);
00169 }
00170 return previous_state;
00171 }
00172
00173 int
00174 ACE_Message_Queue_NT::activate (void)
00175 {
00176 ACE_TRACE ("ACE_Message_Queue_NT::activate");
00177 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00178 int const previous_status = this->state_;
00179 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00180 return previous_status;
00181 }
00182
00183 int
00184 ACE_Message_Queue_NT::pulse (void)
00185 {
00186 ACE_TRACE ("ACE_Message_Queue_NT::pulse");
00187 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00188
00189 int const previous_state = this->state_;
00190 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00191 {
00192 this->state_ = ACE_Message_Queue_Base::PULSED;
00193
00194
00195
00196
00197 DWORD cntr =
00198 this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00199 while (cntr-- > 0)
00200 ::PostQueuedCompletionStatus (this->completion_port_,
00201 0,
00202 this->state_,
00203 0);
00204 }
00205 return previous_state;
00206 }
00207
00208 void
00209 ACE_Message_Queue_NT::dump (void) const
00210 {
00211 #if defined (ACE_HAS_DUMP)
00212 ACE_TRACE ("ACE_Message_Queue_NT::dump");
00213
00214 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00215 switch (this->state_)
00216 {
00217 case ACE_Message_Queue_Base::ACTIVATED:
00218 ACE_DEBUG ((LM_DEBUG,
00219 ACE_TEXT ("state = ACTIVATED\n")));
00220 break;
00221 case ACE_Message_Queue_Base::DEACTIVATED:
00222 ACE_DEBUG ((LM_DEBUG,
00223 ACE_TEXT ("state = DEACTIVATED\n")));
00224 break;
00225 case ACE_Message_Queue_Base::PULSED:
00226 ACE_DEBUG ((LM_DEBUG,
00227 ACE_TEXT ("state = PULSED\n")));
00228 break;
00229 }
00230
00231 ACE_DEBUG ((LM_DEBUG,
00232 ACE_TEXT ("max_cthrs_ = %d\n")
00233 ACE_TEXT ("cur_thrs_ = %d\n")
00234 ACE_TEXT ("cur_bytes = %d\n")
00235 ACE_TEXT ("cur_length = %d\n")
00236 ACE_TEXT ("cur_count = %d\n")
00237 ACE_TEXT ("completion_port_ = %x\n"),
00238 this->max_cthrs_,
00239 this->cur_thrs_,
00240 this->cur_bytes_,
00241 this->cur_length_,
00242 this->cur_count_,
00243 this->completion_port_));
00244 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00245 #endif
00246 }
00247
00248 #endif
00249
00250 ACE_END_VERSIONED_NAMESPACE_DECL