00001
00002
00003 #ifndef ACE_MESSAGE_QUEUE_T_CPP
00004 #define ACE_MESSAGE_QUEUE_T_CPP
00005
00006
00007
00008 #include "ace/Message_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/OS_NS_sys_time.h"
00011
00012 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00013 #include "ace/Message_Queue_NT.h"
00014 #endif
00015
00016 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00017 # pragma once
00018 #endif
00019
00020 #include "ace/Notification_Strategy.h"
00021 #include "ace/Truncate.h"
00022
00023 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
00024 #include "ace/OS_NS_stdio.h"
00025 #include "ace/OS_NS_unistd.h"
00026 #include "ace/Monitor_Size.h"
00027 #endif
00028
00029 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00030
00031 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
00032 ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
00033 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex)
00034 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex_N)
00035
00036 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00037 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00038 {
00039 #if defined (ACE_HAS_DUMP)
00040 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump");
00041
00042 this->queue_.dump ();
00043 #endif
00044 }
00045
00046 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00047 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (size_t new_value)
00048 {
00049 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00050
00051 this->queue_.message_bytes (new_value);
00052 }
00053
00054 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00055 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (size_t new_value)
00056 {
00057 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00058
00059 this->queue_.message_length (new_value);
00060 }
00061
00062 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00063 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex (size_t high_water_mark,
00064 size_t low_water_mark,
00065 ACE_Notification_Strategy *ns)
00066 {
00067 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex");
00068
00069 if (this->queue_.open (high_water_mark, low_water_mark, ns) == -1)
00070 ACE_ERROR ((LM_ERROR,
00071 ACE_TEXT ("ACE_Message_Queue_Ex")));
00072 }
00073
00074 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00075 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex (void)
00076 {
00077 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex");
00078 }
00079
00080 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00081 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open (size_t hwm,
00082 size_t lwm,
00083 ACE_Notification_Strategy *ns)
00084 {
00085 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open");
00086
00087 return this->queue_.open (hwm, lwm, ns);
00088 }
00089
00090
00091
00092 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00093 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close (void)
00094 {
00095 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close");
00096
00097 return this->queue_.close ();
00098 }
00099
00100 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00101 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush (void)
00102 {
00103 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush");
00104
00105 return this->queue_.flush ();
00106 }
00107
00108 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00109 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i (void)
00110 {
00111 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i");
00112
00113 return this->queue_.flush_i ();
00114 }
00115
00116
00117
00118 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00119 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00120 ACE_Time_Value *timeout)
00121 {
00122 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head");
00123
00124 ACE_Message_Block *mb = 0;
00125
00126 int const cur_count = this->queue_.peek_dequeue_head (mb, timeout);
00127
00128 if (cur_count != -1)
00129 first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00130
00131 return cur_count;
00132 }
00133
00134 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00135 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head (ACE_MESSAGE_TYPE *new_item,
00136 ACE_Time_Value *timeout)
00137 {
00138 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00139
00140 ACE_Message_Block *mb = 0;
00141
00142 ACE_NEW_RETURN (mb,
00143 ACE_Message_Block ((char *) new_item,
00144 sizeof (*new_item),
00145 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00146 -1);
00147
00148 int const result = this->queue_.enqueue_head (mb, timeout);
00149 if (result == -1)
00150
00151 mb->release ();
00152 return result;
00153 }
00154
00155
00156
00157
00158
00159 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00160 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue (ACE_MESSAGE_TYPE *new_item,
00161 ACE_Time_Value *timeout)
00162 {
00163 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue");
00164
00165 return this->enqueue_prio (new_item, timeout);
00166 }
00167
00168 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00169 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio (ACE_MESSAGE_TYPE *new_item,
00170 ACE_Time_Value *timeout,
00171 unsigned long priority)
00172 {
00173 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00174
00175 ACE_Message_Block *mb = 0;
00176
00177 ACE_NEW_RETURN (mb,
00178 ACE_Message_Block ((char *) new_item,
00179 sizeof (*new_item),
00180 priority),
00181 -1);
00182
00183 int const result = this->queue_.enqueue_prio (mb, timeout);
00184 if (result == -1)
00185
00186 mb->release ();
00187
00188 return result;
00189 }
00190
00191 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00192 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
00193 ACE_Time_Value *timeout)
00194 {
00195 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline");
00196
00197 ACE_Message_Block *mb = 0;
00198
00199 ACE_NEW_RETURN (mb,
00200 ACE_Message_Block ((char *) new_item,
00201 sizeof (*new_item),
00202 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY ),
00203 -1);
00204
00205 int const result = this->queue_.enqueue_deadline (mb, timeout);
00206 if (result == -1)
00207
00208 mb->release ();
00209
00210 return result;
00211 }
00212
00213
00214
00215
00216 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00217 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail (ACE_MESSAGE_TYPE *new_item,
00218 ACE_Time_Value *timeout)
00219 {
00220 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00221
00222 ACE_Message_Block *mb = 0;
00223
00224 ACE_NEW_RETURN (mb,
00225 ACE_Message_Block ((char *) new_item,
00226 sizeof (*new_item),
00227 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00228 -1);
00229
00230 int const result = this->queue_.enqueue_tail (mb, timeout);
00231 if (result == -1)
00232
00233 mb->release ();
00234 return result;
00235 }
00236
00237
00238
00239
00240
00241 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00242 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00243 ACE_Time_Value *timeout)
00244 {
00245 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head");
00246
00247 ACE_Message_Block *mb = 0;
00248
00249 int const cur_count = this->queue_.dequeue_head (mb, timeout);
00250
00251
00252 if (cur_count != -1)
00253 {
00254 first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00255
00256 mb->release ();
00257 }
00258
00259 return cur_count;
00260 }
00261
00262
00263
00264
00265
00266 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00267 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
00268 ACE_Time_Value *timeout)
00269 {
00270 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio");
00271
00272 ACE_Message_Block *mb = 0;
00273
00274 int const cur_count = this->queue_.dequeue_prio (mb, timeout);
00275
00276
00277 if (cur_count != -1)
00278 {
00279 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00280
00281 mb->release ();
00282 }
00283
00284 return cur_count;
00285 }
00286
00287
00288
00289
00290
00291 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00292 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
00293 ACE_Time_Value *timeout)
00294 {
00295 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail");
00296
00297 ACE_Message_Block *mb = 0;
00298
00299 int const cur_count = this->queue_.dequeue_tail (mb, timeout);
00300
00301
00302 if (cur_count != -1)
00303 {
00304 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00305
00306 mb->release ();
00307 }
00308
00309 return cur_count;
00310 }
00311
00312
00313
00314
00315
00316 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00317 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
00318 ACE_Time_Value *timeout)
00319 {
00320 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline");
00321
00322 ACE_Message_Block *mb = 0;
00323
00324 int const cur_count = this->queue_.dequeue_deadline (mb, timeout);
00325
00326
00327 if (cur_count != -1)
00328 {
00329 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00330
00331 mb->release ();
00332 }
00333
00334 return cur_count;
00335 }
00336
00337 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00338 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify (void)
00339 {
00340 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify");
00341
00342 return this->queue_.notify ();
00343 }
00344
00345 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00346 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::
00347 ACE_Message_Queue_Ex_Iterator (ACE_Message_Queue_Ex <ACE_MESSAGE_TYPE, ACE_SYNCH_USE> & queue)
00348 : iter_ (queue.queue_)
00349 {
00350
00351 }
00352
00353 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00354 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::
00355 next (ACE_MESSAGE_TYPE *&entry)
00356 {
00357 ACE_Message_Block * mb = 0;
00358 int retval = this->iter_.next (mb);
00359
00360 if (retval == 1)
00361 entry = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00362
00363 return retval;
00364 }
00365
00366 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00367 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::done (void) const
00368 {
00369 return this->iter_.done ();
00370 }
00371
00372 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00373 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::advance (void)
00374 {
00375 return this->iter_.advance ();
00376 }
00377
00378 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00379 ACE_Message_Queue_Ex_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00380 {
00381 this->iter_.dump ();
00382 }
00383
00384 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex_Iterator)
00385
00386 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00387 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::
00388 ACE_Message_Queue_Ex_Reverse_Iterator (ACE_Message_Queue_Ex <ACE_MESSAGE_TYPE, ACE_SYNCH_USE> & queue)
00389 : iter_ (queue.queue_)
00390 {
00391
00392 }
00393
00394 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00395 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::
00396 next (ACE_MESSAGE_TYPE *&entry)
00397 {
00398 ACE_Message_Block * mb = 0;
00399 int retval = this->iter_.next (mb);
00400
00401 if (retval == 1)
00402 entry = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00403
00404 return retval;
00405 }
00406
00407 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00408 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::done (void) const
00409 {
00410 return this->iter_.done ();
00411 }
00412
00413 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00414 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::advance (void)
00415 {
00416 return this->iter_.advance ();
00417 }
00418
00419 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00420 ACE_Message_Queue_Ex_Reverse_Iterator<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00421 {
00422 this->iter_.dump ();
00423 }
00424
00425 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex_Reverse_Iterator)
00426
00427 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00428 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N
00429 (size_t high_water_mark,
00430 size_t low_water_mark,
00431 ACE_Notification_Strategy *ns):
00432 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE> (high_water_mark,
00433 low_water_mark,
00434 ns)
00435 {
00436 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N");
00437 }
00438
00439 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00440 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N (void)
00441 {
00442 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N");
00443 }
00444
00445 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00446 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head
00447 (ACE_MESSAGE_TYPE *new_item,
00448 ACE_Time_Value *timeout)
00449 {
00450 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00451
00452
00453
00454 ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
00455 if (0 == mb)
00456 {
00457 return -1;
00458 }
00459
00460 int result = this->queue_.enqueue_head (mb, timeout);
00461 if (-1 == result)
00462 {
00463
00464 mb->release ();
00465 }
00466 return result;
00467 }
00468
00469 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00470 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail
00471 (ACE_MESSAGE_TYPE *new_item,
00472 ACE_Time_Value *timeout)
00473 {
00474 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00475
00476
00477
00478 ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
00479 if (0 == mb)
00480 {
00481 return -1;
00482 }
00483
00484 int result = this->queue_.enqueue_tail (mb, timeout);
00485 if (-1 == result)
00486 {
00487
00488 mb->release ();
00489 }
00490 return result;
00491 }
00492
00493 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Message_Block *
00494 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i
00495 (ACE_MESSAGE_TYPE *new_item)
00496 {
00497 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i");
00498
00499
00500 ACE_Message_Block *mb_head = 0;
00501
00502 ACE_NEW_RETURN (mb_head,
00503 ACE_Message_Block ((char *) new_item,
00504 sizeof (*new_item),
00505 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00506 0);
00507
00508
00509 ACE_Message_Block *mb_tail = mb_head;
00510
00511
00512 for (ACE_MESSAGE_TYPE *pobj = new_item->next (); pobj; pobj = pobj->next ())
00513 {
00514 ACE_Message_Block *mb_temp = 0;
00515 ACE_NEW_NORETURN (mb_temp,
00516 ACE_Message_Block ((char *) pobj,
00517 sizeof (*pobj),
00518 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY));
00519 if (mb_temp == 0)
00520 {
00521 mb_head->release ();
00522 mb_head = 0;
00523 break;
00524 }
00525
00526 mb_tail->next (mb_temp);
00527 mb_tail = mb_temp;
00528 }
00529
00530 return mb_head;
00531 }
00532
00533 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
00534
00535 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00536 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue (ACE_MESSAGE_TYPE *&first_item,
00537 ACE_Time_Value *timeout)
00538 {
00539 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue");
00540
00541 return this->dequeue_head (first_item, timeout);
00542 }
00543
00544 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Notification_Strategy *
00545 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (void)
00546 {
00547 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00548
00549 return this->queue_.notification_strategy ();
00550 }
00551
00552 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00553 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00554 {
00555 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00556
00557 this->queue_.notification_strategy (s);
00558 }
00559
00560
00561
00562 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> bool
00563 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty (void)
00564 {
00565 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty");
00566
00567 return this->queue_.is_empty ();
00568 }
00569
00570
00571
00572 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> bool
00573 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full (void)
00574 {
00575 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full");
00576
00577 return this->queue_.is_full ();
00578 }
00579
00580 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00581 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (void)
00582 {
00583 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00584
00585 return this->queue_.high_water_mark ();
00586 }
00587
00588 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00589 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00590 {
00591 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00592
00593 this->queue_.high_water_mark (hwm);
00594 }
00595
00596 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00597 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (void)
00598 {
00599 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00600
00601 return this->queue_.low_water_mark ();
00602 }
00603
00604 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00605 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00606 {
00607 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00608
00609 this->queue_.low_water_mark (lwm);
00610 }
00611
00612 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00613 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (void)
00614 {
00615 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00616
00617 return this->queue_.message_bytes ();
00618 }
00619
00620 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00621 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (void)
00622 {
00623 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00624
00625 return this->queue_.message_length ();
00626 }
00627
00628 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00629 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count (void)
00630 {
00631 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count");
00632
00633 return this->queue_.message_count ();
00634 }
00635
00636 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00637 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (void)
00638 {
00639 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate");
00640
00641 return this->queue_.deactivate ();
00642 }
00643
00644 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00645 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void)
00646 {
00647 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate");
00648
00649 return this->queue_.activate ();
00650 }
00651
00652 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00653 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse (void)
00654 {
00655 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse");
00656
00657 return this->queue_.pulse ();
00658 }
00659
00660 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00661 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated (void)
00662 {
00663 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated");
00664
00665 return this->queue_.deactivated ();
00666 }
00667
00668 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00669 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state (void)
00670 {
00671 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state");
00672
00673 return this->queue_.state ();
00674 }
00675
00676 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00677 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::lock (void)
00678 {
00679 return this->queue_.lock ();
00680 }
00681
00682 template <ACE_SYNCH_DECL>
00683 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00684 : queue_ (q),
00685 curr_ (q.head_)
00686 {
00687 }
00688
00689 template <ACE_SYNCH_DECL> int
00690 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00691 {
00692 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00693
00694 if (this->curr_ != 0)
00695 {
00696 entry = this->curr_;
00697 return 1;
00698 }
00699
00700 return 0;
00701 }
00702
00703 template <ACE_SYNCH_DECL> int
00704 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
00705 {
00706 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00707
00708 return this->curr_ == 0;
00709 }
00710
00711 template <ACE_SYNCH_DECL> int
00712 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
00713 {
00714 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00715
00716 if (this->curr_)
00717 this->curr_ = this->curr_->next ();
00718 return this->curr_ != 0;
00719 }
00720
00721 template <ACE_SYNCH_DECL> void
00722 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
00723 {
00724 #if defined (ACE_HAS_DUMP)
00725 #endif
00726 }
00727
00728 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
00729
00730 template <ACE_SYNCH_DECL>
00731 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00732 : queue_ (q),
00733 curr_ (queue_.tail_)
00734 {
00735 }
00736
00737 template <ACE_SYNCH_DECL> int
00738 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00739 {
00740 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00741
00742 if (this->curr_ != 0)
00743 {
00744 entry = this->curr_;
00745 return 1;
00746 }
00747
00748 return 0;
00749 }
00750
00751 template <ACE_SYNCH_DECL> int
00752 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
00753 {
00754 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00755
00756 return this->curr_ == 0;
00757 }
00758
00759 template <ACE_SYNCH_DECL> int
00760 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
00761 {
00762 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00763
00764 if (this->curr_)
00765 this->curr_ = this->curr_->prev ();
00766 return this->curr_ != 0;
00767 }
00768
00769 template <ACE_SYNCH_DECL> void
00770 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
00771 {
00772 #if defined (ACE_HAS_DUMP)
00773 #endif
00774 }
00775
00776 template <ACE_SYNCH_DECL> int
00777 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue (ACE_Message_Block *&first_item,
00778 ACE_Time_Value *timeout)
00779 {
00780 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue");
00781 return this->dequeue_head (first_item, timeout);
00782 }
00783
00784 template <ACE_SYNCH_DECL> ACE_Notification_Strategy *
00785 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void)
00786 {
00787 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00788
00789 return this->notification_strategy_;
00790 }
00791
00792 template <ACE_SYNCH_DECL> void
00793 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00794 {
00795 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00796
00797 this->notification_strategy_ = s;
00798 }
00799
00800
00801
00802 template <ACE_SYNCH_DECL> bool
00803 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void)
00804 {
00805 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i");
00806 return this->tail_ == 0;
00807 }
00808
00809
00810
00811 template <ACE_SYNCH_DECL> bool
00812 ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void)
00813 {
00814 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i");
00815 return this->cur_bytes_ >= this->high_water_mark_;
00816 }
00817
00818
00819
00820 template <ACE_SYNCH_DECL> bool
00821 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void)
00822 {
00823 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty");
00824 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, false);
00825
00826 return this->is_empty_i ();
00827 }
00828
00829
00830
00831 template <ACE_SYNCH_DECL> bool
00832 ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void)
00833 {
00834 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full");
00835 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, false);
00836
00837 return this->is_full_i ();
00838 }
00839
00840 template <ACE_SYNCH_DECL> size_t
00841 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void)
00842 {
00843 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00844 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00845
00846 return this->high_water_mark_;
00847 }
00848
00849 template <ACE_SYNCH_DECL> void
00850 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00851 {
00852 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00853 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00854
00855 this->high_water_mark_ = hwm;
00856 }
00857
00858 template <ACE_SYNCH_DECL> size_t
00859 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void)
00860 {
00861 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00862 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00863
00864 return this->low_water_mark_;
00865 }
00866
00867 template <ACE_SYNCH_DECL> void
00868 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00869 {
00870 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00871 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00872
00873 this->low_water_mark_ = lwm;
00874 }
00875
00876 template <ACE_SYNCH_DECL> size_t
00877 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void)
00878 {
00879 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00880 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00881
00882 return this->cur_bytes_;
00883 }
00884
00885 template <ACE_SYNCH_DECL> size_t
00886 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (void)
00887 {
00888 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00889 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00890
00891 return this->cur_length_;
00892 }
00893
00894 template <ACE_SYNCH_DECL> size_t
00895 ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void)
00896 {
00897 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count");
00898 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00899
00900 return this->cur_count_;
00901 }
00902
00903 template <ACE_SYNCH_DECL> int
00904 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate ()
00905 {
00906 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
00907 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00908
00909 return this->deactivate_i (0);
00910 }
00911
00912 template <ACE_SYNCH_DECL> int
00913 ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
00914 {
00915 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate");
00916 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00917
00918 return this->activate_i ();
00919 }
00920
00921 template <ACE_SYNCH_DECL> int
00922 ACE_Message_Queue<ACE_SYNCH_USE>::pulse ()
00923 {
00924 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::pulse");
00925 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00926
00927 return this->deactivate_i (1);
00928 }
00929
00930 template <ACE_SYNCH_DECL> int
00931 ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
00932 {
00933 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated");
00934
00935 return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
00936 }
00937
00938 template <ACE_SYNCH_DECL> int
00939 ACE_Message_Queue<ACE_SYNCH_USE>::state (void)
00940 {
00941 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::state");
00942
00943 return this->state_;
00944 }
00945
00946 template <ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00947 ACE_Message_Queue<ACE_SYNCH_USE>::lock (void)
00948 {
00949 return this->lock_;
00950 }
00951
00952 template <ACE_SYNCH_DECL> void
00953 ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
00954 {
00955 #if defined (ACE_HAS_DUMP)
00956 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
00957 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00958 switch (this->state_)
00959 {
00960 case ACE_Message_Queue_Base::ACTIVATED:
00961 ACE_DEBUG ((LM_DEBUG,
00962 ACE_TEXT ("state = ACTIVATED\n")));
00963 break;
00964 case ACE_Message_Queue_Base::DEACTIVATED:
00965 ACE_DEBUG ((LM_DEBUG,
00966 ACE_TEXT ("state = DEACTIVATED\n")));
00967 break;
00968 case ACE_Message_Queue_Base::PULSED:
00969 ACE_DEBUG ((LM_DEBUG,
00970 ACE_TEXT ("state = PULSED\n")));
00971 break;
00972 }
00973 ACE_DEBUG ((LM_DEBUG,
00974 ACE_TEXT ("low_water_mark = %d\n")
00975 ACE_TEXT ("high_water_mark = %d\n")
00976 ACE_TEXT ("cur_bytes = %d\n")
00977 ACE_TEXT ("cur_length = %d\n")
00978 ACE_TEXT ("cur_count = %d\n")
00979 ACE_TEXT ("head_ = %u\n")
00980 ACE_TEXT ("tail_ = %u\n"),
00981 this->low_water_mark_,
00982 this->high_water_mark_,
00983 this->cur_bytes_,
00984 this->cur_length_,
00985 this->cur_count_,
00986 this->head_,
00987 this->tail_));
00988 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("not_full_cond:\n")));
00989 not_full_cond_.dump ();
00990 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("not_empty_cond:\n")));
00991 not_empty_cond_.dump ();
00992 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00993 #endif
00994 }
00995
00996 template <ACE_SYNCH_DECL> void
00997 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (size_t new_value)
00998 {
00999 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
01000 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
01001
01002 this->cur_bytes_ = new_value;
01003 }
01004
01005 template <ACE_SYNCH_DECL> void
01006 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (size_t new_value)
01007 {
01008 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
01009 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
01010
01011 this->cur_length_ = new_value;
01012 }
01013
01014 template <ACE_SYNCH_DECL>
01015 ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
01016 size_t lwm,
01017 ACE_Notification_Strategy *ns)
01018 : not_empty_cond_ (lock_)
01019 , not_full_cond_ (lock_)
01020 {
01021 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
01022
01023 if (this->open (hwm, lwm, ns) == -1)
01024 ACE_ERROR ((LM_ERROR,
01025 ACE_TEXT ("open")));
01026
01027 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01028 ACE_NEW (this->monitor_,
01029 ACE::Monitor_Control::Size_Monitor);
01030
01031
01032 char pid_buf[sizeof (int) + 1];
01033 ACE_OS::sprintf (pid_buf, "%d", ACE_OS::getpid ());
01034 pid_buf[sizeof (int)] = '\0';
01035
01036 const int addr_nibbles = 2 * sizeof (ptrdiff_t);
01037 char addr_buf[addr_nibbles + 1];
01038 ACE_OS::sprintf (addr_buf, "%p", this);
01039 addr_buf[addr_nibbles] = '\0';
01040
01041 ACE_CString name_str ("Message_Queue_");
01042 name_str += pid_buf;
01043 name_str += '_';
01044 name_str += addr_buf;
01045 this->monitor_->name (name_str.c_str ());
01046 this->monitor_->add_to_registry ();
01047 #endif
01048 }
01049
01050 template <ACE_SYNCH_DECL>
01051 ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
01052 {
01053 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
01054 if (this->head_ != 0 && this->close () == -1)
01055 ACE_ERROR ((LM_ERROR,
01056 ACE_TEXT ("close")));
01057
01058 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01059 this->monitor_->remove_from_registry ();
01060 this->monitor_->remove_ref ();
01061 #endif
01062 }
01063
01064 template <ACE_SYNCH_DECL> int
01065 ACE_Message_Queue<ACE_SYNCH_USE>::flush_i (void)
01066 {
01067 int number_flushed = 0;
01068
01069
01070
01071 for (this->tail_ = 0; this->head_ != 0; )
01072 {
01073 ++number_flushed;
01074
01075 size_t mb_bytes = 0;
01076 size_t mb_length = 0;
01077 this->head_->total_size_and_length (mb_bytes,
01078 mb_length);
01079
01080 this->cur_bytes_ -= mb_bytes;
01081 this->cur_length_ -= mb_length;
01082 --this->cur_count_;
01083
01084 ACE_Message_Block *temp = this->head_;
01085 this->head_ = this->head_->next ();
01086
01087
01088
01089 temp->release ();
01090 }
01091
01092 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01093
01094 if (number_flushed > 0)
01095 {
01096 this->monitor_->receive (this->cur_length_);
01097 }
01098 #endif
01099
01100 return number_flushed;
01101 }
01102
01103
01104
01105
01106
01107 template <ACE_SYNCH_DECL> int
01108 ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
01109 size_t lwm,
01110 ACE_Notification_Strategy *ns)
01111 {
01112 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
01113 this->high_water_mark_ = hwm;
01114 this->low_water_mark_ = lwm;
01115 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
01116 this->cur_bytes_ = 0;
01117 this->cur_length_ = 0;
01118 this->cur_count_ = 0;
01119 this->tail_ = 0;
01120 this->head_ = 0;
01121 this->notification_strategy_ = ns;
01122 return 0;
01123 }
01124
01125
01126
01127
01128 template <ACE_SYNCH_DECL> int
01129 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse)
01130 {
01131 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
01132 int const previous_state = this->state_;
01133
01134 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
01135 {
01136
01137 this->not_empty_cond_.broadcast ();
01138 this->not_full_cond_.broadcast ();
01139
01140 if (pulse)
01141 this->state_ = ACE_Message_Queue_Base::PULSED;
01142 else
01143 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
01144 }
01145
01146 return previous_state;
01147 }
01148
01149 template <ACE_SYNCH_DECL> int
01150 ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
01151 {
01152 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
01153 int const previous_state = this->state_;
01154 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
01155 return previous_state;
01156 }
01157
01158 template <ACE_SYNCH_DECL> int
01159 ACE_Message_Queue<ACE_SYNCH_USE>::flush (void)
01160 {
01161 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::flush");
01162 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01163
01164
01165 return this->flush_i ();
01166 }
01167
01168
01169
01170 template <ACE_SYNCH_DECL> int
01171 ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
01172 {
01173 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
01174 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01175
01176
01177
01178 this->deactivate_i ();
01179
01180
01181 return this->flush_i ();
01182 }
01183
01184 template <ACE_SYNCH_DECL> int
01185 ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
01186 {
01187 if (this->not_full_cond_.signal () != 0)
01188 return -1;
01189 return 0;
01190 }
01191
01192 template <ACE_SYNCH_DECL> int
01193 ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
01194 {
01195
01196 if (this->not_empty_cond_.signal () != 0)
01197 return -1;
01198 return 0;
01199 }
01200
01201
01202
01203
01204 template <ACE_SYNCH_DECL> int
01205 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
01206 {
01207 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
01208
01209 if (new_item == 0)
01210 return -1;
01211
01212
01213
01214
01215
01216
01217 ACE_Message_Block *seq_tail = new_item;
01218 ++this->cur_count_;
01219 new_item->total_size_and_length (this->cur_bytes_,
01220 this->cur_length_);
01221 while (seq_tail->next () != 0)
01222 {
01223 seq_tail->next ()->prev (seq_tail);
01224 seq_tail = seq_tail->next ();
01225 ++this->cur_count_;
01226 seq_tail->total_size_and_length (this->cur_bytes_,
01227 this->cur_length_);
01228 }
01229
01230
01231 if (this->tail_ == 0)
01232 {
01233 this->head_ = new_item;
01234 this->tail_ = seq_tail;
01235
01236 new_item->prev (0);
01237 }
01238
01239 else
01240 {
01241
01242 this->tail_->next (new_item);
01243 new_item->prev (this->tail_);
01244 this->tail_ = seq_tail;
01245 }
01246
01247 if (this->signal_dequeue_waiters () == -1)
01248 return -1;
01249 else
01250 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01251 }
01252
01253
01254
01255 template <ACE_SYNCH_DECL> int
01256 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
01257 {
01258 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
01259
01260 if (new_item == 0)
01261 return -1;
01262
01263
01264
01265
01266
01267
01268 ACE_Message_Block *seq_tail = new_item;
01269 ++this->cur_count_;
01270 new_item->total_size_and_length (this->cur_bytes_,
01271 this->cur_length_);
01272 while (seq_tail->next () != 0)
01273 {
01274 seq_tail->next ()->prev (seq_tail);
01275 seq_tail = seq_tail->next ();
01276 ++this->cur_count_;
01277 seq_tail->total_size_and_length (this->cur_bytes_,
01278 this->cur_length_);
01279 }
01280
01281 new_item->prev (0);
01282 seq_tail->next (this->head_);
01283
01284 if (this->head_ != 0)
01285 this->head_->prev (seq_tail);
01286 else
01287 this->tail_ = seq_tail;
01288
01289 this->head_ = new_item;
01290
01291 if (this->signal_dequeue_waiters () == -1)
01292 return -1;
01293 else
01294 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01295 }
01296
01297
01298
01299
01300 template <ACE_SYNCH_DECL> int
01301 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
01302 {
01303 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01304
01305 if (new_item == 0)
01306 return -1;
01307
01308
01309
01310
01311
01312 new_item->next (0);
01313
01314 if (this->head_ == 0)
01315
01316
01317 return this->enqueue_head_i (new_item);
01318 else
01319 {
01320 ACE_Message_Block *temp = 0;
01321
01322
01323
01324
01325
01326 for (temp = this->tail_;
01327 temp != 0;
01328 temp = temp->prev ())
01329 if (temp->msg_priority () >= new_item->msg_priority ())
01330
01331
01332 break;
01333
01334 if (temp == 0)
01335
01336
01337
01338 return this->enqueue_head_i (new_item);
01339 else if (temp->next () == 0)
01340
01341
01342
01343 return this->enqueue_tail_i (new_item);
01344 else
01345 {
01346
01347
01348
01349
01350 new_item->prev (temp);
01351 new_item->next (temp->next ());
01352 temp->next ()->prev (new_item);
01353 temp->next (new_item);
01354 }
01355 }
01356
01357
01358 new_item->total_size_and_length (this->cur_bytes_,
01359 this->cur_length_);
01360 ++this->cur_count_;
01361
01362 if (this->signal_dequeue_waiters () == -1)
01363 return -1;
01364 else
01365 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01366 }
01367
01368
01369
01370
01371 template <ACE_SYNCH_DECL> int
01372 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_item)
01373 {
01374 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01375 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i");
01376
01377 if (new_item == 0)
01378 return -1;
01379
01380
01381
01382
01383
01384 new_item->next (0);
01385
01386 if (this->head_ == 0)
01387
01388
01389 return this->enqueue_head_i (new_item);
01390 else
01391 {
01392 ACE_Message_Block *temp = 0;
01393
01394
01395
01396
01397
01398 for (temp = this->head_;
01399 temp != 0;
01400 temp = temp->next ())
01401 if (new_item->msg_deadline_time () < temp->msg_deadline_time ())
01402
01403
01404 break;
01405
01406 if (temp == 0 || temp->next () == 0)
01407
01408
01409
01410 return this->enqueue_tail_i (new_item);
01411 else
01412 {
01413
01414
01415
01416
01417 new_item->prev (temp);
01418 new_item->next (temp->next ());
01419 temp->next ()->prev (new_item);
01420 temp->next (new_item);
01421 }
01422 }
01423
01424
01425 new_item->total_size_and_length (this->cur_bytes_,
01426 this->cur_length_);
01427 ++this->cur_count_;
01428
01429 if (this->signal_dequeue_waiters () == -1)
01430 return -1;
01431 else
01432 return this->cur_count_;
01433 #else
01434 return this->enqueue_tail_i (new_item);
01435 #endif
01436 }
01437
01438
01439
01440
01441
01442 template <ACE_SYNCH_DECL> int
01443 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
01444 {
01445 if (this->head_ ==0)
01446 ACE_ERROR_RETURN ((LM_ERROR,
01447 ACE_TEXT ("Attempting to dequeue from empty queue")),
01448 -1);
01449 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01450 first_item = this->head_;
01451 this->head_ = this->head_->next ();
01452
01453 if (this->head_ == 0)
01454 this->tail_ = 0;
01455 else
01456
01457 this->head_->prev (0);
01458
01459 size_t mb_bytes = 0;
01460 size_t mb_length = 0;
01461 first_item->total_size_and_length (mb_bytes,
01462 mb_length);
01463
01464 this->cur_bytes_ -= mb_bytes;
01465 this->cur_length_ -= mb_length;
01466 --this->cur_count_;
01467
01468 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01469 this->head_ = this->tail_ = 0;
01470
01471
01472 first_item->prev (0);
01473 first_item->next (0);
01474
01475 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01476 this->monitor_->receive (this->cur_length_);
01477 #endif
01478
01479
01480
01481 if (this->cur_bytes_ <= this->low_water_mark_
01482 && this->signal_enqueue_waiters () == -1)
01483 return -1;
01484 else
01485 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01486 }
01487
01488
01489
01490
01491
01492
01493 template <ACE_SYNCH_DECL> int
01494 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i (ACE_Message_Block *&dequeued)
01495 {
01496 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i");
01497
01498 if (this->head_ == 0)
01499 return -1;
01500
01501
01502
01503 ACE_Message_Block *chosen = 0;
01504 u_long priority = ULONG_MAX;
01505
01506 for (ACE_Message_Block *temp = this->tail_;
01507 temp != 0;
01508 temp = temp->prev ())
01509 {
01510
01511
01512 if (temp->msg_priority () <= priority)
01513 {
01514 priority = temp->msg_priority ();
01515 chosen = temp;
01516 }
01517 }
01518
01519
01520
01521 if (chosen == 0)
01522 chosen = this->head_;
01523
01524
01525
01526 if (chosen->prev () == 0)
01527 this->head_ = chosen->next ();
01528 else
01529 chosen->prev ()->next (chosen->next ());
01530
01531 if (chosen->next () == 0)
01532 this->tail_ = chosen->prev ();
01533 else
01534 chosen->next ()->prev (chosen->prev ());
01535
01536
01537 dequeued = chosen;
01538
01539 size_t mb_bytes = 0;
01540 size_t mb_length = 0;
01541 dequeued->total_size_and_length (mb_bytes,
01542 mb_length);
01543
01544 this->cur_bytes_ -= mb_bytes;
01545 this->cur_length_ -= mb_length;
01546 --this->cur_count_;
01547
01548 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01549 this->head_ = this->tail_ = 0;
01550
01551
01552 dequeued->prev (0);
01553 dequeued->next (0);
01554
01555
01556
01557 if (this->cur_bytes_ <= this->low_water_mark_
01558 && this->signal_enqueue_waiters () == -1)
01559 return -1;
01560 else
01561 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01562 }
01563
01564
01565
01566
01567
01568 template <ACE_SYNCH_DECL> int
01569 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i (ACE_Message_Block *&dequeued)
01570 {
01571 if (this->head_ == 0)
01572 ACE_ERROR_RETURN ((LM_ERROR,
01573 ACE_TEXT ("Attempting to dequeue from empty queue")),
01574 -1);
01575 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i");
01576 dequeued = this->tail_;
01577 if (this->tail_->prev () == 0)
01578 {
01579 this->head_ = 0;
01580 this->tail_ = 0;
01581 }
01582 else
01583 {
01584 this->tail_->prev ()->next (0);
01585 this->tail_ = this->tail_->prev ();
01586 }
01587
01588 size_t mb_bytes = 0;
01589 size_t mb_length = 0;
01590 dequeued->total_size_and_length (mb_bytes,
01591 mb_length);
01592
01593 this->cur_bytes_ -= mb_bytes;
01594 this->cur_length_ -= mb_length;
01595 --this->cur_count_;
01596
01597 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01598 this->head_ = this->tail_ = 0;
01599
01600
01601 dequeued->prev (0);
01602 dequeued->next (0);
01603
01604
01605
01606 if (this->cur_bytes_ <= this->low_water_mark_
01607 && this->signal_enqueue_waiters () == -1)
01608 return -1;
01609 else
01610 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01611 }
01612
01613
01614
01615
01616
01617 template <ACE_SYNCH_DECL> int
01618 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i (ACE_Message_Block *&dequeued)
01619 {
01620 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01621 if (this->head_ == 0)
01622 ACE_ERROR_RETURN ((LM_ERROR,
01623 ACE_TEXT ("Attempting to dequeue from empty queue")),
01624 -1);
01625 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i");
01626
01627
01628 ACE_Message_Block* chosen = 0;
01629 ACE_Time_Value deadline = ACE_Time_Value::max_time;
01630 for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ())
01631 if (temp->msg_deadline_time () < deadline)
01632 {
01633 deadline = temp->msg_deadline_time ();
01634 chosen = temp;
01635 }
01636
01637
01638
01639 if (chosen == 0)
01640 chosen = this->head_;
01641
01642
01643
01644 if (chosen->prev () == 0)
01645 this->head_ = chosen->next ();
01646 else
01647 chosen->prev ()->next (chosen->next ());
01648
01649 if (chosen->next () == 0)
01650 this->tail_ = chosen->prev ();
01651 else
01652 chosen->next ()->prev (chosen->prev ());
01653
01654
01655 dequeued = chosen;
01656
01657 size_t mb_bytes = 0;
01658 size_t mb_length = 0;
01659 dequeued->total_size_and_length (mb_bytes,
01660 mb_length);
01661
01662 this->cur_bytes_ -= mb_bytes;
01663 this->cur_length_ -= mb_length;
01664 --this->cur_count_;
01665
01666 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01667 this->head_ = this->tail_ = 0;
01668
01669
01670 dequeued->prev (0);
01671 dequeued->next (0);
01672
01673
01674
01675 if (this->cur_bytes_ <= this->low_water_mark_
01676 && this->signal_enqueue_waiters () == -1)
01677 return -1;
01678 else
01679 return this->cur_count_;
01680 #else
01681 return this->dequeue_head_i (dequeued);
01682 #endif
01683 }
01684
01685
01686
01687 template <ACE_SYNCH_DECL> int
01688 ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
01689 ACE_Time_Value *timeout)
01690 {
01691 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
01692 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01693
01694 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01695 {
01696 errno = ESHUTDOWN;
01697 return -1;
01698 }
01699
01700
01701
01702 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01703 return -1;
01704
01705 first_item = this->head_;
01706 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01707 }
01708
01709 template <ACE_SYNCH_DECL> int
01710 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &,
01711 ACE_Time_Value *timeout)
01712 {
01713 int result = 0;
01714
01715
01716
01717 while (this->is_full_i ())
01718 {
01719 if (this->not_full_cond_.wait (timeout) == -1)
01720 {
01721 if (errno == ETIME)
01722 errno = EWOULDBLOCK;
01723 result = -1;
01724 break;
01725 }
01726 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01727 {
01728 errno = ESHUTDOWN;
01729 result = -1;
01730 break;
01731 }
01732 }
01733 return result;
01734 }
01735
01736 template <ACE_SYNCH_DECL> int
01737 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond
01738 (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01739 {
01740 int result = 0;
01741
01742
01743
01744 while (this->is_empty_i ())
01745 {
01746 if (this->not_empty_cond_.wait (timeout) == -1)
01747 {
01748 if (errno == ETIME)
01749 errno = EWOULDBLOCK;
01750 result = -1;
01751 break;
01752 }
01753 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01754 {
01755 errno = ESHUTDOWN;
01756 result = -1;
01757 break;
01758 }
01759 }
01760 return result;
01761 }
01762
01763
01764
01765
01766 template <ACE_SYNCH_DECL> int
01767 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
01768 ACE_Time_Value *timeout)
01769 {
01770 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
01771 int queue_count = 0;
01772 ACE_Notification_Strategy *notifier = 0;
01773 {
01774 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01775
01776 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01777 {
01778 errno = ESHUTDOWN;
01779 return -1;
01780 }
01781
01782 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01783 return -1;
01784
01785 queue_count = this->enqueue_head_i (new_item);
01786 if (queue_count == -1)
01787 return -1;
01788
01789 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01790 this->monitor_->receive (this->cur_length_);
01791 #endif
01792 notifier = this->notification_strategy_;
01793 }
01794
01795 if (0 != notifier)
01796 notifier->notify();
01797 return queue_count;
01798 }
01799
01800
01801
01802
01803
01804 template <ACE_SYNCH_DECL> int
01805 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
01806 ACE_Time_Value *timeout)
01807 {
01808 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
01809 int queue_count = 0;
01810 ACE_Notification_Strategy *notifier = 0;
01811 {
01812 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01813
01814 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01815 {
01816 errno = ESHUTDOWN;
01817 return -1;
01818 }
01819
01820 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01821 return -1;
01822
01823 queue_count = this->enqueue_i (new_item);
01824
01825 if (queue_count == -1)
01826 return -1;
01827
01828 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01829 this->monitor_->receive (this->cur_length_);
01830 #endif
01831 notifier = this->notification_strategy_;
01832 }
01833 if (0 != notifier)
01834 notifier->notify ();
01835 return queue_count;
01836 }
01837
01838
01839
01840
01841
01842 template <ACE_SYNCH_DECL> int
01843 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline (ACE_Message_Block *new_item,
01844 ACE_Time_Value *timeout)
01845 {
01846 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline");
01847 int queue_count = 0;
01848 ACE_Notification_Strategy *notifier = 0;
01849 {
01850 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01851
01852 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01853 {
01854 errno = ESHUTDOWN;
01855 return -1;
01856 }
01857
01858 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01859 return -1;
01860
01861 queue_count = this->enqueue_deadline_i (new_item);
01862
01863 if (queue_count == -1)
01864 return -1;
01865
01866 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01867 this->monitor_->receive (this->cur_length_);
01868 #endif
01869 notifier = this->notification_strategy_;
01870 }
01871 if (0 != notifier)
01872 notifier->notify ();
01873 return queue_count;
01874 }
01875
01876 template <ACE_SYNCH_DECL> int
01877 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
01878 ACE_Time_Value *timeout)
01879 {
01880 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
01881 return this->enqueue_prio (new_item, timeout);
01882 }
01883
01884
01885
01886
01887 template <ACE_SYNCH_DECL> int
01888 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
01889 ACE_Time_Value *timeout)
01890 {
01891 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
01892 int queue_count = 0;
01893 ACE_Notification_Strategy *notifier = 0;
01894 {
01895 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01896
01897 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01898 {
01899 errno = ESHUTDOWN;
01900 return -1;
01901 }
01902
01903 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01904 return -1;
01905
01906 queue_count = this->enqueue_tail_i (new_item);
01907
01908 if (queue_count == -1)
01909 return -1;
01910
01911 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01912 this->monitor_->receive (this->cur_length_);
01913 #endif
01914 notifier = this->notification_strategy_;
01915 }
01916 if (0 != notifier)
01917 notifier->notify ();
01918 return queue_count;
01919 }
01920
01921
01922
01923
01924
01925 template <ACE_SYNCH_DECL> int
01926 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01927 ACE_Time_Value *timeout)
01928 {
01929 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01930 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01931
01932 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01933 {
01934 errno = ESHUTDOWN;
01935 return -1;
01936 }
01937
01938 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01939 return -1;
01940
01941 return this->dequeue_head_i (first_item);
01942 }
01943
01944
01945
01946
01947
01948 template <ACE_SYNCH_DECL> int
01949 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio (ACE_Message_Block *&dequeued,
01950 ACE_Time_Value *timeout)
01951 {
01952 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio");
01953 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01954
01955 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01956 {
01957 errno = ESHUTDOWN;
01958 return -1;
01959 }
01960
01961 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01962 return -1;
01963
01964 return this->dequeue_prio_i (dequeued);
01965 }
01966
01967
01968
01969
01970
01971 template <ACE_SYNCH_DECL> int
01972 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail (ACE_Message_Block *&dequeued,
01973 ACE_Time_Value *timeout)
01974 {
01975 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail");
01976 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01977
01978 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01979 {
01980 errno = ESHUTDOWN;
01981 return -1;
01982 }
01983
01984 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01985 return -1;
01986
01987 return this->dequeue_tail_i (dequeued);
01988 }
01989
01990
01991
01992
01993
01994 template <ACE_SYNCH_DECL> int
01995 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline (ACE_Message_Block *&dequeued,
01996 ACE_Time_Value *timeout)
01997 {
01998 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline");
01999 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
02000
02001 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
02002 {
02003 errno = ESHUTDOWN;
02004 return -1;
02005 }
02006
02007 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
02008 return -1;
02009
02010 return this->dequeue_deadline_i (dequeued);
02011 }
02012
02013 template <ACE_SYNCH_DECL> int
02014 ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
02015 {
02016 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
02017
02018
02019 if (this->notification_strategy_ == 0)
02020 return 0;
02021 else
02022 return this->notification_strategy_->notify ();
02023 }
02024
02025
02026 template <ACE_SYNCH_DECL>
02027 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
02028 size_t hwm,
02029 size_t lwm,
02030 ACE_Notification_Strategy *ns)
02031 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02032 pending_head_ (0),
02033 pending_tail_ (0),
02034 late_head_ (0),
02035 late_tail_ (0),
02036 beyond_late_head_ (0),
02037 beyond_late_tail_ (0),
02038 message_strategy_ (message_strategy)
02039 {
02040
02041
02042
02043 }
02044
02045
02046
02047 template <ACE_SYNCH_DECL>
02048 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
02049 {
02050 delete &this->message_strategy_;
02051 }
02052
02053 template <ACE_SYNCH_DECL> int
02054 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head,
02055 ACE_Message_Block *&list_tail,
02056 u_int status_flags)
02057 {
02058
02059 list_head = 0;
02060 list_tail = 0;
02061
02062
02063 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02064
02065
02066 int result = this->refresh_queue (current_time);
02067 if (result < 0)
02068 return result;
02069
02070 if (ACE_BIT_ENABLED (status_flags,
02071 (u_int) ACE_Dynamic_Message_Strategy::PENDING)
02072 && this->pending_head_
02073 && this->pending_tail_)
02074 {
02075
02076 if (this->pending_head_->prev ())
02077 {
02078 this->tail_ = this->pending_head_->prev ();
02079 this->pending_head_->prev ()->next (0);
02080 }
02081 else
02082 {
02083
02084 this->head_ = 0;
02085 this->tail_ = 0;
02086 }
02087
02088
02089 list_head = this->pending_head_;
02090 list_tail = this->pending_tail_;
02091
02092
02093 this->pending_head_->prev (0);
02094 this->pending_head_ = 0;
02095 this->pending_tail_ = 0;
02096 }
02097
02098 if (ACE_BIT_ENABLED (status_flags,
02099 (u_int) ACE_Dynamic_Message_Strategy::LATE)
02100 && this->late_head_
02101 && this->late_tail_)
02102 {
02103
02104
02105 if (this->late_tail_->next ())
02106 this->late_tail_->next ()->prev (this->late_head_->prev ());
02107 else
02108 this->tail_ = this->late_head_->prev ();
02109
02110 if (this->late_head_->prev ())
02111 this->late_head_->prev ()->next (this->late_tail_->next ());
02112 else
02113 this->head_ = this->late_tail_->next ();
02114
02115
02116 this->late_head_->prev (list_tail);
02117 if (list_tail)
02118 list_tail->next (this->late_head_);
02119 else
02120 list_head = this->late_head_;
02121
02122 list_tail = this->late_tail_;
02123
02124 this->late_tail_->next (0);
02125 this->late_head_ = 0;
02126 this->late_tail_ = 0;
02127 }
02128
02129 if (ACE_BIT_ENABLED (status_flags,
02130 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
02131 && this->beyond_late_head_
02132 && this->beyond_late_tail_)
02133 {
02134
02135 if (this->beyond_late_tail_->next ())
02136 {
02137 this->head_ = this->beyond_late_tail_->next ();
02138 this->beyond_late_tail_->next ()->prev (0);
02139 }
02140 else
02141 {
02142
02143 this->head_ = 0;
02144 this->tail_ = 0;
02145 }
02146
02147
02148
02149 if (list_tail)
02150 {
02151 this->beyond_late_head_->prev (list_tail);
02152 list_tail->next (this->beyond_late_head_);
02153 }
02154 else
02155 list_head = this->beyond_late_head_;
02156
02157 list_tail = this->beyond_late_tail_;
02158
02159 this->beyond_late_tail_->next (0);
02160 this->beyond_late_head_ = 0;
02161 this->beyond_late_tail_ = 0;
02162 }
02163
02164
02165 ACE_Message_Block *temp1;
02166
02167 for (temp1 = list_head;
02168 temp1 != 0;
02169 temp1 = temp1->next ())
02170 {
02171 --this->cur_count_;
02172
02173 size_t mb_bytes = 0;
02174 size_t mb_length = 0;
02175 temp1->total_size_and_length (mb_bytes,
02176 mb_length);
02177
02178 this->cur_bytes_ -= mb_bytes;
02179 this->cur_length_ -= mb_length;
02180 }
02181
02182 return result;
02183 }
02184
02185
02186
02187
02188
02189
02190
02191
02192
02193 template <ACE_SYNCH_DECL> int
02194 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
02195 ACE_Time_Value *timeout)
02196 {
02197 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
02198
02199 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
02200
02201 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
02202 {
02203 errno = ESHUTDOWN;
02204 return -1;
02205 }
02206
02207 int result;
02208
02209
02210 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02211
02212
02213 result = this->refresh_queue (current_time);
02214 if (result < 0)
02215 return result;
02216
02217
02218 result = this->wait_not_empty_cond (ace_mon, timeout);
02219 if (result == -1)
02220 return result;
02221
02222
02223
02224
02225 result = this->dequeue_head_i (first_item);
02226
02227 return result;
02228 }
02229
02230
02231
02232
02233 template <ACE_SYNCH_DECL> void
02234 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
02235 {
02236 #if defined (ACE_HAS_DUMP)
02237 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
02238 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02239
02240 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class):\n")));
02241 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
02242
02243 ACE_DEBUG ((LM_DEBUG,
02244 ACE_TEXT ("pending_head_ = %u\n")
02245 ACE_TEXT ("pending_tail_ = %u\n")
02246 ACE_TEXT ("late_head_ = %u\n")
02247 ACE_TEXT ("late_tail_ = %u\n")
02248 ACE_TEXT ("beyond_late_head_ = %u\n")
02249 ACE_TEXT ("beyond_late_tail_ = %u\n"),
02250 this->pending_head_,
02251 this->pending_tail_,
02252 this->late_head_,
02253 this->late_tail_,
02254 this->beyond_late_head_,
02255 this->beyond_late_tail_));
02256
02257 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ :\n")));
02258 message_strategy_.dump ();
02259
02260 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02261 #endif
02262 }
02263
02264
02265 template <ACE_SYNCH_DECL> int
02266 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
02267 {
02268 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02269
02270 if (new_item == 0)
02271 {
02272 return -1;
02273 }
02274
02275 int result = 0;
02276
02277
02278 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02279
02280
02281
02282 result = this->refresh_queue (current_time);
02283
02284 if (result < 0)
02285 {
02286 return result;
02287 }
02288
02289
02290 switch (message_strategy_.priority_status (*new_item,
02291 current_time))
02292 {
02293 case ACE_Dynamic_Message_Strategy::PENDING:
02294 if (this->pending_tail_ == 0)
02295 {
02296
02297
02298
02299 pending_head_ = new_item;
02300 pending_tail_ = pending_head_;
02301 return this->enqueue_tail_i (new_item);
02302 }
02303 else
02304 {
02305
02306
02307 result = sublist_enqueue_i (new_item,
02308 current_time,
02309 this->pending_head_,
02310 this->pending_tail_,
02311 ACE_Dynamic_Message_Strategy::PENDING);
02312 }
02313 break;
02314
02315 case ACE_Dynamic_Message_Strategy::LATE:
02316 if (this->late_tail_ == 0)
02317 {
02318 late_head_ = new_item;
02319 late_tail_ = late_head_;
02320
02321 if (this->pending_head_ == 0)
02322
02323
02324
02325 return this->enqueue_tail_i (new_item);
02326 else if (this->beyond_late_tail_ == 0)
02327
02328
02329 return this->enqueue_head_i (new_item);
02330 else
02331 {
02332
02333
02334
02335 this->beyond_late_tail_->next (new_item);
02336 new_item->prev (this->beyond_late_tail_);
02337 this->pending_head_->prev (new_item);
02338 new_item->next (this->pending_head_);
02339 }
02340 }
02341 else
02342 {
02343
02344
02345 result = sublist_enqueue_i (new_item,
02346 current_time,
02347 this->late_head_,
02348 this->late_tail_,
02349 ACE_Dynamic_Message_Strategy::LATE);
02350 }
02351 break;
02352
02353 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02354 if (this->beyond_late_tail_ == 0)
02355 {
02356
02357
02358
02359 beyond_late_head_ = new_item;
02360 beyond_late_tail_ = beyond_late_head_;
02361 return this->enqueue_head_i (new_item);
02362 }
02363 else
02364 {
02365
02366
02367
02368 if (this->beyond_late_tail_->next ())
02369 {
02370 this->beyond_late_tail_->next ()->prev (new_item);
02371 }
02372 else
02373 {
02374 this->tail_ = new_item;
02375 }
02376
02377 new_item->next (this->beyond_late_tail_->next ());
02378 this->beyond_late_tail_->next (new_item);
02379 new_item->prev (this->beyond_late_tail_);
02380 this->beyond_late_tail_ = new_item;
02381 }
02382
02383 break;
02384
02385
02386 default:
02387 result = -1;
02388 break;
02389 }
02390
02391 if (result < 0)
02392 {
02393 return result;
02394 }
02395
02396 size_t mb_bytes = 0;
02397 size_t mb_length = 0;
02398 new_item->total_size_and_length (mb_bytes,
02399 mb_length);
02400 this->cur_bytes_ += mb_bytes;
02401 this->cur_length_ += mb_length;
02402 ++this->cur_count_;
02403
02404 if (this->signal_dequeue_waiters () == -1)
02405 {
02406 return -1;
02407 }
02408 else
02409 {
02410 return ACE_Utils::truncate_cast<int> (this->cur_count_);
02411 }
02412 }
02413
02414
02415
02416
02417
02418
02419
02420 template <ACE_SYNCH_DECL> int
02421 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
02422 const ACE_Time_Value ¤t_time,
02423 ACE_Message_Block *&sublist_head,
02424 ACE_Message_Block *&sublist_tail,
02425 ACE_Dynamic_Message_Strategy::Priority_Status status)
02426 {
02427 int result = 0;
02428 ACE_Message_Block *current_item = 0;
02429
02430
02431
02432 for (current_item = sublist_tail;
02433 current_item;
02434 current_item = current_item->prev ())
02435 {
02436 if (message_strategy_.priority_status (*current_item, current_time) == status)
02437 {
02438 if (current_item->msg_priority () >= new_item->msg_priority ())
02439 break;
02440 }
02441 else
02442 {
02443 sublist_head = new_item;
02444 break;
02445 }
02446 }
02447
02448 if (current_item == 0)
02449 {
02450
02451
02452 new_item->prev (0);
02453 new_item->next (this->head_);
02454 if (this->head_ != 0)
02455 this->head_->prev (new_item);
02456 else
02457 {
02458 this->tail_ = new_item;
02459 sublist_tail = new_item;
02460 }
02461 this->head_ = new_item;
02462 sublist_head = new_item;
02463 }
02464 else
02465 {
02466
02467 new_item->next (current_item->next ());
02468 new_item->prev (current_item);
02469
02470 if (current_item->next ())
02471 current_item->next ()->prev (new_item);
02472 else
02473 this->tail_ = new_item;
02474
02475 current_item->next (new_item);
02476
02477
02478
02479 if (current_item == sublist_tail)
02480 sublist_tail = new_item;
02481 }
02482
02483 return result;
02484 }
02485
02486
02487
02488
02489 template <ACE_SYNCH_DECL> int
02490 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
02491 {
02492 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02493
02494 int result = 0;
02495 int last_in_subqueue = 0;
02496
02497
02498 if (this->pending_head_)
02499 {
02500 first_item = this->pending_head_;
02501
02502 if (0 == this->pending_head_->prev ())
02503 this->head_ = this->pending_head_->next ();
02504 else
02505 this->pending_head_->prev ()->next (this->pending_head_->next ());
02506
02507 if (0 == this->pending_head_->next ())
02508 {
02509 this->tail_ = this->pending_head_->prev ();
02510 this->pending_head_ = 0;
02511 this->pending_tail_ = 0;
02512 }
02513 else
02514 {
02515 this->pending_head_->next ()->prev (this->pending_head_->prev ());
02516 this->pending_head_ = this->pending_head_->next ();
02517 }
02518
02519 first_item->prev (0);
02520 first_item->next (0);
02521 }
02522
02523
02524 else if (this->late_head_)
02525 {
02526 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02527
02528 first_item = this->late_head_;
02529
02530 if (0 == this->late_head_->prev ())
02531 this->head_ = this->late_head_->next ();
02532 else
02533 this->late_head_->prev ()->next (this->late_head_->next ());
02534
02535 if (0 == this->late_head_->next ())
02536 this->tail_ = this->late_head_->prev ();
02537 else
02538 {
02539 this->late_head_->next ()->prev (this->late_head_->prev ());
02540 this->late_head_ = this->late_head_->next ();
02541 }
02542
02543 if (last_in_subqueue)
02544 {
02545 this->late_head_ = 0;
02546 this->late_tail_ = 0;
02547 }
02548
02549 first_item->prev (0);
02550 first_item->next (0);
02551 }
02552
02553 else if (this->beyond_late_head_)
02554 {
02555 last_in_subqueue =
02556 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02557
02558 first_item = this->beyond_late_head_;
02559 this->head_ = this->beyond_late_head_->next ();
02560
02561 if (0 == this->beyond_late_head_->next ())
02562 {
02563 this->tail_ = this->beyond_late_head_->prev ();
02564 }
02565 else
02566 {
02567 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02568 this->beyond_late_head_ = this->beyond_late_head_->next ();
02569 }
02570
02571 if (last_in_subqueue)
02572 {
02573 this->beyond_late_head_ = 0;
02574 this->beyond_late_tail_ = 0;
02575 }
02576
02577 first_item->prev (0);
02578 first_item->next (0);
02579 }
02580 else
02581 {
02582
02583 first_item = 0;
02584 result = -1;
02585 }
02586
02587 if (result < 0)
02588 {
02589 return result;
02590 }
02591
02592 size_t mb_bytes = 0;
02593 size_t mb_length = 0;
02594 first_item->total_size_and_length (mb_bytes,
02595 mb_length);
02596
02597 this->cur_bytes_ -= mb_bytes;
02598 this->cur_length_ -= mb_length;
02599 --this->cur_count_;
02600
02601
02602
02603 if (this->cur_bytes_ <= this->low_water_mark_
02604 && this->signal_enqueue_waiters () == -1)
02605 {
02606 return -1;
02607 }
02608 else
02609 {
02610 return ACE_Utils::truncate_cast<int> (this->cur_count_);
02611 }
02612 }
02613
02614
02615
02616
02617
02618
02619
02620 template <ACE_SYNCH_DECL> int
02621 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value ¤t_time)
02622 {
02623 int result;
02624
02625 result = refresh_pending_queue (current_time);
02626
02627 if (result != -1)
02628 result = refresh_late_queue (current_time);
02629
02630 return result;
02631 }
02632
02633
02634
02635
02636 template <ACE_SYNCH_DECL> int
02637 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value ¤t_time)
02638 {
02639 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02640
02641
02642 if (this->pending_head_)
02643 {
02644 current_status = message_strategy_.priority_status (*this->pending_head_,
02645 current_time);
02646 switch (current_status)
02647 {
02648 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02649
02650
02651 this->beyond_late_head_ = this->head_;
02652
02653
02654
02655 this->late_head_ = 0;
02656 this->late_tail_ = 0;
02657
02658
02659 do
02660 {
02661 this->pending_head_ = this->pending_head_->next ();
02662
02663 if (this->pending_head_)
02664 current_status = message_strategy_.priority_status (*this->pending_head_,
02665 current_time);
02666 else
02667 break;
02668
02669 }
02670 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02671
02672 if (this->pending_head_)
02673 {
02674
02675 this->beyond_late_tail_ = this->pending_head_->prev ();
02676
02677 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02678
02679 break;
02680 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02681 {
02682
02683 ACE_ERROR_RETURN ((LM_ERROR,
02684 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02685 (int) current_status),
02686 -1);
02687 }
02688
02689 }
02690 else
02691 {
02692
02693
02694 this->beyond_late_tail_ = this->tail_;
02695 this->pending_head_ = 0;
02696 this->pending_tail_ = 0;
02697 break;
02698 }
02699
02700 case ACE_Dynamic_Message_Strategy::LATE:
02701
02702
02703
02704 if (this->late_head_ == 0)
02705 this->late_head_ = this->pending_head_;
02706
02707
02708 do
02709 {
02710 this->pending_head_ = this->pending_head_->next ();
02711
02712 if (this->pending_head_)
02713 current_status = message_strategy_.priority_status (*this->pending_head_,
02714 current_time);
02715 else
02716 break;
02717
02718 }
02719 while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02720
02721 if (this->pending_head_)
02722 {
02723 if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02724
02725 ACE_ERROR_RETURN((LM_ERROR,
02726 ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02727 (int) current_status),
02728 -1);
02729
02730
02731 this->late_tail_ = this->pending_head_->prev ();
02732 }
02733 else
02734 {
02735
02736 this->late_tail_ = this->tail_;
02737 this->pending_head_ = 0;
02738 this->pending_tail_ = 0;
02739 }
02740
02741 break;
02742 case ACE_Dynamic_Message_Strategy::PENDING:
02743
02744 break;
02745 default:
02746
02747 ACE_ERROR_RETURN((LM_ERROR,
02748 ACE_TEXT ("Unknown message priority status [%d]"),
02749 (int) current_status),
02750 -1);
02751 }
02752 }
02753 return 0;
02754 }
02755
02756
02757
02758
02759 template <ACE_SYNCH_DECL> int
02760 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value ¤t_time)
02761 {
02762 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02763
02764 if (this->late_head_)
02765 {
02766 current_status = message_strategy_.priority_status (*this->late_head_,
02767 current_time);
02768 switch (current_status)
02769 {
02770 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02771
02772
02773
02774 this->beyond_late_head_ = this->head_;
02775
02776
02777 do
02778 {
02779 this->late_head_ = this->late_head_->next ();
02780
02781 if (this->late_head_)
02782 current_status = message_strategy_.priority_status (*this->late_head_,
02783 current_time);
02784 else
02785 break;
02786
02787 }
02788 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02789
02790 if (this->late_head_)
02791 {
02792
02793 this->beyond_late_tail_ = this->late_head_->prev ();
02794
02795 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02796 {
02797
02798 this->late_head_ = 0;
02799 this->late_tail_ = 0;
02800 }
02801 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02802
02803 ACE_ERROR_RETURN ((LM_ERROR,
02804 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02805 (int) current_status),
02806 -1);
02807 }
02808 else
02809 {
02810
02811 this->beyond_late_tail_ = this->tail_;
02812 this->late_head_ = 0;
02813 this->late_tail_ = 0;
02814 }
02815
02816 break;
02817
02818 case ACE_Dynamic_Message_Strategy::LATE:
02819
02820 break;
02821
02822 case ACE_Dynamic_Message_Strategy::PENDING:
02823
02824 ACE_ERROR_RETURN ((LM_ERROR,
02825 ACE_TEXT ("Unexpected message priority status ")
02826 ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02827 (int) current_status),
02828 -1);
02829 default:
02830
02831 ACE_ERROR_RETURN ((LM_ERROR,
02832 ACE_TEXT ("Unknown message priority status [%d]"),
02833 (int) current_status),
02834 -1);
02835 }
02836 }
02837
02838 return 0;
02839 }
02840
02841
02842
02843
02844 template <ACE_SYNCH_DECL> int
02845 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
02846 ACE_Time_Value *timeout)
02847 {
02848 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02849 timeout);
02850 }
02851
02852
02853
02854
02855 template <ACE_SYNCH_DECL> int
02856 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
02857 ACE_Time_Value *timeout)
02858 {
02859 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02860 return this->enqueue_prio (new_item, timeout);
02861 }
02862
02863
02864
02865
02866
02867
02868 template <ACE_SYNCH_DECL> int
02869 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
02870 ACE_Time_Value *timeout)
02871 {
02872 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02873 return this->enqueue_prio (new_item, timeout);
02874 }
02875
02876
02877
02878
02879
02880
02881 template <ACE_SYNCH_DECL>
02882 ACE_Message_Queue<ACE_SYNCH_USE> *
02883 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
02884 size_t lwm,
02885 ACE_Notification_Strategy *ns)
02886 {
02887 ACE_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02888
02889 ACE_NEW_RETURN (tmp,
02890 ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02891 0);
02892 return tmp;
02893 }
02894
02895
02896
02897 template <ACE_SYNCH_DECL>
02898 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02899 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
02900 size_t lwm,
02901 ACE_Notification_Strategy *ns,
02902 u_long static_bit_field_mask,
02903 u_long static_bit_field_shift,
02904 u_long dynamic_priority_max,
02905 u_long dynamic_priority_offset)
02906 {
02907 ACE_Deadline_Message_Strategy *adms = 0;
02908
02909 ACE_NEW_RETURN (adms,
02910 ACE_Deadline_Message_Strategy (static_bit_field_mask,
02911 static_bit_field_shift,
02912 dynamic_priority_max,
02913 dynamic_priority_offset),
02914 0);
02915
02916 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02917 ACE_NEW_RETURN (tmp,
02918 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns),
02919 0);
02920 return tmp;
02921 }
02922
02923
02924
02925
02926 template <ACE_SYNCH_DECL>
02927 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02928 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
02929 size_t lwm,
02930 ACE_Notification_Strategy *ns,
02931 u_long static_bit_field_mask,
02932 u_long static_bit_field_shift,
02933 u_long dynamic_priority_max,
02934 u_long dynamic_priority_offset)
02935 {
02936 ACE_Laxity_Message_Strategy *alms = 0;
02937
02938 ACE_NEW_RETURN (alms,
02939 ACE_Laxity_Message_Strategy (static_bit_field_mask,
02940 static_bit_field_shift,
02941 dynamic_priority_max,
02942 dynamic_priority_offset),
02943 0);
02944
02945 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02946 ACE_NEW_RETURN (tmp,
02947 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns),
02948 0);
02949 return tmp;
02950 }
02951
02952
02953
02954
02955 #if defined (ACE_VXWORKS)
02956
02957
02958 template <ACE_SYNCH_DECL>
02959 ACE_Message_Queue_Vx *
02960 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
02961 size_t max_message_length,
02962 ACE_Notification_Strategy *ns)
02963 {
02964 ACE_Message_Queue_Vx *tmp = 0;
02965
02966 ACE_NEW_RETURN (tmp,
02967 ACE_Message_Queue_Vx (max_messages, max_message_length, ns),
02968 0);
02969 return tmp;
02970 }
02971 #endif
02972
02973 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
02974
02975 template <ACE_SYNCH_DECL>
02976 ACE_Message_Queue_NT *
02977 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads)
02978 {
02979 ACE_Message_Queue_NT *tmp = 0;
02980
02981 ACE_NEW_RETURN (tmp,
02982 ACE_Message_Queue_NT (max_threads),
02983 0);
02984 return tmp;
02985 }
02986
02987 #endif
02988
02989 ACE_END_VERSIONED_NAMESPACE_DECL
02990
02991 #endif