Retransmit.cpp

Go to the documentation of this file.
00001 // file      : ace/RMCast/Retransmit.cpp
00002 // author    : Boris Kolpackov <boris@kolpackov.net>
00003 // cvs-id    : $Id: Retransmit.cpp 78774 2007-07-04 06:06:59Z sowayaa $
00004 
00005 #include "ace/Time_Value.h"     // ACE_Time_Value
00006 #include "ace/OS_NS_stdlib.h"   // abort
00007 #include "ace/OS_NS_sys_time.h" // gettimeofday
00008 
00009 #include "Retransmit.h"
00010 
00011 /*
00012 #include <iostream>
00013 using std::cerr;
00014 using std::endl;
00015 */
00016 
00017 namespace ACE_RMCast
00018 {
00019   Retransmit::
00020   Retransmit (Parameters const& params)
00021       : params_ (params),
00022         cond_ (mutex_),
00023         stop_ (false)
00024   {
00025   }
00026 
00027   void Retransmit::
00028   out_start (Out_Element* out)
00029   {
00030     Element::out_start (out);
00031 
00032     tracker_mgr_.spawn (track_thunk, this);
00033   }
00034 
00035   void Retransmit::
00036   out_stop ()
00037   {
00038     {
00039       Lock l (mutex_);
00040       stop_ = true;
00041       cond_.signal ();
00042     }
00043 
00044     tracker_mgr_.wait ();
00045 
00046     Element::out_stop ();
00047   }
00048 
00049   void Retransmit::send (Message_ptr m)
00050   {
00051     if (m->find (Data::id) != 0)
00052     {
00053       SN const* sn = static_cast<SN const*> (m->find (SN::id));
00054 
00055       Lock l (mutex_);
00056       queue_.bind (sn->num (), Descr (m->clone ()));
00057     }
00058 
00059     out_->send (m);
00060   }
00061 
00062   void Retransmit::recv (Message_ptr m)
00063   {
00064     if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
00065     {
00066       Address to (static_cast<To const*> (m->find (To::id))->address ());
00067 
00068       if (nak->address () == to)
00069       {
00070         Lock l (mutex_);
00071 
00072         for (NAK::iterator j (const_cast<NAK*> (nak)->begin ());
00073              !j.done ();
00074              j.advance ())
00075         {
00076           u64* psn;
00077           j.next (psn);
00078 
00079           Message_ptr m;
00080 
00081           Queue::ENTRY* pair;
00082 
00083           if (queue_.find (*psn, pair) == 0)
00084           {
00085             //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl;
00086 
00087             m = pair->int_id_.message ();
00088 
00089             pair->int_id_.reset ();
00090           }
00091           else
00092           {
00093             //cerr << 4 << "message " << *psn << " not available" << endl;
00094 
00095             m = Message_ptr (new Message);
00096             m->add (Profile_ptr (new SN (*psn)));
00097             m->add (Profile_ptr (new NoData));
00098           }
00099 
00100           out_->send (m);
00101         }
00102       }
00103     }
00104 
00105     in_->recv (m);
00106   }
00107 
00108   ACE_THR_FUNC_RETURN Retransmit::
00109   track_thunk (void* obj)
00110   {
00111     reinterpret_cast<Retransmit*> (obj)->track ();
00112     return 0;
00113   }
00114 
00115   void Retransmit::
00116   track ()
00117   {
00118     while (true)
00119     {
00120       Lock l (mutex_);
00121 
00122       for (Queue::iterator i (queue_); !i.done ();)
00123       {
00124         if ((*i).int_id_.inc () >= params_.retention_timeout ())
00125         {
00126           u64 sn ((*i).ext_id_);
00127           i.advance ();
00128           queue_.unbind (sn);
00129         }
00130         else
00131         {
00132           i.advance ();
00133         }
00134       }
00135 
00136       //FUZZ: disable check_for_lack_ACE_OS
00137       // Go to sleep but watch for "manual cancellation" request.
00138       //
00139       ACE_Time_Value time (ACE_OS::gettimeofday ());
00140       //FUZZ: enable check_for_lack_ACE_OS
00141 
00142       time += params_.tick ();
00143 
00144       while (!stop_)
00145       {
00146         if (cond_.wait (&time) == -1)
00147         {
00148           if (errno != ETIME)
00149             ACE_OS::abort ();
00150           else
00151             break;
00152         }
00153       }
00154 
00155       if (stop_)
00156         break;
00157     }
00158   }
00159 }

Generated on Sun Jan 27 13:02:56 2008 for ACE_RMCast by doxygen 1.3.6