Message_Queue.cpp

Go to the documentation of this file.
00001 // Message_Queue.cpp,v 4.76 2006/05/30 10:57:22 jwillemsen Exp
00002 
00003 #include "ace/Message_Queue.h"
00004 #include "ace/Log_Msg.h"
00005 
00006 #if !defined (__ACE_INLINE__)
00007 #include "ace/Message_Queue.inl"
00008 #endif /* __ACE_INLINE__ */
00009 
00010 
00011 ACE_RCSID (ace,
00012            Message_Queue,
00013            "Message_Queue.cpp,v 4.76 2006/05/30 10:57:22 jwillemsen Exp")
00014 
00015 
00016 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00017 
00018 ACE_Message_Queue_Base::~ACE_Message_Queue_Base (void)
00019 {
00020 }
00021 
00022 int
00023 ACE_Message_Queue_Base::state (void)
00024 {
00025   ACE_TRACE ("ACE_Message_Queue_Base::state");
00026 
00027   return this->state_;
00028 }
00029 
00030 #if defined (ACE_VXWORKS)
00031 
00032 ////////////////////////////////
00033 // class ACE_Message_Queue_Vx //
00034 ////////////////////////////////
00035 
00036 void
00037 ACE_Message_Queue_Vx::dump (void) const
00038 {
00039 #if defined (ACE_HAS_DUMP)
00040   ACE_TRACE ("ACE_Message_Queue_Vx::dump");
00041   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00042   switch (this->state_)
00043     {
00044     case ACE_Message_Queue_Base::ACTIVATED:
00045       ACE_DEBUG ((LM_DEBUG,
00046                   ACE_LIB_TEXT ("state = ACTIVATED\n")));
00047       break;
00048     case ACE_Message_Queue_Base::DEACTIVATED:
00049       ACE_DEBUG ((LM_DEBUG,
00050                   ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00051       break;
00052     case ACE_Message_Queue_Base::PULSED:
00053       ACE_DEBUG ((LM_DEBUG,
00054                   ACE_LIB_TEXT ("state = PULSED\n")));
00055       break;
00056     }
00057   ACE_DEBUG ((LM_DEBUG,
00058               ACE_LIB_TEXT ("low_water_mark = %d\n")
00059               ACE_LIB_TEXT ("high_water_mark = %d\n")
00060               ACE_LIB_TEXT ("cur_bytes = %d\n")
00061               ACE_LIB_TEXT ("cur_length = %d\n")
00062               ACE_LIB_TEXT ("cur_count = %d\n")
00063               ACE_LIB_TEXT ("head_ = %u\n")
00064               ACE_LIB_TEXT ("MSG_Q_ID = %u\n"),
00065               this->low_water_mark_,
00066               this->high_water_mark_,
00067               this->cur_bytes_,
00068               this->cur_length_,
00069               this->cur_count_,
00070               this->head_,
00071               this->tail_));
00072   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00073 #endif /* ACE_HAS_DUMP */
00074 }
00075 
00076 ACE_Message_Queue_Vx::ACE_Message_Queue_Vx (size_t max_messages,
00077                                             size_t max_message_length,
00078                                             ACE_Notification_Strategy *ns)
00079   : ACE_Message_Queue<ACE_NULL_SYNCH> (0, 0, ns),
00080     max_messages_ (static_cast<int> (max_messages)),
00081     max_message_length_ (static_cast<int> (max_message_length))
00082 {
00083   ACE_TRACE ("ACE_Message_Queue_Vx::ACE_Message_Queue_Vx");
00084 
00085   if (this->open (max_messages_, max_message_length_, ns) == -1)
00086     ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("open")));
00087 }
00088 
00089 ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx (void)
00090 {
00091   ACE_TRACE ("ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx");
00092 
00093   if (this->tail_ != 0  &&  this->close () == -1)
00094     ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("close")));
00095 }
00096 
00097 // Don't bother locking since if someone calls this function more than
00098 // once for the same queue, we're in bigger trouble than just
00099 // concurrency control!
00100 
00101 int
00102 ACE_Message_Queue_Vx::open (size_t max_messages,
00103                             size_t max_message_length,
00104                             ACE_Notification_Strategy *ns)
00105 {
00106   ACE_TRACE ("ACE_Message_Queue_Vx::open");
00107   this->high_water_mark_ = 0;
00108   this->low_water_mark_  = 0;
00109   this->cur_bytes_ = 0;
00110   this->cur_length_ = 0;
00111   this->cur_count_ = 0;
00112   this->head_ = 0;
00113   this->notification_strategy_ = ns;
00114   this->max_messages_ = static_cast<int> (max_messages);
00115   this->max_message_length_ = static_cast<int> (max_message_length);
00116 
00117   if (tail_)
00118     {
00119       // Had already created a msgQ, so delete it.
00120       close ();
00121       activate_i ();
00122     }
00123 
00124   return (this->tail_ =
00125           reinterpret_cast<ACE_Message_Block *> (
00126               ::msgQCreate (max_messages_,
00127                             max_message_length_,
00128                             MSG_Q_FIFO))) == 0 ? -1 : 0;
00129 }
00130 
00131 // Clean up the queue if we have not already done so!
00132 
00133 int
00134 ACE_Message_Queue_Vx::close (void)
00135 {
00136   ACE_TRACE ("ACE_Message_Queue_Vx::close");
00137   // Don't lock, because we don't have a lock.  It shouldn't be
00138   // necessary, anyways.
00139 
00140   this->deactivate_i ();
00141 
00142   // Don't bother to free up the remaining message on the list,
00143   // because we don't have any way to iterate over what's in the
00144   // queue.
00145 
00146   return ::msgQDelete (msgq ());
00147 }
00148 
00149 
00150 int
00151 ACE_Message_Queue_Vx::is_empty_i (void)
00152 {
00153   ACE_TRACE ("ACE_Message_Queue_Vx::is_empty_i");
00154   return ::msgQNumMsgs (msgq ()) == 0;
00155 }
00156 
00157 int
00158 ACE_Message_Queue_Vx::is_full_i (void)
00159 {
00160   ACE_TRACE ("ACE_Message_Queue_Vx::is_full_i");
00161   return ::msgQNumMsgs (msgq ()) >= max_messages_;
00162 }
00163 
00164 size_t
00165 ACE_Message_Queue_Vx::high_water_mark (void)
00166 {
00167   ACE_TRACE ("ACE_Message_Queue_Vx::high_water_mark");
00168   ACE_NOTSUP_RETURN ((size_t) -1);
00169 }
00170 
00171 void
00172 ACE_Message_Queue_Vx::high_water_mark (size_t)
00173 {
00174   ACE_TRACE ("ACE_Message_Queue_Vx::high_water_mark");
00175   ACE_NOTSUP;
00176 }
00177 
00178 size_t
00179 ACE_Message_Queue_Vx::low_water_mark (void)
00180 {
00181   ACE_TRACE ("ACE_Message_Queue_Vx::low_water_mark");
00182   // Don't need to guard, because this is fixed.
00183 
00184   ACE_NOTSUP_RETURN ((size_t) -1);
00185 }
00186 
00187 void
00188 ACE_Message_Queue_Vx::low_water_mark (size_t)
00189 {
00190   ACE_TRACE ("ACE_Message_Queue_Vx::low_water_mark");
00191   ACE_NOTSUP;
00192 }
00193 
00194 size_t
00195 ACE_Message_Queue_Vx::message_bytes (void)
00196 {
00197   ACE_TRACE ("ACE_Message_Queue_Vx::message_bytes");
00198   ACE_NOTSUP_RETURN ((size_t) -1);
00199 }
00200 
00201 size_t
00202 ACE_Message_Queue_Vx::message_length (void)
00203 {
00204   ACE_TRACE ("ACE_Message_Queue_Vx::message_length");
00205   ACE_NOTSUP_RETURN ((size_t) -1);
00206 }
00207 
00208 size_t
00209 ACE_Message_Queue_Vx::message_count (void)
00210 {
00211   ACE_TRACE ("ACE_Message_Queue_Vx::message_count");
00212   // Don't need to guard, because this is a system call.
00213 
00214   return ::msgQNumMsgs (msgq ());
00215 }
00216 
00217 void
00218 ACE_Message_Queue_Vx::message_bytes (size_t)
00219 {
00220   ACE_TRACE ("ACE_Message_Queue_Vx::message_bytes");
00221   ACE_NOTSUP;
00222 }
00223 
00224 void
00225 ACE_Message_Queue_Vx::message_length (size_t)
00226 {
00227   ACE_TRACE ("ACE_Message_Queue_Vx::message_length");
00228   ACE_NOTSUP;
00229 }
00230 
00231 int
00232 ACE_Message_Queue_Vx::signal_enqueue_waiters (void)
00233 {
00234   // No-op.
00235   return 0;
00236 }
00237 
00238 int
00239 ACE_Message_Queue_Vx::signal_dequeue_waiters (void)
00240 {
00241   // No-op.
00242   return 0;
00243 }
00244 
00245 int
00246 ACE_Message_Queue_Vx::enqueue_tail_i (ACE_Message_Block *new_item)
00247 {
00248   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_tail_i");
00249 
00250   if (new_item == 0)
00251     return -1;
00252 
00253   // Don't try to send a composite message!!!!  Only the first
00254   // block will be sent.
00255 
00256   ++this->cur_count_;
00257 
00258   // Always use this method to actually send a message on the queue.
00259   if (::msgQSend (msgq (),
00260                   new_item->rd_ptr (),
00261                   new_item->size (),
00262                   WAIT_FOREVER,
00263                   MSG_PRI_NORMAL) == OK)
00264     return ::msgQNumMsgs (msgq ());
00265   else
00266     return -1;
00267 }
00268 
00269 int
00270 ACE_Message_Queue_Vx::enqueue_head_i (ACE_Message_Block *new_item)
00271 {
00272   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_head_i");
00273 
00274   // Just delegate to enqueue_tail_i.
00275   return enqueue_tail_i (new_item);
00276 }
00277 
00278 int
00279 ACE_Message_Queue_Vx::enqueue_i (ACE_Message_Block *new_item)
00280 {
00281   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_i");
00282 
00283   if (new_item == 0)
00284     return -1;
00285 
00286   if (this->head_ == 0)
00287     // Should always take this branch.
00288     return this->enqueue_head_i (new_item);
00289   else
00290     ACE_NOTSUP_RETURN (-1);
00291 }
00292 
00293 int
00294 ACE_Message_Queue_Vx::enqueue_deadline_i (ACE_Message_Block *new_item)
00295 {
00296   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_deadline_i");
00297 
00298   // Just delegate to enqueue_tail_i.
00299   return enqueue_tail_i (new_item);
00300 }
00301 
00302 // Actually get the first ACE_Message_Block (no locking, so must be
00303 // called with locks held).  This method assumes that the queue has at
00304 // least one item in it when it is called.
00305 
00306 int
00307 ACE_Message_Queue_Vx::dequeue_head_i (ACE_Message_Block *&first_item)
00308 {
00309   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_head_i");
00310 
00311   // We don't allocate a new Message_Block:  the caller must provide
00312   // it, and must ensure that it is big enough (without chaining).
00313 
00314   if (first_item == 0  ||  first_item->wr_ptr () == 0)
00315     return -1;
00316 
00317   if (::msgQReceive (msgq (),
00318                      first_item->wr_ptr (),
00319                      first_item->size (),
00320                      WAIT_FOREVER) == ERROR)
00321     return -1;
00322   else
00323     return ::msgQNumMsgs (msgq ());
00324 }
00325 
00326 int
00327 ACE_Message_Queue_Vx::dequeue_prio_i (ACE_Message_Block *& /*dequeued*/)
00328 {
00329   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_prio_i");
00330   ACE_NOTSUP_RETURN (-1);
00331 }
00332 
00333 int
00334 ACE_Message_Queue_Vx::dequeue_tail_i (ACE_Message_Block *& /*dequeued*/)
00335 {
00336   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_tail_i");
00337   ACE_NOTSUP_RETURN (-1);
00338 }
00339 
00340 int
00341 ACE_Message_Queue_Vx::dequeue_deadline_i (ACE_Message_Block *& /*dequeued*/)
00342 {
00343   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_deadline_i");
00344   ACE_NOTSUP_RETURN (-1);
00345 }
00346 
00347 // Take a look at the first item without removing it.
00348 
00349 int
00350 ACE_Message_Queue_Vx::wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00351                                           ACE_Time_Value *tv)
00352 {
00353   // Always return here, and let the VxWorks message queue handle blocking.
00354   ACE_UNUSED_ARG (mon);
00355   ACE_UNUSED_ARG (tv);
00356 
00357   return 0;
00358 }
00359 
00360 int
00361 ACE_Message_Queue_Vx::wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00362                                            ACE_Time_Value *tv)
00363 {
00364   // Always return here, and let the VxWorks message queue handle blocking.
00365   ACE_UNUSED_ARG (mon);
00366   ACE_UNUSED_ARG (tv);
00367 
00368   return 0;
00369 }
00370 
00371 #if ! defined (ACE_NEEDS_FUNC_DEFINITIONS)
00372 int
00373 ACE_Message_Queue_Vx::peek_dequeue_head (ACE_Message_Block *&,
00374                                          ACE_Time_Value *)
00375 {
00376   ACE_NOTSUP_RETURN (-1);
00377 }
00378 #endif /* ! ACE_NEEDS_FUNC_DEFINITIONS */
00379 
00380 #endif /* ACE_VXWORKS */
00381 
00382 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
00383 
00384 ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
00385   : max_cthrs_ (max_threads),
00386     cur_thrs_ (0),
00387     cur_bytes_ (0),
00388     cur_length_ (0),
00389     cur_count_ (0),
00390     completion_port_ (ACE_INVALID_HANDLE)
00391 {
00392   ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
00393   this->open (max_threads);
00394 }
00395 
00396 int
00397 ACE_Message_Queue_NT::open (DWORD max_threads)
00398 {
00399   ACE_TRACE ("ACE_Message_Queue_NT::open");
00400   this->max_cthrs_ = max_threads;
00401   this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
00402                                                      0,
00403                                                      ACE_Message_Queue_Base::ACTIVATED,
00404                                                      max_threads);
00405   return (this->completion_port_ == 0 ? -1 : 0);
00406 }
00407 
00408 int
00409 ACE_Message_Queue_NT::close (void)
00410 {
00411   ACE_TRACE ("ACE_Message_Queue_NT::close");
00412   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00413   this->deactivate ();
00414   return (::CloseHandle (this->completion_port_) ? 0 : -1 );
00415 }
00416 
00417 ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
00418 {
00419   ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
00420   this->close ();
00421 }
00422 
00423 int
00424 ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
00425                                ACE_Time_Value *)
00426 {
00427   ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
00428   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00429   if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
00430     {
00431       size_t const msize = new_item->total_size ();
00432       size_t const mlength = new_item->total_length ();
00433       // Note - we send ACTIVATED in the 3rd arg to tell the completion
00434       // routine it's _NOT_ being woken up because of deactivate().
00435       ULONG_PTR state_to_post;
00436       state_to_post = ACE_Message_Queue_Base::ACTIVATED;
00437       if (::PostQueuedCompletionStatus (this->completion_port_,
00438                                         static_cast<DWORD> (msize),
00439                                         state_to_post,
00440                                         reinterpret_cast<LPOVERLAPPED> (new_item)))
00441         {
00442           // Update the states once I succeed.
00443           this->cur_bytes_ += msize;
00444           this->cur_length_ += mlength;
00445           return ACE_Utils::Truncate (++this->cur_count_);
00446         }
00447     }
00448   else
00449     errno = ESHUTDOWN;
00450 
00451   // Fail to enqueue the message.
00452   return -1;
00453 }
00454 
00455 int
00456 ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
00457                                ACE_Time_Value *timeout)
00458 {
00459   ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
00460 
00461   {
00462     ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00463 
00464     // Make sure the MQ is not deactivated before proceeding.
00465     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
00466       {
00467         errno = ESHUTDOWN;      // Operation on deactivated MQ not allowed.
00468         return -1;
00469       }
00470     else
00471       ++this->cur_thrs_;        // Increase the waiting thread count.
00472   }
00473 
00474   ULONG_PTR queue_state;
00475   DWORD msize;
00476   // Get a message from the completion port.
00477   int retv = ::GetQueuedCompletionStatus (this->completion_port_,
00478                                           &msize,
00479                                           &queue_state,
00480                                           reinterpret_cast<LPOVERLAPPED *> (&first_item),
00481                                           (timeout == 0 ? INFINITE : timeout->msec ()));
00482   {
00483     ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00484     --this->cur_thrs_;          // Decrease waiting thread count.
00485     if (retv)
00486       {
00487         if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
00488           {                     // Really get a valid MB from the queue.
00489             --this->cur_count_;
00490             this->cur_bytes_ -= msize;
00491             this->cur_length_ -= first_item->total_length ();
00492             return ACE_Utils::Truncate (this->cur_count_);
00493           }
00494         else                    // Woken up by deactivate () or pulse ().
00495             errno = ESHUTDOWN;
00496       }
00497   }
00498   return -1;
00499 }
00500 
00501 int
00502 ACE_Message_Queue_NT::deactivate (void)
00503 {
00504   ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
00505   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00506 
00507   int const previous_state = this->state_;
00508   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00509     {
00510       this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00511 
00512       // Get the number of shutdown messages necessary to wake up all
00513       // waiting threads.
00514       DWORD cntr =
00515         this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00516       while (cntr-- > 0)
00517         ::PostQueuedCompletionStatus (this->completion_port_,
00518                                       0,
00519                                       this->state_,
00520                                       0);
00521     }
00522   return previous_state;
00523 }
00524 
00525 int
00526 ACE_Message_Queue_NT::activate (void)
00527 {
00528   ACE_TRACE ("ACE_Message_Queue_NT::activate");
00529   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00530   int const previous_status = this->state_;
00531   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00532   return previous_status;
00533 }
00534 
00535 int
00536 ACE_Message_Queue_NT::pulse (void)
00537 {
00538   ACE_TRACE ("ACE_Message_Queue_NT::pulse");
00539   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00540 
00541   int const previous_state = this->state_;
00542   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00543     {
00544       this->state_ = ACE_Message_Queue_Base::PULSED;
00545 
00546       // Get the number of shutdown messages necessary to wake up all
00547       // waiting threads.
00548 
00549       DWORD cntr =
00550         this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00551       while (cntr-- > 0)
00552         ::PostQueuedCompletionStatus (this->completion_port_,
00553                                       0,
00554                                       this->state_,
00555                                       0);
00556     }
00557   return previous_state;
00558 }
00559 
00560 void
00561 ACE_Message_Queue_NT::dump (void) const
00562 {
00563 #if defined (ACE_HAS_DUMP)
00564   ACE_TRACE ("ACE_Message_Queue_NT::dump");
00565 
00566   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00567   switch (this->state_)
00568     {
00569     case ACE_Message_Queue_Base::ACTIVATED:
00570       ACE_DEBUG ((LM_DEBUG,
00571                   ACE_LIB_TEXT ("state = ACTIVATED\n")));
00572       break;
00573     case ACE_Message_Queue_Base::DEACTIVATED:
00574       ACE_DEBUG ((LM_DEBUG,
00575                   ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00576       break;
00577     case ACE_Message_Queue_Base::PULSED:
00578       ACE_DEBUG ((LM_DEBUG,
00579                   ACE_LIB_TEXT ("state = PULSED\n")));
00580       break;
00581     }
00582 
00583   ACE_DEBUG ((LM_DEBUG,
00584               ACE_LIB_TEXT ("max_cthrs_ = %d\n")
00585               ACE_LIB_TEXT ("cur_thrs_ = %d\n")
00586               ACE_LIB_TEXT ("cur_bytes = %d\n")
00587               ACE_LIB_TEXT ("cur_length = %d\n")
00588               ACE_LIB_TEXT ("cur_count = %d\n")
00589               ACE_LIB_TEXT ("completion_port_ = %x\n"),
00590               this->max_cthrs_,
00591               this->cur_thrs_,
00592               this->cur_bytes_,
00593               this->cur_length_,
00594               this->cur_count_,
00595               this->completion_port_));
00596   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00597 #endif /* ACE_HAS_DUMP */
00598 }
00599 
00600 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
00601 
00602 ACE_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 09:41:56 2006 for ACE by doxygen 1.3.6