ACE_Stream<> Class Template Reference

This class is the primary abstraction for the ASX framework. It is moduled after System V Stream. More...

#include <Stream.h>

List of all members.

Public Types

enum  { M_DELETE = 3 }

Public Member Functions

 ACE_Stream (void *arg=0, ACE_Module< ACE_SYNCH_USE > *head=0, ACE_Module< ACE_SYNCH_USE > *tail=0)
virtual int open (void *arg, ACE_Module< ACE_SYNCH_USE > *head=0, ACE_Module< ACE_SYNCH_USE > *tail=0)
virtual int close (int flags=M_DELETE)
 Close down the stream and release all the resources.

virtual ~ACE_Stream (void)
 Close down the stream and release all the resources.

virtual int push (ACE_Module< ACE_SYNCH_USE > *mod)
virtual int pop (int flags=M_DELETE)
 are invoked to cleanup the tasks.

virtual int top (ACE_Module< ACE_SYNCH_USE > *&mod)
virtual int insert (const ACE_TCHAR *prev_name, ACE_Module< ACE_SYNCH_USE > *mod)
 Insert a new module below the named module .

virtual int replace (const ACE_TCHAR *replace_name, ACE_Module< ACE_SYNCH_USE > *mod, int flags=M_DELETE)
 Replace the named module with a new module .

virtual int remove (const ACE_TCHAR *mod, int flags=M_DELETE)
virtual ACE_Module< ACE_SYNCH_USE > * head (void)
 Return current stream head.

virtual ACE_Module< ACE_SYNCH_USE > * tail (void)
 Return current stream tail.

virtual ACE_Module< ACE_SYNCH_USE > * find (const ACE_TCHAR *mod)
 Find a particular ACE_Module.

virtual int link (ACE_Stream< ACE_SYNCH_USE > &)
 Create a pipe between two Streams.

virtual int unlink (void)
 Remove a pipe formed between two Streams.

virtual int put (ACE_Message_Block *mb, ACE_Time_Value *timeout=0)
virtual int get (ACE_Message_Block *&mb, ACE_Time_Value *timeout=0)
virtual int control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd, void *args)
 Send control message down the stream.

virtual int wait (void)
 Synchronize with the final close of the stream.

virtual void dump (void) const
 Dump the state of an object.


Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.


Private Member Functions

int unlink_i (void)
int link_i (ACE_Stream< ACE_SYNCH_USE > &)
int push_module (ACE_Module< ACE_SYNCH_USE > *, ACE_Module< ACE_SYNCH_USE > *=0, ACE_Module< ACE_SYNCH_USE > *=0)
 Must a new module onto the Stream.


Private Attributes

ACE_Module< ACE_SYNCH_USE > * stream_head_
 Pointer to the head of the stream.

ACE_Module< ACE_SYNCH_USE > * stream_tail_
 Pointer to the tail of the stream.

ACE_Stream< ACE_SYNCH_USE > * linked_us_
 Pointer to an adjoining linked stream.

ACE_SYNCH_MUTEX_T lock_
 Protect the stream against race conditions.

ACE_SYNCH_CONDITION_T final_close_
 Use to tell all threads waiting on the close that we are done.


Friends

class ACE_Stream_Iterator< ACE_SYNCH_USE >


Detailed Description

template<ACE_SYNCH_DECL>
class ACE_Stream<>

This class is the primary abstraction for the ASX framework. It is moduled after System V Stream.

A Stream consists of a stack of , each of which contains two . Even though the methods in this class are virtual, this class isn't really intended for subclassing unless you know what you are doing. In particular, the destructor calls , which won't be overridden properly unless you call it in a subclass destructor.

Definition at line 49 of file Stream.h.


Member Enumeration Documentation

template<ACE_SYNCH_DECL >
anonymous enum
 

Enumeration values:
M_DELETE  Indicates that deletes the Tasks. Don't change this value without updating the same enum in class ACE_Module...

Definition at line 54 of file Stream.h.

00055   {
00056     /// Indicates that <close> deletes the Tasks.  Don't change this
00057     /// value without updating the same enum in class ACE_Module...
00058     M_DELETE = 3
00059   };


Constructor & Destructor Documentation

