00001 // -*- C++ -*- 00002 00003 //========================================================================== 00004 /** 00005 * @file Stream.h 00006 * 00007 * $Id: Stream.h 80826 2008-03-04 14:51:23Z wotte $ 00008 * 00009 * @author Douglas C. Schmidt <schmidt@uci.edu> 00010 */ 00011 //========================================================================== 00012 00013 #ifndef ACE_STREAM_H 00014 #define ACE_STREAM_H 00015 00016 #include /**/ "ace/pre.h" 00017 00018 #include /**/ "ace/config-all.h" 00019 00020 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00021 # pragma once 00022 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00023 00024 #include "ace/IO_Cntl_Msg.h" 00025 #include "ace/Message_Block.h" 00026 #include "ace/Module.h" 00027 00028 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00029 00030 // Forward decls. 00031 template<ACE_SYNCH_DECL> class ACE_Stream_Iterator; 00032 class ACE_Time_Value; 00033 00034 /** 00035 * @class ACE_Stream 00036 * 00037 * @brief This class is the primary abstraction for the ASX framework. 00038 * It is moduled after System V Stream. 00039 * 00040 * A Stream consists of a stack of <ACE_Modules>, each of which 00041 * contains two <ACE_Tasks>. Even though the methods in this 00042 * class are virtual, this class isn't really intended for 00043 * subclassing unless you know what you are doing. In 00044 * particular, the ACE_Stream destructor calls <close>, which 00045 * won't be overridden properly unless you call it in a subclass 00046 * destructor. 00047 */ 00048 template <ACE_SYNCH_DECL> 00049 class ACE_Stream 00050 { 00051 public: 00052 friend class ACE_Stream_Iterator<ACE_SYNCH_USE>; 00053 00054 enum 00055 { 00056 /// Indicates that <close> deletes the Tasks. Don't change this 00057 /// value without updating the same enum in class ACE_Module... 00058 M_DELETE = 3 00059 }; 00060 00061 // = Initializatation and termination methods. 00062 /** 00063 * Create a Stream consisting of <head> and <tail> as the Stream 00064 * head and Stream tail, respectively. If these are 0 then the 00065 * <ACE_Stream_Head> and <ACE_Stream_Tail> are used, respectively. 00066 * <arg> is the value past in to the <open> methods of the tasks. 00067 */ 00068 ACE_Stream (void *arg = 0, 00069 ACE_Module<ACE_SYNCH_USE> *head = 0, 00070 ACE_Module<ACE_SYNCH_USE> *tail = 0); 00071 00072 /** 00073 * Create a Stream consisting of <head> and <tail> as the Stream 00074 * head and Stream tail, respectively. If these are 0 then the 00075 * <ACE_Stream_Head> and <ACE_Stream_Tail> are used, respectively. 00076 * <arg> is the value past in to the <open> methods of the tasks. 00077 */ 00078 virtual int open (void *arg, 00079 ACE_Module<ACE_SYNCH_USE> *head = 0, 00080 ACE_Module<ACE_SYNCH_USE> *tail = 0); 00081 00082 /// Close down the stream and release all the resources. 00083 virtual int close (int flags = M_DELETE); 00084 00085 /// Close down the stream and release all the resources. 00086 virtual ~ACE_Stream (void); 00087 00088 // = ACE_Stream plumbing operations 00089 00090 /// Add a new module <mod> right below the Stream head. The 00091 /// <open()> hook methods of the <ACE_Tasks> in this ACE_Module 00092 /// are invoked to initialize the tasks. 00093 virtual int push (ACE_Module<ACE_SYNCH_USE> *mod); 00094 00095 /// Remove the <mod> right below the Stream head and close it down. 00096 // The <close()> hook methods of the <ACE_Tasks> in this ACE_Module 00097 /// are invoked to cleanup the tasks. 00098 virtual int pop (int flags = M_DELETE); 00099 00100 /// Return the top module on the stream (right below the stream 00101 /// head). 00102 virtual int top (ACE_Module<ACE_SYNCH_USE> *&mod); 00103 00104 /// Insert a new module <mod> below the named module <prev_name>. 00105 virtual int insert (const ACE_TCHAR *prev_name, 00106 ACE_Module<ACE_SYNCH_USE> *mod); 00107 00108 /// Replace the named module <replace_name> with a new module <mod>. 00109 virtual int replace (const ACE_TCHAR *replace_name, 00110 ACE_Module<ACE_SYNCH_USE> *mod, 00111 int flags = M_DELETE); 00112 00113 /// Remove the named module <mod> from the stream. This bypasses the 00114 /// strict LIFO ordering of <push> and <pop>. 00115 virtual int remove (const ACE_TCHAR *mod, 00116 int flags = M_DELETE); 00117 00118 /// Return current stream head. 00119 virtual ACE_Module<ACE_SYNCH_USE> *head (void); 00120 00121 /// Return current stream tail. 00122 virtual ACE_Module<ACE_SYNCH_USE> *tail (void); 00123 00124 /// Find a particular ACE_Module. 00125 virtual ACE_Module<ACE_SYNCH_USE> *find (const ACE_TCHAR *mod); 00126 00127 /// Create a pipe between two Streams. 00128 virtual int link (ACE_Stream<ACE_SYNCH_USE> &); 00129 00130 /// Remove a pipe formed between two Streams. 00131 virtual int unlink (void); 00132 00133 // = Blocking data transfer operations 00134 /** 00135 * Send the message @a mb down the stream, starting at the Module 00136 * below the Stream head. Wait for upto @a timeout amount of 00137 * absolute time for the operation to complete (or block forever if 00138 * @a timeout == 0). 00139 */ 00140 virtual int put (ACE_Message_Block *mb, 00141 ACE_Time_Value *timeout = 0); 00142 00143 /** 00144 * Read the message @a mb that is stored in the stream head. 00145 * Wait for upto @a timeout amount of absolute time for the operation 00146 * to complete (or block forever if @a timeout == 0). 00147 */ 00148 virtual int get (ACE_Message_Block *&mb, 00149 ACE_Time_Value *timeout = 0); 00150 00151 /// Send control message down the stream. 00152 virtual int control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd, 00153 void *args); 00154 00155 /// Synchronize with the final close of the stream. 00156 virtual int wait (void); 00157 00158 /// Dump the state of an object. 00159 virtual void dump (void) const; 00160 00161 /// Declare the dynamic allocation hooks. 00162 ACE_ALLOC_HOOK_DECLARE; 00163 00164 private: 00165 /// Actually perform the unlinking of two Streams (must be called 00166 /// with locks held). 00167 int unlink_i (void); 00168 00169 /// Actually perform the linking of two Streams (must be called with 00170 /// locks held). 00171 int link_i (ACE_Stream<ACE_SYNCH_USE> &); 00172 00173 /// Must a new module onto the Stream. 00174 int push_module (ACE_Module<ACE_SYNCH_USE> *, 00175 ACE_Module<ACE_SYNCH_USE> * = 0, 00176 ACE_Module<ACE_SYNCH_USE> * = 0); 00177 00178 /// Pointer to the head of the stream. 00179 ACE_Module<ACE_SYNCH_USE> *stream_head_; 00180 00181 /// Pointer to the tail of the stream. 00182 ACE_Module<ACE_SYNCH_USE> *stream_tail_; 00183 00184 /// Pointer to an adjoining linked stream. 00185 ACE_Stream<ACE_SYNCH_USE> *linked_us_; 00186 00187 // = Synchronization objects used for thread-safe streams. 00188 /// Protect the stream against race conditions. 00189 ACE_SYNCH_MUTEX_T lock_; 00190 00191 /// Use to tell all threads waiting on the close that we are done. 00192 ACE_SYNCH_CONDITION_T final_close_; 00193 }; 00194 00195 /** 00196 * @class ACE_Stream_Iterator 00197 * 00198 * @brief Iterate through an ACE_Stream. 00199 */ 00200 template <ACE_SYNCH_DECL> 00201 class ACE_Stream_Iterator 00202 { 00203 public: 00204 // = Initialization method. 00205 ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE> &sr); 00206 00207 // = Iteration methods. 00208 00209 /// Pass back the <next_item> that hasn't been seen in the set. 00210 /// Returns 0 when all items have been seen, else 1. 00211 int next (const ACE_Module<ACE_SYNCH_USE> *&next_item); 00212 00213 /// Returns 1 when all items have been seen, else 0. 00214 int done (void) const; 00215 00216 /// Move forward by one element in the set. Returns 0 when all the 00217 /// items in the set have been seen, else 1. 00218 int advance (void); 00219 00220 private: 00221 /// Next <Module> that we haven't yet seen. 00222 ACE_Module<ACE_SYNCH_USE> *next_; 00223 }; 00224 00225 ACE_END_VERSIONED_NAMESPACE_DECL 00226 00227 #if defined (__ACE_INLINE__) 00228 #include "ace/Stream.inl" 00229 #endif /* __ACE_INLINE__ */ 00230 00231 #if defined (ACE_TEMPLATES_REQUIRE_SOURCE) 00232 #include "ace/Stream.cpp" 00233 #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ 00234 00235 #if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) 00236 #pragma implementation ("Stream.cpp") 00237 #endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ 00238 00239 #include /**/ "ace/post.h" 00240 00241 #endif /* ACE_STREAM_H */