Message_Queue_Vx.cpp

Go to the documentation of this file.
00001 // $Id: Message_Queue_Vx.cpp 79134 2007-07-31 18:23:50Z johnnyw $
00002 
00003 #include "ace/Message_Queue_Vx.h"
00004 #include "ace/Log_Msg.h"
00005 
00006 #if !defined (__ACE_INLINE__)
00007 #include "ace/Message_Queue_Vx.inl"
00008 #endif /* __ACE_INLINE__ */
00009 
00010 ACE_RCSID (ace,
00011            Message_Queue_Vx,
00012            "$Id: Message_Queue_Vx.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00013 
00014 
00015 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00016 
00017 #if defined (ACE_VXWORKS)
00018 
00019 ////////////////////////////////
00020 // class ACE_Message_Queue_Vx //
00021 ////////////////////////////////
00022 
00023 void
00024 ACE_Message_Queue_Vx::dump (void) const
00025 {
00026 #if defined (ACE_HAS_DUMP)
00027   ACE_TRACE ("ACE_Message_Queue_Vx::dump");
00028   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00029   switch (this->state_)
00030     {
00031     case ACE_Message_Queue_Base::ACTIVATED:
00032       ACE_DEBUG ((LM_DEBUG,
00033                   ACE_TEXT ("state = ACTIVATED\n")));
00034       break;
00035     case ACE_Message_Queue_Base::DEACTIVATED:
00036       ACE_DEBUG ((LM_DEBUG,
00037                   ACE_TEXT ("state = DEACTIVATED\n")));
00038       break;
00039     case ACE_Message_Queue_Base::PULSED:
00040       ACE_DEBUG ((LM_DEBUG,
00041                   ACE_TEXT ("state = PULSED\n")));
00042       break;
00043     }
00044   ACE_DEBUG ((LM_DEBUG,
00045               ACE_TEXT ("low_water_mark = %d\n")
00046               ACE_TEXT ("high_water_mark = %d\n")
00047               ACE_TEXT ("cur_bytes = %d\n")
00048               ACE_TEXT ("cur_length = %d\n")
00049               ACE_TEXT ("cur_count = %d\n")
00050               ACE_TEXT ("head_ = %u\n")
00051               ACE_TEXT ("MSG_Q_ID = %u\n"),
00052               this->low_water_mark_,
00053               this->high_water_mark_,
00054               this->cur_bytes_,
00055               this->cur_length_,
00056               this->cur_count_,
00057               this->head_,
00058               this->tail_));
00059   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00060 #endif /* ACE_HAS_DUMP */
00061 }
00062 
00063 ACE_Message_Queue_Vx::ACE_Message_Queue_Vx (size_t max_messages,
00064                                             size_t max_message_length,
00065                                             ACE_Notification_Strategy *ns)
00066   : ACE_Message_Queue<ACE_NULL_SYNCH> (0, 0, ns),
00067     max_messages_ (static_cast<int> (max_messages)),
00068     max_message_length_ (static_cast<int> (max_message_length))
00069 {
00070   ACE_TRACE ("ACE_Message_Queue_Vx::ACE_Message_Queue_Vx");
00071 
00072   if (this->open (max_messages_, max_message_length_, ns) == -1)
00073     ACE_ERROR ((LM_ERROR, ACE_TEXT ("open")));
00074 }
00075 
00076 ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx (void)
00077 {
00078   ACE_TRACE ("ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx");
00079 
00080   if (this->tail_ != 0  &&  this->close () == -1)
00081     ACE_ERROR ((LM_ERROR, ACE_TEXT ("close")));
00082 }
00083 
00084 // Don't bother locking since if someone calls this function more than
00085 // once for the same queue, we're in bigger trouble than just
00086 // concurrency control!
00087 
00088 int
00089 ACE_Message_Queue_Vx::open (size_t max_messages,
00090                             size_t max_message_length,
00091                             ACE_Notification_Strategy *ns)
00092 {
00093   ACE_TRACE ("ACE_Message_Queue_Vx::open");
00094   this->high_water_mark_ = 0;
00095   this->low_water_mark_  = 0;
00096   this->cur_bytes_ = 0;
00097   this->cur_length_ = 0;
00098   this->cur_count_ = 0;
00099   this->head_ = 0;
00100   this->notification_strategy_ = ns;
00101   this->max_messages_ = static_cast<int> (max_messages);
00102   this->max_message_length_ = static_cast<int> (max_message_length);
00103 
00104   if (tail_)
00105     {
00106       // Had already created a msgQ, so delete it.
00107       close ();
00108       activate_i ();
00109     }
00110 
00111   return (this->tail_ =
00112           reinterpret_cast<ACE_Message_Block *> (
00113               ::msgQCreate (max_messages_,
00114                             max_message_length_,
00115                             MSG_Q_FIFO))) == 0 ? -1 : 0;
00116 }
00117 
00118 // Clean up the queue if we have not already done so!
00119 
00120 int
00121 ACE_Message_Queue_Vx::close (void)
00122 {
00123   ACE_TRACE ("ACE_Message_Queue_Vx::close");
00124   // Don't lock, because we don't have a lock.  It shouldn't be
00125   // necessary, anyways.
00126 
00127   this->deactivate_i ();
00128 
00129   // Don't bother to free up the remaining message on the list,
00130   // because we don't have any way to iterate over what's in the
00131   // queue.
00132 
00133   return ::msgQDelete (msgq ());
00134 }
00135 
00136 
00137 int
00138 ACE_Message_Queue_Vx::is_empty_i (void)
00139 {
00140   ACE_TRACE ("ACE_Message_Queue_Vx::is_empty_i");
00141   return ::msgQNumMsgs (msgq ()) == 0;
00142 }
00143 
00144 int
00145 ACE_Message_Queue_Vx::is_full_i (void)
00146 {
00147   ACE_TRACE ("ACE_Message_Queue_Vx::is_full_i");
00148   return ::msgQNumMsgs (msgq ()) >= max_messages_;
00149 }
00150 
00151 size_t
00152 ACE_Message_Queue_Vx::high_water_mark (void)
00153 {
00154   ACE_TRACE ("ACE_Message_Queue_Vx::high_water_mark");
00155   ACE_NOTSUP_RETURN ((size_t) -1);
00156 }
00157 
00158 void
00159 ACE_Message_Queue_Vx::high_water_mark (size_t)
00160 {
00161   ACE_TRACE ("ACE_Message_Queue_Vx::high_water_mark");
00162   ACE_NOTSUP;
00163 }
00164 
00165 size_t
00166 ACE_Message_Queue_Vx::low_water_mark (void)
00167 {
00168   ACE_TRACE ("ACE_Message_Queue_Vx::low_water_mark");
00169   // Don't need to guard, because this is fixed.
00170 
00171   ACE_NOTSUP_RETURN ((size_t) -1);
00172 }
00173 
00174 void
00175 ACE_Message_Queue_Vx::low_water_mark (size_t)
00176 {
00177   ACE_TRACE ("ACE_Message_Queue_Vx::low_water_mark");
00178   ACE_NOTSUP;
00179 }
00180 
00181 size_t
00182 ACE_Message_Queue_Vx::message_bytes (void)
00183 {
00184   ACE_TRACE ("ACE_Message_Queue_Vx::message_bytes");
00185   ACE_NOTSUP_RETURN ((size_t) -1);
00186 }
00187 
00188 size_t
00189 ACE_Message_Queue_Vx::message_length (void)
00190 {
00191   ACE_TRACE ("ACE_Message_Queue_Vx::message_length");
00192   ACE_NOTSUP_RETURN ((size_t) -1);
00193 }
00194 
00195 size_t
00196 ACE_Message_Queue_Vx::message_count (void)
00197 {
00198   ACE_TRACE ("ACE_Message_Queue_Vx::message_count");
00199   // Don't need to guard, because this is a system call.
00200 
00201   return ::msgQNumMsgs (msgq ());
00202 }
00203 
00204 void
00205 ACE_Message_Queue_Vx::message_bytes (size_t)
00206 {
00207   ACE_TRACE ("ACE_Message_Queue_Vx::message_bytes");
00208   ACE_NOTSUP;
00209 }
00210 
00211 void
00212 ACE_Message_Queue_Vx::message_length (size_t)
00213 {
00214   ACE_TRACE ("ACE_Message_Queue_Vx::message_length");
00215   ACE_NOTSUP;
00216 }
00217 
00218 int
00219 ACE_Message_Queue_Vx::signal_enqueue_waiters (void)
00220 {
00221   // No-op.
00222   return 0;
00223 }
00224 
00225 int
00226 ACE_Message_Queue_Vx::signal_dequeue_waiters (void)
00227 {
00228   // No-op.
00229   return 0;
00230 }
00231 
00232 int
00233 ACE_Message_Queue_Vx::enqueue_tail_i (ACE_Message_Block *new_item)
00234 {
00235   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_tail_i");
00236 
00237   if (new_item == 0)
00238     return -1;
00239 
00240   // Don't try to send a composite message!!!!  Only the first
00241   // block will be sent.
00242 
00243   ++this->cur_count_;
00244 
00245   // Always use this method to actually send a message on the queue.
00246   if (::msgQSend (msgq (),
00247                   new_item->rd_ptr (),
00248                   new_item->size (),
00249                   WAIT_FOREVER,
00250                   MSG_PRI_NORMAL) == OK)
00251     return ::msgQNumMsgs (msgq ());
00252   else
00253     return -1;
00254 }
00255 
00256 int
00257 ACE_Message_Queue_Vx::enqueue_head_i (ACE_Message_Block *new_item)
00258 {
00259   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_head_i");
00260 
00261   // Just delegate to enqueue_tail_i.
00262   return enqueue_tail_i (new_item);
00263 }
00264 
00265 int
00266 ACE_Message_Queue_Vx::enqueue_i (ACE_Message_Block *new_item)
00267 {
00268   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_i");
00269 
00270   if (new_item == 0)
00271     return -1;
00272 
00273   if (this->head_ == 0)
00274     // Should always take this branch.
00275     return this->enqueue_head_i (new_item);
00276   else
00277     ACE_NOTSUP_RETURN (-1);
00278 }
00279 
00280 int
00281 ACE_Message_Queue_Vx::enqueue_deadline_i (ACE_Message_Block *new_item)
00282 {
00283   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_deadline_i");
00284 
00285   // Just delegate to enqueue_tail_i.
00286   return enqueue_tail_i (new_item);
00287 }
00288 
00289 // Actually get the first ACE_Message_Block (no locking, so must be
00290 // called with locks held).  This method assumes that the queue has at
00291 // least one item in it when it is called.
00292 
00293 int
00294 ACE_Message_Queue_Vx::dequeue_head_i (ACE_Message_Block *&first_item)
00295 {
00296   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_head_i");
00297 
00298   // We don't allocate a new Message_Block:  the caller must provide
00299   // it, and must ensure that it is big enough (without chaining).
00300 
00301   if (first_item == 0  ||  first_item->wr_ptr () == 0)
00302     return -1;
00303 
00304   if (::msgQReceive (msgq (),
00305                      first_item->wr_ptr (),
00306                      first_item->size (),
00307                      WAIT_FOREVER) == ERROR)
00308     return -1;
00309   else
00310     return ::msgQNumMsgs (msgq ());
00311 }
00312 
00313 int
00314 ACE_Message_Queue_Vx::dequeue_prio_i (ACE_Message_Block *& /*dequeued*/)
00315 {
00316   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_prio_i");
00317   ACE_NOTSUP_RETURN (-1);
00318 }
00319 
00320 int
00321 ACE_Message_Queue_Vx::dequeue_tail_i (ACE_Message_Block *& /*dequeued*/)
00322 {
00323   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_tail_i");
00324   ACE_NOTSUP_RETURN (-1);
00325 }
00326 
00327 int
00328 ACE_Message_Queue_Vx::dequeue_deadline_i (ACE_Message_Block *& /*dequeued*/)
00329 {
00330   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_deadline_i");
00331   ACE_NOTSUP_RETURN (-1);
00332 }
00333 
00334 // Take a look at the first item without removing it.
00335 
00336 int
00337 ACE_Message_Queue_Vx::wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00338                                           ACE_Time_Value *tv)
00339 {
00340   // Always return here, and let the VxWorks message queue handle blocking.
00341   ACE_UNUSED_ARG (mon);
00342   ACE_UNUSED_ARG (tv);
00343 
00344   return 0;
00345 }
00346 
00347 int
00348 ACE_Message_Queue_Vx::wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00349                                            ACE_Time_Value *tv)
00350 {
00351   // Always return here, and let the VxWorks message queue handle blocking.
00352   ACE_UNUSED_ARG (mon);
00353   ACE_UNUSED_ARG (tv);
00354 
00355   return 0;
00356 }
00357 
00358 #if ! defined (ACE_NEEDS_FUNC_DEFINITIONS)
00359 int
00360 ACE_Message_Queue_Vx::peek_dequeue_head (ACE_Message_Block *&,
00361                                          ACE_Time_Value *)
00362 {
00363   ACE_NOTSUP_RETURN (-1);
00364 }
00365 #endif /* ! ACE_NEEDS_FUNC_DEFINITIONS */
00366 
00367 #endif /* ACE_VXWORKS */
00368 
00369 ACE_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 12:05:31 2008 for ACE by doxygen 1.3.6