00001
00002
00003 #ifndef ACE_MESSAGE_QUEUE_T_CPP
00004 #define ACE_MESSAGE_QUEUE_T_CPP
00005
00006
00007
00008 #include "ace/Message_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/OS_NS_sys_time.h"
00011
00012 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00013 # pragma once
00014 #endif
00015
00016 #include "ace/Notification_Strategy.h"
00017 #include "ace/Truncate.h"
00018
00019 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
00022 ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex)
00024
00025 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00026 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00027 {
00028 #if defined (ACE_HAS_DUMP)
00029 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump");
00030
00031 this->queue_.dump ();
00032 #endif
00033 }
00034
00035 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00036 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (size_t new_value)
00037 {
00038 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00039
00040 this->queue_.message_bytes (new_value);
00041 }
00042
00043 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00044 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (size_t new_value)
00045 {
00046 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00047
00048 this->queue_.message_length (new_value);
00049 }
00050
00051 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00052 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex (size_t hwm,
00053 size_t lwm,
00054 ACE_Notification_Strategy *ns)
00055 {
00056 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex");
00057
00058 if (this->queue_.open (hwm, lwm, ns) == -1)
00059 ACE_ERROR ((LM_ERROR,
00060 ACE_LIB_TEXT ("ACE_Message_Queue_Ex")));
00061 }
00062
00063 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00064 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex (void)
00065 {
00066 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex");
00067 }
00068
00069 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00070 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open (size_t hwm,
00071 size_t lwm,
00072 ACE_Notification_Strategy *ns)
00073 {
00074 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open");
00075
00076 return this->queue_.open (hwm, lwm, ns);
00077 }
00078
00079
00080
00081 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00082 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close (void)
00083 {
00084 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close");
00085
00086 return this->queue_.close ();
00087 }
00088
00089 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00090 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush (void)
00091 {
00092 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush");
00093
00094 return this->queue_.flush ();
00095 }
00096
00097 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00098 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i (void)
00099 {
00100 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i");
00101
00102 return this->queue_.flush_i ();
00103 }
00104
00105
00106
00107 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00108 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00109 ACE_Time_Value *timeout)
00110 {
00111 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head");
00112
00113 ACE_Message_Block *mb = 0;
00114
00115 int const cur_count = this->queue_.peek_dequeue_head (mb, timeout);
00116
00117 if (cur_count != -1)
00118 first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00119
00120 return cur_count;
00121 }
00122
00123 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00124 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head (ACE_MESSAGE_TYPE *new_item,
00125 ACE_Time_Value *timeout)
00126 {
00127 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00128
00129 ACE_Message_Block *mb = 0;
00130
00131 ACE_NEW_RETURN (mb,
00132 ACE_Message_Block ((char *) new_item,
00133 sizeof (*new_item),
00134 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00135 -1);
00136
00137 int const result = this->queue_.enqueue_head (mb, timeout);
00138 if (result == -1)
00139
00140 mb->release ();
00141 return result;
00142 }
00143
00144
00145
00146
00147
00148 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00149 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue (ACE_MESSAGE_TYPE *new_item,
00150 ACE_Time_Value *timeout)
00151 {
00152 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue");
00153
00154 return this->enqueue_prio (new_item, timeout);
00155 }
00156
00157 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00158 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio (ACE_MESSAGE_TYPE *new_item,
00159 ACE_Time_Value *timeout)
00160 {
00161 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00162
00163 ACE_Message_Block *mb = 0;
00164
00165 ACE_NEW_RETURN (mb,
00166 ACE_Message_Block ((char *) new_item,
00167 sizeof (*new_item),
00168 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00169 -1);
00170
00171 int const result = this->queue_.enqueue_prio (mb, timeout);
00172 if (result == -1)
00173
00174 mb->release ();
00175
00176 return result;
00177 }
00178
00179 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00180 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
00181 ACE_Time_Value *timeout)
00182 {
00183 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline");
00184
00185 ACE_Message_Block *mb = 0;
00186
00187 ACE_NEW_RETURN (mb,
00188 ACE_Message_Block ((char *) new_item,
00189 sizeof (*new_item),
00190 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY ),
00191 -1);
00192
00193 int const result = this->queue_.enqueue_deadline (mb, timeout);
00194 if (result == -1)
00195
00196 mb->release ();
00197
00198 return result;
00199 }
00200
00201
00202
00203
00204 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00205 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail (ACE_MESSAGE_TYPE *new_item,
00206 ACE_Time_Value *timeout)
00207 {
00208 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00209
00210 ACE_Message_Block *mb = 0;
00211
00212 ACE_NEW_RETURN (mb,
00213 ACE_Message_Block ((char *) new_item,
00214 sizeof (*new_item),
00215 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00216 -1);
00217
00218 int const result = this->queue_.enqueue_tail (mb, timeout);
00219 if (result == -1)
00220
00221 mb->release ();
00222 return result;
00223 }
00224
00225
00226
00227
00228
00229 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00230 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00231 ACE_Time_Value *timeout)
00232 {
00233 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head");
00234
00235 ACE_Message_Block *mb = 0;
00236
00237 int const cur_count = this->queue_.dequeue_head (mb, timeout);
00238
00239
00240 if (cur_count != -1)
00241 {
00242 first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00243
00244 mb->release ();
00245 }
00246
00247 return cur_count;
00248 }
00249
00250
00251
00252
00253
00254 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00255 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
00256 ACE_Time_Value *timeout)
00257 {
00258 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio");
00259
00260 ACE_Message_Block *mb = 0;
00261
00262 int const cur_count = this->queue_.dequeue_prio (mb, timeout);
00263
00264
00265 if (cur_count != -1)
00266 {
00267 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00268
00269 mb->release ();
00270 }
00271
00272 return cur_count;
00273 }
00274
00275
00276
00277
00278
00279 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00280 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
00281 ACE_Time_Value *timeout)
00282 {
00283 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail");
00284
00285 ACE_Message_Block *mb = 0;
00286
00287 int const cur_count = this->queue_.dequeue_tail (mb, timeout);
00288
00289
00290 if (cur_count != -1)
00291 {
00292 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00293
00294 mb->release ();
00295 }
00296
00297 return cur_count;
00298 }
00299
00300
00301
00302
00303
00304 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00305 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
00306 ACE_Time_Value *timeout)
00307 {
00308 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline");
00309
00310 ACE_Message_Block *mb = 0;
00311
00312 int const cur_count = this->queue_.dequeue_deadline (mb, timeout);
00313
00314
00315 if (cur_count != -1)
00316 {
00317 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00318
00319 mb->release ();
00320 }
00321
00322 return cur_count;
00323 }
00324
00325 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00326 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify (void)
00327 {
00328 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify");
00329
00330 return this->queue_.notify ();
00331 }
00332
00333
00334 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
00335
00336 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00337 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue (ACE_MESSAGE_TYPE *&first_item,
00338 ACE_Time_Value *timeout)
00339 {
00340 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue");
00341
00342 return this->dequeue_head (first_item, timeout);
00343 }
00344
00345 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Notification_Strategy *
00346 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (void)
00347 {
00348 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00349
00350 return this->queue_.notification_strategy ();
00351 }
00352
00353 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00354 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00355 {
00356 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00357
00358 this->queue_.notification_strategy (s);
00359 }
00360
00361
00362
00363 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00364 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty (void)
00365 {
00366 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty");
00367
00368 return this->queue_.is_empty ();
00369 }
00370
00371
00372
00373 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00374 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full (void)
00375 {
00376 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full");
00377
00378 return this->queue_.is_full ();
00379 }
00380
00381 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00382 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (void)
00383 {
00384 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00385
00386 return this->queue_.high_water_mark ();
00387 }
00388
00389 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00390 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00391 {
00392 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00393
00394 this->queue_.high_water_mark (hwm);
00395 }
00396
00397 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00398 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (void)
00399 {
00400 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00401
00402 return this->queue_.low_water_mark ();
00403 }
00404
00405 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00406 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00407 {
00408 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00409
00410 this->queue_.low_water_mark (lwm);
00411 }
00412
00413 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00414 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (void)
00415 {
00416 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00417
00418 return this->queue_.message_bytes ();
00419 }
00420
00421 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00422 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (void)
00423 {
00424 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00425
00426 return this->queue_.message_length ();
00427 }
00428
00429 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00430 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count (void)
00431 {
00432 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count");
00433
00434 return this->queue_.message_count ();
00435 }
00436
00437 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00438 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (void)
00439 {
00440 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate");
00441
00442 return this->queue_.deactivate ();
00443 }
00444
00445 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00446 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void)
00447 {
00448 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate");
00449
00450 return this->queue_.activate ();
00451 }
00452
00453 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00454 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse (void)
00455 {
00456 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse");
00457
00458 return this->queue_.pulse ();
00459 }
00460
00461 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00462 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated (void)
00463 {
00464 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated");
00465
00466 return this->queue_.deactivated ();
00467 }
00468
00469 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00470 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state (void)
00471 {
00472 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state");
00473
00474 return this->queue_.state ();
00475 }
00476
00477 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00478 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::lock (void)
00479 {
00480 return this->queue_.lock ();
00481 }
00482
00483 template <ACE_SYNCH_DECL>
00484 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00485 : queue_ (q),
00486 curr_ (q.head_)
00487 {
00488 }
00489
00490 template <ACE_SYNCH_DECL> int
00491 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00492 {
00493 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00494
00495 if (this->curr_ != 0)
00496 {
00497 entry = this->curr_;
00498 return 1;
00499 }
00500
00501 return 0;
00502 }
00503
00504 template <ACE_SYNCH_DECL> int
00505 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
00506 {
00507 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00508
00509 return this->curr_ == 0;
00510 }
00511
00512 template <ACE_SYNCH_DECL> int
00513 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
00514 {
00515 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00516
00517 if (this->curr_)
00518 this->curr_ = this->curr_->next ();
00519 return this->curr_ != 0;
00520 }
00521
00522 template <ACE_SYNCH_DECL> void
00523 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
00524 {
00525 #if defined (ACE_HAS_DUMP)
00526 #endif
00527 }
00528
00529 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
00530
00531 template <ACE_SYNCH_DECL>
00532 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00533 : queue_ (q),
00534 curr_ (queue_.tail_)
00535 {
00536 }
00537
00538 template <ACE_SYNCH_DECL> int
00539 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00540 {
00541 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00542
00543 if (this->curr_ != 0)
00544 {
00545 entry = this->curr_;
00546 return 1;
00547 }
00548
00549 return 0;
00550 }
00551
00552 template <ACE_SYNCH_DECL> int
00553 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
00554 {
00555 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00556
00557 return this->curr_ == 0;
00558 }
00559
00560 template <ACE_SYNCH_DECL> int
00561 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
00562 {
00563 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00564
00565 if (this->curr_)
00566 this->curr_ = this->curr_->prev ();
00567 return this->curr_ != 0;
00568 }
00569
00570 template <ACE_SYNCH_DECL> void
00571 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
00572 {
00573 #if defined (ACE_HAS_DUMP)
00574 #endif
00575 }
00576
00577 template <ACE_SYNCH_DECL> int
00578 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue (ACE_Message_Block *&first_item,
00579 ACE_Time_Value *timeout)
00580 {
00581 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue");
00582 return this->dequeue_head (first_item, timeout);
00583 }
00584
00585 template <ACE_SYNCH_DECL> ACE_Notification_Strategy *
00586 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void)
00587 {
00588 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00589
00590 return this->notification_strategy_;
00591 }
00592
00593 template <ACE_SYNCH_DECL> void
00594 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00595 {
00596 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00597
00598 this->notification_strategy_ = s;
00599 }
00600
00601
00602
00603 template <ACE_SYNCH_DECL> int
00604 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void)
00605 {
00606 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i");
00607 return this->tail_ == 0;
00608 }
00609
00610
00611
00612 template <ACE_SYNCH_DECL> int
00613 ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void)
00614 {
00615 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i");
00616 return this->cur_bytes_ >= this->high_water_mark_;
00617 }
00618
00619
00620
00621 template <ACE_SYNCH_DECL> int
00622 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void)
00623 {
00624 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty");
00625 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00626
00627 return this->is_empty_i ();
00628 }
00629
00630
00631
00632 template <ACE_SYNCH_DECL> int
00633 ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void)
00634 {
00635 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full");
00636 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00637
00638 return this->is_full_i ();
00639 }
00640
00641 template <ACE_SYNCH_DECL> size_t
00642 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void)
00643 {
00644 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00645 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00646
00647 return this->high_water_mark_;
00648 }
00649
00650 template <ACE_SYNCH_DECL> void
00651 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00652 {
00653 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00654 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00655
00656 this->high_water_mark_ = hwm;
00657 }
00658
00659 template <ACE_SYNCH_DECL> size_t
00660 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void)
00661 {
00662 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00663 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00664
00665 return this->low_water_mark_;
00666 }
00667
00668 template <ACE_SYNCH_DECL> void
00669 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00670 {
00671 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00672 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00673
00674 this->low_water_mark_ = lwm;
00675 }
00676
00677 template <ACE_SYNCH_DECL> size_t
00678 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void)
00679 {
00680 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00681 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00682
00683 return this->cur_bytes_;
00684 }
00685
00686 template <ACE_SYNCH_DECL> size_t
00687 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (void)
00688 {
00689 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00690 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00691
00692 return this->cur_length_;
00693 }
00694
00695 template <ACE_SYNCH_DECL> size_t
00696 ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void)
00697 {
00698 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count");
00699 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00700
00701 return this->cur_count_;
00702 }
00703
00704 template <ACE_SYNCH_DECL> int
00705 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate ()
00706 {
00707 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
00708 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00709
00710 return this->deactivate_i (0);
00711 }
00712
00713 template <ACE_SYNCH_DECL> int
00714 ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
00715 {
00716 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate");
00717 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00718
00719 return this->activate_i ();
00720 }
00721
00722 template <ACE_SYNCH_DECL> int
00723 ACE_Message_Queue<ACE_SYNCH_USE>::pulse ()
00724 {
00725 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::pulse");
00726 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00727
00728 return this->deactivate_i (1);
00729 }
00730
00731 template <ACE_SYNCH_DECL> int
00732 ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
00733 {
00734 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated");
00735
00736 return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
00737 }
00738
00739 template <ACE_SYNCH_DECL> int
00740 ACE_Message_Queue<ACE_SYNCH_USE>::state (void)
00741 {
00742 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::state");
00743
00744 return this->state_;
00745 }
00746
00747 template <ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00748 ACE_Message_Queue<ACE_SYNCH_USE>::lock (void)
00749 {
00750 return this->lock_;
00751 }
00752
00753 template <ACE_SYNCH_DECL> void
00754 ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
00755 {
00756 #if defined (ACE_HAS_DUMP)
00757 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
00758 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00759 switch (this->state_)
00760 {
00761 case ACE_Message_Queue_Base::ACTIVATED:
00762 ACE_DEBUG ((LM_DEBUG,
00763 ACE_LIB_TEXT ("state = ACTIVATED\n")));
00764 break;
00765 case ACE_Message_Queue_Base::DEACTIVATED:
00766 ACE_DEBUG ((LM_DEBUG,
00767 ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00768 break;
00769 case ACE_Message_Queue_Base::PULSED:
00770 ACE_DEBUG ((LM_DEBUG,
00771 ACE_LIB_TEXT ("state = PULSED\n")));
00772 break;
00773 }
00774 ACE_DEBUG ((LM_DEBUG,
00775 ACE_LIB_TEXT ("low_water_mark = %d\n")
00776 ACE_LIB_TEXT ("high_water_mark = %d\n")
00777 ACE_LIB_TEXT ("cur_bytes = %d\n")
00778 ACE_LIB_TEXT ("cur_length = %d\n")
00779 ACE_LIB_TEXT ("cur_count = %d\n")
00780 ACE_LIB_TEXT ("head_ = %u\n")
00781 ACE_LIB_TEXT ("tail_ = %u\n"),
00782 this->low_water_mark_,
00783 this->high_water_mark_,
00784 this->cur_bytes_,
00785 this->cur_length_,
00786 this->cur_count_,
00787 this->head_,
00788 this->tail_));
00789 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("not_full_cond: \n")));
00790 not_full_cond_.dump ();
00791 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("not_empty_cond: \n")));
00792 not_empty_cond_.dump ();
00793 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00794 #endif
00795 }
00796
00797 template <ACE_SYNCH_DECL> void
00798 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (size_t new_value)
00799 {
00800 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00801 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00802
00803 this->cur_bytes_ = new_value;
00804 }
00805
00806 template <ACE_SYNCH_DECL> void
00807 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (size_t new_value)
00808 {
00809 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00810 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00811
00812 this->cur_length_ = new_value;
00813 }
00814
00815 template <ACE_SYNCH_DECL>
00816 ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
00817 size_t lwm,
00818 ACE_Notification_Strategy *ns)
00819 : not_empty_cond_ (this->lock_),
00820 not_full_cond_ (this->lock_)
00821 {
00822 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
00823
00824 if (this->open (hwm, lwm, ns) == -1)
00825 ACE_ERROR ((LM_ERROR,
00826 ACE_LIB_TEXT ("open")));
00827 }
00828
00829 template <ACE_SYNCH_DECL>
00830 ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
00831 {
00832 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
00833 if (this->head_ != 0 && this->close () == -1)
00834 ACE_ERROR ((LM_ERROR,
00835 ACE_LIB_TEXT ("close")));
00836 }
00837
00838 template <ACE_SYNCH_DECL> int
00839 ACE_Message_Queue<ACE_SYNCH_USE>::flush_i (void)
00840 {
00841 int number_flushed = 0;
00842
00843
00844
00845 for (this->tail_ = 0; this->head_ != 0; )
00846 {
00847 ++number_flushed;
00848
00849 size_t mb_bytes = 0;
00850 size_t mb_length = 0;
00851 this->head_->total_size_and_length (mb_bytes,
00852 mb_length);
00853
00854 this->cur_bytes_ -= mb_bytes;
00855 this->cur_length_ -= mb_length;
00856 --this->cur_count_;
00857
00858 ACE_Message_Block *temp = this->head_;
00859 this->head_ = this->head_->next ();
00860
00861
00862
00863 temp->release ();
00864 }
00865
00866 return number_flushed;
00867 }
00868
00869
00870
00871
00872
00873 template <ACE_SYNCH_DECL> int
00874 ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
00875 size_t lwm,
00876 ACE_Notification_Strategy *ns)
00877 {
00878 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
00879 this->high_water_mark_ = hwm;
00880 this->low_water_mark_ = lwm;
00881 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00882 this->cur_bytes_ = 0;
00883 this->cur_length_ = 0;
00884 this->cur_count_ = 0;
00885 this->tail_ = 0;
00886 this->head_ = 0;
00887 this->notification_strategy_ = ns;
00888 return 0;
00889 }
00890
00891
00892
00893
00894 template <ACE_SYNCH_DECL> int
00895 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse)
00896 {
00897 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
00898 int const previous_state = this->state_;
00899
00900 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00901 {
00902
00903 this->not_empty_cond_.broadcast ();
00904 this->not_full_cond_.broadcast ();
00905
00906 if (pulse)
00907 this->state_ = ACE_Message_Queue_Base::PULSED;
00908 else
00909 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00910 }
00911 return previous_state;
00912 }
00913
00914 template <ACE_SYNCH_DECL> int
00915 ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
00916 {
00917 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
00918 int const previous_state = this->state_;
00919 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00920 return previous_state;
00921 }
00922
00923 template <ACE_SYNCH_DECL> int
00924 ACE_Message_Queue<ACE_SYNCH_USE>::flush (void)
00925 {
00926 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::flush");
00927 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00928
00929
00930 return this->flush_i ();
00931 }
00932
00933
00934
00935 template <ACE_SYNCH_DECL> int
00936 ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
00937 {
00938 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
00939 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00940
00941 int const result = this->deactivate_i ();
00942
00943
00944 this->flush_i ();
00945
00946 return result;
00947 }
00948
00949 template <ACE_SYNCH_DECL> int
00950 ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
00951 {
00952 if (this->not_full_cond_.signal () != 0)
00953 return -1;
00954 return 0;
00955 }
00956
00957 template <ACE_SYNCH_DECL> int
00958 ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
00959 {
00960
00961 if (this->not_empty_cond_.signal () != 0)
00962 return -1;
00963 return 0;
00964 }
00965
00966
00967
00968
00969 template <ACE_SYNCH_DECL> int
00970 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
00971 {
00972 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
00973
00974 if (new_item == 0)
00975 return -1;
00976
00977
00978
00979
00980
00981
00982 ACE_Message_Block *seq_tail = new_item;
00983 ++this->cur_count_;
00984 new_item->total_size_and_length (this->cur_bytes_,
00985 this->cur_length_);
00986 while (seq_tail->next () != 0)
00987 {
00988 seq_tail->next ()->prev (seq_tail);
00989 seq_tail = seq_tail->next ();
00990 ++this->cur_count_;
00991 seq_tail->total_size_and_length (this->cur_bytes_,
00992 this->cur_length_);
00993 }
00994
00995
00996 if (this->tail_ == 0)
00997 {
00998 this->head_ = new_item;
00999 this->tail_ = seq_tail;
01000
01001 new_item->prev (0);
01002 }
01003
01004 else
01005 {
01006
01007 this->tail_->next (new_item);
01008 new_item->prev (this->tail_);
01009 this->tail_ = seq_tail;
01010 }
01011
01012 if (this->signal_dequeue_waiters () == -1)
01013 return -1;
01014 else
01015 return ACE_Utils::Truncate (this->cur_count_);
01016 }
01017
01018
01019
01020 template <ACE_SYNCH_DECL> int
01021 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
01022 {
01023 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
01024
01025 if (new_item == 0)
01026 return -1;
01027
01028
01029
01030
01031
01032
01033 ACE_Message_Block *seq_tail = new_item;
01034 ++this->cur_count_;
01035 new_item->total_size_and_length (this->cur_bytes_,
01036 this->cur_length_);
01037 while (seq_tail->next () != 0)
01038 {
01039 seq_tail->next ()->prev (seq_tail);
01040 seq_tail = seq_tail->next ();
01041 ++this->cur_count_;
01042 seq_tail->total_size_and_length (this->cur_bytes_,
01043 this->cur_length_);
01044 }
01045
01046 new_item->prev (0);
01047 seq_tail->next (this->head_);
01048
01049 if (this->head_ != 0)
01050 this->head_->prev (seq_tail);
01051 else
01052 this->tail_ = seq_tail;
01053
01054 this->head_ = new_item;
01055
01056 if (this->signal_dequeue_waiters () == -1)
01057 return -1;
01058 else
01059 return ACE_Utils::Truncate (this->cur_count_);
01060 }
01061
01062
01063
01064
01065 template <ACE_SYNCH_DECL> int
01066 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
01067 {
01068 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01069
01070 if (new_item == 0)
01071 return -1;
01072
01073
01074
01075
01076
01077 new_item->next (0);
01078
01079 if (this->head_ == 0)
01080
01081
01082 return this->enqueue_head_i (new_item);
01083 else
01084 {
01085 ACE_Message_Block *temp = 0;
01086
01087
01088
01089
01090
01091 for (temp = this->tail_;
01092 temp != 0;
01093 temp = temp->prev ())
01094 if (temp->msg_priority () >= new_item->msg_priority ())
01095
01096
01097 break;
01098
01099 if (temp == 0)
01100
01101
01102
01103 return this->enqueue_head_i (new_item);
01104 else if (temp->next () == 0)
01105
01106
01107
01108 return this->enqueue_tail_i (new_item);
01109 else
01110 {
01111
01112
01113
01114
01115 new_item->prev (temp);
01116 new_item->next (temp->next ());
01117 temp->next ()->prev (new_item);
01118 temp->next (new_item);
01119 }
01120 }
01121
01122
01123 new_item->total_size_and_length (this->cur_bytes_,
01124 this->cur_length_);
01125 ++this->cur_count_;
01126
01127 if (this->signal_dequeue_waiters () == -1)
01128 return -1;
01129 else
01130 return ACE_Utils::Truncate (this->cur_count_);
01131 }
01132
01133
01134
01135
01136 template <ACE_SYNCH_DECL> int
01137 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_item)
01138 {
01139 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01140 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i");
01141
01142 if (new_item == 0)
01143 return -1;
01144
01145
01146
01147
01148
01149 new_item->next (0);
01150
01151 if (this->head_ == 0)
01152
01153
01154 return this->enqueue_head_i (new_item);
01155 else
01156 {
01157 ACE_Message_Block *temp = 0;
01158
01159
01160
01161
01162
01163 for (temp = this->head_;
01164 temp != 0;
01165 temp = temp->next ())
01166 if (new_item->msg_deadline_time () < temp->msg_deadline_time ())
01167
01168
01169 break;
01170
01171 if (temp == 0 || temp->next () == 0)
01172
01173
01174
01175 return this->enqueue_tail_i (new_item);
01176 else
01177 {
01178
01179
01180
01181
01182 new_item->prev (temp);
01183 new_item->next (temp->next ());
01184 temp->next ()->prev (new_item);
01185 temp->next (new_item);
01186 }
01187 }
01188
01189
01190 new_item->total_size_and_length (this->cur_bytes_,
01191 this->cur_length_);
01192 ++this->cur_count_;
01193
01194 if (this->signal_dequeue_waiters () == -1)
01195 return -1;
01196 else
01197 return this->cur_count_;
01198 #else
01199 return this->enqueue_tail_i (new_item);
01200 #endif
01201 }
01202
01203
01204
01205
01206
01207 template <ACE_SYNCH_DECL> int
01208 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
01209 {
01210 if (this->head_ ==0)
01211 ACE_ERROR_RETURN ((LM_ERROR,
01212 ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
01213 -1);
01214 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01215 first_item = this->head_;
01216 this->head_ = this->head_->next ();
01217
01218 if (this->head_ == 0)
01219 this->tail_ = 0;
01220 else
01221
01222 this->head_->prev (0);
01223
01224 size_t mb_bytes = 0;
01225 size_t mb_length = 0;
01226 first_item->total_size_and_length (mb_bytes,
01227 mb_length);
01228
01229 this->cur_bytes_ -= mb_bytes;
01230 this->cur_length_ -= mb_length;
01231 --this->cur_count_;
01232
01233 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01234 this->head_ = this->tail_ = 0;
01235
01236
01237 first_item->prev (0);
01238 first_item->next (0);
01239
01240
01241
01242 if (this->cur_bytes_ <= this->low_water_mark_
01243 && this->signal_enqueue_waiters () == -1)
01244 return -1;
01245 else
01246 return ACE_Utils::Truncate (this->cur_count_);
01247 }
01248
01249
01250
01251
01252
01253
01254 template <ACE_SYNCH_DECL> int
01255 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i (ACE_Message_Block *&dequeued)
01256 {
01257 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i");
01258
01259 if (this->head_ == 0)
01260 return -1;
01261
01262
01263
01264 ACE_Message_Block *chosen = 0;
01265 u_long priority = ULONG_MAX;
01266
01267 for (ACE_Message_Block *temp = this->tail_;
01268 temp != 0;
01269 temp = temp->prev ())
01270 {
01271
01272
01273 if (temp->msg_priority () <= priority)
01274 {
01275 priority = temp->msg_priority ();
01276 chosen = temp;
01277 }
01278 }
01279
01280
01281
01282 if (chosen == 0)
01283 chosen = this->head_;
01284
01285
01286
01287 if (chosen->prev () == 0)
01288 this->head_ = chosen->next ();
01289 else
01290 chosen->prev ()->next (chosen->next ());
01291
01292 if (chosen->next () == 0)
01293 this->tail_ = chosen->prev ();
01294 else
01295 chosen->next ()->prev (chosen->prev ());
01296
01297
01298 dequeued = chosen;
01299
01300 size_t mb_bytes = 0;
01301 size_t mb_length = 0;
01302 dequeued->total_size_and_length (mb_bytes,
01303 mb_length);
01304
01305 this->cur_bytes_ -= mb_bytes;
01306 this->cur_length_ -= mb_length;
01307 --this->cur_count_;
01308
01309 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01310 this->head_ = this->tail_ = 0;
01311
01312
01313 dequeued->prev (0);
01314 dequeued->next (0);
01315
01316
01317
01318 if (this->cur_bytes_ <= this->low_water_mark_
01319 && this->signal_enqueue_waiters () == -1)
01320 return -1;
01321 else
01322 return ACE_Utils::Truncate (this->cur_count_);
01323 }
01324
01325
01326
01327
01328
01329 template <ACE_SYNCH_DECL> int
01330 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i (ACE_Message_Block *&dequeued)
01331 {
01332 if (this->head_ == 0)
01333 ACE_ERROR_RETURN ((LM_ERROR,
01334 ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
01335 -1);
01336 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i");
01337 dequeued = this->tail_;
01338 if (this->tail_->prev () == 0)
01339 {
01340 this->head_ = 0;
01341 this->tail_ = 0;
01342 }
01343 else
01344 {
01345 this->tail_->prev ()->next (0);
01346 this->tail_ = this->tail_->prev ();
01347 }
01348
01349 size_t mb_bytes = 0;
01350 size_t mb_length = 0;
01351 dequeued->total_size_and_length (mb_bytes,
01352 mb_length);
01353
01354 this->cur_bytes_ -= mb_bytes;
01355 this->cur_length_ -= mb_length;
01356 --this->cur_count_;
01357
01358 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01359 this->head_ = this->tail_ = 0;
01360
01361
01362 dequeued->prev (0);
01363 dequeued->next (0);
01364
01365
01366
01367 if (this->cur_bytes_ <= this->low_water_mark_
01368 && this->signal_enqueue_waiters () == -1)
01369 return -1;
01370 else
01371 return ACE_Utils::Truncate (this->cur_count_);
01372 }
01373
01374
01375
01376
01377
01378 template <ACE_SYNCH_DECL> int
01379 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i (ACE_Message_Block *&dequeued)
01380 {
01381 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01382 if (this->head_ == 0)
01383 ACE_ERROR_RETURN ((LM_ERROR,
01384 ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
01385 -1);
01386 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i");
01387
01388
01389 ACE_Message_Block* chosen = 0;
01390 ACE_Time_Value deadline = ACE_Time_Value::max_time;
01391 for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ())
01392 if (temp->msg_deadline_time () < deadline)
01393 {
01394 deadline = temp->msg_deadline_time ();
01395 chosen = temp;
01396 }
01397
01398
01399
01400 if (chosen == 0)
01401 chosen = this->head_;
01402
01403
01404
01405 if (chosen->prev () == 0)
01406 this->head_ = chosen->next ();
01407 else
01408 chosen->prev ()->next (chosen->next ());
01409
01410 if (chosen->next () == 0)
01411 this->tail_ = chosen->prev ();
01412 else
01413 chosen->next ()->prev (chosen->prev ());
01414
01415
01416 dequeued = chosen;
01417
01418 size_t mb_bytes = 0;
01419 size_t mb_length = 0;
01420 dequeued->total_size_and_length (mb_bytes,
01421 mb_length);
01422
01423 this->cur_bytes_ -= mb_bytes;
01424 this->cur_length_ -= mb_length;
01425 --this->cur_count_;
01426
01427 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01428 this->head_ = this->tail_ = 0;
01429
01430
01431 dequeued->prev (0);
01432 dequeued->next (0);
01433
01434
01435
01436 if (this->cur_bytes_ <= this->low_water_mark_
01437 && this->signal_enqueue_waiters () == -1)
01438 return -1;
01439 else
01440 return this->cur_count_;
01441 #else
01442 return this->dequeue_head_i (dequeued);
01443 #endif
01444 }
01445
01446
01447
01448 template <ACE_SYNCH_DECL> int
01449 ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
01450 ACE_Time_Value *timeout)
01451 {
01452 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
01453 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01454
01455 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01456 {
01457 errno = ESHUTDOWN;
01458 return -1;
01459 }
01460
01461
01462
01463 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01464 return -1;
01465
01466 first_item = this->head_;
01467 return ACE_Utils::Truncate (this->cur_count_);
01468 }
01469
01470 template <ACE_SYNCH_DECL> int
01471 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &,
01472 ACE_Time_Value *timeout)
01473 {
01474 int result = 0;
01475
01476
01477
01478 while (this->is_full_i ())
01479 {
01480 if (this->not_full_cond_.wait (timeout) == -1)
01481 {
01482 if (errno == ETIME)
01483 errno = EWOULDBLOCK;
01484 result = -1;
01485 break;
01486 }
01487 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01488 {
01489 errno = ESHUTDOWN;
01490 result = -1;
01491 break;
01492 }
01493 }
01494 return result;
01495 }
01496
01497 template <ACE_SYNCH_DECL> int
01498 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond
01499 (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01500 {
01501 int result = 0;
01502
01503
01504
01505 while (this->is_empty_i ())
01506 {
01507 if (this->not_empty_cond_.wait (timeout) == -1)
01508 {
01509 if (errno == ETIME)
01510 errno = EWOULDBLOCK;
01511 result = -1;
01512 break;
01513 }
01514 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01515 {
01516 errno = ESHUTDOWN;
01517 result = -1;
01518 break;
01519 }
01520 }
01521 return result;
01522 }
01523
01524
01525
01526
01527 template <ACE_SYNCH_DECL> int
01528 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
01529 ACE_Time_Value *timeout)
01530 {
01531 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
01532 int queue_count = 0;
01533 {
01534 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01535
01536 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01537 {
01538 errno = ESHUTDOWN;
01539 return -1;
01540 }
01541
01542 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01543 return -1;
01544
01545 queue_count = this->enqueue_head_i (new_item);
01546
01547 if (queue_count == -1)
01548 return -1;
01549
01550 this->notify ();
01551 }
01552 return queue_count;
01553 }
01554
01555
01556
01557
01558
01559 template <ACE_SYNCH_DECL> int
01560 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
01561 ACE_Time_Value *timeout)
01562 {
01563 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
01564 int queue_count = 0;
01565 {
01566 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01567
01568 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01569 {
01570 errno = ESHUTDOWN;
01571 return -1;
01572 }
01573
01574 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01575 return -1;
01576
01577 queue_count = this->enqueue_i (new_item);
01578
01579 if (queue_count == -1)
01580 return -1;
01581
01582 this->notify ();
01583 }
01584 return queue_count;
01585 }
01586
01587
01588
01589
01590
01591 template <ACE_SYNCH_DECL> int
01592 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline (ACE_Message_Block *new_item,
01593 ACE_Time_Value *timeout)
01594 {
01595 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline");
01596 int queue_count = 0;
01597 {
01598 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01599
01600 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01601 {
01602 errno = ESHUTDOWN;
01603 return -1;
01604 }
01605
01606 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01607 return -1;
01608
01609 queue_count = this->enqueue_deadline_i (new_item);
01610
01611 if (queue_count == -1)
01612 return -1;
01613
01614 this->notify ();
01615 }
01616 return queue_count;
01617 }
01618
01619 template <ACE_SYNCH_DECL> int
01620 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
01621 ACE_Time_Value *timeout)
01622 {
01623 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
01624 return this->enqueue_prio (new_item, timeout);
01625 }
01626
01627
01628
01629
01630 template <ACE_SYNCH_DECL> int
01631 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
01632 ACE_Time_Value *timeout)
01633 {
01634 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
01635 int queue_count = 0;
01636 {
01637 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01638
01639 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01640 {
01641 errno = ESHUTDOWN;
01642 return -1;
01643 }
01644
01645 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01646 return -1;
01647
01648 queue_count = this->enqueue_tail_i (new_item);
01649
01650 if (queue_count == -1)
01651 return -1;
01652
01653 this->notify ();
01654 }
01655 return queue_count;
01656 }
01657
01658
01659
01660
01661
01662 template <ACE_SYNCH_DECL> int
01663 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01664 ACE_Time_Value *timeout)
01665 {
01666 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01667 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01668
01669 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01670 {
01671 errno = ESHUTDOWN;
01672 return -1;
01673 }
01674
01675 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01676 return -1;
01677
01678 return this->dequeue_head_i (first_item);
01679 }
01680
01681
01682
01683
01684
01685 template <ACE_SYNCH_DECL> int
01686 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio (ACE_Message_Block *&dequeued,
01687 ACE_Time_Value *timeout)
01688 {
01689 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio");
01690 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01691
01692 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01693 {
01694 errno = ESHUTDOWN;
01695 return -1;
01696 }
01697
01698 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01699 return -1;
01700
01701 return this->dequeue_prio_i (dequeued);
01702 }
01703
01704
01705
01706
01707
01708 template <ACE_SYNCH_DECL> int
01709 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail (ACE_Message_Block *&dequeued,
01710 ACE_Time_Value *timeout)
01711 {
01712 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail");
01713 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01714
01715 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01716 {
01717 errno = ESHUTDOWN;
01718 return -1;
01719 }
01720
01721 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01722 return -1;
01723
01724 return this->dequeue_tail_i (dequeued);
01725 }
01726
01727
01728
01729
01730
01731 template <ACE_SYNCH_DECL> int
01732 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline (ACE_Message_Block *&dequeued,
01733 ACE_Time_Value *timeout)
01734 {
01735 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline");
01736 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01737
01738 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01739 {
01740 errno = ESHUTDOWN;
01741 return -1;
01742 }
01743
01744 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01745 return -1;
01746
01747 return this->dequeue_deadline_i (dequeued);
01748 }
01749
01750 template <ACE_SYNCH_DECL> int
01751 ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
01752 {
01753 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
01754
01755
01756 if (this->notification_strategy_ == 0)
01757 return 0;
01758 else
01759 return this->notification_strategy_->notify ();
01760 }
01761
01762
01763
01764 template <ACE_SYNCH_DECL>
01765 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
01766 size_t hwm,
01767 size_t lwm,
01768 ACE_Notification_Strategy *ns)
01769 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01770 pending_head_ (0),
01771 pending_tail_ (0),
01772 late_head_ (0),
01773 late_tail_ (0),
01774 beyond_late_head_ (0),
01775 beyond_late_tail_ (0),
01776 message_strategy_ (message_strategy)
01777 {
01778
01779
01780
01781 }
01782
01783
01784
01785 template <ACE_SYNCH_DECL>
01786 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
01787 {
01788 delete &this->message_strategy_;
01789 }
01790
01791 template <ACE_SYNCH_DECL> int
01792 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head,
01793 ACE_Message_Block *&list_tail,
01794 u_int status_flags)
01795 {
01796
01797 list_head = 0;
01798 list_tail = 0;
01799
01800
01801 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01802
01803
01804 int result = this->refresh_queue (current_time);
01805 if (result < 0)
01806 return result;
01807
01808 if (ACE_BIT_ENABLED (status_flags,
01809 (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01810 && this->pending_head_
01811 && this->pending_tail_)
01812 {
01813
01814 if (this->pending_head_->prev ())
01815 {
01816 this->tail_ = this->pending_head_->prev ();
01817 this->pending_head_->prev ()->next (0);
01818 }
01819 else
01820 {
01821
01822 this->head_ = 0;
01823 this->tail_ = 0;
01824 }
01825
01826
01827 list_head = this->pending_head_;
01828 list_tail = this->pending_tail_;
01829
01830
01831 this->pending_head_->prev (0);
01832 this->pending_head_ = 0;
01833 this->pending_tail_ = 0;
01834 }
01835
01836 if (ACE_BIT_ENABLED (status_flags,
01837 (u_int) ACE_Dynamic_Message_Strategy::LATE)
01838 && this->late_head_
01839 && this->late_tail_)
01840 {
01841
01842
01843 if (this->late_tail_->next ())
01844 this->late_tail_->next ()->prev (this->late_head_->prev ());
01845 else
01846 this->tail_ = this->late_head_->prev ();
01847
01848 if (this->late_head_->prev ())
01849 this->late_head_->prev ()->next (this->late_tail_->next ());
01850 else
01851 this->head_ = this->late_tail_->next ();
01852
01853
01854 this->late_head_->prev (list_tail);
01855 if (list_tail)
01856 list_tail->next (this->late_head_);
01857 else
01858 list_head = this->late_head_;
01859
01860 list_tail = this->late_tail_;
01861
01862 this->late_tail_->next (0);
01863 this->late_head_ = 0;
01864 this->late_tail_ = 0;
01865 }
01866
01867 if (ACE_BIT_ENABLED (status_flags,
01868 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01869 && this->beyond_late_head_
01870 && this->beyond_late_tail_)
01871 {
01872
01873 if (this->beyond_late_tail_->next ())
01874 {
01875 this->head_ = this->beyond_late_tail_->next ();
01876 this->beyond_late_tail_->next ()->prev (0);
01877 }
01878 else
01879 {
01880
01881 this->head_ = 0;
01882 this->tail_ = 0;
01883 }
01884
01885
01886
01887 if (list_tail)
01888 {
01889 this->beyond_late_head_->prev (list_tail);
01890 list_tail->next (this->beyond_late_head_);
01891 }
01892 else
01893 list_head = this->beyond_late_head_;
01894
01895 list_tail = this->beyond_late_tail_;
01896
01897 this->beyond_late_tail_->next (0);
01898 this->beyond_late_head_ = 0;
01899 this->beyond_late_tail_ = 0;
01900 }
01901
01902
01903 ACE_Message_Block *temp1;
01904
01905 for (temp1 = list_head;
01906 temp1 != 0;
01907 temp1 = temp1->next ())
01908 {
01909 --this->cur_count_;
01910
01911 size_t mb_bytes = 0;
01912 size_t mb_length = 0;
01913 temp1->total_size_and_length (mb_bytes,
01914 mb_length);
01915
01916 this->cur_bytes_ -= mb_bytes;
01917 this->cur_length_ -= mb_length;
01918 }
01919
01920 return result;
01921 }
01922
01923
01924
01925
01926
01927
01928
01929
01930
01931 template <ACE_SYNCH_DECL> int
01932 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01933 ACE_Time_Value *timeout)
01934 {
01935 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01936
01937 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01938
01939 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01940 {
01941 errno = ESHUTDOWN;
01942 return -1;
01943 }
01944
01945 int result;
01946
01947
01948 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01949
01950
01951 result = this->refresh_queue (current_time);
01952 if (result < 0)
01953 return result;
01954
01955
01956 result = this->wait_not_empty_cond (ace_mon, timeout);
01957 if (result == -1)
01958 return result;
01959
01960
01961
01962
01963 result = this->dequeue_head_i (first_item);
01964
01965 return result;
01966 }
01967
01968
01969
01970
01971 template <ACE_SYNCH_DECL> void
01972 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
01973 {
01974 #if defined (ACE_HAS_DUMP)
01975 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
01976 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01977
01978 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
01979 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
01980
01981 ACE_DEBUG ((LM_DEBUG,
01982 ACE_LIB_TEXT ("pending_head_ = %u\n")
01983 ACE_LIB_TEXT ("pending_tail_ = %u\n")
01984 ACE_LIB_TEXT ("late_head_ = %u\n")
01985 ACE_LIB_TEXT ("late_tail_ = %u\n")
01986 ACE_LIB_TEXT ("beyond_late_head_ = %u\n")
01987 ACE_LIB_TEXT ("beyond_late_tail_ = %u\n"),
01988 this->pending_head_,
01989 this->pending_tail_,
01990 this->late_head_,
01991 this->late_tail_,
01992 this->beyond_late_head_,
01993 this->beyond_late_tail_));
01994
01995 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("message_strategy_ : \n")));
01996 message_strategy_.dump ();
01997
01998 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
01999 #endif
02000 }
02001
02002
02003 template <ACE_SYNCH_DECL> int
02004 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
02005 {
02006 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02007
02008 if (new_item == 0)
02009 return -1;
02010
02011 int result = 0;
02012
02013
02014 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02015
02016
02017
02018 result = this->refresh_queue (current_time);
02019 if (result < 0)
02020 return result;
02021
02022
02023 switch (message_strategy_.priority_status (*new_item,
02024 current_time))
02025 {
02026 case ACE_Dynamic_Message_Strategy::PENDING:
02027 if (this->pending_tail_ == 0)
02028 {
02029
02030
02031
02032 pending_head_ = new_item;
02033 pending_tail_ = pending_head_;
02034 return this->enqueue_tail_i (new_item);
02035 }
02036 else
02037 {
02038
02039
02040 result = sublist_enqueue_i (new_item,
02041 current_time,
02042 this->pending_head_,
02043 this->pending_tail_,
02044 ACE_Dynamic_Message_Strategy::PENDING);
02045 }
02046 break;
02047
02048 case ACE_Dynamic_Message_Strategy::LATE:
02049 if (this->late_tail_ == 0)
02050 {
02051 late_head_ = new_item;
02052 late_tail_ = late_head_;
02053
02054 if (this->pending_head_ == 0)
02055
02056
02057
02058 return this->enqueue_tail_i (new_item);
02059 else if (this->beyond_late_tail_ == 0)
02060
02061
02062 return this->enqueue_head_i (new_item);
02063 else
02064 {
02065
02066
02067
02068 this->beyond_late_tail_->next (new_item);
02069 new_item->prev (this->beyond_late_tail_);
02070 this->pending_head_->prev (new_item);
02071 new_item->next (this->pending_head_);
02072 }
02073 }
02074 else
02075 {
02076
02077
02078 result = sublist_enqueue_i (new_item,
02079 current_time,
02080 this->late_head_,
02081 this->late_tail_,
02082 ACE_Dynamic_Message_Strategy::LATE);
02083 }
02084 break;
02085
02086 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02087 if (this->beyond_late_tail_ == 0)
02088 {
02089
02090
02091
02092 beyond_late_head_ = new_item;
02093 beyond_late_tail_ = beyond_late_head_;
02094 return this->enqueue_head_i (new_item);
02095 }
02096 else
02097 {
02098
02099
02100
02101 if (this->beyond_late_tail_->next ())
02102 this->beyond_late_tail_->next ()->prev (new_item);
02103 else
02104 this->tail_ = new_item;
02105
02106 new_item->next (this->beyond_late_tail_->next ());
02107 this->beyond_late_tail_->next (new_item);
02108 new_item->prev (this->beyond_late_tail_);
02109 this->beyond_late_tail_ = new_item;
02110 }
02111
02112 break;
02113
02114
02115 default:
02116 result = -1;
02117 break;
02118 }
02119
02120 if (result < 0)
02121 return result;
02122
02123 size_t mb_bytes = 0;
02124 size_t mb_length = 0;
02125 new_item->total_size_and_length (mb_bytes,
02126 mb_length);
02127 this->cur_bytes_ += mb_bytes;
02128 this->cur_length_ += mb_length;
02129 ++this->cur_count_;
02130
02131 if (this->signal_dequeue_waiters () == -1)
02132 return -1;
02133 else
02134 return this->cur_count_;
02135 }
02136
02137
02138
02139
02140
02141
02142
02143 template <ACE_SYNCH_DECL> int
02144 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
02145 const ACE_Time_Value ¤t_time,
02146 ACE_Message_Block *&sublist_head,
02147 ACE_Message_Block *&sublist_tail,
02148 ACE_Dynamic_Message_Strategy::Priority_Status status)
02149 {
02150 int result = 0;
02151 ACE_Message_Block *current_item = 0;
02152
02153
02154
02155 for (current_item = sublist_tail;
02156 current_item;
02157 current_item = current_item->prev ())
02158 {
02159 if (message_strategy_.priority_status (*current_item, current_time) == status)
02160 {
02161 if (current_item->msg_priority () >= new_item->msg_priority ())
02162 break;
02163 }
02164 else
02165 {
02166 sublist_head = new_item;
02167 break;
02168 }
02169 }
02170
02171 if (current_item == 0)
02172 {
02173
02174
02175 new_item->prev (0);
02176 new_item->next (this->head_);
02177 if (this->head_ != 0)
02178 this->head_->prev (new_item);
02179 else
02180 {
02181 this->tail_ = new_item;
02182 sublist_tail = new_item;
02183 }
02184 this->head_ = new_item;
02185 sublist_head = new_item;
02186 }
02187 else
02188 {
02189
02190 new_item->next (current_item->next ());
02191 new_item->prev (current_item);
02192
02193 if (current_item->next ())
02194 current_item->next ()->prev (new_item);
02195 else
02196 this->tail_ = new_item;
02197
02198 current_item->next (new_item);
02199
02200
02201
02202 if (current_item == sublist_tail)
02203 sublist_tail = new_item;
02204 }
02205
02206 return result;
02207 }
02208
02209
02210
02211
02212 template <ACE_SYNCH_DECL> int
02213 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
02214 {
02215 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02216
02217 int result = 0;
02218 int last_in_subqueue = 0;
02219
02220
02221 if (this->pending_head_)
02222 {
02223 first_item = this->pending_head_;
02224
02225 if (0 == this->pending_head_->prev ())
02226 this->head_ = this->pending_head_->next ();
02227 else
02228 this->pending_head_->prev ()->next (this->pending_head_->next ());
02229
02230 if (0 == this->pending_head_->next ())
02231 {
02232 this->tail_ = this->pending_head_->prev ();
02233 this->pending_head_ = 0;
02234 this->pending_tail_ = 0;
02235 }
02236 else
02237 {
02238 this->pending_head_->next ()->prev (this->pending_head_->prev ());
02239 this->pending_head_ = this->pending_head_->next ();
02240 }
02241
02242 first_item->prev (0);
02243 first_item->next (0);
02244 }
02245
02246
02247 else if (this->late_head_)
02248 {
02249 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02250
02251 first_item = this->late_head_;
02252
02253 if (0 == this->late_head_->prev ())
02254 this->head_ = this->late_head_->next ();
02255 else
02256 this->late_head_->prev ()->next (this->late_head_->next ());
02257
02258 if (0 == this->late_head_->next ())
02259 this->tail_ = this->late_head_->prev ();
02260 else
02261 {
02262 this->late_head_->next ()->prev (this->late_head_->prev ());
02263 this->late_head_ = this->late_head_->next ();
02264 }
02265
02266 if (last_in_subqueue)
02267 {
02268 this->late_head_ = 0;
02269 this->late_tail_ = 0;
02270 }
02271
02272 first_item->prev (0);
02273 first_item->next (0);
02274 }
02275
02276 else if (this->beyond_late_head_)
02277 {
02278 last_in_subqueue =
02279 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02280
02281 first_item = this->beyond_late_head_;
02282 this->head_ = this->beyond_late_head_->next ();
02283
02284 if (0 == this->beyond_late_head_->next ())
02285 this->tail_ = this->beyond_late_head_->prev ();
02286 else
02287 {
02288 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02289 this->beyond_late_head_ = this->beyond_late_head_->next ();
02290 }
02291
02292 if (last_in_subqueue)
02293 {
02294 this->beyond_late_head_ = 0;
02295 this->beyond_late_tail_ = 0;
02296 }
02297
02298 first_item->prev (0);
02299 first_item->next (0);
02300 }
02301 else
02302 {
02303
02304 first_item = 0;
02305 result = -1;
02306 }
02307
02308 if (result < 0)
02309 return result;
02310
02311 size_t mb_bytes = 0;
02312 size_t mb_length = 0;
02313 first_item->total_size_and_length (mb_bytes,
02314 mb_length);
02315
02316 this->cur_bytes_ -= mb_bytes;
02317 this->cur_length_ -= mb_length;
02318 --this->cur_count_;
02319
02320
02321
02322 if (this->cur_bytes_ <= this->low_water_mark_
02323 && this->signal_enqueue_waiters () == -1)
02324 return -1;
02325 else
02326 return this->cur_count_;
02327 }
02328
02329
02330
02331
02332
02333
02334
02335 template <ACE_SYNCH_DECL> int
02336 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value ¤t_time)
02337 {
02338 int result;
02339
02340 result = refresh_pending_queue (current_time);
02341
02342 if (result != -1)
02343 result = refresh_late_queue (current_time);
02344
02345 return result;
02346 }
02347
02348
02349
02350
02351 template <ACE_SYNCH_DECL> int
02352 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value ¤t_time)
02353 {
02354 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02355
02356
02357 if (this->pending_head_)
02358 {
02359 current_status = message_strategy_.priority_status (*this->pending_head_,
02360 current_time);
02361 switch (current_status)
02362 {
02363 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02364
02365
02366 this->beyond_late_head_ = this->head_;
02367
02368
02369
02370 this->late_head_ = 0;
02371 this->late_tail_ = 0;
02372
02373
02374 do
02375 {
02376 this->pending_head_ = this->pending_head_->next ();
02377
02378 if (this->pending_head_)
02379 current_status = message_strategy_.priority_status (*this->pending_head_,
02380 current_time);
02381 else
02382 break;
02383
02384 }
02385 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02386
02387 if (this->pending_head_)
02388 {
02389
02390 this->beyond_late_tail_ = this->pending_head_->prev ();
02391
02392 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02393
02394 break;
02395 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02396 {
02397
02398 ACE_ERROR_RETURN ((LM_ERROR,
02399 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02400 (int) current_status),
02401 -1);
02402 }
02403
02404 }
02405 else
02406 {
02407
02408
02409 this->beyond_late_tail_ = this->tail_;
02410 this->pending_head_ = 0;
02411 this->pending_tail_ = 0;
02412 break;
02413 }
02414
02415 case ACE_Dynamic_Message_Strategy::LATE:
02416
02417
02418
02419 if (this->late_head_ == 0)
02420 this->late_head_ = this->pending_head_;
02421
02422
02423 do
02424 {
02425 this->pending_head_ = this->pending_head_->next ();
02426
02427 if (this->pending_head_)
02428 current_status = message_strategy_.priority_status (*this->pending_head_,
02429 current_time);
02430 else
02431 break;
02432
02433 }
02434 while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02435
02436 if (this->pending_head_)
02437 {
02438 if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02439
02440 ACE_ERROR_RETURN((LM_ERROR,
02441 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02442 (int) current_status),
02443 -1);
02444
02445
02446 this->late_tail_ = this->pending_head_->prev ();
02447 }
02448 else
02449 {
02450
02451 this->late_tail_ = this->tail_;
02452 this->pending_head_ = 0;
02453 this->pending_tail_ = 0;
02454 }
02455
02456 break;
02457 case ACE_Dynamic_Message_Strategy::PENDING:
02458
02459 break;
02460 default:
02461
02462 ACE_ERROR_RETURN((LM_ERROR,
02463 ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02464 (int) current_status),
02465 -1);
02466 }
02467 }
02468 return 0;
02469 }
02470
02471
02472
02473
02474 template <ACE_SYNCH_DECL> int
02475 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value ¤t_time)
02476 {
02477 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02478
02479 if (this->late_head_)
02480 {
02481 current_status = message_strategy_.priority_status (*this->late_head_,
02482 current_time);
02483 switch (current_status)
02484 {
02485 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02486
02487
02488
02489 this->beyond_late_head_ = this->head_;
02490
02491
02492 do
02493 {
02494 this->late_head_ = this->late_head_->next ();
02495
02496 if (this->late_head_)
02497 current_status = message_strategy_.priority_status (*this->late_head_,
02498 current_time);
02499 else
02500 break;
02501
02502 }
02503 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02504
02505 if (this->late_head_)
02506 {
02507
02508 this->beyond_late_tail_ = this->late_head_->prev ();
02509
02510 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02511 {
02512
02513 this->late_head_ = 0;
02514 this->late_tail_ = 0;
02515 }
02516 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02517
02518 ACE_ERROR_RETURN ((LM_ERROR,
02519 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02520 (int) current_status),
02521 -1);
02522 }
02523 else
02524 {
02525
02526 this->beyond_late_tail_ = this->tail_;
02527 this->late_head_ = 0;
02528 this->late_tail_ = 0;
02529 }
02530
02531 break;
02532
02533 case ACE_Dynamic_Message_Strategy::LATE:
02534
02535 break;
02536
02537 case ACE_Dynamic_Message_Strategy::PENDING:
02538
02539 ACE_ERROR_RETURN ((LM_ERROR,
02540 ACE_LIB_TEXT ("Unexpected message priority status ")
02541 ACE_LIB_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02542 (int) current_status),
02543 -1);
02544 default:
02545
02546 ACE_ERROR_RETURN ((LM_ERROR,
02547 ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02548 (int) current_status),
02549 -1);
02550 }
02551 }
02552
02553 return 0;
02554 }
02555
02556
02557
02558
02559 template <ACE_SYNCH_DECL> int
02560 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
02561 ACE_Time_Value *timeout)
02562 {
02563 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02564 timeout);
02565 }
02566
02567
02568
02569
02570 template <ACE_SYNCH_DECL> int
02571 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
02572 ACE_Time_Value *timeout)
02573 {
02574 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02575 return this->enqueue_prio (new_item, timeout);
02576 }
02577
02578
02579
02580
02581
02582
02583 template <ACE_SYNCH_DECL> int
02584 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
02585 ACE_Time_Value *timeout)
02586 {
02587 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02588 return this->enqueue_prio (new_item, timeout);
02589 }
02590
02591
02592
02593
02594
02595
02596 template <ACE_SYNCH_DECL>
02597 ACE_Message_Queue<ACE_SYNCH_USE> *
02598 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
02599 size_t lwm,
02600 ACE_Notification_Strategy *ns)
02601 {
02602 ACE_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02603
02604 ACE_NEW_RETURN (tmp,
02605 ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02606 0);
02607 return tmp;
02608 }
02609
02610
02611
02612 template <ACE_SYNCH_DECL>
02613 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02614 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
02615 size_t lwm,
02616 ACE_Notification_Strategy *ns,
02617 u_long static_bit_field_mask,
02618 u_long static_bit_field_shift,
02619 u_long dynamic_priority_max,
02620 u_long dynamic_priority_offset)
02621 {
02622 ACE_Deadline_Message_Strategy *adms = 0;
02623
02624 ACE_NEW_RETURN (adms,
02625 ACE_Deadline_Message_Strategy (static_bit_field_mask,
02626 static_bit_field_shift,
02627 dynamic_priority_max,
02628 dynamic_priority_offset),
02629 0);
02630
02631 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02632 ACE_NEW_RETURN (tmp,
02633 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns),
02634 0);
02635 return tmp;
02636 }
02637
02638
02639
02640
02641 template <ACE_SYNCH_DECL>
02642 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02643 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
02644 size_t lwm,
02645 ACE_Notification_Strategy *ns,
02646 u_long static_bit_field_mask,
02647 u_long static_bit_field_shift,
02648 u_long dynamic_priority_max,
02649 u_long dynamic_priority_offset)
02650 {
02651 ACE_Laxity_Message_Strategy *alms = 0;
02652
02653 ACE_NEW_RETURN (alms,
02654 ACE_Laxity_Message_Strategy (static_bit_field_mask,
02655 static_bit_field_shift,
02656 dynamic_priority_max,
02657 dynamic_priority_offset),
02658 0);
02659
02660 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02661 ACE_NEW_RETURN (tmp,
02662 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns),
02663 0);
02664 return tmp;
02665 }
02666
02667
02668
02669
02670 #if defined (ACE_VXWORKS)
02671
02672 template <ACE_SYNCH_DECL>
02673 ACE_Message_Queue_Vx *
02674 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
02675 size_t max_message_length,
02676 ACE_Notification_Strategy *ns)
02677 {
02678 ACE_Message_Queue_Vx *tmp = 0;
02679
02680 ACE_NEW_RETURN (tmp,
02681 ACE_Message_Queue_Vx (max_messages, max_message_length, ns),
02682 0);
02683 return tmp;
02684 }
02685
02686
02687 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
02688
02689 template <ACE_SYNCH_DECL>
02690 ACE_Message_Queue_NT *
02691 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads)
02692 {
02693 ACE_Message_Queue_NT *tmp = 0;
02694
02695 ACE_NEW_RETURN (tmp,
02696 ACE_Message_Queue_NT (max_threads);
02697 0);
02698 return tmp;
02699 }
02700
02701 #endif
02702 #endif
02703
02704 ACE_END_VERSIONED_NAMESPACE_DECL
02705
02706 #endif