00001
00002
00003
00004 #ifndef ACE_STREAM_CPP
00005 #define ACE_STREAM_CPP
00006
00007
00008 #include "ace/Stream.h"
00009
00010 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00011 # pragma once
00012 #endif
00013
00014 #include "ace/Stream_Modules.h"
00015 #include "ace/OS_NS_string.h"
00016
00017 #if !defined (__ACE_INLINE__)
00018 #include "ace/Stream.inl"
00019 #endif
00020
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Stream)
00024
00025
00026
00027 template <ACE_SYNCH_DECL> void
00028 ACE_Stream<ACE_SYNCH_USE>::dump (void) const
00029 {
00030 #if defined (ACE_HAS_DUMP)
00031 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::dump");
00032 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("-------- module links --------\n")));
00033
00034 for (ACE_Module<ACE_SYNCH_USE> *mp = this->stream_head_;
00035 ;
00036 mp = mp->next ())
00037 {
00038 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("module name = %s\n"), mp->name ()));
00039 if (mp == this->stream_tail_)
00040 break;
00041 }
00042
00043 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("-------- writer links --------\n")));
00044
00045 ACE_Task<ACE_SYNCH_USE> *tp;
00046
00047 for (tp = this->stream_head_->writer ();
00048 ;
00049 tp = tp->next ())
00050 {
00051 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("writer queue name = %s\n"), tp->name ()));
00052 tp->dump ();
00053 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("-------\n")));
00054 if (tp == this->stream_tail_->writer ()
00055 || (this->linked_us_
00056 && tp == this->linked_us_->stream_head_->reader ()))
00057 break;
00058 }
00059
00060 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("-------- reader links --------\n")));
00061 for (tp = this->stream_tail_->reader (); ; tp = tp->next ())
00062 {
00063 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("reader queue name = %s\n"), tp->name ()));
00064 tp->dump ();
00065 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("-------\n")));
00066 if (tp == this->stream_head_->reader ()
00067 || (this->linked_us_
00068 && tp == this->linked_us_->stream_head_->writer ()))
00069 break;
00070 }
00071 #endif
00072 }
00073
00074 template <ACE_SYNCH_DECL> int
00075 ACE_Stream<ACE_SYNCH_USE>::push (ACE_Module<ACE_SYNCH_USE> *new_top)
00076 {
00077 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push");
00078 if (this->push_module (new_top,
00079 this->stream_head_->next (),
00080 this->stream_head_) == -1)
00081 return -1;
00082 else
00083 return 0;
00084 }
00085
00086 template <ACE_SYNCH_DECL> int
00087 ACE_Stream<ACE_SYNCH_USE>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
00088 {
00089 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::put");
00090 return this->stream_head_->writer ()->put (mb, tv);
00091 }
00092
00093 template <ACE_SYNCH_DECL> int
00094 ACE_Stream<ACE_SYNCH_USE>::get (ACE_Message_Block *&mb, ACE_Time_Value *tv)
00095 {
00096 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::get");
00097 return this->stream_head_->reader ()->getq (mb, tv);
00098 }
00099
00100
00101
00102
00103 template <ACE_SYNCH_DECL> int
00104 ACE_Stream<ACE_SYNCH_USE>::top (ACE_Module<ACE_SYNCH_USE> *&m)
00105 {
00106 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::top");
00107 if (this->stream_head_->next () == this->stream_tail_)
00108 return -1;
00109 else
00110 {
00111 m = this->stream_head_->next ();
00112 return 0;
00113 }
00114 }
00115
00116 template <ACE_SYNCH_DECL> int
00117 ACE_Stream<ACE_SYNCH_USE>::insert (const ACE_TCHAR *prev_name,
00118 ACE_Module<ACE_SYNCH_USE> *mod)
00119 {
00120 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::insert");
00121
00122 for (ACE_Module<ACE_SYNCH_USE> *prev_mod = this->stream_head_;
00123 prev_mod != 0;
00124 prev_mod = prev_mod->next ())
00125 if (ACE_OS::strcmp (prev_mod->name (), prev_name) == 0)
00126 {
00127 ACE_Module<ACE_SYNCH_USE> *next_mod = prev_mod->next ();
00128
00129
00130 if (next_mod == 0)
00131 return -1;
00132
00133 mod->link (next_mod);
00134 prev_mod->link (mod);
00135
00136 if (mod->reader ()->open (mod->arg ()) == -1)
00137 return -1;
00138
00139 if (mod->writer ()->open (mod->arg ()) == -1)
00140 return -1;
00141
00142 return 0;
00143 }
00144
00145 return -1;
00146 }
00147
00148 template <ACE_SYNCH_DECL> int
00149 ACE_Stream<ACE_SYNCH_USE>::replace (const ACE_TCHAR *replace_name,
00150 ACE_Module<ACE_SYNCH_USE> *mod,
00151 int flags)
00152 {
00153 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::replace");
00154 ACE_Module<ACE_SYNCH_USE> *prev_mod = 0;
00155
00156 for (ACE_Module<ACE_SYNCH_USE> *rep_mod = this->stream_head_;
00157 rep_mod != 0;
00158 rep_mod = rep_mod->next ())
00159 if (ACE_OS::strcmp (rep_mod->name (), replace_name) == 0)
00160 {
00161 ACE_Module<ACE_SYNCH_USE> *next_mod = rep_mod->next ();
00162
00163 if (next_mod)
00164 mod->link (next_mod);
00165 else
00166 {
00167 mod->writer ()->next (0);
00168 mod->next (0);
00169 this->stream_tail_ = mod;
00170 }
00171
00172 if (prev_mod)
00173 prev_mod->link (mod);
00174 else
00175 {
00176 mod->reader ()->next (0);
00177 this->stream_head_ = mod;
00178 }
00179
00180 if (mod->reader ()->open (mod->arg ()) == -1)
00181 return -1;
00182
00183 if (mod->writer ()->open (mod->arg ()) == -1)
00184 return -1;
00185
00186 if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00187 {
00188 rep_mod->close (flags);
00189 delete rep_mod;
00190 }
00191
00192 return 0;
00193 }
00194 else
00195 prev_mod = rep_mod;
00196
00197 return -1;
00198 }
00199
00200
00201
00202
00203 template <ACE_SYNCH_DECL> int
00204 ACE_Stream<ACE_SYNCH_USE>::pop (int flags)
00205 {
00206 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::pop");
00207 if (this->stream_head_->next () == this->stream_tail_)
00208 return -1;
00209 else
00210 {
00211
00212 ACE_Module<ACE_SYNCH_USE> *top_mod = this->stream_head_->next ();
00213 ACE_Module<ACE_SYNCH_USE> *new_top = top_mod->next ();
00214
00215 this->stream_head_->next (new_top);
00216
00217
00218
00219 top_mod->close (flags);
00220
00221
00222 if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00223 delete top_mod;
00224
00225 this->stream_head_->writer ()->next (new_top->writer ());
00226 new_top->reader ()->next (this->stream_head_->reader ());
00227 return 0;
00228 }
00229 }
00230
00231
00232
00233
00234 template <ACE_SYNCH_DECL> int
00235 ACE_Stream<ACE_SYNCH_USE>::remove (const ACE_TCHAR *name,
00236 int flags)
00237 {
00238 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::remove");
00239 ACE_Module<ACE_SYNCH_USE> *prev = 0;
00240
00241 for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00242 mod != 0;
00243 mod = mod->next ())
00244 if (ACE_OS::strcmp (mod->name (), name) == 0)
00245 {
00246 if (prev == 0)
00247 this->stream_head_->link (mod->next ());
00248 else
00249 prev->link (mod->next ());
00250
00251
00252 if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00253 {
00254
00255 mod->close (flags);
00256 delete mod;
00257 }
00258
00259 return 0;
00260 }
00261 else
00262 prev = mod;
00263
00264 return -1;
00265 }
00266
00267 template <ACE_SYNCH_DECL> ACE_Module<ACE_SYNCH_USE> *
00268 ACE_Stream<ACE_SYNCH_USE>::find (const ACE_TCHAR *name)
00269 {
00270 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::find");
00271 for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00272 mod != 0;
00273 mod = mod->next ())
00274 if (ACE_OS::strcmp (mod->name (), name) == 0)
00275 return mod;
00276
00277 return 0;
00278 }
00279
00280
00281
00282 template <ACE_SYNCH_DECL> int
00283 ACE_Stream<ACE_SYNCH_USE>::push_module (ACE_Module<ACE_SYNCH_USE> *new_top,
00284 ACE_Module<ACE_SYNCH_USE> *current_top,
00285 ACE_Module<ACE_SYNCH_USE> *head)
00286 {
00287 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push_module");
00288 ACE_Task<ACE_SYNCH_USE> *nt_reader = new_top->reader ();
00289 ACE_Task<ACE_SYNCH_USE> *nt_writer = new_top->writer ();
00290 ACE_Task<ACE_SYNCH_USE> *ct_reader = 0;
00291 ACE_Task<ACE_SYNCH_USE> *ct_writer = 0;
00292
00293 if (current_top)
00294 {
00295 ct_reader = current_top->reader ();
00296 ct_writer = current_top->writer ();
00297 ct_reader->next (nt_reader);
00298 }
00299
00300 nt_writer->next (ct_writer);
00301
00302 if (head)
00303 {
00304 if (head != new_top)
00305 head->link (new_top);
00306 }
00307 else
00308 nt_reader->next (0);
00309
00310 new_top->next (current_top);
00311
00312 if (nt_reader->open (new_top->arg ()) == -1)
00313 return -1;
00314
00315 if (nt_writer->open (new_top->arg ()) == -1)
00316 return -1;
00317 return 0;
00318 }
00319
00320 template <ACE_SYNCH_DECL> int
00321 ACE_Stream<ACE_SYNCH_USE>::open (void *a,
00322 ACE_Module<ACE_SYNCH_USE> *head,
00323 ACE_Module<ACE_SYNCH_USE> *tail)
00324 {
00325 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::open");
00326 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00327
00328 ACE_Task<ACE_SYNCH_USE> *h1 = 0, *h2 = 0;
00329 ACE_Task<ACE_SYNCH_USE> *t1 = 0, *t2 = 0;
00330
00331 if (head == 0)
00332 {
00333 ACE_NEW_RETURN (h1,
00334 ACE_Stream_Head<ACE_SYNCH_USE>,
00335 -1);
00336 ACE_NEW_RETURN (h2,
00337 ACE_Stream_Head<ACE_SYNCH_USE>,
00338 -1);
00339 ACE_NEW_RETURN (head,
00340 ACE_Module<ACE_SYNCH_USE> (ACE_LIB_TEXT ("ACE_Stream_Head"),
00341 h1, h2,
00342 a,
00343 M_DELETE),
00344 -1);
00345 }
00346
00347 if (tail == 0)
00348 {
00349 ACE_NEW_RETURN (t1,
00350 ACE_Stream_Tail<ACE_SYNCH_USE>,
00351 -1);
00352 ACE_NEW_RETURN (t2,
00353 ACE_Stream_Tail<ACE_SYNCH_USE>,
00354 -1);
00355 ACE_NEW_RETURN (tail,
00356 ACE_Module<ACE_SYNCH_USE> (ACE_LIB_TEXT ("ACE_Stream_Tail"),
00357 t1, t2,
00358 a,
00359 M_DELETE),
00360 -1);
00361 }
00362
00363
00364 if (head == 0 && (h1 == 0 || h2 == 0)
00365 || tail == 0 && (t1 == 0 || t2 == 0))
00366 {
00367 delete h1;
00368 delete h2;
00369 delete t1;
00370 delete t2;
00371 delete head;
00372 delete tail;
00373 errno = ENOMEM;
00374 return -1;
00375 }
00376
00377 this->stream_head_ = head;
00378 this->stream_tail_ = tail;
00379
00380 if (this->push_module (this->stream_tail_) == -1)
00381 return -1;
00382 else if (this->push_module (this->stream_head_,
00383 this->stream_tail_,
00384 this->stream_head_) == -1)
00385 return -1;
00386
00387 return 0;
00388 }
00389
00390 template <ACE_SYNCH_DECL> int
00391 ACE_Stream<ACE_SYNCH_USE>::close (int flags)
00392 {
00393 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::close");
00394 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00395
00396 if (this->stream_head_ != 0
00397 && this->stream_tail_ != 0)
00398 {
00399
00400 this->unlink_i ();
00401
00402 int result = 0;
00403
00404
00405
00406 while (this->stream_head_->next () != this->stream_tail_)
00407 if (this->pop (flags) == -1)
00408 result = -1;
00409
00410
00411 if (this->stream_head_->close (flags) == -1)
00412 result = -1;
00413 if (this->stream_tail_->close (flags) == -1)
00414 result = -1;
00415
00416
00417 delete this->stream_head_;
00418 delete this->stream_tail_;
00419
00420 this->stream_head_ = 0;
00421 this->stream_tail_ = 0;
00422
00423
00424 this->final_close_.broadcast ();
00425 return result;
00426 }
00427 return 0;
00428 }
00429
00430 template <ACE_SYNCH_DECL> int
00431 ACE_Stream<ACE_SYNCH_USE>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
00432 void *a)
00433 {
00434 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::control");
00435 ACE_IO_Cntl_Msg ioc (cmd);
00436
00437 ACE_Message_Block *db;
00438
00439
00440 ACE_NEW_RETURN (db,
00441 ACE_Message_Block (sizeof (int),
00442 ACE_Message_Block::MB_IOCTL,
00443 0,
00444 (char *) a),
00445 -1);
00446
00447
00448
00449 ACE_Message_Block *cb = 0;
00450
00451 ACE_NEW_RETURN (cb,
00452 ACE_Message_Block (sizeof ioc,
00453 ACE_Message_Block::MB_IOCTL,
00454 db,
00455 (char *) &ioc),
00456 -1);
00457
00458
00459
00460
00461
00462
00463 if (cb == 0)
00464 {
00465 db->release ();
00466 errno = ENOMEM;
00467 return -1;
00468 }
00469
00470 int result;
00471
00472 if (this->stream_head_->writer ()->put (cb) == -1)
00473 result = -1;
00474 else if (this->stream_head_->reader ()->getq (cb) == -1)
00475 result = -1;
00476 else
00477 result = ((ACE_IO_Cntl_Msg *) cb->rd_ptr ())->rval ();
00478
00479
00480 cb->release ();
00481
00482 return result;
00483 }
00484
00485
00486
00487
00488
00489
00490 template <ACE_SYNCH_DECL> int
00491 ACE_Stream<ACE_SYNCH_USE>::link_i (ACE_Stream<ACE_SYNCH_USE> &us)
00492 {
00493 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link_i");
00494 this->linked_us_ = &us;
00495
00496 us.linked_us_ = this;
00497
00498 ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00499
00500 if (my_tail == 0)
00501 return -1;
00502
00503
00504 while (my_tail->next () != this->stream_tail_)
00505 my_tail = my_tail->next ();
00506
00507 ACE_Module<ACE_SYNCH_USE> *other_tail = us.stream_head_;
00508
00509 if (other_tail == 0)
00510 return -1;
00511
00512
00513 while (other_tail->next () != us.stream_tail_)
00514 other_tail = other_tail->next ();
00515
00516
00517 my_tail->writer ()->next (other_tail->reader ());
00518 other_tail->writer ()->next (my_tail->reader ());
00519 return 0;
00520 }
00521
00522 template <ACE_SYNCH_DECL> int
00523 ACE_Stream<ACE_SYNCH_USE>::link (ACE_Stream<ACE_SYNCH_USE> &us)
00524 {
00525 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link");
00526
00527 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00528
00529 return this->link_i (us);
00530 }
00531
00532
00533
00534 template <ACE_SYNCH_DECL> int
00535 ACE_Stream<ACE_SYNCH_USE>::unlink_i (void)
00536 {
00537 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink_i");
00538
00539
00540
00541 if (this->linked_us_ != 0)
00542 {
00543 ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00544
00545
00546 if (my_tail)
00547 {
00548
00549 while (my_tail->next () != this->stream_tail_)
00550 my_tail = my_tail->next ();
00551
00552
00553 my_tail->writer ()->next (this->stream_tail_->writer ());
00554 }
00555
00556 ACE_Module<ACE_SYNCH_USE> *other_tail =
00557 this->linked_us_->stream_head_;
00558
00559
00560 if (other_tail != 0)
00561 {
00562 while (other_tail->next () != this->linked_us_->stream_tail_)
00563 other_tail = other_tail->next ();
00564
00565 other_tail->writer ()->next (this->linked_us_->stream_tail_->writer ());
00566
00567 }
00568
00569
00570 this->linked_us_->linked_us_ = 0;
00571
00572 this->linked_us_ = 0;
00573 return 0;
00574 }
00575 else
00576 return -1;
00577 }
00578
00579 template <ACE_SYNCH_DECL> int
00580 ACE_Stream<ACE_SYNCH_USE>::unlink (void)
00581 {
00582 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink");
00583 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00584 return this->unlink_i ();
00585 }
00586
00587 template <ACE_SYNCH_DECL>
00588 ACE_Stream<ACE_SYNCH_USE>::ACE_Stream (void * a,
00589 ACE_Module<ACE_SYNCH_USE> *head,
00590 ACE_Module<ACE_SYNCH_USE> *tail)
00591 : linked_us_ (0),
00592 final_close_ (this->lock_)
00593 {
00594 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::ACE_Stream");
00595 if (this->open (a, head, tail) == -1)
00596 ACE_ERROR ((LM_ERROR,
00597 ACE_LIB_TEXT ("ACE_Stream<ACE_SYNCH_USE>::open (%s, %s)\n"),
00598 head->name (), tail->name ()));
00599 }
00600
00601 template <ACE_SYNCH_DECL>
00602 ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream (void)
00603 {
00604 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream");
00605
00606 if (this->stream_head_ != 0)
00607 this->close ();
00608 }
00609
00610 template <ACE_SYNCH_DECL>
00611 ACE_Stream_Iterator<ACE_SYNCH_USE>::ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE> &sr)
00612 : next_ (sr.stream_head_)
00613 {
00614 ACE_TRACE ("ACE_Stream_Iterator<ACE_SYNCH_USE>::ACE_Stream_Iterator");
00615 }
00616
00617 ACE_END_VERSIONED_NAMESPACE_DECL
00618
00619 #endif