Stream.cpp

Go to the documentation of this file.
00001 // Stream.cpp
00002 // Stream.cpp,v 4.37 2005/10/28 16:14:56 ossama Exp
00003 
00004 #ifndef ACE_STREAM_CPP
00005 #define ACE_STREAM_CPP
00006 
00007 //#include "ace/Module.h"
00008 #include "ace/Stream.h"
00009 
00010 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00011 # pragma once
00012 #endif /* ACE_LACKS_PRAGMA_ONCE */
00013 
00014 #include "ace/Stream_Modules.h"
00015 #include "ace/OS_NS_string.h"
00016 
00017 #if !defined (__ACE_INLINE__)
00018 #include "ace/Stream.inl"
00019 #endif /* __ACE_INLINE__ */
00020 
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Stream)
00024 
00025 // Give some idea of what the heck is going on in a stream!
00026 
00027 template <ACE_SYNCH_DECL> void
00028 ACE_Stream<ACE_SYNCH_USE>::dump (void) const
00029 {
00030 #if defined (ACE_HAS_DUMP)
00031   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::dump");
00032   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------- module links --------\n")));
00033 
00034   for (ACE_Module<ACE_SYNCH_USE> *mp = this->stream_head_;
00035        ;
00036        mp = mp->next ())
00037     {
00038       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("module name = %s\n"), mp->name ()));
00039       if (mp == this->stream_tail_)
00040         break;
00041     }
00042 
00043   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------- writer links --------\n")));
00044 
00045   ACE_Task<ACE_SYNCH_USE> *tp;
00046 
00047   for (tp = this->stream_head_->writer ();
00048        ;
00049        tp = tp->next ())
00050     {
00051       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("writer queue name = %s\n"), tp->name ()));
00052       tp->dump ();
00053       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------\n")));
00054       if (tp == this->stream_tail_->writer ()
00055           || (this->linked_us_
00056               && tp == this->linked_us_->stream_head_->reader ()))
00057         break;
00058     }
00059 
00060   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------- reader links --------\n")));
00061   for (tp = this->stream_tail_->reader (); ; tp = tp->next ())
00062     {
00063       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("reader queue name = %s\n"), tp->name ()));
00064       tp->dump ();
00065       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------\n")));
00066       if (tp == this->stream_head_->reader ()
00067           || (this->linked_us_
00068               && tp == this->linked_us_->stream_head_->writer ()))
00069         break;
00070     }
00071 #endif /* ACE_HAS_DUMP */
00072 }
00073 
00074 template <ACE_SYNCH_DECL> int
00075 ACE_Stream<ACE_SYNCH_USE>::push (ACE_Module<ACE_SYNCH_USE> *new_top)
00076 {
00077   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push");
00078   if (this->push_module  (new_top,
00079                           this->stream_head_->next (),
00080                           this->stream_head_) == -1)
00081     return -1;
00082   else
00083     return 0;
00084 }
00085 
00086 template <ACE_SYNCH_DECL> int
00087 ACE_Stream<ACE_SYNCH_USE>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
00088 {
00089   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::put");
00090   return this->stream_head_->writer ()->put (mb, tv);
00091 }
00092 
00093 template <ACE_SYNCH_DECL> int
00094 ACE_Stream<ACE_SYNCH_USE>::get (ACE_Message_Block *&mb, ACE_Time_Value *tv)
00095 {
00096   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::get");
00097   return this->stream_head_->reader ()->getq (mb, tv);
00098 }
00099 
00100 // Return the "top" ACE_Module in a ACE_Stream, skipping over the
00101 // stream_head.
00102 
00103 template <ACE_SYNCH_DECL> int
00104 ACE_Stream<ACE_SYNCH_USE>::top (ACE_Module<ACE_SYNCH_USE> *&m)
00105 {
00106   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::top");
00107   if (this->stream_head_->next () == this->stream_tail_)
00108     return -1;
00109   else
00110     {
00111       m = this->stream_head_->next ();
00112       return 0;
00113     }
00114 }
00115 
00116 template <ACE_SYNCH_DECL> int
00117 ACE_Stream<ACE_SYNCH_USE>::insert (const ACE_TCHAR *prev_name,
00118                                    ACE_Module<ACE_SYNCH_USE> *mod)
00119 {
00120   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::insert");
00121 
00122   for (ACE_Module<ACE_SYNCH_USE> *prev_mod = this->stream_head_;
00123        prev_mod != 0;
00124        prev_mod = prev_mod->next ())
00125     if (ACE_OS::strcmp (prev_mod->name (), prev_name) == 0)
00126       {
00127         ACE_Module<ACE_SYNCH_USE> *next_mod = prev_mod->next ();
00128 
00129         // We can't insert a module below <stream_tail_>.
00130         if (next_mod == 0)
00131           return -1;
00132 
00133         mod->link (next_mod);
00134         prev_mod->link (mod);
00135 
00136         if (mod->reader ()->open (mod->arg ()) == -1)
00137           return -1;
00138 
00139         if (mod->writer ()->open (mod->arg ()) == -1)
00140           return -1;
00141 
00142         return 0;
00143       }
00144 
00145   return -1;
00146 }
00147 
00148 template <ACE_SYNCH_DECL> int
00149 ACE_Stream<ACE_SYNCH_USE>::replace (const ACE_TCHAR *replace_name,
00150                                     ACE_Module<ACE_SYNCH_USE> *mod,
00151                                     int flags)
00152 {
00153   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::replace");
00154   ACE_Module<ACE_SYNCH_USE> *prev_mod = 0;
00155 
00156   for (ACE_Module<ACE_SYNCH_USE> *rep_mod = this->stream_head_;
00157        rep_mod != 0;
00158        rep_mod = rep_mod->next ())
00159     if (ACE_OS::strcmp (rep_mod->name (), replace_name) == 0)
00160       {
00161         ACE_Module<ACE_SYNCH_USE> *next_mod = rep_mod->next ();
00162 
00163         if (next_mod)
00164           mod->link (next_mod);
00165         else // In case the <next_mod> is <stream_tail_>.
00166           {
00167             mod->writer ()->next (0);
00168             mod->next (0);
00169             this->stream_tail_ = mod;
00170           }
00171 
00172         if (prev_mod)
00173           prev_mod->link (mod);
00174         else // In case the <rep_mod> is <stream_head_>.
00175           {
00176             mod->reader ()->next (0);
00177             this->stream_head_ = mod;
00178           }
00179 
00180         if (mod->reader ()->open (mod->arg ()) == -1)
00181           return -1;
00182 
00183         if (mod->writer ()->open (mod->arg ()) == -1)
00184           return -1;
00185 
00186         if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00187           {
00188             rep_mod->close (flags);
00189             delete rep_mod;
00190           }
00191 
00192         return 0;
00193       }
00194     else
00195       prev_mod = rep_mod;
00196 
00197   return -1;
00198 }
00199 
00200 // Remove the "top" ACE_Module in a ACE_Stream, skipping over the
00201 // stream_head.
00202 
00203 template <ACE_SYNCH_DECL> int
00204 ACE_Stream<ACE_SYNCH_USE>::pop (int flags)
00205 {
00206   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::pop");
00207   if (this->stream_head_->next () == this->stream_tail_)
00208     return -1;
00209   else
00210     {
00211       // Skip over the ACE_Stream head.
00212       ACE_Module<ACE_SYNCH_USE> *top_mod = this->stream_head_->next ();
00213       ACE_Module<ACE_SYNCH_USE> *new_top = top_mod->next ();
00214 
00215       this->stream_head_->next (new_top);
00216 
00217       // Close the top ACE_Module.
00218 
00219       top_mod->close (flags);
00220 
00221       // Don't delete the Module unless the flags request this.
00222       if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00223         delete top_mod;
00224 
00225       this->stream_head_->writer ()->next (new_top->writer ());
00226       new_top->reader ()->next (this->stream_head_->reader ());
00227       return 0;
00228     }
00229 }
00230 
00231 // Remove a named ACE_Module from an arbitrary place in the
00232 // ACE_Stream.
00233 
00234 template <ACE_SYNCH_DECL> int
00235 ACE_Stream<ACE_SYNCH_USE>::remove (const ACE_TCHAR *name,
00236                                    int flags)
00237 {
00238   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::remove");
00239   ACE_Module<ACE_SYNCH_USE> *prev = 0;
00240 
00241   for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00242        mod != 0;
00243        mod = mod->next ())
00244     if (ACE_OS::strcmp (mod->name (), name) == 0)
00245       {
00246         if (prev == 0) // Deleting ACE_Stream Head
00247           this->stream_head_->link (mod->next ());
00248         else
00249           prev->link (mod->next ());
00250 
00251         // Don't delete the Module unless the flags request this.
00252         if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00253           {
00254             // Close down the module and release the memory.
00255             mod->close (flags);
00256             delete mod;
00257           }
00258 
00259         return 0;
00260       }
00261     else
00262       prev = mod;
00263 
00264   return -1;
00265 }
00266 
00267 template <ACE_SYNCH_DECL> ACE_Module<ACE_SYNCH_USE> *
00268 ACE_Stream<ACE_SYNCH_USE>::find (const ACE_TCHAR *name)
00269 {
00270   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::find");
00271   for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00272        mod != 0;
00273        mod = mod->next ())
00274     if (ACE_OS::strcmp (mod->name (), name) == 0)
00275         return mod;
00276 
00277   return 0;
00278 }
00279 
00280 // Actually push a module onto the stack...
00281 
00282 template <ACE_SYNCH_DECL> int
00283 ACE_Stream<ACE_SYNCH_USE>::push_module (ACE_Module<ACE_SYNCH_USE> *new_top,
00284                                         ACE_Module<ACE_SYNCH_USE> *current_top,
00285                                         ACE_Module<ACE_SYNCH_USE> *head)
00286 {
00287   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push_module");
00288   ACE_Task<ACE_SYNCH_USE> *nt_reader = new_top->reader ();
00289   ACE_Task<ACE_SYNCH_USE> *nt_writer = new_top->writer ();
00290   ACE_Task<ACE_SYNCH_USE> *ct_reader = 0;
00291   ACE_Task<ACE_SYNCH_USE> *ct_writer = 0;
00292 
00293   if (current_top)
00294     {
00295       ct_reader = current_top->reader ();
00296       ct_writer = current_top->writer ();
00297       ct_reader->next (nt_reader);
00298     }
00299 
00300   nt_writer->next (ct_writer);
00301 
00302   if (head)
00303     {
00304       if (head != new_top)
00305         head->link (new_top);
00306     }
00307   else
00308     nt_reader->next (0);
00309 
00310   new_top->next (current_top);
00311 
00312   if (nt_reader->open (new_top->arg ()) == -1)
00313     return -1;
00314 
00315   if (nt_writer->open (new_top->arg ()) == -1)
00316     return -1;
00317   return 0;
00318 }
00319 
00320 template <ACE_SYNCH_DECL> int
00321 ACE_Stream<ACE_SYNCH_USE>::open (void *a,
00322                                  ACE_Module<ACE_SYNCH_USE> *head,
00323                                  ACE_Module<ACE_SYNCH_USE> *tail)
00324 {
00325   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::open");
00326   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00327 
00328   ACE_Task<ACE_SYNCH_USE> *h1 = 0, *h2 = 0;
00329   ACE_Task<ACE_SYNCH_USE> *t1 = 0, *t2 = 0;
00330 
00331   if (head == 0)
00332     {
00333       ACE_NEW_RETURN (h1,
00334                       ACE_Stream_Head<ACE_SYNCH_USE>,
00335                       -1);
00336       ACE_NEW_RETURN (h2,
00337                       ACE_Stream_Head<ACE_SYNCH_USE>,
00338                       -1);
00339       ACE_NEW_RETURN (head,
00340                       ACE_Module<ACE_SYNCH_USE> (ACE_LIB_TEXT ("ACE_Stream_Head"),
00341                                                  h1, h2,
00342                                                  a,
00343                                                  M_DELETE),
00344                       -1);
00345     }
00346 
00347   if (tail == 0)
00348     {
00349       ACE_NEW_RETURN (t1,
00350                       ACE_Stream_Tail<ACE_SYNCH_USE>,
00351                       -1);
00352       ACE_NEW_RETURN (t2,
00353                       ACE_Stream_Tail<ACE_SYNCH_USE>,
00354                       -1);
00355       ACE_NEW_RETURN (tail,
00356                       ACE_Module<ACE_SYNCH_USE> (ACE_LIB_TEXT ("ACE_Stream_Tail"),
00357                                                  t1, t2,
00358                                                  a,
00359                                                  M_DELETE),
00360                       -1);
00361     }
00362 
00363   // Make sure *all* the allocation succeeded!
00364   if (head == 0 && (h1 == 0 || h2 == 0)
00365       || tail == 0 && (t1 == 0 || t2 == 0))
00366     {
00367       delete h1;
00368       delete h2;
00369       delete t1;
00370       delete t2;
00371       delete head;
00372       delete tail;
00373       errno = ENOMEM;
00374       return -1;
00375     }
00376 
00377   this->stream_head_ = head;
00378   this->stream_tail_ = tail;
00379 
00380   if (this->push_module (this->stream_tail_) == -1)
00381     return -1;
00382   else if (this->push_module (this->stream_head_,
00383                               this->stream_tail_,
00384                               this->stream_head_) == -1)
00385     return -1;
00386 
00387   return 0;
00388 }
00389 
00390 template <ACE_SYNCH_DECL> int
00391 ACE_Stream<ACE_SYNCH_USE>::close (int flags)
00392 {
00393   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::close");
00394   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00395 
00396   if (this->stream_head_ != 0
00397       && this->stream_tail_ != 0)
00398     {
00399       // Don't bother checking return value here.
00400       this->unlink_i ();
00401 
00402       int result = 0;
00403 
00404       // Remove and cleanup all the intermediate modules.
00405 
00406       while (this->stream_head_->next () != this->stream_tail_)
00407         if (this->pop (flags) == -1)
00408           result = -1;
00409 
00410       // Clean up the head and tail of the stream.
00411       if (this->stream_head_->close (flags) == -1)
00412         result = -1;
00413       if (this->stream_tail_->close (flags) == -1)
00414         result = -1;
00415 
00416       // Cleanup the memory.
00417       delete this->stream_head_;
00418       delete this->stream_tail_;
00419 
00420       this->stream_head_ = 0;
00421       this->stream_tail_ = 0;
00422 
00423       // Tell all threads waiting on the close that we are done.
00424       this->final_close_.broadcast ();
00425       return result;
00426     }
00427   return 0;
00428 }
00429 
00430 template <ACE_SYNCH_DECL> int
00431 ACE_Stream<ACE_SYNCH_USE>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
00432                                     void *a)
00433 {
00434   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::control");
00435   ACE_IO_Cntl_Msg ioc (cmd);
00436 
00437   ACE_Message_Block *db;
00438 
00439   // Try to create a data block that contains the user-supplied data.
00440   ACE_NEW_RETURN (db,
00441                   ACE_Message_Block (sizeof (int),
00442                                      ACE_Message_Block::MB_IOCTL,
00443                                      0,
00444                                      (char *) a),
00445                   -1);
00446   // Try to create a control block <cb> that contains the control
00447   // field and a pointer to the data block <db> in <cb>'s continuation
00448   // field.
00449   ACE_Message_Block *cb = 0;
00450 
00451   ACE_NEW_RETURN (cb,
00452                   ACE_Message_Block (sizeof ioc,
00453                                      ACE_Message_Block::MB_IOCTL,
00454                                      db,
00455                                      (char *) &ioc),
00456                   -1);
00457   // @@ Michael: The old semantic assumed that cb returns == 0
00458   //             if no memory was available. We will now return immediately
00459   //             without release (errno is set to ENOMEM by the macro).
00460 
00461   // If we can't allocate <cb> then we need to delete db and return
00462   // -1.
00463   if (cb == 0)
00464     {
00465       db->release ();
00466       errno = ENOMEM;
00467       return -1;
00468     }
00469 
00470   int result;
00471 
00472   if (this->stream_head_->writer ()->put (cb) == -1)
00473     result = -1;
00474   else if (this->stream_head_->reader ()->getq (cb) == -1)
00475     result = -1;
00476   else
00477     result = ((ACE_IO_Cntl_Msg *) cb->rd_ptr ())->rval ();
00478 
00479   // This will also release db if it's reference count == 0.
00480   cb->release ();
00481 
00482   return result;
00483 }
00484 
00485 // Link two streams together at their bottom-most Modules (i.e., the
00486 // one just above the Stream tail).  Note that all of this is premised
00487 // on the fact that the Stream head and Stream tail are non-NULL...
00488 // This must be called with locks held.
00489 
00490 template <ACE_SYNCH_DECL> int
00491 ACE_Stream<ACE_SYNCH_USE>::link_i (ACE_Stream<ACE_SYNCH_USE> &us)
00492 {
00493   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link_i");
00494   this->linked_us_ = &us;
00495   // Make sure the other side is also linked to us!
00496   us.linked_us_ = this;
00497 
00498   ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00499 
00500   if (my_tail == 0)
00501     return -1;
00502 
00503   // Locate the module just above our Stream tail.
00504   while (my_tail->next () != this->stream_tail_)
00505     my_tail = my_tail->next ();
00506 
00507   ACE_Module<ACE_SYNCH_USE> *other_tail = us.stream_head_;
00508 
00509   if (other_tail == 0)
00510     return -1;
00511 
00512   // Locate the module just above the other Stream's tail.
00513   while (other_tail->next () != us.stream_tail_)
00514     other_tail = other_tail->next ();
00515 
00516   // Reattach the pointers so that the two streams are linked!
00517   my_tail->writer ()->next (other_tail->reader ());
00518   other_tail->writer ()->next (my_tail->reader ());
00519   return 0;
00520 }
00521 
00522 template <ACE_SYNCH_DECL> int
00523 ACE_Stream<ACE_SYNCH_USE>::link (ACE_Stream<ACE_SYNCH_USE> &us)
00524 {
00525   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link");
00526 
00527   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00528 
00529   return this->link_i (us);
00530 }
00531 
00532 // Must be called with locks held...
00533 
00534 template <ACE_SYNCH_DECL> int
00535 ACE_Stream<ACE_SYNCH_USE>::unlink_i (void)
00536 {
00537   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink_i");
00538 
00539   // Only try to unlink if we are in fact still linked!
00540 
00541   if (this->linked_us_ != 0)
00542     {
00543       ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00544 
00545       // Only relink if we still exist!
00546       if (my_tail)
00547         {
00548           // Find the module that's just before our stream tail.
00549           while (my_tail->next () != this->stream_tail_)
00550             my_tail = my_tail->next ();
00551 
00552           // Restore the writer's next() link to our tail.
00553           my_tail->writer ()->next (this->stream_tail_->writer ());
00554         }
00555 
00556       ACE_Module<ACE_SYNCH_USE> *other_tail =
00557         this->linked_us_->stream_head_;
00558 
00559       // Only fiddle with the other side if it in fact still remains.
00560       if (other_tail != 0)
00561         {
00562           while (other_tail->next () != this->linked_us_->stream_tail_)
00563             other_tail = other_tail->next ();
00564 
00565           other_tail->writer ()->next (this->linked_us_->stream_tail_->writer ());
00566 
00567         }
00568 
00569       // Make sure the other side is also aware that it's been unlinked!
00570       this->linked_us_->linked_us_ = 0;
00571 
00572       this->linked_us_ = 0;
00573       return 0;
00574     }
00575   else
00576     return -1;
00577 }
00578 
00579 template <ACE_SYNCH_DECL> int
00580 ACE_Stream<ACE_SYNCH_USE>::unlink (void)
00581 {
00582   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink");
00583   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00584   return this->unlink_i ();
00585 }
00586 
00587 template <ACE_SYNCH_DECL>
00588 ACE_Stream<ACE_SYNCH_USE>::ACE_Stream (void * a,
00589                                        ACE_Module<ACE_SYNCH_USE> *head,
00590                                        ACE_Module<ACE_SYNCH_USE> *tail)
00591   : linked_us_ (0),
00592     final_close_ (this->lock_)
00593 {
00594   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::ACE_Stream");
00595   if (this->open (a, head, tail) == -1)
00596     ACE_ERROR ((LM_ERROR,
00597                 ACE_LIB_TEXT ("ACE_Stream<ACE_SYNCH_USE>::open (%s, %s)\n"),
00598                head->name (), tail->name ()));
00599 }
00600 
00601 template <ACE_SYNCH_DECL>
00602 ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream (void)
00603 {
00604   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream");
00605 
00606   if (this->stream_head_ != 0)
00607     this->close ();
00608 }
00609 
00610 template <ACE_SYNCH_DECL>
00611 ACE_Stream_Iterator<ACE_SYNCH_USE>::ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE> &sr)
00612   : next_ (sr.stream_head_)
00613 {
00614   ACE_TRACE ("ACE_Stream_Iterator<ACE_SYNCH_USE>::ACE_Stream_Iterator");
00615 }
00616 
00617 ACE_END_VERSIONED_NAMESPACE_DECL
00618 
00619 #endif /* ACE_STREAM_CPP */

Generated on Thu Nov 9 09:42:05 2006 for ACE by doxygen 1.3.6