template<ACE_SYNCH_DECL >
ACE_Stream<>::ACE_Stream void *  arg = 0,
ACE_Module< ACE_SYNCH_USE > *  head = 0,
ACE_Module< ACE_SYNCH_USE > *  tail = 0
 

Create a Stream consisting of and as the Stream head and Stream tail, respectively. If these are 0 then the and are used, respectively. is the value past in to the methods of the tasks.

Definition at line 588 of file Stream.cpp.

References ACE_ERROR, ACE_LIB_TEXT, ACE_TRACE, LM_ERROR, ACE_Module<>::name(), and ACE_Stream<>::open().

00591   : linked_us_ (0),
00592     final_close_ (this->lock_)
00593 {
00594   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::ACE_Stream");
00595   if (this->open (a, head, tail) == -1)
00596     ACE_ERROR ((LM_ERROR,
00597                 ACE_LIB_TEXT ("ACE_Stream<ACE_SYNCH_USE>::open (%s, %s)\n"),
00598                head->name (), tail->name ()));
00599 }

template<ACE_SYNCH_DECL >
ACE_Stream<>::~ACE_Stream void   )  [virtual]
 

Close down the stream and release all the resources.

Definition at line 602 of file Stream.cpp.

References ACE_TRACE, ACE_Stream<>::close(), and ACE_Stream<>::stream_head_.

00603 {
00604   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream");
00605 
00606   if (this->stream_head_ != 0)
00607     this->close ();
00608 }


Member Function Documentation

template<ACE_SYNCH_DECL >
int ACE_Stream<>::close int  flags = M_DELETE  )  [virtual]
 

Close down the stream and release all the resources.

Definition at line 391 of file Stream.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Stream<>::final_close_, ACE_Stream<>::pop(), ACE_Stream<>::stream_head_, ACE_Stream<>::stream_tail_, and ACE_Stream<>::unlink_i().

Referenced by ACE_UPIPE_Stream::close(), ACE_Stream_Type::fini(), and ACE_Stream<>::~ACE_Stream().

00392 {
00393   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::close");
00394   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00395 
00396   if (this->stream_head_ != 0
00397       && this->stream_tail_ != 0)
00398     {
00399       // Don't bother checking return value here.
00400       this->unlink_i ();
00401 
00402       int result = 0;
00403 
00404       // Remove and cleanup all the intermediate modules.
00405 
00406       while (this->stream_head_->next () != this->stream_tail_)
00407         if (this->pop (flags) == -1)
00408           result = -1;
00409 
00410       // Clean up the head and tail of the stream.
00411       if (this->stream_head_->close (flags) == -1)
00412         result = -1;
00413       if (this->stream_tail_->close (flags) == -1)
00414         result = -1;
00415 
00416       // Cleanup the memory.
00417       delete this->stream_head_;
00418       delete this->stream_tail_;
00419 
00420       this->stream_head_ = 0;
00421       this->stream_tail_ = 0;
00422 
00423       // Tell all threads waiting on the close that we are done.
00424       this->final_close_.broadcast ();
00425       return result;
00426     }
00427   return 0;
00428 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::control ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds  cmd,
void *  args
[virtual]
 

Send control message down the stream.

Definition at line 431 of file Stream.cpp.

References ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds, ACE_NEW_RETURN, ACE_TRACE, ACE_Message_Block::rd_ptr(), ACE_Message_Block::release(), and ACE_Stream<>::stream_head_.

00433 {
00434   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::control");
00435   ACE_IO_Cntl_Msg ioc (cmd);
00436 
00437   ACE_Message_Block *db;
00438 
00439   // Try to create a data block that contains the user-supplied data.
00440   ACE_NEW_RETURN (db,
00441                   ACE_Message_Block (sizeof (int),
00442                                      ACE_Message_Block::MB_IOCTL,
00443                                      0,
00444                                      (char *) a),
00445                   -1);
00446   // Try to create a control block <cb> that contains the control
00447   // field and a pointer to the data block <db> in <cb>'s continuation
00448   // field.
00449   ACE_Message_Block *cb = 0;
00450 
00451   ACE_NEW_RETURN (cb,
00452                   ACE_Message_Block (sizeof ioc,
00453                                      ACE_Message_Block::MB_IOCTL,
00454                                      db,
00455                                      (char *) &ioc),
00456                   -1);
00457   // @@ Michael: The old semantic assumed that cb returns == 0
00458   //             if no memory was available. We will now return immediately
00459   //             without release (errno is set to ENOMEM by the macro).
00460 
00461   // If we can't allocate <cb> then we need to delete db and return
00462   // -1.
00463   if (cb == 0)
00464     {
00465       db->release ();
00466       errno = ENOMEM;
00467       return -1;
00468     }
00469 
00470   int result;
00471 
00472   if (this->stream_head_->writer ()->put (cb) == -1)
00473     result = -1;
00474   else if (this->stream_head_->reader ()->getq (cb) == -1)
00475     result = -1;
00476   else
00477     result = ((ACE_IO_Cntl_Msg *) cb->rd_ptr ())->rval ();
00478 
00479   // This will also release db if it's reference count == 0.
00480   cb->release ();
00481 
00482   return result;
00483 }

template<ACE_SYNCH_DECL >
ACE_BEGIN_VERSIONED_NAMESPACE_DECL void ACE_Stream<>::dump void   )  const [virtual]
 

