00001
00002
00003 #include "ace/Message_Queue.h"
00004 #include "ace/Log_Msg.h"
00005
00006 #if !defined (__ACE_INLINE__)
00007 #include "ace/Message_Queue.inl"
00008 #endif
00009
00010
00011 ACE_RCSID (ace,
00012 Message_Queue,
00013 "Message_Queue.cpp,v 4.76 2006/05/30 10:57:22 jwillemsen Exp")
00014
00015
00016 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 ACE_Message_Queue_Base::~ACE_Message_Queue_Base (void)
00019 {
00020 }
00021
00022 int
00023 ACE_Message_Queue_Base::state (void)
00024 {
00025 ACE_TRACE ("ACE_Message_Queue_Base::state");
00026
00027 return this->state_;
00028 }
00029
00030 #if defined (ACE_VXWORKS)
00031
00032
00033
00034
00035
00036 void
00037 ACE_Message_Queue_Vx::dump (void) const
00038 {
00039 #if defined (ACE_HAS_DUMP)
00040 ACE_TRACE ("ACE_Message_Queue_Vx::dump");
00041 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00042 switch (this->state_)
00043 {
00044 case ACE_Message_Queue_Base::ACTIVATED:
00045 ACE_DEBUG ((LM_DEBUG,
00046 ACE_LIB_TEXT ("state = ACTIVATED\n")));
00047 break;
00048 case ACE_Message_Queue_Base::DEACTIVATED:
00049 ACE_DEBUG ((LM_DEBUG,
00050 ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00051 break;
00052 case ACE_Message_Queue_Base::PULSED:
00053 ACE_DEBUG ((LM_DEBUG,
00054 ACE_LIB_TEXT ("state = PULSED\n")));
00055 break;
00056 }
00057 ACE_DEBUG ((LM_DEBUG,
00058 ACE_LIB_TEXT ("low_water_mark = %d\n")
00059 ACE_LIB_TEXT ("high_water_mark = %d\n")
00060 ACE_LIB_TEXT ("cur_bytes = %d\n")
00061 ACE_LIB_TEXT ("cur_length = %d\n")
00062 ACE_LIB_TEXT ("cur_count = %d\n")
00063 ACE_LIB_TEXT ("head_ = %u\n")
00064 ACE_LIB_TEXT ("MSG_Q_ID = %u\n"),
00065 this->low_water_mark_,
00066 this->high_water_mark_,
00067 this->cur_bytes_,
00068 this->cur_length_,
00069 this->cur_count_,
00070 this->head_,
00071 this->tail_));
00072 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00073 #endif
00074 }
00075
00076 ACE_Message_Queue_Vx::ACE_Message_Queue_Vx (size_t max_messages,
00077 size_t max_message_length,
00078 ACE_Notification_Strategy *ns)
00079 : ACE_Message_Queue<ACE_NULL_SYNCH> (0, 0, ns),
00080 max_messages_ (static_cast<int> (max_messages)),
00081 max_message_length_ (static_cast<int> (max_message_length))
00082 {
00083 ACE_TRACE ("ACE_Message_Queue_Vx::ACE_Message_Queue_Vx");
00084
00085 if (this->open (max_messages_, max_message_length_, ns) == -1)
00086 ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("open")));
00087 }
00088
00089 ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx (void)
00090 {
00091 ACE_TRACE ("ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx");
00092
00093 if (this->tail_ != 0 && this->close () == -1)
00094 ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("close")));
00095 }
00096
00097
00098
00099
00100
00101 int
00102 ACE_Message_Queue_Vx::open (size_t max_messages,
00103 size_t max_message_length,
00104 ACE_Notification_Strategy *ns)
00105 {
00106 ACE_TRACE ("ACE_Message_Queue_Vx::open");
00107 this->high_water_mark_ = 0;
00108 this->low_water_mark_ = 0;
00109 this->cur_bytes_ = 0;
00110 this->cur_length_ = 0;
00111 this->cur_count_ = 0;
00112 this->head_ = 0;
00113 this->notification_strategy_ = ns;
00114 this->max_messages_ = static_cast<int> (max_messages);
00115 this->max_message_length_ = static_cast<int> (max_message_length);
00116
00117 if (tail_)
00118 {
00119
00120 close ();
00121 activate_i ();
00122 }
00123
00124 return (this->tail_ =
00125 reinterpret_cast<ACE_Message_Block *> (
00126 ::msgQCreate (max_messages_,
00127 max_message_length_,
00128 MSG_Q_FIFO))) == 0 ? -1 : 0;
00129 }
00130
00131
00132
00133 int
00134 ACE_Message_Queue_Vx::close (void)
00135 {
00136 ACE_TRACE ("ACE_Message_Queue_Vx::close");
00137
00138
00139
00140 this->deactivate_i ();
00141
00142
00143
00144
00145
00146 return ::msgQDelete (msgq ());
00147 }
00148
00149
00150 int
00151 ACE_Message_Queue_Vx::is_empty_i (void)
00152 {
00153 ACE_TRACE ("ACE_Message_Queue_Vx::is_empty_i");
00154 return ::msgQNumMsgs (msgq ()) == 0;
00155 }
00156
00157 int
00158 ACE_Message_Queue_Vx::is_full_i (void)
00159 {
00160 ACE_TRACE ("ACE_Message_Queue_Vx::is_full_i");
00161 return ::msgQNumMsgs (msgq ()) >= max_messages_;
00162 }
00163
00164 size_t
00165 ACE_Message_Queue_Vx::high_water_mark (void)
00166 {
00167 ACE_TRACE ("ACE_Message_Queue_Vx::high_water_mark");
00168 ACE_NOTSUP_RETURN ((size_t) -1);
00169 }
00170
00171 void
00172 ACE_Message_Queue_Vx::high_water_mark (size_t)
00173 {
00174 ACE_TRACE ("ACE_Message_Queue_Vx::high_water_mark");
00175 ACE_NOTSUP;
00176 }
00177
00178 size_t
00179 ACE_Message_Queue_Vx::low_water_mark (void)
00180 {
00181 ACE_TRACE ("ACE_Message_Queue_Vx::low_water_mark");
00182
00183
00184 ACE_NOTSUP_RETURN ((size_t) -1);
00185 }
00186
00187 void
00188 ACE_Message_Queue_Vx::low_water_mark (size_t)
00189 {
00190 ACE_TRACE ("ACE_Message_Queue_Vx::low_water_mark");
00191 ACE_NOTSUP;
00192 }
00193
00194 size_t
00195 ACE_Message_Queue_Vx::message_bytes (void)
00196 {
00197 ACE_TRACE ("ACE_Message_Queue_Vx::message_bytes");
00198 ACE_NOTSUP_RETURN ((size_t) -1);
00199 }
00200
00201 size_t
00202 ACE_Message_Queue_Vx::message_length (void)
00203 {
00204 ACE_TRACE ("ACE_Message_Queue_Vx::message_length");
00205 ACE_NOTSUP_RETURN ((size_t) -1);
00206 }
00207
00208 size_t
00209 ACE_Message_Queue_Vx::message_count (void)
00210 {
00211 ACE_TRACE ("ACE_Message_Queue_Vx::message_count");
00212
00213
00214 return ::msgQNumMsgs (msgq ());
00215 }
00216
00217 void
00218 ACE_Message_Queue_Vx::message_bytes (size_t)
00219 {
00220 ACE_TRACE ("ACE_Message_Queue_Vx::message_bytes");
00221 ACE_NOTSUP;
00222 }
00223
00224 void
00225 ACE_Message_Queue_Vx::message_length (size_t)
00226 {
00227 ACE_TRACE ("ACE_Message_Queue_Vx::message_length");
00228 ACE_NOTSUP;
00229 }
00230
00231 int
00232 ACE_Message_Queue_Vx::signal_enqueue_waiters (void)
00233 {
00234
00235 return 0;
00236 }
00237
00238 int
00239 ACE_Message_Queue_Vx::signal_dequeue_waiters (void)
00240 {
00241
00242 return 0;
00243 }
00244
00245 int
00246 ACE_Message_Queue_Vx::enqueue_tail_i (ACE_Message_Block *new_item)
00247 {
00248 ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_tail_i");
00249
00250 if (new_item == 0)
00251 return -1;
00252
00253
00254
00255
00256 ++this->cur_count_;
00257
00258
00259 if (::msgQSend (msgq (),
00260 new_item->rd_ptr (),
00261 new_item->size (),
00262 WAIT_FOREVER,
00263 MSG_PRI_NORMAL) == OK)
00264 return ::msgQNumMsgs (msgq ());
00265 else
00266 return -1;
00267 }
00268
00269 int
00270 ACE_Message_Queue_Vx::enqueue_head_i (ACE_Message_Block *new_item)
00271 {
00272 ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_head_i");
00273
00274
00275 return enqueue_tail_i (new_item);
00276 }
00277
00278 int
00279 ACE_Message_Queue_Vx::enqueue_i (ACE_Message_Block *new_item)
00280 {
00281 ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_i");
00282
00283 if (new_item == 0)
00284 return -1;
00285
00286 if (this->head_ == 0)
00287
00288 return this->enqueue_head_i (new_item);
00289 else
00290 ACE_NOTSUP_RETURN (-1);
00291 }
00292
00293 int
00294 ACE_Message_Queue_Vx::enqueue_deadline_i (ACE_Message_Block *new_item)
00295 {
00296 ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_deadline_i");
00297
00298
00299 return enqueue_tail_i (new_item);
00300 }
00301
00302
00303
00304
00305
00306 int
00307 ACE_Message_Queue_Vx::dequeue_head_i (ACE_Message_Block *&first_item)
00308 {
00309 ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_head_i");
00310
00311
00312
00313
00314 if (first_item == 0 || first_item->wr_ptr () == 0)
00315 return -1;
00316
00317 if (::msgQReceive (msgq (),
00318 first_item->wr_ptr (),
00319 first_item->size (),
00320 WAIT_FOREVER) == ERROR)
00321 return -1;
00322 else
00323 return ::msgQNumMsgs (msgq ());
00324 }
00325
00326 int
00327 ACE_Message_Queue_Vx::dequeue_prio_i (ACE_Message_Block *& )
00328 {
00329 ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_prio_i");
00330 ACE_NOTSUP_RETURN (-1);
00331 }
00332
00333 int
00334 ACE_Message_Queue_Vx::dequeue_tail_i (ACE_Message_Block *& )
00335 {
00336 ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_tail_i");
00337 ACE_NOTSUP_RETURN (-1);
00338 }
00339
00340 int
00341 ACE_Message_Queue_Vx::dequeue_deadline_i (ACE_Message_Block *& )
00342 {
00343 ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_deadline_i");
00344 ACE_NOTSUP_RETURN (-1);
00345 }
00346
00347
00348
00349 int
00350 ACE_Message_Queue_Vx::wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00351 ACE_Time_Value *tv)
00352 {
00353
00354 ACE_UNUSED_ARG (mon);
00355 ACE_UNUSED_ARG (tv);
00356
00357 return 0;
00358 }
00359
00360 int
00361 ACE_Message_Queue_Vx::wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00362 ACE_Time_Value *tv)
00363 {
00364
00365 ACE_UNUSED_ARG (mon);
00366 ACE_UNUSED_ARG (tv);
00367
00368 return 0;
00369 }
00370
00371 #if ! defined (ACE_NEEDS_FUNC_DEFINITIONS)
00372 int
00373 ACE_Message_Queue_Vx::peek_dequeue_head (ACE_Message_Block *&,
00374 ACE_Time_Value *)
00375 {
00376 ACE_NOTSUP_RETURN (-1);
00377 }
00378 #endif
00379
00380 #endif
00381
00382 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
00383
00384 ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
00385 : max_cthrs_ (max_threads),
00386 cur_thrs_ (0),
00387 cur_bytes_ (0),
00388 cur_length_ (0),
00389 cur_count_ (0),
00390 completion_port_ (ACE_INVALID_HANDLE)
00391 {
00392 ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
00393 this->open (max_threads);
00394 }
00395
00396 int
00397 ACE_Message_Queue_NT::open (DWORD max_threads)
00398 {
00399 ACE_TRACE ("ACE_Message_Queue_NT::open");
00400 this->max_cthrs_ = max_threads;
00401 this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
00402 0,
00403 ACE_Message_Queue_Base::ACTIVATED,
00404 max_threads);
00405 return (this->completion_port_ == 0 ? -1 : 0);
00406 }
00407
00408 int
00409 ACE_Message_Queue_NT::close (void)
00410 {
00411 ACE_TRACE ("ACE_Message_Queue_NT::close");
00412 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00413 this->deactivate ();
00414 return (::CloseHandle (this->completion_port_) ? 0 : -1 );
00415 }
00416
00417 ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
00418 {
00419 ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
00420 this->close ();
00421 }
00422
00423 int
00424 ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
00425 ACE_Time_Value *)
00426 {
00427 ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
00428 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00429 if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
00430 {
00431 size_t const msize = new_item->total_size ();
00432 size_t const mlength = new_item->total_length ();
00433
00434
00435 ULONG_PTR state_to_post;
00436 state_to_post = ACE_Message_Queue_Base::ACTIVATED;
00437 if (::PostQueuedCompletionStatus (this->completion_port_,
00438 static_cast<DWORD> (msize),
00439 state_to_post,
00440 reinterpret_cast<LPOVERLAPPED> (new_item)))
00441 {
00442
00443 this->cur_bytes_ += msize;
00444 this->cur_length_ += mlength;
00445 return ACE_Utils::Truncate (++this->cur_count_);
00446 }
00447 }
00448 else
00449 errno = ESHUTDOWN;
00450
00451
00452 return -1;
00453 }
00454
00455 int
00456 ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
00457 ACE_Time_Value *timeout)
00458 {
00459 ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
00460
00461 {
00462 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00463
00464
00465 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
00466 {
00467 errno = ESHUTDOWN;
00468 return -1;
00469 }
00470 else
00471 ++this->cur_thrs_;
00472 }
00473
00474 ULONG_PTR queue_state;
00475 DWORD msize;
00476
00477 int retv = ::GetQueuedCompletionStatus (this->completion_port_,
00478 &msize,
00479 &queue_state,
00480 reinterpret_cast<LPOVERLAPPED *> (&first_item),
00481 (timeout == 0 ? INFINITE : timeout->msec ()));
00482 {
00483 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00484 --this->cur_thrs_;
00485 if (retv)
00486 {
00487 if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
00488 {
00489 --this->cur_count_;
00490 this->cur_bytes_ -= msize;
00491 this->cur_length_ -= first_item->total_length ();
00492 return ACE_Utils::Truncate (this->cur_count_);
00493 }
00494 else
00495 errno = ESHUTDOWN;
00496 }
00497 }
00498 return -1;
00499 }
00500
00501 int
00502 ACE_Message_Queue_NT::deactivate (void)
00503 {
00504 ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
00505 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00506
00507 int const previous_state = this->state_;
00508 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00509 {
00510 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00511
00512
00513
00514 DWORD cntr =
00515 this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00516 while (cntr-- > 0)
00517 ::PostQueuedCompletionStatus (this->completion_port_,
00518 0,
00519 this->state_,
00520 0);
00521 }
00522 return previous_state;
00523 }
00524
00525 int
00526 ACE_Message_Queue_NT::activate (void)
00527 {
00528 ACE_TRACE ("ACE_Message_Queue_NT::activate");
00529 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00530 int const previous_status = this->state_;
00531 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00532 return previous_status;
00533 }
00534
00535 int
00536 ACE_Message_Queue_NT::pulse (void)
00537 {
00538 ACE_TRACE ("ACE_Message_Queue_NT::pulse");
00539 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00540
00541 int const previous_state = this->state_;
00542 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00543 {
00544 this->state_ = ACE_Message_Queue_Base::PULSED;
00545
00546
00547
00548
00549 DWORD cntr =
00550 this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
00551 while (cntr-- > 0)
00552 ::PostQueuedCompletionStatus (this->completion_port_,
00553 0,
00554 this->state_,
00555 0);
00556 }
00557 return previous_state;
00558 }
00559
00560 void
00561 ACE_Message_Queue_NT::dump (void) const
00562 {
00563 #if defined (ACE_HAS_DUMP)
00564 ACE_TRACE ("ACE_Message_Queue_NT::dump");
00565
00566 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00567 switch (this->state_)
00568 {
00569 case ACE_Message_Queue_Base::ACTIVATED:
00570 ACE_DEBUG ((LM_DEBUG,
00571 ACE_LIB_TEXT ("state = ACTIVATED\n")));
00572 break;
00573 case ACE_Message_Queue_Base::DEACTIVATED:
00574 ACE_DEBUG ((LM_DEBUG,
00575 ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00576 break;
00577 case ACE_Message_Queue_Base::PULSED:
00578 ACE_DEBUG ((LM_DEBUG,
00579 ACE_LIB_TEXT ("state = PULSED\n")));
00580 break;
00581 }
00582
00583 ACE_DEBUG ((LM_DEBUG,
00584 ACE_LIB_TEXT ("max_cthrs_ = %d\n")
00585 ACE_LIB_TEXT ("cur_thrs_ = %d\n")
00586 ACE_LIB_TEXT ("cur_bytes = %d\n")
00587 ACE_LIB_TEXT ("cur_length = %d\n")
00588 ACE_LIB_TEXT ("cur_count = %d\n")
00589 ACE_LIB_TEXT ("completion_port_ = %x\n"),
00590 this->max_cthrs_,
00591 this->cur_thrs_,
00592 this->cur_bytes_,
00593 this->cur_length_,
00594 this->cur_count_,
00595 this->completion_port_));
00596 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00597 #endif
00598 }
00599
00600 #endif
00601
00602 ACE_END_VERSIONED_NAMESPACE_DECL