00001
00002
00003 #ifndef ACE_MESSAGE_QUEUE_T_CPP
00004 #define ACE_MESSAGE_QUEUE_T_CPP
00005
00006
00007
00008 #include "ace/Message_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/OS_NS_sys_time.h"
00011
00012 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00013 # pragma once
00014 #endif
00015
00016 #include "ace/Notification_Strategy.h"
00017 #include "ace/Truncate.h"
00018
00019 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
00020 #include "ace/OS_NS_stdio.h"
00021 #include "ace/Monitor_Size.h"
00022 #endif
00023
00024 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00025
00026 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
00027 ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
00028 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex)
00029 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex_N)
00030
00031 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00032 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00033 {
00034 #if defined (ACE_HAS_DUMP)
00035 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump");
00036
00037 this->queue_.dump ();
00038 #endif
00039 }
00040
00041 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00042 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (size_t new_value)
00043 {
00044 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00045
00046 this->queue_.message_bytes (new_value);
00047 }
00048
00049 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00050 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (size_t new_value)
00051 {
00052 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00053
00054 this->queue_.message_length (new_value);
00055 }
00056
00057 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00058 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex (size_t hwm,
00059 size_t lwm,
00060 ACE_Notification_Strategy *ns)
00061 {
00062 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex");
00063
00064 if (this->queue_.open (hwm, lwm, ns) == -1)
00065 ACE_ERROR ((LM_ERROR,
00066 ACE_TEXT ("ACE_Message_Queue_Ex")));
00067 }
00068
00069 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00070 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex (void)
00071 {
00072 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex");
00073 }
00074
00075 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00076 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open (size_t hwm,
00077 size_t lwm,
00078 ACE_Notification_Strategy *ns)
00079 {
00080 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open");
00081
00082 return this->queue_.open (hwm, lwm, ns);
00083 }
00084
00085
00086
00087 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00088 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close (void)
00089 {
00090 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close");
00091
00092 return this->queue_.close ();
00093 }
00094
00095 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00096 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush (void)
00097 {
00098 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush");
00099
00100 return this->queue_.flush ();
00101 }
00102
00103 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00104 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i (void)
00105 {
00106 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i");
00107
00108 return this->queue_.flush_i ();
00109 }
00110
00111
00112
00113 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00114 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00115 ACE_Time_Value *timeout)
00116 {
00117 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head");
00118
00119 ACE_Message_Block *mb = 0;
00120
00121 int const cur_count = this->queue_.peek_dequeue_head (mb, timeout);
00122
00123 if (cur_count != -1)
00124 first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00125
00126 return cur_count;
00127 }
00128
00129 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00130 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head (ACE_MESSAGE_TYPE *new_item,
00131 ACE_Time_Value *timeout)
00132 {
00133 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00134
00135 ACE_Message_Block *mb = 0;
00136
00137 ACE_NEW_RETURN (mb,
00138 ACE_Message_Block ((char *) new_item,
00139 sizeof (*new_item),
00140 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00141 -1);
00142
00143 int const result = this->queue_.enqueue_head (mb, timeout);
00144 if (result == -1)
00145
00146 mb->release ();
00147 return result;
00148 }
00149
00150
00151
00152
00153
00154 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00155 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue (ACE_MESSAGE_TYPE *new_item,
00156 ACE_Time_Value *timeout)
00157 {
00158 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue");
00159
00160 return this->enqueue_prio (new_item, timeout);
00161 }
00162
00163 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00164 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio (ACE_MESSAGE_TYPE *new_item,
00165 ACE_Time_Value *timeout,
00166 unsigned long priority)
00167 {
00168 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00169
00170 ACE_Message_Block *mb = 0;
00171
00172 ACE_NEW_RETURN (mb,
00173 ACE_Message_Block ((char *) new_item,
00174 sizeof (*new_item),
00175 priority),
00176 -1);
00177
00178 int const result = this->queue_.enqueue_prio (mb, timeout);
00179 if (result == -1)
00180
00181 mb->release ();
00182
00183 return result;
00184 }
00185
00186 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00187 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
00188 ACE_Time_Value *timeout)
00189 {
00190 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline");
00191
00192 ACE_Message_Block *mb = 0;
00193
00194 ACE_NEW_RETURN (mb,
00195 ACE_Message_Block ((char *) new_item,
00196 sizeof (*new_item),
00197 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY ),
00198 -1);
00199
00200 int const result = this->queue_.enqueue_deadline (mb, timeout);
00201 if (result == -1)
00202
00203 mb->release ();
00204
00205 return result;
00206 }
00207
00208
00209
00210
00211 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00212 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail (ACE_MESSAGE_TYPE *new_item,
00213 ACE_Time_Value *timeout)
00214 {
00215 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00216
00217 ACE_Message_Block *mb = 0;
00218
00219 ACE_NEW_RETURN (mb,
00220 ACE_Message_Block ((char *) new_item,
00221 sizeof (*new_item),
00222 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00223 -1);
00224
00225 int const result = this->queue_.enqueue_tail (mb, timeout);
00226 if (result == -1)
00227
00228 mb->release ();
00229 return result;
00230 }
00231
00232
00233
00234
00235
00236 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00237 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00238 ACE_Time_Value *timeout)
00239 {
00240 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head");
00241
00242 ACE_Message_Block *mb = 0;
00243
00244 int const cur_count = this->queue_.dequeue_head (mb, timeout);
00245
00246
00247 if (cur_count != -1)
00248 {
00249 first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00250
00251 mb->release ();
00252 }
00253
00254 return cur_count;
00255 }
00256
00257
00258
00259
00260
00261 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00262 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
00263 ACE_Time_Value *timeout)
00264 {
00265 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio");
00266
00267 ACE_Message_Block *mb = 0;
00268
00269 int const cur_count = this->queue_.dequeue_prio (mb, timeout);
00270
00271
00272 if (cur_count != -1)
00273 {
00274 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00275
00276 mb->release ();
00277 }
00278
00279 return cur_count;
00280 }
00281
00282
00283
00284
00285
00286 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00287 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
00288 ACE_Time_Value *timeout)
00289 {
00290 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail");
00291
00292 ACE_Message_Block *mb = 0;
00293
00294 int const cur_count = this->queue_.dequeue_tail (mb, timeout);
00295
00296
00297 if (cur_count != -1)
00298 {
00299 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00300
00301 mb->release ();
00302 }
00303
00304 return cur_count;
00305 }
00306
00307
00308
00309
00310
00311 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00312 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
00313 ACE_Time_Value *timeout)
00314 {
00315 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline");
00316
00317 ACE_Message_Block *mb = 0;
00318
00319 int const cur_count = this->queue_.dequeue_deadline (mb, timeout);
00320
00321
00322 if (cur_count != -1)
00323 {
00324 dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ());
00325
00326 mb->release ();
00327 }
00328
00329 return cur_count;
00330 }
00331
00332 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00333 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify (void)
00334 {
00335 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify");
00336
00337 return this->queue_.notify ();
00338 }
00339
00340
00341 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00342 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N
00343 (size_t high_water_mark,
00344 size_t low_water_mark,
00345 ACE_Notification_Strategy *ns):
00346 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE> (high_water_mark,
00347 low_water_mark,
00348 ns)
00349 {
00350 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N");
00351 }
00352
00353 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00354 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N (void)
00355 {
00356 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N");
00357 }
00358
00359 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00360 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head
00361 (ACE_MESSAGE_TYPE *new_item,
00362 ACE_Time_Value *timeout)
00363 {
00364 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00365
00366
00367
00368 ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
00369 if (0 == mb)
00370 {
00371 return -1;
00372 }
00373
00374 int result = this->queue_.enqueue_head (mb, timeout);
00375 if (-1 == result)
00376 {
00377
00378 mb->release ();
00379 }
00380 return result;
00381 }
00382
00383 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00384 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail
00385 (ACE_MESSAGE_TYPE *new_item,
00386 ACE_Time_Value *timeout)
00387 {
00388 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00389
00390
00391
00392 ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item);
00393 if (0 == mb)
00394 {
00395 return -1;
00396 }
00397
00398 int result = this->queue_.enqueue_tail (mb, timeout);
00399 if (-1 == result)
00400 {
00401
00402 mb->release ();
00403 }
00404 return result;
00405 }
00406
00407 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Message_Block *
00408 ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i
00409 (ACE_MESSAGE_TYPE *new_item)
00410 {
00411 ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i");
00412
00413
00414 ACE_Message_Block *mb_head = 0;
00415
00416 ACE_NEW_RETURN (mb_head,
00417 ACE_Message_Block ((char *) new_item,
00418 sizeof (*new_item),
00419 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00420 0);
00421
00422
00423 ACE_Message_Block *mb_tail = mb_head;
00424
00425
00426 for (ACE_MESSAGE_TYPE *pobj = new_item->next (); pobj; pobj = pobj->next ())
00427 {
00428 ACE_Message_Block *mb_temp = 0;
00429 ACE_NEW_NORETURN (mb_temp,
00430 ACE_Message_Block ((char *) pobj,
00431 sizeof (*pobj),
00432 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY));
00433 if (mb_temp == 0)
00434 {
00435 mb_head->release ();
00436 mb_head = 0;
00437 break;
00438 }
00439
00440 mb_tail->next (mb_temp);
00441 mb_tail = mb_temp;
00442 }
00443
00444 return mb_head;
00445 }
00446
00447 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
00448
00449 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00450 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue (ACE_MESSAGE_TYPE *&first_item,
00451 ACE_Time_Value *timeout)
00452 {
00453 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue");
00454
00455 return this->dequeue_head (first_item, timeout);
00456 }
00457
00458 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Notification_Strategy *
00459 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (void)
00460 {
00461 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00462
00463 return this->queue_.notification_strategy ();
00464 }
00465
00466 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00467 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00468 {
00469 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy");
00470
00471 this->queue_.notification_strategy (s);
00472 }
00473
00474
00475
00476 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> bool
00477 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty (void)
00478 {
00479 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty");
00480
00481 return this->queue_.is_empty ();
00482 }
00483
00484
00485
00486 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> bool
00487 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full (void)
00488 {
00489 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full");
00490
00491 return this->queue_.is_full ();
00492 }
00493
00494 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00495 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (void)
00496 {
00497 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00498
00499 return this->queue_.high_water_mark ();
00500 }
00501
00502 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00503 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00504 {
00505 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark");
00506
00507 this->queue_.high_water_mark (hwm);
00508 }
00509
00510 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00511 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (void)
00512 {
00513 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00514
00515 return this->queue_.low_water_mark ();
00516 }
00517
00518 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00519 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00520 {
00521 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark");
00522
00523 this->queue_.low_water_mark (lwm);
00524 }
00525
00526 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00527 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (void)
00528 {
00529 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00530
00531 return this->queue_.message_bytes ();
00532 }
00533
00534 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00535 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (void)
00536 {
00537 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00538
00539 return this->queue_.message_length ();
00540 }
00541
00542 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t
00543 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count (void)
00544 {
00545 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count");
00546
00547 return this->queue_.message_count ();
00548 }
00549
00550 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00551 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (void)
00552 {
00553 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate");
00554
00555 return this->queue_.deactivate ();
00556 }
00557
00558 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00559 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void)
00560 {
00561 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate");
00562
00563 return this->queue_.activate ();
00564 }
00565
00566 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00567 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse (void)
00568 {
00569 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse");
00570
00571 return this->queue_.pulse ();
00572 }
00573
00574 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00575 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated (void)
00576 {
00577 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated");
00578
00579 return this->queue_.deactivated ();
00580 }
00581
00582 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00583 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state (void)
00584 {
00585 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state");
00586
00587 return this->queue_.state ();
00588 }
00589
00590 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00591 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::lock (void)
00592 {
00593 return this->queue_.lock ();
00594 }
00595
00596 template <ACE_SYNCH_DECL>
00597 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00598 : queue_ (q),
00599 curr_ (q.head_)
00600 {
00601 }
00602
00603 template <ACE_SYNCH_DECL> int
00604 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00605 {
00606 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00607
00608 if (this->curr_ != 0)
00609 {
00610 entry = this->curr_;
00611 return 1;
00612 }
00613
00614 return 0;
00615 }
00616
00617 template <ACE_SYNCH_DECL> int
00618 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
00619 {
00620 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00621
00622 return this->curr_ == 0;
00623 }
00624
00625 template <ACE_SYNCH_DECL> int
00626 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
00627 {
00628 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00629
00630 if (this->curr_)
00631 this->curr_ = this->curr_->next ();
00632 return this->curr_ != 0;
00633 }
00634
00635 template <ACE_SYNCH_DECL> void
00636 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
00637 {
00638 #if defined (ACE_HAS_DUMP)
00639 #endif
00640 }
00641
00642 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
00643
00644 template <ACE_SYNCH_DECL>
00645 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00646 : queue_ (q),
00647 curr_ (queue_.tail_)
00648 {
00649 }
00650
00651 template <ACE_SYNCH_DECL> int
00652 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00653 {
00654 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00655
00656 if (this->curr_ != 0)
00657 {
00658 entry = this->curr_;
00659 return 1;
00660 }
00661
00662 return 0;
00663 }
00664
00665 template <ACE_SYNCH_DECL> int
00666 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
00667 {
00668 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00669
00670 return this->curr_ == 0;
00671 }
00672
00673 template <ACE_SYNCH_DECL> int
00674 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
00675 {
00676 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00677
00678 if (this->curr_)
00679 this->curr_ = this->curr_->prev ();
00680 return this->curr_ != 0;
00681 }
00682
00683 template <ACE_SYNCH_DECL> void
00684 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
00685 {
00686 #if defined (ACE_HAS_DUMP)
00687 #endif
00688 }
00689
00690 template <ACE_SYNCH_DECL> int
00691 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue (ACE_Message_Block *&first_item,
00692 ACE_Time_Value *timeout)
00693 {
00694 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue");
00695 return this->dequeue_head (first_item, timeout);
00696 }
00697
00698 template <ACE_SYNCH_DECL> ACE_Notification_Strategy *
00699 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void)
00700 {
00701 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00702
00703 return this->notification_strategy_;
00704 }
00705
00706 template <ACE_SYNCH_DECL> void
00707 ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
00708 {
00709 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
00710
00711 this->notification_strategy_ = s;
00712 }
00713
00714
00715
00716 template <ACE_SYNCH_DECL> bool
00717 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void)
00718 {
00719 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i");
00720 return this->tail_ == 0;
00721 }
00722
00723
00724
00725 template <ACE_SYNCH_DECL> bool
00726 ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void)
00727 {
00728 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i");
00729 return this->cur_bytes_ >= this->high_water_mark_;
00730 }
00731
00732
00733
00734 template <ACE_SYNCH_DECL> bool
00735 ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void)
00736 {
00737 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty");
00738 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, false);
00739
00740 return this->is_empty_i ();
00741 }
00742
00743
00744
00745 template <ACE_SYNCH_DECL> bool
00746 ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void)
00747 {
00748 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full");
00749 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, false);
00750
00751 return this->is_full_i ();
00752 }
00753
00754 template <ACE_SYNCH_DECL> size_t
00755 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void)
00756 {
00757 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00758 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00759
00760 return this->high_water_mark_;
00761 }
00762
00763 template <ACE_SYNCH_DECL> void
00764 ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm)
00765 {
00766 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
00767 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00768
00769 this->high_water_mark_ = hwm;
00770 }
00771
00772 template <ACE_SYNCH_DECL> size_t
00773 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void)
00774 {
00775 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00776 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00777
00778 return this->low_water_mark_;
00779 }
00780
00781 template <ACE_SYNCH_DECL> void
00782 ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm)
00783 {
00784 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
00785 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00786
00787 this->low_water_mark_ = lwm;
00788 }
00789
00790 template <ACE_SYNCH_DECL> size_t
00791 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void)
00792 {
00793 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00794 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00795
00796 return this->cur_bytes_;
00797 }
00798
00799 template <ACE_SYNCH_DECL> size_t
00800 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (void)
00801 {
00802 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00803 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00804
00805 return this->cur_length_;
00806 }
00807
00808 template <ACE_SYNCH_DECL> size_t
00809 ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void)
00810 {
00811 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count");
00812 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
00813
00814 return this->cur_count_;
00815 }
00816
00817 template <ACE_SYNCH_DECL> int
00818 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate ()
00819 {
00820 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
00821 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00822
00823 return this->deactivate_i (0);
00824 }
00825
00826 template <ACE_SYNCH_DECL> int
00827 ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
00828 {
00829 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate");
00830 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00831
00832 return this->activate_i ();
00833 }
00834
00835 template <ACE_SYNCH_DECL> int
00836 ACE_Message_Queue<ACE_SYNCH_USE>::pulse ()
00837 {
00838 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::pulse");
00839 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00840
00841 return this->deactivate_i (1);
00842 }
00843
00844 template <ACE_SYNCH_DECL> int
00845 ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
00846 {
00847 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated");
00848
00849 return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
00850 }
00851
00852 template <ACE_SYNCH_DECL> int
00853 ACE_Message_Queue<ACE_SYNCH_USE>::state (void)
00854 {
00855 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::state");
00856
00857 return this->state_;
00858 }
00859
00860 template <ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T &
00861 ACE_Message_Queue<ACE_SYNCH_USE>::lock (void)
00862 {
00863 return this->lock_;
00864 }
00865
00866 template <ACE_SYNCH_DECL> void
00867 ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
00868 {
00869 #if defined (ACE_HAS_DUMP)
00870 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
00871 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00872 switch (this->state_)
00873 {
00874 case ACE_Message_Queue_Base::ACTIVATED:
00875 ACE_DEBUG ((LM_DEBUG,
00876 ACE_TEXT ("state = ACTIVATED\n")));
00877 break;
00878 case ACE_Message_Queue_Base::DEACTIVATED:
00879 ACE_DEBUG ((LM_DEBUG,
00880 ACE_TEXT ("state = DEACTIVATED\n")));
00881 break;
00882 case ACE_Message_Queue_Base::PULSED:
00883 ACE_DEBUG ((LM_DEBUG,
00884 ACE_TEXT ("state = PULSED\n")));
00885 break;
00886 }
00887 ACE_DEBUG ((LM_DEBUG,
00888 ACE_TEXT ("low_water_mark = %d\n")
00889 ACE_TEXT ("high_water_mark = %d\n")
00890 ACE_TEXT ("cur_bytes = %d\n")
00891 ACE_TEXT ("cur_length = %d\n")
00892 ACE_TEXT ("cur_count = %d\n")
00893 ACE_TEXT ("head_ = %u\n")
00894 ACE_TEXT ("tail_ = %u\n"),
00895 this->low_water_mark_,
00896 this->high_water_mark_,
00897 this->cur_bytes_,
00898 this->cur_length_,
00899 this->cur_count_,
00900 this->head_,
00901 this->tail_));
00902 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("not_full_cond: \n")));
00903 not_full_cond_.dump ();
00904 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("not_empty_cond: \n")));
00905 not_empty_cond_.dump ();
00906 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00907 #endif
00908 }
00909
00910 template <ACE_SYNCH_DECL> void
00911 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (size_t new_value)
00912 {
00913 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00914 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00915
00916 this->cur_bytes_ = new_value;
00917 }
00918
00919 template <ACE_SYNCH_DECL> void
00920 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (size_t new_value)
00921 {
00922 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00923 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00924
00925 this->cur_length_ = new_value;
00926 }
00927
00928 template <ACE_SYNCH_DECL>
00929 ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
00930 size_t lwm,
00931 ACE_Notification_Strategy *ns)
00932 : not_empty_cond_ (lock_)
00933 , not_full_cond_ (lock_)
00934 {
00935 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
00936
00937 if (this->open (hwm, lwm, ns) == -1)
00938 ACE_ERROR ((LM_ERROR,
00939 ACE_TEXT ("open")));
00940
00941 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
00942 ACE_NEW (this->monitor_,
00943 ACE::Monitor_Control::Size_Monitor);
00944
00945
00946 const int nibbles = 2 * sizeof (ptrdiff_t);
00947 char buf[nibbles + 1];
00948 ACE_OS::sprintf (buf, "%p", this);
00949 buf[nibbles] = '\0';
00950 ACE_CString name_str ("Message_Queue_");
00951 name_str += buf;
00952 this->monitor_->name (name_str.c_str ());
00953 this->monitor_->add_to_registry ();
00954 #endif
00955 }
00956
00957 template <ACE_SYNCH_DECL>
00958 ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
00959 {
00960 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
00961 if (this->head_ != 0 && this->close () == -1)
00962 ACE_ERROR ((LM_ERROR,
00963 ACE_TEXT ("close")));
00964
00965 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
00966 this->monitor_->remove_from_registry ();
00967 this->monitor_->remove_ref ();
00968 #endif
00969 }
00970
00971 template <ACE_SYNCH_DECL> int
00972 ACE_Message_Queue<ACE_SYNCH_USE>::flush_i (void)
00973 {
00974 int number_flushed = 0;
00975
00976
00977
00978 for (this->tail_ = 0; this->head_ != 0; )
00979 {
00980 ++number_flushed;
00981
00982 size_t mb_bytes = 0;
00983 size_t mb_length = 0;
00984 this->head_->total_size_and_length (mb_bytes,
00985 mb_length);
00986
00987 this->cur_bytes_ -= mb_bytes;
00988 this->cur_length_ -= mb_length;
00989 --this->cur_count_;
00990
00991 ACE_Message_Block *temp = this->head_;
00992 this->head_ = this->head_->next ();
00993
00994
00995
00996 temp->release ();
00997 }
00998
00999 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01000
01001 if (number_flushed > 0)
01002 {
01003 this->monitor_->receive (this->cur_length_);
01004 }
01005 #endif
01006
01007 return number_flushed;
01008 }
01009
01010
01011
01012
01013
01014 template <ACE_SYNCH_DECL> int
01015 ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
01016 size_t lwm,
01017 ACE_Notification_Strategy *ns)
01018 {
01019 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
01020 this->high_water_mark_ = hwm;
01021 this->low_water_mark_ = lwm;
01022 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
01023 this->cur_bytes_ = 0;
01024 this->cur_length_ = 0;
01025 this->cur_count_ = 0;
01026 this->tail_ = 0;
01027 this->head_ = 0;
01028 this->notification_strategy_ = ns;
01029 return 0;
01030 }
01031
01032
01033
01034
01035 template <ACE_SYNCH_DECL> int
01036 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse)
01037 {
01038 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
01039 int const previous_state = this->state_;
01040
01041 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
01042 {
01043
01044 this->not_empty_cond_.broadcast ();
01045 this->not_full_cond_.broadcast ();
01046
01047 if (pulse)
01048 this->state_ = ACE_Message_Queue_Base::PULSED;
01049 else
01050 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
01051 }
01052 return previous_state;
01053 }
01054
01055 template <ACE_SYNCH_DECL> int
01056 ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
01057 {
01058 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
01059 int const previous_state = this->state_;
01060 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
01061 return previous_state;
01062 }
01063
01064 template <ACE_SYNCH_DECL> int
01065 ACE_Message_Queue<ACE_SYNCH_USE>::flush (void)
01066 {
01067 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::flush");
01068 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01069
01070
01071 return this->flush_i ();
01072 }
01073
01074
01075
01076 template <ACE_SYNCH_DECL> int
01077 ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
01078 {
01079 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
01080 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01081
01082 int const result = this->deactivate_i ();
01083
01084
01085 this->flush_i ();
01086
01087 return result;
01088 }
01089
01090 template <ACE_SYNCH_DECL> int
01091 ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
01092 {
01093 if (this->not_full_cond_.signal () != 0)
01094 return -1;
01095 return 0;
01096 }
01097
01098 template <ACE_SYNCH_DECL> int
01099 ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
01100 {
01101
01102 if (this->not_empty_cond_.signal () != 0)
01103 return -1;
01104 return 0;
01105 }
01106
01107
01108
01109
01110 template <ACE_SYNCH_DECL> int
01111 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
01112 {
01113 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
01114
01115 if (new_item == 0)
01116 return -1;
01117
01118
01119
01120
01121
01122
01123 ACE_Message_Block *seq_tail = new_item;
01124 ++this->cur_count_;
01125 new_item->total_size_and_length (this->cur_bytes_,
01126 this->cur_length_);
01127 while (seq_tail->next () != 0)
01128 {
01129 seq_tail->next ()->prev (seq_tail);
01130 seq_tail = seq_tail->next ();
01131 ++this->cur_count_;
01132 seq_tail->total_size_and_length (this->cur_bytes_,
01133 this->cur_length_);
01134 }
01135
01136
01137 if (this->tail_ == 0)
01138 {
01139 this->head_ = new_item;
01140 this->tail_ = seq_tail;
01141
01142 new_item->prev (0);
01143 }
01144
01145 else
01146 {
01147
01148 this->tail_->next (new_item);
01149 new_item->prev (this->tail_);
01150 this->tail_ = seq_tail;
01151 }
01152
01153 if (this->signal_dequeue_waiters () == -1)
01154 return -1;
01155 else
01156 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01157 }
01158
01159
01160
01161 template <ACE_SYNCH_DECL> int
01162 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
01163 {
01164 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
01165
01166 if (new_item == 0)
01167 return -1;
01168
01169
01170
01171
01172
01173
01174 ACE_Message_Block *seq_tail = new_item;
01175 ++this->cur_count_;
01176 new_item->total_size_and_length (this->cur_bytes_,
01177 this->cur_length_);
01178 while (seq_tail->next () != 0)
01179 {
01180 seq_tail->next ()->prev (seq_tail);
01181 seq_tail = seq_tail->next ();
01182 ++this->cur_count_;
01183 seq_tail->total_size_and_length (this->cur_bytes_,
01184 this->cur_length_);
01185 }
01186
01187 new_item->prev (0);
01188 seq_tail->next (this->head_);
01189
01190 if (this->head_ != 0)
01191 this->head_->prev (seq_tail);
01192 else
01193 this->tail_ = seq_tail;
01194
01195 this->head_ = new_item;
01196
01197 if (this->signal_dequeue_waiters () == -1)
01198 return -1;
01199 else
01200 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01201 }
01202
01203
01204
01205
01206 template <ACE_SYNCH_DECL> int
01207 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
01208 {
01209 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01210
01211 if (new_item == 0)
01212 return -1;
01213
01214
01215
01216
01217
01218 new_item->next (0);
01219
01220 if (this->head_ == 0)
01221
01222
01223 return this->enqueue_head_i (new_item);
01224 else
01225 {
01226 ACE_Message_Block *temp = 0;
01227
01228
01229
01230
01231
01232 for (temp = this->tail_;
01233 temp != 0;
01234 temp = temp->prev ())
01235 if (temp->msg_priority () >= new_item->msg_priority ())
01236
01237
01238 break;
01239
01240 if (temp == 0)
01241
01242
01243
01244 return this->enqueue_head_i (new_item);
01245 else if (temp->next () == 0)
01246
01247
01248
01249 return this->enqueue_tail_i (new_item);
01250 else
01251 {
01252
01253
01254
01255
01256 new_item->prev (temp);
01257 new_item->next (temp->next ());
01258 temp->next ()->prev (new_item);
01259 temp->next (new_item);
01260 }
01261 }
01262
01263
01264 new_item->total_size_and_length (this->cur_bytes_,
01265 this->cur_length_);
01266 ++this->cur_count_;
01267
01268 if (this->signal_dequeue_waiters () == -1)
01269 return -1;
01270 else
01271 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01272 }
01273
01274
01275
01276
01277 template <ACE_SYNCH_DECL> int
01278 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_item)
01279 {
01280 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01281 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i");
01282
01283 if (new_item == 0)
01284 return -1;
01285
01286
01287
01288
01289
01290 new_item->next (0);
01291
01292 if (this->head_ == 0)
01293
01294
01295 return this->enqueue_head_i (new_item);
01296 else
01297 {
01298 ACE_Message_Block *temp = 0;
01299
01300
01301
01302
01303
01304 for (temp = this->head_;
01305 temp != 0;
01306 temp = temp->next ())
01307 if (new_item->msg_deadline_time () < temp->msg_deadline_time ())
01308
01309
01310 break;
01311
01312 if (temp == 0 || temp->next () == 0)
01313
01314
01315
01316 return this->enqueue_tail_i (new_item);
01317 else
01318 {
01319
01320
01321
01322
01323 new_item->prev (temp);
01324 new_item->next (temp->next ());
01325 temp->next ()->prev (new_item);
01326 temp->next (new_item);
01327 }
01328 }
01329
01330
01331 new_item->total_size_and_length (this->cur_bytes_,
01332 this->cur_length_);
01333 ++this->cur_count_;
01334
01335 if (this->signal_dequeue_waiters () == -1)
01336 return -1;
01337 else
01338 return this->cur_count_;
01339 #else
01340 return this->enqueue_tail_i (new_item);
01341 #endif
01342 }
01343
01344
01345
01346
01347
01348 template <ACE_SYNCH_DECL> int
01349 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
01350 {
01351 if (this->head_ ==0)
01352 ACE_ERROR_RETURN ((LM_ERROR,
01353 ACE_TEXT ("Attempting to dequeue from empty queue")),
01354 -1);
01355 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01356 first_item = this->head_;
01357 this->head_ = this->head_->next ();
01358
01359 if (this->head_ == 0)
01360 this->tail_ = 0;
01361 else
01362
01363 this->head_->prev (0);
01364
01365 size_t mb_bytes = 0;
01366 size_t mb_length = 0;
01367 first_item->total_size_and_length (mb_bytes,
01368 mb_length);
01369
01370 this->cur_bytes_ -= mb_bytes;
01371 this->cur_length_ -= mb_length;
01372 --this->cur_count_;
01373
01374 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01375 this->head_ = this->tail_ = 0;
01376
01377
01378 first_item->prev (0);
01379 first_item->next (0);
01380
01381 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01382 this->monitor_->receive (this->cur_length_);
01383 #endif
01384
01385
01386
01387 if (this->cur_bytes_ <= this->low_water_mark_
01388 && this->signal_enqueue_waiters () == -1)
01389 return -1;
01390 else
01391 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01392 }
01393
01394
01395
01396
01397
01398
01399 template <ACE_SYNCH_DECL> int
01400 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i (ACE_Message_Block *&dequeued)
01401 {
01402 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i");
01403
01404 if (this->head_ == 0)
01405 return -1;
01406
01407
01408
01409 ACE_Message_Block *chosen = 0;
01410 u_long priority = ULONG_MAX;
01411
01412 for (ACE_Message_Block *temp = this->tail_;
01413 temp != 0;
01414 temp = temp->prev ())
01415 {
01416
01417
01418 if (temp->msg_priority () <= priority)
01419 {
01420 priority = temp->msg_priority ();
01421 chosen = temp;
01422 }
01423 }
01424
01425
01426
01427 if (chosen == 0)
01428 chosen = this->head_;
01429
01430
01431
01432 if (chosen->prev () == 0)
01433 this->head_ = chosen->next ();
01434 else
01435 chosen->prev ()->next (chosen->next ());
01436
01437 if (chosen->next () == 0)
01438 this->tail_ = chosen->prev ();
01439 else
01440 chosen->next ()->prev (chosen->prev ());
01441
01442
01443 dequeued = chosen;
01444
01445 size_t mb_bytes = 0;
01446 size_t mb_length = 0;
01447 dequeued->total_size_and_length (mb_bytes,
01448 mb_length);
01449
01450 this->cur_bytes_ -= mb_bytes;
01451 this->cur_length_ -= mb_length;
01452 --this->cur_count_;
01453
01454 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01455 this->head_ = this->tail_ = 0;
01456
01457
01458 dequeued->prev (0);
01459 dequeued->next (0);
01460
01461
01462
01463 if (this->cur_bytes_ <= this->low_water_mark_
01464 && this->signal_enqueue_waiters () == -1)
01465 return -1;
01466 else
01467 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01468 }
01469
01470
01471
01472
01473
01474 template <ACE_SYNCH_DECL> int
01475 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i (ACE_Message_Block *&dequeued)
01476 {
01477 if (this->head_ == 0)
01478 ACE_ERROR_RETURN ((LM_ERROR,
01479 ACE_TEXT ("Attempting to dequeue from empty queue")),
01480 -1);
01481 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i");
01482 dequeued = this->tail_;
01483 if (this->tail_->prev () == 0)
01484 {
01485 this->head_ = 0;
01486 this->tail_ = 0;
01487 }
01488 else
01489 {
01490 this->tail_->prev ()->next (0);
01491 this->tail_ = this->tail_->prev ();
01492 }
01493
01494 size_t mb_bytes = 0;
01495 size_t mb_length = 0;
01496 dequeued->total_size_and_length (mb_bytes,
01497 mb_length);
01498
01499 this->cur_bytes_ -= mb_bytes;
01500 this->cur_length_ -= mb_length;
01501 --this->cur_count_;
01502
01503 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01504 this->head_ = this->tail_ = 0;
01505
01506
01507 dequeued->prev (0);
01508 dequeued->next (0);
01509
01510
01511
01512 if (this->cur_bytes_ <= this->low_water_mark_
01513 && this->signal_enqueue_waiters () == -1)
01514 return -1;
01515 else
01516 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01517 }
01518
01519
01520
01521
01522
01523 template <ACE_SYNCH_DECL> int
01524 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i (ACE_Message_Block *&dequeued)
01525 {
01526 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01527 if (this->head_ == 0)
01528 ACE_ERROR_RETURN ((LM_ERROR,
01529 ACE_TEXT ("Attempting to dequeue from empty queue")),
01530 -1);
01531 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i");
01532
01533
01534 ACE_Message_Block* chosen = 0;
01535 ACE_Time_Value deadline = ACE_Time_Value::max_time;
01536 for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ())
01537 if (temp->msg_deadline_time () < deadline)
01538 {
01539 deadline = temp->msg_deadline_time ();
01540 chosen = temp;
01541 }
01542
01543
01544
01545 if (chosen == 0)
01546 chosen = this->head_;
01547
01548
01549
01550 if (chosen->prev () == 0)
01551 this->head_ = chosen->next ();
01552 else
01553 chosen->prev ()->next (chosen->next ());
01554
01555 if (chosen->next () == 0)
01556 this->tail_ = chosen->prev ();
01557 else
01558 chosen->next ()->prev (chosen->prev ());
01559
01560
01561 dequeued = chosen;
01562
01563 size_t mb_bytes = 0;
01564 size_t mb_length = 0;
01565 dequeued->total_size_and_length (mb_bytes,
01566 mb_length);
01567
01568 this->cur_bytes_ -= mb_bytes;
01569 this->cur_length_ -= mb_length;
01570 --this->cur_count_;
01571
01572 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01573 this->head_ = this->tail_ = 0;
01574
01575
01576 dequeued->prev (0);
01577 dequeued->next (0);
01578
01579
01580
01581 if (this->cur_bytes_ <= this->low_water_mark_
01582 && this->signal_enqueue_waiters () == -1)
01583 return -1;
01584 else
01585 return this->cur_count_;
01586 #else
01587 return this->dequeue_head_i (dequeued);
01588 #endif
01589 }
01590
01591
01592
01593 template <ACE_SYNCH_DECL> int
01594 ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
01595 ACE_Time_Value *timeout)
01596 {
01597 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
01598 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01599
01600 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01601 {
01602 errno = ESHUTDOWN;
01603 return -1;
01604 }
01605
01606
01607
01608 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01609 return -1;
01610
01611 first_item = this->head_;
01612 return ACE_Utils::truncate_cast<int> (this->cur_count_);
01613 }
01614
01615 template <ACE_SYNCH_DECL> int
01616 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &,
01617 ACE_Time_Value *timeout)
01618 {
01619 int result = 0;
01620
01621
01622
01623 while (this->is_full_i ())
01624 {
01625 if (this->not_full_cond_.wait (timeout) == -1)
01626 {
01627 if (errno == ETIME)
01628 errno = EWOULDBLOCK;
01629 result = -1;
01630 break;
01631 }
01632 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01633 {
01634 errno = ESHUTDOWN;
01635 result = -1;
01636 break;
01637 }
01638 }
01639 return result;
01640 }
01641
01642 template <ACE_SYNCH_DECL> int
01643 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond
01644 (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01645 {
01646 int result = 0;
01647
01648
01649
01650 while (this->is_empty_i ())
01651 {
01652 if (this->not_empty_cond_.wait (timeout) == -1)
01653 {
01654 if (errno == ETIME)
01655 errno = EWOULDBLOCK;
01656 result = -1;
01657 break;
01658 }
01659 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01660 {
01661 errno = ESHUTDOWN;
01662 result = -1;
01663 break;
01664 }
01665 }
01666 return result;
01667 }
01668
01669
01670
01671
01672 template <ACE_SYNCH_DECL> int
01673 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
01674 ACE_Time_Value *timeout)
01675 {
01676 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
01677 int queue_count = 0;
01678 {
01679 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01680
01681 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01682 {
01683 errno = ESHUTDOWN;
01684 return -1;
01685 }
01686
01687 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01688 return -1;
01689
01690 queue_count = this->enqueue_head_i (new_item);
01691
01692 if (queue_count == -1)
01693 return -1;
01694
01695 this->notify ();
01696 }
01697 return queue_count;
01698 }
01699
01700
01701
01702
01703
01704 template <ACE_SYNCH_DECL> int
01705 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
01706 ACE_Time_Value *timeout)
01707 {
01708 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
01709 int queue_count = 0;
01710 {
01711 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01712
01713 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01714 {
01715 errno = ESHUTDOWN;
01716 return -1;
01717 }
01718
01719 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01720 return -1;
01721
01722 queue_count = this->enqueue_i (new_item);
01723
01724 if (queue_count == -1)
01725 return -1;
01726
01727 this->notify ();
01728 }
01729 return queue_count;
01730 }
01731
01732
01733
01734
01735
01736 template <ACE_SYNCH_DECL> int
01737 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline (ACE_Message_Block *new_item,
01738 ACE_Time_Value *timeout)
01739 {
01740 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline");
01741 int queue_count = 0;
01742 {
01743 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01744
01745 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01746 {
01747 errno = ESHUTDOWN;
01748 return -1;
01749 }
01750
01751 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01752 return -1;
01753
01754 queue_count = this->enqueue_deadline_i (new_item);
01755
01756 if (queue_count == -1)
01757 return -1;
01758
01759 this->notify ();
01760 }
01761 return queue_count;
01762 }
01763
01764 template <ACE_SYNCH_DECL> int
01765 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
01766 ACE_Time_Value *timeout)
01767 {
01768 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
01769 return this->enqueue_prio (new_item, timeout);
01770 }
01771
01772
01773
01774
01775 template <ACE_SYNCH_DECL> int
01776 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
01777 ACE_Time_Value *timeout)
01778 {
01779 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
01780 int queue_count = 0;
01781 {
01782 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01783
01784 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01785 {
01786 errno = ESHUTDOWN;
01787 return -1;
01788 }
01789
01790 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01791 return -1;
01792
01793 queue_count = this->enqueue_tail_i (new_item);
01794
01795 if (queue_count == -1)
01796 return -1;
01797
01798 this->notify ();
01799 }
01800 return queue_count;
01801 }
01802
01803
01804
01805
01806
01807 template <ACE_SYNCH_DECL> int
01808 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01809 ACE_Time_Value *timeout)
01810 {
01811 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01812 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01813
01814 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01815 {
01816 errno = ESHUTDOWN;
01817 return -1;
01818 }
01819
01820 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01821 return -1;
01822
01823 return this->dequeue_head_i (first_item);
01824 }
01825
01826
01827
01828
01829
01830 template <ACE_SYNCH_DECL> int
01831 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio (ACE_Message_Block *&dequeued,
01832 ACE_Time_Value *timeout)
01833 {
01834 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio");
01835 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01836
01837 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01838 {
01839 errno = ESHUTDOWN;
01840 return -1;
01841 }
01842
01843 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01844 return -1;
01845
01846 return this->dequeue_prio_i (dequeued);
01847 }
01848
01849
01850
01851
01852
01853 template <ACE_SYNCH_DECL> int
01854 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail (ACE_Message_Block *&dequeued,
01855 ACE_Time_Value *timeout)
01856 {
01857 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail");
01858 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01859
01860 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01861 {
01862 errno = ESHUTDOWN;
01863 return -1;
01864 }
01865
01866 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01867 return -1;
01868
01869 return this->dequeue_tail_i (dequeued);
01870 }
01871
01872
01873
01874
01875
01876 template <ACE_SYNCH_DECL> int
01877 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline (ACE_Message_Block *&dequeued,
01878 ACE_Time_Value *timeout)
01879 {
01880 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline");
01881 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01882
01883 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01884 {
01885 errno = ESHUTDOWN;
01886 return -1;
01887 }
01888
01889 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01890 return -1;
01891
01892 return this->dequeue_deadline_i (dequeued);
01893 }
01894
01895 template <ACE_SYNCH_DECL> int
01896 ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
01897 {
01898 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
01899
01900 #if defined (ACE_HAS_MONITOR_POINTS) && (ACE_HAS_MONITOR_POINTS == 1)
01901 this->monitor_->receive (this->cur_length_);
01902 #endif
01903
01904
01905 if (this->notification_strategy_ == 0)
01906 return 0;
01907 else
01908 return this->notification_strategy_->notify ();
01909 }
01910
01911
01912 template <ACE_SYNCH_DECL>
01913 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
01914 size_t hwm,
01915 size_t lwm,
01916 ACE_Notification_Strategy *ns)
01917 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01918 pending_head_ (0),
01919 pending_tail_ (0),
01920 late_head_ (0),
01921 late_tail_ (0),
01922 beyond_late_head_ (0),
01923 beyond_late_tail_ (0),
01924 message_strategy_ (message_strategy)
01925 {
01926
01927
01928
01929 }
01930
01931
01932
01933 template <ACE_SYNCH_DECL>
01934 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
01935 {
01936 delete &this->message_strategy_;
01937 }
01938
01939 template <ACE_SYNCH_DECL> int
01940 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head,
01941 ACE_Message_Block *&list_tail,
01942 u_int status_flags)
01943 {
01944
01945 list_head = 0;
01946 list_tail = 0;
01947
01948
01949 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01950
01951
01952 int result = this->refresh_queue (current_time);
01953 if (result < 0)
01954 return result;
01955
01956 if (ACE_BIT_ENABLED (status_flags,
01957 (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01958 && this->pending_head_
01959 && this->pending_tail_)
01960 {
01961
01962 if (this->pending_head_->prev ())
01963 {
01964 this->tail_ = this->pending_head_->prev ();
01965 this->pending_head_->prev ()->next (0);
01966 }
01967 else
01968 {
01969
01970 this->head_ = 0;
01971 this->tail_ = 0;
01972 }
01973
01974
01975 list_head = this->pending_head_;
01976 list_tail = this->pending_tail_;
01977
01978
01979 this->pending_head_->prev (0);
01980 this->pending_head_ = 0;
01981 this->pending_tail_ = 0;
01982 }
01983
01984 if (ACE_BIT_ENABLED (status_flags,
01985 (u_int) ACE_Dynamic_Message_Strategy::LATE)
01986 && this->late_head_
01987 && this->late_tail_)
01988 {
01989
01990
01991 if (this->late_tail_->next ())
01992 this->late_tail_->next ()->prev (this->late_head_->prev ());
01993 else
01994 this->tail_ = this->late_head_->prev ();
01995
01996 if (this->late_head_->prev ())
01997 this->late_head_->prev ()->next (this->late_tail_->next ());
01998 else
01999 this->head_ = this->late_tail_->next ();
02000
02001
02002 this->late_head_->prev (list_tail);
02003 if (list_tail)
02004 list_tail->next (this->late_head_);
02005 else
02006 list_head = this->late_head_;
02007
02008 list_tail = this->late_tail_;
02009
02010 this->late_tail_->next (0);
02011 this->late_head_ = 0;
02012 this->late_tail_ = 0;
02013 }
02014
02015 if (ACE_BIT_ENABLED (status_flags,
02016 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
02017 && this->beyond_late_head_
02018 && this->beyond_late_tail_)
02019 {
02020
02021 if (this->beyond_late_tail_->next ())
02022 {
02023 this->head_ = this->beyond_late_tail_->next ();
02024 this->beyond_late_tail_->next ()->prev (0);
02025 }
02026 else
02027 {
02028
02029 this->head_ = 0;
02030 this->tail_ = 0;
02031 }
02032
02033
02034
02035 if (list_tail)
02036 {
02037 this->beyond_late_head_->prev (list_tail);
02038 list_tail->next (this->beyond_late_head_);
02039 }
02040 else
02041 list_head = this->beyond_late_head_;
02042
02043 list_tail = this->beyond_late_tail_;
02044
02045 this->beyond_late_tail_->next (0);
02046 this->beyond_late_head_ = 0;
02047 this->beyond_late_tail_ = 0;
02048 }
02049
02050
02051 ACE_Message_Block *temp1;
02052
02053 for (temp1 = list_head;
02054 temp1 != 0;
02055 temp1 = temp1->next ())
02056 {
02057 --this->cur_count_;
02058
02059 size_t mb_bytes = 0;
02060 size_t mb_length = 0;
02061 temp1->total_size_and_length (mb_bytes,
02062 mb_length);
02063
02064 this->cur_bytes_ -= mb_bytes;
02065 this->cur_length_ -= mb_length;
02066 }
02067
02068 return result;
02069 }
02070
02071
02072
02073
02074
02075
02076
02077
02078
02079 template <ACE_SYNCH_DECL> int
02080 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
02081 ACE_Time_Value *timeout)
02082 {
02083 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
02084
02085 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
02086
02087 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
02088 {
02089 errno = ESHUTDOWN;
02090 return -1;
02091 }
02092
02093 int result;
02094
02095
02096 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02097
02098
02099 result = this->refresh_queue (current_time);
02100 if (result < 0)
02101 return result;
02102
02103
02104 result = this->wait_not_empty_cond (ace_mon, timeout);
02105 if (result == -1)
02106 return result;
02107
02108
02109
02110
02111 result = this->dequeue_head_i (first_item);
02112
02113 return result;
02114 }
02115
02116
02117
02118
02119 template <ACE_SYNCH_DECL> void
02120 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
02121 {
02122 #if defined (ACE_HAS_DUMP)
02123 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
02124 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02125
02126 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
02127 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
02128
02129 ACE_DEBUG ((LM_DEBUG,
02130 ACE_TEXT ("pending_head_ = %u\n")
02131 ACE_TEXT ("pending_tail_ = %u\n")
02132 ACE_TEXT ("late_head_ = %u\n")
02133 ACE_TEXT ("late_tail_ = %u\n")
02134 ACE_TEXT ("beyond_late_head_ = %u\n")
02135 ACE_TEXT ("beyond_late_tail_ = %u\n"),
02136 this->pending_head_,
02137 this->pending_tail_,
02138 this->late_head_,
02139 this->late_tail_,
02140 this->beyond_late_head_,
02141 this->beyond_late_tail_));
02142
02143 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("message_strategy_ : \n")));
02144 message_strategy_.dump ();
02145
02146 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02147 #endif
02148 }
02149
02150
02151 template <ACE_SYNCH_DECL> int
02152 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
02153 {
02154 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
02155
02156 if (new_item == 0)
02157 return -1;
02158
02159 int result = 0;
02160
02161
02162 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
02163
02164
02165
02166 result = this->refresh_queue (current_time);
02167 if (result < 0)
02168 return result;
02169
02170
02171 switch (message_strategy_.priority_status (*new_item,
02172 current_time))
02173 {
02174 case ACE_Dynamic_Message_Strategy::PENDING:
02175 if (this->pending_tail_ == 0)
02176 {
02177
02178
02179
02180 pending_head_ = new_item;
02181 pending_tail_ = pending_head_;
02182 return this->enqueue_tail_i (new_item);
02183 }
02184 else
02185 {
02186
02187
02188 result = sublist_enqueue_i (new_item,
02189 current_time,
02190 this->pending_head_,
02191 this->pending_tail_,
02192 ACE_Dynamic_Message_Strategy::PENDING);
02193 }
02194 break;
02195
02196 case ACE_Dynamic_Message_Strategy::LATE:
02197 if (this->late_tail_ == 0)
02198 {
02199 late_head_ = new_item;
02200 late_tail_ = late_head_;
02201
02202 if (this->pending_head_ == 0)
02203
02204
02205
02206 return this->enqueue_tail_i (new_item);
02207 else if (this->beyond_late_tail_ == 0)
02208
02209
02210 return this->enqueue_head_i (new_item);
02211 else
02212 {
02213
02214
02215
02216 this->beyond_late_tail_->next (new_item);
02217 new_item->prev (this->beyond_late_tail_);
02218 this->pending_head_->prev (new_item);
02219 new_item->next (this->pending_head_);
02220 }
02221 }
02222 else
02223 {
02224
02225
02226 result = sublist_enqueue_i (new_item,
02227 current_time,
02228 this->late_head_,
02229 this->late_tail_,
02230 ACE_Dynamic_Message_Strategy::LATE);
02231 }
02232 break;
02233
02234 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02235 if (this->beyond_late_tail_ == 0)
02236 {
02237
02238
02239
02240 beyond_late_head_ = new_item;
02241 beyond_late_tail_ = beyond_late_head_;
02242 return this->enqueue_head_i (new_item);
02243 }
02244 else
02245 {
02246
02247
02248
02249 if (this->beyond_late_tail_->next ())
02250 this->beyond_late_tail_->next ()->prev (new_item);
02251 else
02252 this->tail_ = new_item;
02253
02254 new_item->next (this->beyond_late_tail_->next ());
02255 this->beyond_late_tail_->next (new_item);
02256 new_item->prev (this->beyond_late_tail_);
02257 this->beyond_late_tail_ = new_item;
02258 }
02259
02260 break;
02261
02262
02263 default:
02264 result = -1;
02265 break;
02266 }
02267
02268 if (result < 0)
02269 return result;
02270
02271 size_t mb_bytes = 0;
02272 size_t mb_length = 0;
02273 new_item->total_size_and_length (mb_bytes,
02274 mb_length);
02275 this->cur_bytes_ += mb_bytes;
02276 this->cur_length_ += mb_length;
02277 ++this->cur_count_;
02278
02279 if (this->signal_dequeue_waiters () == -1)
02280 return -1;
02281 else
02282 return this->cur_count_;
02283 }
02284
02285
02286
02287
02288
02289
02290
02291 template <ACE_SYNCH_DECL> int
02292 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
02293 const ACE_Time_Value ¤t_time,
02294 ACE_Message_Block *&sublist_head,
02295 ACE_Message_Block *&sublist_tail,
02296 ACE_Dynamic_Message_Strategy::Priority_Status status)
02297 {
02298 int result = 0;
02299 ACE_Message_Block *current_item = 0;
02300
02301
02302
02303 for (current_item = sublist_tail;
02304 current_item;
02305 current_item = current_item->prev ())
02306 {
02307 if (message_strategy_.priority_status (*current_item, current_time) == status)
02308 {
02309 if (current_item->msg_priority () >= new_item->msg_priority ())
02310 break;
02311 }
02312 else
02313 {
02314 sublist_head = new_item;
02315 break;
02316 }
02317 }
02318
02319 if (current_item == 0)
02320 {
02321
02322
02323 new_item->prev (0);
02324 new_item->next (this->head_);
02325 if (this->head_ != 0)
02326 this->head_->prev (new_item);
02327 else
02328 {
02329 this->tail_ = new_item;
02330 sublist_tail = new_item;
02331 }
02332 this->head_ = new_item;
02333 sublist_head = new_item;
02334 }
02335 else
02336 {
02337
02338 new_item->next (current_item->next ());
02339 new_item->prev (current_item);
02340
02341 if (current_item->next ())
02342 current_item->next ()->prev (new_item);
02343 else
02344 this->tail_ = new_item;
02345
02346 current_item->next (new_item);
02347
02348
02349
02350 if (current_item == sublist_tail)
02351 sublist_tail = new_item;
02352 }
02353
02354 return result;
02355 }
02356
02357
02358
02359
02360 template <ACE_SYNCH_DECL> int
02361 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
02362 {
02363 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
02364
02365 int result = 0;
02366 int last_in_subqueue = 0;
02367
02368
02369 if (this->pending_head_)
02370 {
02371 first_item = this->pending_head_;
02372
02373 if (0 == this->pending_head_->prev ())
02374 this->head_ = this->pending_head_->next ();
02375 else
02376 this->pending_head_->prev ()->next (this->pending_head_->next ());
02377
02378 if (0 == this->pending_head_->next ())
02379 {
02380 this->tail_ = this->pending_head_->prev ();
02381 this->pending_head_ = 0;
02382 this->pending_tail_ = 0;
02383 }
02384 else
02385 {
02386 this->pending_head_->next ()->prev (this->pending_head_->prev ());
02387 this->pending_head_ = this->pending_head_->next ();
02388 }
02389
02390 first_item->prev (0);
02391 first_item->next (0);
02392 }
02393
02394
02395 else if (this->late_head_)
02396 {
02397 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
02398
02399 first_item = this->late_head_;
02400
02401 if (0 == this->late_head_->prev ())
02402 this->head_ = this->late_head_->next ();
02403 else
02404 this->late_head_->prev ()->next (this->late_head_->next ());
02405
02406 if (0 == this->late_head_->next ())
02407 this->tail_ = this->late_head_->prev ();
02408 else
02409 {
02410 this->late_head_->next ()->prev (this->late_head_->prev ());
02411 this->late_head_ = this->late_head_->next ();
02412 }
02413
02414 if (last_in_subqueue)
02415 {
02416 this->late_head_ = 0;
02417 this->late_tail_ = 0;
02418 }
02419
02420 first_item->prev (0);
02421 first_item->next (0);
02422 }
02423
02424 else if (this->beyond_late_head_)
02425 {
02426 last_in_subqueue =
02427 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
02428
02429 first_item = this->beyond_late_head_;
02430 this->head_ = this->beyond_late_head_->next ();
02431
02432 if (0 == this->beyond_late_head_->next ())
02433 this->tail_ = this->beyond_late_head_->prev ();
02434 else
02435 {
02436 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
02437 this->beyond_late_head_ = this->beyond_late_head_->next ();
02438 }
02439
02440 if (last_in_subqueue)
02441 {
02442 this->beyond_late_head_ = 0;
02443 this->beyond_late_tail_ = 0;
02444 }
02445
02446 first_item->prev (0);
02447 first_item->next (0);
02448 }
02449 else
02450 {
02451
02452 first_item = 0;
02453 result = -1;
02454 }
02455
02456 if (result < 0)
02457 return result;
02458
02459 size_t mb_bytes = 0;
02460 size_t mb_length = 0;
02461 first_item->total_size_and_length (mb_bytes,
02462 mb_length);
02463
02464 this->cur_bytes_ -= mb_bytes;
02465 this->cur_length_ -= mb_length;
02466 --this->cur_count_;
02467
02468
02469
02470 if (this->cur_bytes_ <= this->low_water_mark_
02471 && this->signal_enqueue_waiters () == -1)
02472 return -1;
02473 else
02474 return this->cur_count_;
02475 }
02476
02477
02478
02479
02480
02481
02482
02483 template <ACE_SYNCH_DECL> int
02484 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value ¤t_time)
02485 {
02486 int result;
02487
02488 result = refresh_pending_queue (current_time);
02489
02490 if (result != -1)
02491 result = refresh_late_queue (current_time);
02492
02493 return result;
02494 }
02495
02496
02497
02498
02499 template <ACE_SYNCH_DECL> int
02500 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value ¤t_time)
02501 {
02502 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02503
02504
02505 if (this->pending_head_)
02506 {
02507 current_status = message_strategy_.priority_status (*this->pending_head_,
02508 current_time);
02509 switch (current_status)
02510 {
02511 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02512
02513
02514 this->beyond_late_head_ = this->head_;
02515
02516
02517
02518 this->late_head_ = 0;
02519 this->late_tail_ = 0;
02520
02521
02522 do
02523 {
02524 this->pending_head_ = this->pending_head_->next ();
02525
02526 if (this->pending_head_)
02527 current_status = message_strategy_.priority_status (*this->pending_head_,
02528 current_time);
02529 else
02530 break;
02531
02532 }
02533 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02534
02535 if (this->pending_head_)
02536 {
02537
02538 this->beyond_late_tail_ = this->pending_head_->prev ();
02539
02540 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02541
02542 break;
02543 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02544 {
02545
02546 ACE_ERROR_RETURN ((LM_ERROR,
02547 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02548 (int) current_status),
02549 -1);
02550 }
02551
02552 }
02553 else
02554 {
02555
02556
02557 this->beyond_late_tail_ = this->tail_;
02558 this->pending_head_ = 0;
02559 this->pending_tail_ = 0;
02560 break;
02561 }
02562
02563 case ACE_Dynamic_Message_Strategy::LATE:
02564
02565
02566
02567 if (this->late_head_ == 0)
02568 this->late_head_ = this->pending_head_;
02569
02570
02571 do
02572 {
02573 this->pending_head_ = this->pending_head_->next ();
02574
02575 if (this->pending_head_)
02576 current_status = message_strategy_.priority_status (*this->pending_head_,
02577 current_time);
02578 else
02579 break;
02580
02581 }
02582 while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02583
02584 if (this->pending_head_)
02585 {
02586 if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02587
02588 ACE_ERROR_RETURN((LM_ERROR,
02589 ACE_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02590 (int) current_status),
02591 -1);
02592
02593
02594 this->late_tail_ = this->pending_head_->prev ();
02595 }
02596 else
02597 {
02598
02599 this->late_tail_ = this->tail_;
02600 this->pending_head_ = 0;
02601 this->pending_tail_ = 0;
02602 }
02603
02604 break;
02605 case ACE_Dynamic_Message_Strategy::PENDING:
02606
02607 break;
02608 default:
02609
02610 ACE_ERROR_RETURN((LM_ERROR,
02611 ACE_TEXT ("Unknown message priority status [%d]"),
02612 (int) current_status),
02613 -1);
02614 }
02615 }
02616 return 0;
02617 }
02618
02619
02620
02621
02622 template <ACE_SYNCH_DECL> int
02623 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value ¤t_time)
02624 {
02625 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02626
02627 if (this->late_head_)
02628 {
02629 current_status = message_strategy_.priority_status (*this->late_head_,
02630 current_time);
02631 switch (current_status)
02632 {
02633 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02634
02635
02636
02637 this->beyond_late_head_ = this->head_;
02638
02639
02640 do
02641 {
02642 this->late_head_ = this->late_head_->next ();
02643
02644 if (this->late_head_)
02645 current_status = message_strategy_.priority_status (*this->late_head_,
02646 current_time);
02647 else
02648 break;
02649
02650 }
02651 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02652
02653 if (this->late_head_)
02654 {
02655
02656 this->beyond_late_tail_ = this->late_head_->prev ();
02657
02658 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02659 {
02660
02661 this->late_head_ = 0;
02662 this->late_tail_ = 0;
02663 }
02664 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02665
02666 ACE_ERROR_RETURN ((LM_ERROR,
02667 ACE_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02668 (int) current_status),
02669 -1);
02670 }
02671 else
02672 {
02673
02674 this->beyond_late_tail_ = this->tail_;
02675 this->late_head_ = 0;
02676 this->late_tail_ = 0;
02677 }
02678
02679 break;
02680
02681 case ACE_Dynamic_Message_Strategy::LATE:
02682
02683 break;
02684
02685 case ACE_Dynamic_Message_Strategy::PENDING:
02686
02687 ACE_ERROR_RETURN ((LM_ERROR,
02688 ACE_TEXT ("Unexpected message priority status ")
02689 ACE_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02690 (int) current_status),
02691 -1);
02692 default:
02693
02694 ACE_ERROR_RETURN ((LM_ERROR,
02695 ACE_TEXT ("Unknown message priority status [%d]"),
02696 (int) current_status),
02697 -1);
02698 }
02699 }
02700
02701 return 0;
02702 }
02703
02704
02705
02706
02707 template <ACE_SYNCH_DECL> int
02708 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
02709 ACE_Time_Value *timeout)
02710 {
02711 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02712 timeout);
02713 }
02714
02715
02716
02717
02718 template <ACE_SYNCH_DECL> int
02719 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
02720 ACE_Time_Value *timeout)
02721 {
02722 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02723 return this->enqueue_prio (new_item, timeout);
02724 }
02725
02726
02727
02728
02729
02730
02731 template <ACE_SYNCH_DECL> int
02732 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
02733 ACE_Time_Value *timeout)
02734 {
02735 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02736 return this->enqueue_prio (new_item, timeout);
02737 }
02738
02739
02740
02741
02742
02743
02744 template <ACE_SYNCH_DECL>
02745 ACE_Message_Queue<ACE_SYNCH_USE> *
02746 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
02747 size_t lwm,
02748 ACE_Notification_Strategy *ns)
02749 {
02750 ACE_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02751
02752 ACE_NEW_RETURN (tmp,
02753 ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02754 0);
02755 return tmp;
02756 }
02757
02758
02759
02760 template <ACE_SYNCH_DECL>
02761 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02762 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
02763 size_t lwm,
02764 ACE_Notification_Strategy *ns,
02765 u_long static_bit_field_mask,
02766 u_long static_bit_field_shift,
02767 u_long dynamic_priority_max,
02768 u_long dynamic_priority_offset)
02769 {
02770 ACE_Deadline_Message_Strategy *adms = 0;
02771
02772 ACE_NEW_RETURN (adms,
02773 ACE_Deadline_Message_Strategy (static_bit_field_mask,
02774 static_bit_field_shift,
02775 dynamic_priority_max,
02776 dynamic_priority_offset),
02777 0);
02778
02779 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02780 ACE_NEW_RETURN (tmp,
02781 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns),
02782 0);
02783 return tmp;
02784 }
02785
02786
02787
02788
02789 template <ACE_SYNCH_DECL>
02790 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02791 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
02792 size_t lwm,
02793 ACE_Notification_Strategy *ns,
02794 u_long static_bit_field_mask,
02795 u_long static_bit_field_shift,
02796 u_long dynamic_priority_max,
02797 u_long dynamic_priority_offset)
02798 {
02799 ACE_Laxity_Message_Strategy *alms = 0;
02800
02801 ACE_NEW_RETURN (alms,
02802 ACE_Laxity_Message_Strategy (static_bit_field_mask,
02803 static_bit_field_shift,
02804 dynamic_priority_max,
02805 dynamic_priority_offset),
02806 0);
02807
02808 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0;
02809 ACE_NEW_RETURN (tmp,
02810 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns),
02811 0);
02812 return tmp;
02813 }
02814
02815
02816
02817
02818 #if defined (ACE_VXWORKS)
02819
02820 template <ACE_SYNCH_DECL>
02821 ACE_Message_Queue_Vx *
02822 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
02823 size_t max_message_length,
02824 ACE_Notification_Strategy *ns)
02825 {
02826 ACE_Message_Queue_Vx *tmp = 0;
02827
02828 ACE_NEW_RETURN (tmp,
02829 ACE_Message_Queue_Vx (max_messages, max_message_length, ns),
02830 0);
02831 return tmp;
02832 }
02833
02834
02835 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
02836
02837 template <ACE_SYNCH_DECL>
02838 ACE_Message_Queue_NT *
02839 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads)
02840 {
02841 ACE_Message_Queue_NT *tmp = 0;
02842
02843 ACE_NEW_RETURN (tmp,
02844 ACE_Message_Queue_NT (max_threads);
02845 0);
02846 return tmp;
02847 }
02848
02849 #endif
02850 #endif
02851
02852 ACE_END_VERSIONED_NAMESPACE_DECL
02853
02854 #endif