Dump the state of an object.

Definition at line 28 of file Stream.cpp.

References ACE_DEBUG, ACE_LIB_TEXT, ACE_TRACE, ACE_Task<>::dump(), ACE_Stream<>::linked_us_, LM_DEBUG, ACE_Task<>::name(), ACE_Module<>::name(), ACE_Task<>::next(), ACE_Module<>::next(), ACE_Module<>::reader(), ACE_Stream<>::stream_head_, ACE_Stream<>::stream_tail_, and ACE_Module<>::writer().

00029 {
00030 #if defined (ACE_HAS_DUMP)
00031   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::dump");
00032   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------- module links --------\n")));
00033 
00034   for (ACE_Module<ACE_SYNCH_USE> *mp = this->stream_head_;
00035        ;
00036        mp = mp->next ())
00037     {
00038       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("module name = %s\n"), mp->name ()));
00039       if (mp == this->stream_tail_)
00040         break;
00041     }
00042 
00043   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------- writer links --------\n")));
00044 
00045   ACE_Task<ACE_SYNCH_USE> *tp;
00046 
00047   for (tp = this->stream_head_->writer ();
00048        ;
00049        tp = tp->next ())
00050     {
00051       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("writer queue name = %s\n"), tp->name ()));
00052       tp->dump ();
00053       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------\n")));
00054       if (tp == this->stream_tail_->writer ()
00055           || (this->linked_us_
00056               && tp == this->linked_us_->stream_head_->reader ()))
00057         break;
00058     }
00059 
00060   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------- reader links --------\n")));
00061   for (tp = this->stream_tail_->reader (); ; tp = tp->next ())
00062     {
00063       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("reader queue name = %s\n"), tp->name ()));
00064       tp->dump ();
00065       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------\n")));
00066       if (tp == this->stream_head_->reader ()
00067           || (this->linked_us_
00068               && tp == this->linked_us_->stream_head_->writer ()))
00069         break;
00070     }
00071 #endif /* ACE_HAS_DUMP */
00072 }

template<ACE_SYNCH_DECL >
ACE_Module< ACE_SYNCH_USE > * ACE_Stream<>::find const ACE_TCHAR mod  )  [virtual]
 

Find a particular ACE_Module.

Definition at line 268 of file Stream.cpp.

References ACE_TCHAR, ACE_TRACE, ACE_Module<>::name(), ACE_Module<>::next(), ACE_OS::strcmp(), and ACE_Stream<>::stream_head_.

00269 {
00270   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::find");
00271   for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00272        mod != 0;
00273        mod = mod->next ())
00274     if (ACE_OS::strcmp (mod->name (), name) == 0)
00275         return mod;
00276 
00277   return 0;
00278 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::get ACE_Message_Block *&  mb,
ACE_Time_Value timeout = 0
[virtual]
 

Read the message that is stored in the stream head. Wait for upto amount of absolute time for the operation to complete (or block forever if == 0).

Definition at line 94 of file Stream.cpp.

