00001
00002
00003
00004 #ifndef ACE_STREAM_CPP
00005 #define ACE_STREAM_CPP
00006
00007
00008
00009 #include "ace/Stream.h"
00010
00011 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00012 # pragma once
00013 #endif
00014
00015 #include "ace/Stream_Modules.h"
00016 #include "ace/OS_NS_string.h"
00017
00018 #if !defined (__ACE_INLINE__)
00019 #include "ace/Stream.inl"
00020 #endif
00021
00022 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00023
00024 ACE_ALLOC_HOOK_DEFINE(ACE_Stream)
00025
00026
00027
00028 template <ACE_SYNCH_DECL> void
00029 ACE_Stream<ACE_SYNCH_USE>::dump (void) const
00030 {
00031 #if defined (ACE_HAS_DUMP)
00032 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::dump");
00033 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("-------- module links --------\n")));
00034
00035 for (ACE_Module<ACE_SYNCH_USE> *mp = this->stream_head_;
00036 ;
00037 mp = mp->next ())
00038 {
00039 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("module name = %s\n"), mp->name ()));
00040 if (mp == this->stream_tail_)
00041 break;
00042 }
00043
00044 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("-------- writer links --------\n")));
00045
00046 ACE_Task<ACE_SYNCH_USE> *tp;
00047
00048 for (tp = this->stream_head_->writer ();
00049 ;
00050 tp = tp->next ())
00051 {
00052 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("writer queue name = %s\n"), tp->name ()));
00053 tp->dump ();
00054 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("-------\n")));
00055 if (tp == this->stream_tail_->writer ()
00056 || (this->linked_us_
00057 && tp == this->linked_us_->stream_head_->reader ()))
00058 break;
00059 }
00060
00061 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("-------- reader links --------\n")));
00062 for (tp = this->stream_tail_->reader (); ; tp = tp->next ())
00063 {
00064 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("reader queue name = %s\n"), tp->name ()));
00065 tp->dump ();
00066 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("-------\n")));
00067 if (tp == this->stream_head_->reader ()
00068 || (this->linked_us_
00069 && tp == this->linked_us_->stream_head_->writer ()))
00070 break;
00071 }
00072 #endif
00073 }
00074
00075 template <ACE_SYNCH_DECL> int
00076 ACE_Stream<ACE_SYNCH_USE>::push (ACE_Module<ACE_SYNCH_USE> *new_top)
00077 {
00078 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push");
00079 if (this->push_module (new_top,
00080 this->stream_head_->next (),
00081 this->stream_head_) == -1)
00082 return -1;
00083 else
00084 return 0;
00085 }
00086
00087 template <ACE_SYNCH_DECL> int
00088 ACE_Stream<ACE_SYNCH_USE>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
00089 {
00090 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::put");
00091 return this->stream_head_->writer ()->put (mb, tv);
00092 }
00093
00094 template <ACE_SYNCH_DECL> int
00095 ACE_Stream<ACE_SYNCH_USE>::get (ACE_Message_Block *&mb, ACE_Time_Value *tv)
00096 {
00097 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::get");
00098 return this->stream_head_->reader ()->getq (mb, tv);
00099 }
00100
00101
00102
00103
00104 template <ACE_SYNCH_DECL> int
00105 ACE_Stream<ACE_SYNCH_USE>::top (ACE_Module<ACE_SYNCH_USE> *&m)
00106 {
00107 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::top");
00108 if (this->stream_head_->next () == this->stream_tail_)
00109 return -1;
00110 else
00111 {
00112 m = this->stream_head_->next ();
00113 return 0;
00114 }
00115 }
00116
00117 template <ACE_SYNCH_DECL> int
00118 ACE_Stream<ACE_SYNCH_USE>::insert (const ACE_TCHAR *prev_name,
00119 ACE_Module<ACE_SYNCH_USE> *mod)
00120 {
00121 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::insert");
00122
00123 for (ACE_Module<ACE_SYNCH_USE> *prev_mod = this->stream_head_;
00124 prev_mod != 0;
00125 prev_mod = prev_mod->next ())
00126 if (ACE_OS::strcmp (prev_mod->name (), prev_name) == 0)
00127 {
00128 ACE_Module<ACE_SYNCH_USE> *next_mod = prev_mod->next ();
00129
00130
00131 if (next_mod == 0)
00132 return -1;
00133
00134 mod->link (next_mod);
00135 prev_mod->link (mod);
00136
00137 if (mod->reader ()->open (mod->arg ()) == -1)
00138 return -1;
00139
00140 if (mod->writer ()->open (mod->arg ()) == -1)
00141 return -1;
00142
00143 return 0;
00144 }
00145
00146 return -1;
00147 }
00148
00149 template <ACE_SYNCH_DECL> int
00150 ACE_Stream<ACE_SYNCH_USE>::replace (const ACE_TCHAR *replace_name,
00151 ACE_Module<ACE_SYNCH_USE> *mod,
00152 int flags)
00153 {
00154 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::replace");
00155 ACE_Module<ACE_SYNCH_USE> *prev_mod = 0;
00156
00157 for (ACE_Module<ACE_SYNCH_USE> *rep_mod = this->stream_head_;
00158 rep_mod != 0;
00159 rep_mod = rep_mod->next ())
00160 if (ACE_OS::strcmp (rep_mod->name (), replace_name) == 0)
00161 {
00162 ACE_Module<ACE_SYNCH_USE> *next_mod = rep_mod->next ();
00163
00164 if (next_mod)
00165 mod->link (next_mod);
00166 else
00167 {
00168 mod->writer ()->next (0);
00169 mod->next (0);
00170 this->stream_tail_ = mod;
00171 }
00172
00173 if (prev_mod)
00174 prev_mod->link (mod);
00175 else
00176 {
00177 mod->reader ()->next (0);
00178 this->stream_head_ = mod;
00179 }
00180
00181 if (mod->reader ()->open (mod->arg ()) == -1)
00182 return -1;
00183
00184 if (mod->writer ()->open (mod->arg ()) == -1)
00185 return -1;
00186
00187 if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00188 {
00189 rep_mod->close (flags);
00190 delete rep_mod;
00191 }
00192
00193 return 0;
00194 }
00195 else
00196 prev_mod = rep_mod;
00197
00198 return -1;
00199 }
00200
00201
00202
00203
00204 template <ACE_SYNCH_DECL> int
00205 ACE_Stream<ACE_SYNCH_USE>::pop (int flags)
00206 {
00207 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::pop");
00208 if (this->stream_head_->next () == this->stream_tail_)
00209 return -1;
00210 else
00211 {
00212
00213 ACE_Module<ACE_SYNCH_USE> *top_mod = this->stream_head_->next ();
00214 ACE_Module<ACE_SYNCH_USE> *new_top = top_mod->next ();
00215
00216 this->stream_head_->next (new_top);
00217
00218
00219
00220 top_mod->close (flags);
00221
00222
00223 if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00224 delete top_mod;
00225
00226 this->stream_head_->writer ()->next (new_top->writer ());
00227 new_top->reader ()->next (this->stream_head_->reader ());
00228 return 0;
00229 }
00230 }
00231
00232
00233
00234
00235 template <ACE_SYNCH_DECL> int
00236 ACE_Stream<ACE_SYNCH_USE>::remove (const ACE_TCHAR *name,
00237 int flags)
00238 {
00239 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::remove");
00240 ACE_Module<ACE_SYNCH_USE> *prev = 0;
00241
00242 for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00243 mod != 0;
00244 mod = mod->next ())
00245 {
00246 #ifndef ACE_NLOGGING
00247 if (ACE::debug ())
00248 {
00249 ACE_DEBUG ((LM_DEBUG,
00250 ACE_TEXT ("ACE_Stream::remove comparing existing module :%s: with :%s:\n"),
00251 mod->name (),
00252 name));
00253 }
00254 #endif
00255
00256 if (ACE_OS::strcmp (mod->name (), name) == 0)
00257 {
00258 if (prev == 0)
00259 this->stream_head_->link (mod->next ());
00260 else
00261 prev->link (mod->next ());
00262
00263
00264 if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00265 {
00266
00267 mod->close (flags);
00268 delete mod;
00269 }
00270
00271 return 0;
00272 }
00273 else
00274 prev = mod;
00275 }
00276
00277 ACE_DEBUG ((LM_WARNING, ACE_TEXT ("ACE_Stream::remove failed to find module with name %s to remove\n"),name));
00278 return -1;
00279 }
00280
00281 template <ACE_SYNCH_DECL> ACE_Module<ACE_SYNCH_USE> *
00282 ACE_Stream<ACE_SYNCH_USE>::find (const ACE_TCHAR *name)
00283 {
00284 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::find");
00285 for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00286 mod != 0;
00287 mod = mod->next ())
00288 if (ACE_OS::strcmp (mod->name (), name) == 0)
00289 return mod;
00290
00291 return 0;
00292 }
00293
00294
00295
00296 template <ACE_SYNCH_DECL> int
00297 ACE_Stream<ACE_SYNCH_USE>::push_module (ACE_Module<ACE_SYNCH_USE> *new_top,
00298 ACE_Module<ACE_SYNCH_USE> *current_top,
00299 ACE_Module<ACE_SYNCH_USE> *head)
00300 {
00301 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push_module");
00302 ACE_Task<ACE_SYNCH_USE> *nt_reader = new_top->reader ();
00303 ACE_Task<ACE_SYNCH_USE> *nt_writer = new_top->writer ();
00304 ACE_Task<ACE_SYNCH_USE> *ct_reader = 0;
00305 ACE_Task<ACE_SYNCH_USE> *ct_writer = 0;
00306
00307 if (current_top)
00308 {
00309 ct_reader = current_top->reader ();
00310 ct_writer = current_top->writer ();
00311 ct_reader->next (nt_reader);
00312 }
00313
00314 nt_writer->next (ct_writer);
00315
00316 if (head)
00317 {
00318 if (head != new_top)
00319 head->link (new_top);
00320 }
00321 else
00322 nt_reader->next (0);
00323
00324 new_top->next (current_top);
00325
00326 if (nt_reader->open (new_top->arg ()) == -1)
00327 return -1;
00328
00329 if (nt_writer->open (new_top->arg ()) == -1)
00330 return -1;
00331 return 0;
00332 }
00333
00334 template <ACE_SYNCH_DECL> int
00335 ACE_Stream<ACE_SYNCH_USE>::open (void *a,
00336 ACE_Module<ACE_SYNCH_USE> *head,
00337 ACE_Module<ACE_SYNCH_USE> *tail)
00338 {
00339 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::open");
00340 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00341
00342 ACE_Task<ACE_SYNCH_USE> *h1 = 0, *h2 = 0;
00343 ACE_Task<ACE_SYNCH_USE> *t1 = 0, *t2 = 0;
00344
00345 if (head == 0)
00346 {
00347 ACE_NEW_RETURN (h1,
00348 ACE_Stream_Head<ACE_SYNCH_USE>,
00349 -1);
00350 ACE_NEW_RETURN (h2,
00351 ACE_Stream_Head<ACE_SYNCH_USE>,
00352 -1);
00353 ACE_NEW_RETURN (head,
00354 ACE_Module<ACE_SYNCH_USE> (ACE_TEXT ("ACE_Stream_Head"),
00355 h1, h2,
00356 a,
00357 M_DELETE),
00358 -1);
00359 }
00360
00361 if (tail == 0)
00362 {
00363 ACE_NEW_RETURN (t1,
00364 ACE_Stream_Tail<ACE_SYNCH_USE>,
00365 -1);
00366 ACE_NEW_RETURN (t2,
00367 ACE_Stream_Tail<ACE_SYNCH_USE>,
00368 -1);
00369 ACE_NEW_RETURN (tail,
00370 ACE_Module<ACE_SYNCH_USE> (ACE_TEXT ("ACE_Stream_Tail"),
00371 t1, t2,
00372 a,
00373 M_DELETE),
00374 -1);
00375 }
00376
00377
00378 if ((head == 0 && (h1 == 0 || h2 == 0))
00379 || (tail == 0 && (t1 == 0 || t2 == 0)))
00380 {
00381 delete h1;
00382 delete h2;
00383 delete t1;
00384 delete t2;
00385 delete head;
00386 delete tail;
00387 errno = ENOMEM;
00388 return -1;
00389 }
00390
00391 this->stream_head_ = head;
00392 this->stream_tail_ = tail;
00393
00394 if (this->push_module (this->stream_tail_) == -1)
00395 return -1;
00396 else if (this->push_module (this->stream_head_,
00397 this->stream_tail_,
00398 this->stream_head_) == -1)
00399 return -1;
00400
00401 return 0;
00402 }
00403
00404 template <ACE_SYNCH_DECL> int
00405 ACE_Stream<ACE_SYNCH_USE>::close (int flags)
00406 {
00407 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::close");
00408 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00409
00410 if (this->stream_head_ != 0
00411 && this->stream_tail_ != 0)
00412 {
00413
00414 this->unlink_i ();
00415
00416 int result = 0;
00417
00418
00419
00420 while (this->stream_head_->next () != this->stream_tail_)
00421 if (this->pop (flags) == -1)
00422 result = -1;
00423
00424
00425 if (this->stream_head_->close (flags) == -1)
00426 result = -1;
00427 if (this->stream_tail_->close (flags) == -1)
00428 result = -1;
00429
00430
00431 delete this->stream_head_;
00432 delete this->stream_tail_;
00433
00434 this->stream_head_ = 0;
00435 this->stream_tail_ = 0;
00436
00437
00438 this->final_close_.broadcast ();
00439 return result;
00440 }
00441 return 0;
00442 }
00443
00444 template <ACE_SYNCH_DECL> int
00445 ACE_Stream<ACE_SYNCH_USE>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
00446 void *a)
00447 {
00448 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::control");
00449 ACE_IO_Cntl_Msg ioc (cmd);
00450
00451 ACE_Message_Block *db;
00452
00453
00454 ACE_NEW_RETURN (db,
00455 ACE_Message_Block (sizeof (int),
00456 ACE_Message_Block::MB_IOCTL,
00457 0,
00458 (char *) a),
00459 -1);
00460
00461
00462
00463 ACE_Message_Block *cb = 0;
00464
00465 ACE_NEW_RETURN (cb,
00466 ACE_Message_Block (sizeof ioc,
00467 ACE_Message_Block::MB_IOCTL,
00468 db,
00469 (char *) &ioc),
00470 -1);
00471
00472
00473
00474
00475
00476
00477 if (cb == 0)
00478 {
00479 db->release ();
00480 errno = ENOMEM;
00481 return -1;
00482 }
00483
00484 int result;
00485
00486 if (this->stream_head_->writer ()->put (cb) == -1)
00487 result = -1;
00488 else if (this->stream_head_->reader ()->getq (cb) == -1)
00489 result = -1;
00490 else
00491 result = ((ACE_IO_Cntl_Msg *) cb->rd_ptr ())->rval ();
00492
00493
00494 cb->release ();
00495
00496 return result;
00497 }
00498
00499
00500
00501
00502
00503
00504 template <ACE_SYNCH_DECL> int
00505 ACE_Stream<ACE_SYNCH_USE>::link_i (ACE_Stream<ACE_SYNCH_USE> &us)
00506 {
00507 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link_i");
00508 this->linked_us_ = &us;
00509
00510 us.linked_us_ = this;
00511
00512 ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00513
00514 if (my_tail == 0)
00515 return -1;
00516
00517
00518 while (my_tail->next () != this->stream_tail_)
00519 my_tail = my_tail->next ();
00520
00521 ACE_Module<ACE_SYNCH_USE> *other_tail = us.stream_head_;
00522
00523 if (other_tail == 0)
00524 return -1;
00525
00526
00527 while (other_tail->next () != us.stream_tail_)
00528 other_tail = other_tail->next ();
00529
00530
00531 my_tail->writer ()->next (other_tail->reader ());
00532 other_tail->writer ()->next (my_tail->reader ());
00533 return 0;
00534 }
00535
00536 template <ACE_SYNCH_DECL> int
00537 ACE_Stream<ACE_SYNCH_USE>::link (ACE_Stream<ACE_SYNCH_USE> &us)
00538 {
00539 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link");
00540
00541 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00542
00543 return this->link_i (us);
00544 }
00545
00546
00547
00548 template <ACE_SYNCH_DECL> int
00549 ACE_Stream<ACE_SYNCH_USE>::unlink_i (void)
00550 {
00551 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink_i");
00552
00553
00554
00555 if (this->linked_us_ != 0)
00556 {
00557 ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00558
00559
00560 if (my_tail)
00561 {
00562
00563 while (my_tail->next () != this->stream_tail_)
00564 my_tail = my_tail->next ();
00565
00566
00567 my_tail->writer ()->next (this->stream_tail_->writer ());
00568 }
00569
00570 ACE_Module<ACE_SYNCH_USE> *other_tail =
00571 this->linked_us_->stream_head_;
00572
00573
00574 if (other_tail != 0)
00575 {
00576 while (other_tail->next () != this->linked_us_->stream_tail_)
00577 other_tail = other_tail->next ();
00578
00579 other_tail->writer ()->next (this->linked_us_->stream_tail_->writer ());
00580
00581 }
00582
00583
00584 this->linked_us_->linked_us_ = 0;
00585
00586 this->linked_us_ = 0;
00587 return 0;
00588 }
00589 else
00590 return -1;
00591 }
00592
00593 template <ACE_SYNCH_DECL> int
00594 ACE_Stream<ACE_SYNCH_USE>::unlink (void)
00595 {
00596 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink");
00597 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00598 return this->unlink_i ();
00599 }
00600
00601 template <ACE_SYNCH_DECL>
00602 ACE_Stream<ACE_SYNCH_USE>::ACE_Stream (void * a,
00603 ACE_Module<ACE_SYNCH_USE> *head,
00604 ACE_Module<ACE_SYNCH_USE> *tail)
00605 : linked_us_ (0),
00606 final_close_ (lock_)
00607 {
00608 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::ACE_Stream");
00609 if (this->open (a, head, tail) == -1)
00610 ACE_ERROR ((LM_ERROR,
00611 ACE_TEXT ("ACE_Stream<ACE_SYNCH_USE>::open (%s, %s)\n"),
00612 head->name (), tail->name ()));
00613 }
00614
00615 template <ACE_SYNCH_DECL>
00616 ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream (void)
00617 {
00618 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream");
00619
00620 if (this->stream_head_ != 0)
00621 this->close ();
00622 }
00623
00624 template <ACE_SYNCH_DECL>
00625 ACE_Stream_Iterator<ACE_SYNCH_USE>::ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE> &sr)
00626 : next_ (sr.stream_head_)
00627 {
00628 ACE_TRACE ("ACE_Stream_Iterator<ACE_SYNCH_USE>::ACE_Stream_Iterator");
00629 }
00630
00631 ACE_END_VERSIONED_NAMESPACE_DECL
00632
00633 #endif