Retransmit.cpp

Go to the documentation of this file.
00001 // file      : ace/RMCast/Retransmit.cpp
00002 // author    : Boris Kolpackov <boris@kolpackov.net>
00003 // cvs-id    : Retransmit.cpp,v 1.10 2006/03/01 23:48:42 shuston Exp
00004 
00005 #include "ace/Time_Value.h"     // ACE_Time_Value
00006 #include "ace/OS_NS_stdlib.h"   // abort
00007 #include "ace/OS_NS_sys_time.h" // gettimeofday
00008 
00009 #include "Retransmit.h"
00010 
00011 /*
00012 #include <iostream>
00013 using std::cerr;
00014 using std::endl;
00015 */
00016 
00017 namespace ACE_RMCast
00018 {
00019   Retransmit::
00020   Retransmit (Parameters const& params)
00021       : params_ (params),
00022         cond_ (mutex_),
00023         stop_ (false)
00024   {
00025   }
00026 
00027   void Retransmit::
00028   out_start (Out_Element* out)
00029   {
00030     Element::out_start (out);
00031 
00032     tracker_mgr_.spawn (track_thunk, this);
00033   }
00034 
00035   void Retransmit::
00036   out_stop ()
00037   {
00038     {
00039       Lock l (mutex_);
00040       stop_ = true;
00041       cond_.signal ();
00042     }
00043 
00044     tracker_mgr_.wait ();
00045 
00046     Element::out_stop ();
00047   }
00048 
00049   void Retransmit::
00050   send (Message_ptr m)
00051   {
00052     if (m->find (Data::id) != 0)
00053     {
00054       SN const* sn = static_cast<SN const*> (m->find (SN::id));
00055 
00056       Lock l (mutex_);
00057       queue_.bind (sn->num (), Descr (m->clone ()));
00058     }
00059 
00060     out_->send (m);
00061   }
00062 
00063   void Retransmit::
00064   recv (Message_ptr m)
00065   {
00066     if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
00067     {
00068       Address to (static_cast<To const*> (m->find (To::id))->address ());
00069 
00070       if (nak->address () == to)
00071       {
00072         Lock l (mutex_);
00073 
00074         for (NAK::iterator j (const_cast<NAK*> (nak)->begin ());
00075              !j.done ();
00076              j.advance ())
00077         {
00078           u64* psn;
00079           j.next (psn);
00080 
00081           Message_ptr m;
00082 
00083           Queue::ENTRY* pair;
00084 
00085           if (queue_.find (*psn, pair) == 0)
00086           {
00087             //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl;
00088 
00089             m = pair->int_id_.message ();
00090 
00091             pair->int_id_.reset ();
00092           }
00093           else
00094           {
00095             //cerr << 4 << "message " << *psn << " not available" << endl;
00096 
00097             m = Message_ptr (new Message);
00098             m->add (Profile_ptr (new SN (*psn)));
00099             m->add (Profile_ptr (new NoData));
00100           }
00101 
00102           out_->send (m);
00103         }
00104       }
00105     }
00106 
00107     in_->recv (m);
00108   }
00109 
00110   ACE_THR_FUNC_RETURN Retransmit::
00111   track_thunk (void* obj)
00112   {
00113     reinterpret_cast<Retransmit*> (obj)->track ();
00114     return 0;
00115   }
00116 
00117   void Retransmit::
00118   track ()
00119   {
00120     while (true)
00121     {
00122       Lock l (mutex_);
00123 
00124       for (Queue::iterator i (queue_); !i.done ();)
00125       {
00126         if ((*i).int_id_.inc () >= params_.retention_timeout ())
00127         {
00128           u64 sn ((*i).ext_id_);
00129           i.advance ();
00130           queue_.unbind (sn);
00131         }
00132         else
00133         {
00134           i.advance ();
00135         }
00136       }
00137 
00138       // Go to sleep but watch for "manual cancellation" request.
00139       //
00140       ACE_Time_Value time (ACE_OS::gettimeofday ());
00141       time += params_.tick ();
00142 
00143       while (!stop_)
00144       {
00145         if (cond_.wait (&time) == -1)
00146         {
00147           if (errno != ETIME)
00148             ACE_OS::abort ();
00149           else
00150             break;
00151         }
00152       }
00153 
00154       if (stop_)
00155         break;
00156     }
00157   }
00158 }

Generated on Thu Nov 9 11:40:41 2006 for ACE_RMCast by doxygen 1.3.6