References ACE_TRACE, and ACE_Stream<>::stream_head_.

Referenced by ACE_UPIPE_Stream::recv().

00095 {
00096   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::get");
00097   return this->stream_head_->reader ()->getq (mb, tv);
00098 }

template<ACE_SYNCH_DECL >
ACE_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE ACE_Module< ACE_SYNCH_USE > * ACE_Stream<>::head void   )  [virtual]
 

Return current stream head.

Definition at line 8 of file Stream.inl.

References ACE_TRACE, and ACE_Stream<>::stream_head_.

00009 {
00010   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::head");
00011   return this->stream_head_;
00012 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::insert const ACE_TCHAR prev_name,
ACE_Module< ACE_SYNCH_USE > *  mod
[virtual]
 

Insert a new module below the named module .

Definition at line 117 of file Stream.cpp.

References ACE_TCHAR, ACE_TRACE, ACE_Module<>::arg(), ACE_Module<>::link(), ACE_Module<>::name(), ACE_Module<>::next(), ACE_Task_Base::open(), ACE_Module<>::reader(), ACE_OS::strcmp(), ACE_Stream<>::stream_head_, and ACE_Module<>::writer().

00119 {
00120   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::insert");
00121 
00122   for (ACE_Module<ACE_SYNCH_USE> *prev_mod = this->stream_head_;
00123        prev_mod != 0;
00124        prev_mod = prev_mod->next ())
00125     if (ACE_OS::strcmp (prev_mod->name (), prev_name) == 0)
00126       {
00127         ACE_Module<ACE_SYNCH_USE> *next_mod = prev_mod->next ();
00128 
00129         // We can't insert a module below <stream_tail_>.
00130         if (next_mod == 0)
00131           return -1;
00132 
00133         mod->link (next_mod);
00134         prev_mod->link (mod);
00135 
00136         if (mod->reader ()->open (mod->arg ()) == -1)
00137           return -1;
00138 
00139         if (mod->writer ()->open (mod->arg ()) == -1)
00140           return -1;
00141 
00142         return 0;
00143       }
00144 
00145   return -1;
00146 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::link ACE_Stream< ACE_SYNCH_USE > &   )  [virtual]
 

Create a pipe between two Streams.

Definition at line 523 of file Stream.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, and ACE_Stream<>::link_i().

Referenced by ACE_UPIPE_Acceptor::accept().

00524 {
00525   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link");
00526 
00527   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00528 
00529   return this->link_i (us);
00530 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::link_i ACE_Stream< ACE_SYNCH_USE > &   )  [private]
 

Actually perform the linking of two Streams (must be called with locks held).

Definition at line 491 of file Stream.cpp.

References ACE_TRACE, ACE_Stream<>::linked_us_, ACE_Task< ACE_SYNCH_USE >::next(), ACE_Module<>::next(), ACE_Module<>::reader(), ACE_Stream<>::stream_head_, ACE_Stream<>::stream_tail_, and ACE_Module<>::writer().

Referenced by ACE_Stream<>::link().

00492 {
00493   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link_i");
00494   this->linked_us_ = &us;
00495   // Make sure the other side is also linked to us!
00496   us.linked_us_ = this;
00497 
00498   ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00499 
00500   if (my_tail == 0)
00501     return -1;
00502 
00503   // Locate the module just above our Stream tail.
00504   while (my_tail->next () != this->stream_tail_)
00505     my_tail = my_tail->next ();
00506 
00507   ACE_Module<ACE_SYNCH_USE> *other_tail = us.stream_head_;
00508 
00509   if (other_tail == 0)
00510     return -1;
00511 
00512   // Locate the module just above the other Stream's tail.
00513   while (other_tail->next () != us.stream_tail_)
00514     other_tail = other_tail->next ();
00515 
00516   // Reattach the pointers so that the two streams are linked!
00517   my_tail->writer ()->next (other_tail->reader ());
00518   other_tail->writer ()->next (my_tail->reader ());
00519   return 0;
00520 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::open void *  arg,
ACE_Module< ACE_SYNCH_USE > *  head = 0,
ACE_Module< ACE_SYNCH_USE > *  tail = 0
[virtual]
 

Create a Stream consisting of and as the Stream head and Stream tail, respectively. If these are 0 then the and are used, respectively. is the value past in to the methods of the tasks.

Definition at line 321 of file Stream.cpp.

References ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_NEW_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Stream<>::push_module(), ACE_Stream<>::stream_head_, and ACE_Stream<>::stream_tail_.

Referenced by ACE_Stream<>::ACE_Stream().

00324 {
00325   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::open");
00326   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00327 
00328   ACE_Task<ACE_SYNCH_USE> *h1 = 0, *h2 = 0;
00329   ACE_Task<ACE_SYNCH_USE> *t1 = 0, *t2 = 0;
00330 
00331   if (head == 0)
00332     {
00333       ACE_NEW_RETURN (h1,
00334                       ACE_Stream_Head<ACE_SYNCH_USE>,
00335                       -1);
00336       ACE_NEW_RETURN (h2,
00337                       ACE_Stream_Head<ACE_SYNCH_USE>,
00338                       -1);
00339       ACE_NEW_RETURN (head,
00340                       ACE_Module<ACE_SYNCH_USE> (ACE_LIB_TEXT ("ACE_Stream_Head"),
00341                                                  h1, h2,
00342                                                  a,
00343                                                  M_DELETE),
00344                       -1);
00345     }
00346 
00347   if (tail == 0)
00348     {
00349       ACE_NEW_RETURN (t1,
00350                       ACE_Stream_Tail<ACE_SYNCH_USE>,
00351                       -1);
00352       ACE_NEW_RETURN (t2,
00353                       ACE_Stream_Tail<ACE_SYNCH_USE>,
00354                       -1);
00355       ACE_NEW_RETURN (tail,
00356                       ACE_Module<ACE_SYNCH_USE> (ACE_LIB_TEXT ("ACE_Stream_Tail"),
00357                                                  t1, t2,
00358                                                  a,
00359                                                  M_DELETE),
00360                       -1);
00361     }
00362 
00363   // Make sure *all* the allocation succeeded!
00364   if (head == 0 && (h1 == 0 || h2 == 0)
00365       || tail == 0 && (t1 == 0 || t2 == 0))
00366     {
00367       delete h1;
00368       delete h2;
00369       delete t1;
00370       delete t2;
00371       delete head;
00372       delete tail;
00373       errno = ENOMEM;
00374       return -1;
00375     }
00376 
00377   this->stream_head_ = head;
00378   this->stream_tail_ = tail;
00379 
00380   if (this->push_module (this->stream_tail_) == -1)
00381     return -1;
00382   else if (this->push_module (this->stream_head_,
00383                               this->stream_tail_,
00384                               this->stream_head_) == -1)
00385     return -1;
00386 
00387   return 0;
00388 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::pop int  flags = M_DELETE  )  [virtual]
 

are invoked to cleanup the tasks.

Definition at line 204 of file Stream.cpp.

References ACE_TRACE, ACE_Module<>::close(), ACE_Task< ACE_SYNCH_USE >::next(), ACE_Module<>::next(), ACE_Module<>::reader(), ACE_Stream<>::stream_head_, ACE_Stream<>::stream_tail_, and ACE_Module<>::writer().

Referenced by ACE_Stream<>::close().

00205 {
00206   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::pop");
00207   if (this->stream_head_->next () == this->stream_tail_)
00208     return -1;
00209   else
00210     {
00211       // Skip over the ACE_Stream head.
00212       ACE_Module<ACE_SYNCH_USE> *top_mod = this->stream_head_->next ();
00213       ACE_Module<ACE_SYNCH_USE> *new_top = top_mod->next ();
00214 
00215       this->stream_head_->next (new_top);
00216 
00217       // Close the top ACE_Module.
00218 
00219       top_mod->close (flags);
00220 
00221       // Don't delete the Module unless the flags request this.
00222       if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00223         delete top_mod;
00224 
00225       this->stream_head_->writer ()->next (new_top->writer ());
00226       new_top->reader ()->next (this->stream_head_->reader ());
00227       return 0;
00228     }
00229 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::push ACE_Module< ACE_SYNCH_USE > *  mod  )  [virtual]
 

Add a new module right below the Stream head. The <open()> hook methods of the in this are invoked to initialize the tasks.

Definition at line 75 of file Stream.cpp.

References ACE_TRACE, ACE_Module<>::next(), ACE_Stream<>::push_module(), and ACE_Stream<>::stream_head_.

Referenced by ACE_Stream_Type::push().

00076 {
00077   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push");
00078   if (this->push_module  (new_top,
00079                           this->stream_head_->next (),
00080                           this->stream_head_) == -1)
00081     return -1;
00082   else
00083     return 0;
00084 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::push_module ACE_Module< ACE_SYNCH_USE > *  ,
ACE_Module< ACE_SYNCH_USE > *  = 0,
ACE_Module< ACE_SYNCH_USE > *  = 0
[private]
 

Must a new module onto the Stream.

Definition at line 283 of file Stream.cpp.

References ACE_TRACE, ACE_Module<>::arg(), ACE_Module<>::link(), ACE_Module<>::next(), ACE_Task<>::next(), ACE_Task_Base::open(), ACE_Module<>::reader(), and ACE_Module<>::writer().

Referenced by ACE_Stream<>::open(), and ACE_Stream<>::push().

00286 {
00287   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push_module");
00288   ACE_Task<ACE_SYNCH_USE> *nt_reader = new_top->reader ();
00289   ACE_Task<ACE_SYNCH_USE> *nt_writer = new_top->writer ();
00290   ACE_Task<ACE_SYNCH_USE> *ct_reader = 0;
00291   ACE_Task<ACE_SYNCH_USE> *ct_writer = 0;
00292 
00293   if (current_top)
00294     {
00295       ct_reader = current_top->reader ();
00296       ct_writer = current_top->writer ();
00297       ct_reader->next (nt_reader);
00298     }
00299 
00300   nt_writer->next (ct_writer);
00301 
00302   if (head)
00303     {
00304       if (head != new_top)
00305         head->link (new_top);
00306     }
00307   else
00308     nt_reader->next (0);
00309 
00310   new_top->next (current_top);
00311 
00312   if (nt_reader->open (new_top->arg ()) == -1)
00313     return -1;
00314 
00315   if (nt_writer->open (new_top->arg ()) == -1)
00316     return -1;
00317   return 0;
00318 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::put ACE_Message_Block mb,
ACE_Time_Value timeout = 0
[virtual]
 

Send the message down the stream, starting at the Module below the Stream head. Wait for upto amount of absolute time for the operation to complete (or block forever if == 0).

Definition at line 87 of file Stream.cpp.

References ACE_TRACE, and ACE_Stream<>::stream_head_.

Referenced by ACE_UPIPE_Stream::send().

00088 {
00089   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::put");
00090   return this->stream_head_->writer ()->put (mb, tv);
00091 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::remove const ACE_TCHAR mod,
int  flags = M_DELETE
[virtual]
 

Remove the named module from the stream. This bypasses the strict LIFO ordering of and .

Definition at line 235 of file Stream.cpp.

References ACE_TCHAR, ACE_TRACE, ACE_Module<>::close(), ACE_Module<>::link(), ACE_Module<>::name(), ACE_Module<>::next(), ACE_OS::strcmp(), and ACE_Stream<>::stream_head_.

Referenced by ACE_Stream_Type::fini(), and ACE_Stream_Type::remove().

00237 {
00238   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::remove");
00239   ACE_Module<ACE_SYNCH_USE> *prev = 0;
00240 
00241   for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00242        mod != 0;
00243        mod = mod->next ())
00244     if (ACE_OS::strcmp (mod->name (), name) == 0)
00245       {
00246         if (prev == 0) // Deleting ACE_Stream Head
00247           this->stream_head_->link (mod->next ());
00248         else
00249           prev->link (mod->next ());
00250 
00251         // Don't delete the Module unless the flags request this.
00252         if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00253           {
00254             // Close down the module and release the memory.
00255             mod->close (flags);
00256             delete mod;
00257           }
00258 
00259         return 0;
00260       }
00261     else
00262       prev = mod;
00263 
00264   return -1;
00265 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::replace const ACE_TCHAR replace_name,
ACE_Module< ACE_SYNCH_USE > *  mod,
int  flags = M_DELETE
[virtual]
 

Replace the named module with a new module .

Definition at line 149 of file Stream.cpp.

References ACE_TCHAR, ACE_TRACE, ACE_Module<>::arg(), ACE_Module<>::close(), ACE_Module<>::link(), ACE_Module<>::name(), ACE_Task< ACE_SYNCH_USE >::next(), ACE_Module<>::next(), ACE_Task_Base::open(), ACE_Module<>::reader(), ACE_OS::strcmp(), ACE_Stream<>::stream_head_, ACE_Stream<>::stream_tail_, and ACE_Module<>::writer().

00152 {
00153   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::replace");
00154   ACE_Module<ACE_SYNCH_USE> *prev_mod = 0;
00155 
00156   for (ACE_Module<ACE_SYNCH_USE> *rep_mod = this->stream_head_;
00157        rep_mod != 0;
00158        rep_mod = rep_mod->next ())
00159     if (ACE_OS::strcmp (rep_mod->name (), replace_name) == 0)
00160       {
00161         ACE_Module<ACE_SYNCH_USE> *next_mod = rep_mod->next ();
00162 
00163         if (next_mod)
00164           mod->link (next_mod);
00165         else // In case the <next_mod> is <stream_tail_>.
00166           {
00167             mod->writer ()->next (0);
00168             mod->next (0);
00169             this->stream_tail_ = mod;
00170           }
00171 
00172         if (prev_mod)
00173           prev_mod->link (mod);
00174         else // In case the <rep_mod> is <stream_head_>.
00175           {
00176             mod->reader ()->next (0);
00177             this->stream_head_ = mod;
00178           }
00179 
00180         if (mod->reader ()->open (mod->arg ()) == -1)
00181           return -1;
00182 
00183         if (mod->writer ()->open (mod->arg ()) == -1)
00184           return -1;
00185 
00186         if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00187           {
00188             rep_mod->close (flags);
00189             delete rep_mod;
00190           }
00191 
00192         return 0;
00193       }
00194     else
00195       prev_mod = rep_mod;
00196 
00197   return -1;
00198 }

template<ACE_SYNCH_DECL >
ACE_INLINE ACE_Module< ACE_SYNCH_USE > * ACE_Stream<>::tail void   )  [virtual]
 

Return current stream tail.

Definition at line 15 of file Stream.inl.

References ACE_TRACE, and ACE_Stream<>::stream_tail_.

00016 {
00017   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::tail");
00018   return this->stream_tail_;
00019 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::top ACE_Module< ACE_SYNCH_USE > *&  mod  )  [virtual]
 

Return the top module on the stream (right below the stream head).

Definition at line 104 of file Stream.cpp.

References ACE_TRACE, ACE_Stream<>::stream_head_, and ACE_Stream<>::stream_tail_.

00105 {
00106   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::top");
00107   if (this->stream_head_->next () == this->stream_tail_)
00108     return -1;
00109   else
00110     {
00111       m = this->stream_head_->next ();
00112       return 0;
00113     }
00114 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::unlink void   )  [virtual]
 

Remove a pipe formed between two Streams.

Definition at line 580 of file Stream.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, and ACE_Stream<>::unlink_i().

00581 {
00582   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink");
00583   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00584   return this->unlink_i ();
00585 }

template<ACE_SYNCH_DECL >
int ACE_Stream<>::unlink_i void   )  [private]
 

Actually perform the unlinking of two Streams (must be called with locks held).

Definition at line 535 of file Stream.cpp.

References ACE_TRACE, ACE_Stream<>::linked_us_, ACE_Task< ACE_SYNCH_USE >::next(), ACE_Module<>::next(), ACE_Stream<>::stream_head_, ACE_Stream<>::stream_tail_, and ACE_Module<>::writer().

Referenced by ACE_Stream<>::close(), and ACE_Stream<>::unlink().

00536 {
00537   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink_i");
00538 
00539   // Only try to unlink if we are in fact still linked!
00540 
00541   if (this->linked_us_ != 0)
00542     {
00543       ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00544 
00545       // Only relink if we still exist!
00546       if (my_tail)
00547         {
00548           // Find the module that's just before our stream tail.
00549           while (my_tail->next () != this->stream_tail_)
00550             my_tail = my_tail->next ();
00551 
00552           // Restore the writer's next() link to our tail.
00553           my_tail->writer ()->next (this->stream_tail_->writer ());
00554         }
00555 
00556       ACE_Module<ACE_SYNCH_USE> *other_tail =
00557         this->linked_us_->stream_head_;
00558 
00559       // Only fiddle with the other side if it in fact still remains.
00560       if (other_tail != 0)
00561         {
00562           while (other_tail->next () != this->linked_us_->stream_tail_)
00563             other_tail = other_tail->next ();
00564 
00565           other_tail->writer ()->next (this->linked_us_->stream_tail_->writer ());
00566 
00567         }
00568 
00569       // Make sure the other side is also aware that it's been unlinked!
00570       this->linked_us_->linked_us_ = 0;
00571 
00572       this->linked_us_ = 0;
00573       return 0;
00574     }
00575   else
00576     return -1;
00577 }

template<ACE_SYNCH_DECL >
ACE_INLINE int ACE_Stream<>::wait void   )  [virtual]
 

Synchronize with the final close of the stream.

Definition at line 22 of file Stream.inl.

References ACE_TRACE, and ACE_Stream<>::final_close_.

00023 {
00024   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::wait");
00025   return this->final_close_.wait ();
00026 }


Friends And Related Function Documentation

template<ACE_SYNCH_DECL >
friend class ACE_Stream_Iterator< ACE_SYNCH_USE > [friend]
 

Definition at line 52 of file Stream.h.


Member Data Documentation

template<ACE_SYNCH_DECL >
ACE_Stream<>::ACE_ALLOC_HOOK_DECLARE
 

Declare the dynamic allocation hooks.

Definition at line 162 of file Stream.h.

template<ACE_SYNCH_DECL >
ACE_SYNCH_CONDITION_T ACE_Stream<>::final_close_ [private]
 

Use to tell all threads waiting on the close that we are done.

Definition at line 192 of file Stream.h.

Referenced by ACE_Stream<>::close(), and ACE_Stream<>::wait().

template<ACE_SYNCH_DECL >
ACE_Stream<ACE_SYNCH_USE>* ACE_Stream<>::linked_us_ [private]
 

Pointer to an adjoining linked stream.

Definition at line 185 of file Stream.h.

Referenced by ACE_Stream<>::dump(), ACE_Stream<>::link_i(), and ACE_Stream<>::unlink_i().

template<ACE_SYNCH_DECL >
ACE_SYNCH_MUTEX_T ACE_Stream<>::lock_ [private]
 

Protect the stream against race conditions.

Definition at line 189 of file Stream.h.

template<ACE_SYNCH_DECL >
ACE_Module<ACE_SYNCH_USE>* ACE_Stream<>::stream_head_ [private]
 

Pointer to the head of the stream.

Definition at line 179 of file Stream.h.

Referenced by ACE_Stream<>::close(), ACE_Stream<>::control(), ACE_Stream<>::dump(), ACE_Stream<>::find(), ACE_Stream<>::get(), ACE_Stream<>::head(), ACE_Stream<>::insert(), ACE_Stream<>::link_i(), ACE_Stream<>::open(), ACE_Stream<>::pop(), ACE_Stream<>::push(), ACE_Stream<>::put(), ACE_Stream<>::remove(), ACE_Stream<>::replace(), ACE_Stream<>::top(), ACE_Stream<>::unlink_i(), and ACE_Stream<>::~ACE_Stream().

template<ACE_SYNCH_DECL >
ACE_Module<ACE_SYNCH_USE>* ACE_Stream<>::stream_tail_ [private]
 

Pointer to the tail of the stream.

Definition at line 182 of file Stream.h.

Referenced by ACE_Stream<>::close(), ACE_Stream<>::dump(), ACE_Stream<>::link_i(), ACE_Stream<>::open(), ACE_Stream<>::pop(), ACE_Stream<>::replace(), ACE_Stream<>::tail(), ACE_Stream<>::top(), and ACE_Stream<>::unlink_i().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 11:30:16 2006 for ACE by doxygen 1.3.6