Acknowledge.cpp

Go to the documentation of this file.
00001 // file      : ace/RMCast/Acknowledge.cpp
00002 // author    : Boris Kolpackov <boris@kolpackov.net>
00003 // cvs-id    : $Id: Acknowledge.cpp 78774 2007-07-04 06:06:59Z sowayaa $
00004 
00005 #include "ace/Time_Value.h"     // ACE_Time_Value
00006 #include "ace/OS_NS_unistd.h"
00007 #include "ace/OS_NS_stdlib.h"   // abort
00008 #include "ace/OS_NS_sys_time.h" // gettimeofday
00009 
00010 #include "Acknowledge.h"
00011 
00012 /*
00013 #include <iostream>
00014 using std::cerr;
00015 using std::endl;
00016 */
00017 
00018 namespace ACE_RMCast
00019 {
00020   Acknowledge::
00021   Acknowledge (Parameters const& params)
00022       : params_ (params),
00023         hold_ (params.addr_map_size ()),
00024         cond_ (mutex_),
00025         nrtm_timer_ (params_.nrtm_timeout ()),
00026         stop_ (false)
00027   {
00028   }
00029 
00030   void Acknowledge::
00031   in_start (In_Element* in)
00032   {
00033     Element::in_start (in);
00034   }
00035 
00036   void Acknowledge::
00037   out_start (Out_Element* out)
00038   {
00039     Element::out_start (out);
00040 
00041     tracker_mgr_.spawn (track_thunk, this);
00042   }
00043 
00044   void Acknowledge::
00045   out_stop ()
00046   {
00047     {
00048       Lock l (mutex_);
00049       stop_ = true;
00050       cond_.signal ();
00051     }
00052 
00053     tracker_mgr_.wait ();
00054 
00055     Element::out_stop ();
00056   }
00057 
00058   void Acknowledge::
00059   collapse (Queue& q)
00060   {
00061     // I would normally use iterators in the logic below but ACE_Map_Manager
00062     // iterates over entries in no particular order so it is pretty much
00063     // unusable here. Instead we will do slow and cumbersome find's.
00064     //
00065 
00066     u64 sn (q.sn () + 1);
00067 
00068     for (;; ++sn)
00069     {
00070       Queue::ENTRY* e;
00071 
00072       if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
00073 
00074       Message_ptr m (e->int_id_.msg ());
00075       q.unbind (sn);
00076 
00077       in_->recv (m);
00078     }
00079 
00080     q.sn (sn - 1);
00081   }
00082 
00083   void Acknowledge::
00084   track ()
00085   {
00086     while (true)
00087     {
00088       Messages msgs;
00089 
00090       {
00091         Lock l (mutex_);
00092 
00093         if (stop_)
00094           break;
00095 
00096         if (hold_.current_size () != 0)
00097         {
00098           for (Map::iterator i (hold_.begin ()), e (hold_.end ());
00099                i != e;
00100                ++i)
00101           {
00102             Queue& q = (*i).int_id_;
00103 
00104             if (q.current_size () == 0) continue;
00105 
00106             track_queue ((*i).ext_id_, q, msgs);
00107           }
00108         }
00109 
00110         if (--nrtm_timer_ == 0)
00111         {
00112           nrtm_timer_ = params_.nrtm_timeout ();
00113 
00114           // Send NRTM.
00115           //
00116           unsigned short max_payload_size (
00117             params_.max_packet_size () - max_service_size);
00118 
00119           u32 max_elem (NRTM::max_count (max_payload_size));
00120 
00121           Profile_ptr nrtm (create_nrtm (max_elem));
00122 
00123           if (!nrtm.null ())
00124           {
00125             Message_ptr m (new Message);
00126             m->add (nrtm);
00127             msgs.push_back (m);
00128 
00129           }
00130         }
00131       }
00132 
00133       // Send stuff off.
00134       //
00135       for (Messages::Iterator i (msgs); !i.done (); i.advance ())
00136       {
00137         Message_ptr* ppm;
00138         i.next (ppm);
00139 
00140         //FUZZ: disable check_for_lack_ACE_OS
00141         send (*ppm);
00142         //FUZZ: enable check_for_lack_ACE_OS
00143       }
00144 
00145       // Go to sleep but watch for "manual cancellation" request.
00146       //
00147       {
00148         //FUZZ: disable check_for_lack_ACE_OS
00149         ACE_Time_Value time (ACE_OS::gettimeofday ());
00150         //FUZZ: enable check_for_lack_ACE_OS
00151 
00152         time += params_.tick ();
00153 
00154         Lock l (mutex_);
00155 
00156         while (!stop_)
00157         {
00158           if (cond_.wait (&time) == -1)
00159           {
00160             if (errno != ETIME)
00161               ACE_OS::abort ();
00162             else
00163               break;
00164           }
00165         }
00166 
00167         if (stop_)
00168           break;
00169       }
00170     }
00171   }
00172 
00173   void Acknowledge::
00174   track_queue (Address const& addr, Queue& q, Messages& msgs)
00175   {
00176     unsigned short max_payload_size (
00177       params_.max_packet_size () - max_service_size);
00178 
00179     u32 max_elem (NAK::max_count (max_payload_size));
00180     u32 count (0);
00181 
00182     Queue::iterator i (q.begin ()), e (q.end ());
00183 
00184     // Track existing losses.
00185     //
00186     while (i != e)
00187     {
00188       auto_ptr<NAK> nak (new NAK (addr));
00189 
00190       // Inner loop that fills NAK profile with up to max_elem elements.
00191       //
00192       for (; i != e && nak->count () < max_elem; ++i)
00193       {
00194         u64 sn ((*i).ext_id_);
00195         Descr& d = (*i).int_id_;
00196 
00197         if (d.lost ())
00198         {
00199           d.timer (d.timer () - 1);
00200 
00201           if (d.timer () == 0)
00202           {
00203             //@@ Need exp fallback.
00204             //
00205             d.nak_count (d.nak_count () + 1);
00206             d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
00207 
00208             nak->add (sn);
00209 
00210             ++count;
00211 
00212             // cerr << 6 << "NAK # " << d.nak_count () << ": "
00213             // << addr << " " << sn << endl;
00214           }
00215         }
00216       }
00217 
00218       // Send this NAK.
00219       //
00220       if (nak->count ())
00221       {
00222         // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
00223         //     << endl;
00224 
00225         Message_ptr m (new Message);
00226 
00227         m->add (Profile_ptr (nak.release ()));
00228 
00229         msgs.push_back (m);
00230       }
00231     }
00232 
00233     // Detect and record new losses.
00234     //
00235     for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
00236     {
00237       if (q.find (sn) == -1)
00238       {
00239         q.bind (sn, Descr (1));
00240       }
00241     }
00242   }
00243 
00244   void Acknowledge::recv (Message_ptr m)
00245   {
00246     // Handle NRTM. There could be some nasty interaction with code
00247     // that handles data below (like missing message and NAK). This
00248     // is why I hold the lock at the beginning (which may be not very
00249     // efficient).
00250     //
00251     Lock l (mutex_);
00252 
00253     if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
00254     {
00255       for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00256       {
00257         u64 sn (nrtm->find ((*i).ext_id_));
00258 
00259         if (sn != 0)
00260         {
00261           Queue& q = (*i).int_id_;
00262 
00263           u64 old (q.max_sn ());
00264 
00265           if (old < sn)
00266           {
00267             // Mark as lost.
00268             //
00269             q.bind (sn, Descr (1));
00270           }
00271         }
00272       }
00273     }
00274 
00275     if (m->find (Data::id) || m->find (NoData::id))
00276     {
00277       Address from (
00278         static_cast<From const*> (m->find (From::id))->address ());
00279 
00280       u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
00281 
00282       Map::ENTRY* e;
00283 
00284       if (hold_.find (from, e) == -1)
00285       {
00286         // First message from this source.
00287         //
00288         hold_.bind (from, Queue (sn));
00289         in_->recv (m);
00290       }
00291       else
00292       {
00293         Queue& q = e->int_id_;
00294 
00295         if (sn <= q.sn ())
00296         {
00297           // Duplicate.
00298           //
00299           //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
00300           //     << endl;
00301         }
00302         else if (sn == q.sn () + 1)
00303         {
00304           // Next message.
00305           //
00306 
00307           q.rebind (sn, Descr (m));
00308           collapse (q);
00309         }
00310         else
00311         {
00312           // Some messages are missing. Insert this one into the queue.
00313           //
00314           q.rebind (sn, Descr (m));
00315         }
00316       }
00317     }
00318     else
00319     {
00320       l.release ();
00321 
00322       // Just forward it up.
00323       //
00324       in_->recv (m);
00325     }
00326   }
00327 
00328   void Acknowledge::send (Message_ptr m)
00329   {
00330     if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00331     {
00332       size_t max_payload_size (
00333         params_.max_packet_size () - max_service_size);
00334 
00335       if (max_payload_size > data->size ())
00336       {
00337         u32 max_size (max_payload_size - data->size ());
00338         u32 max_elem (NRTM::max_count (max_size));
00339 
00340         if (max_elem > 0)
00341         {
00342           Lock l (mutex_);
00343 
00344           Profile_ptr nrtm (create_nrtm (max_elem));
00345 
00346           if (nrtm.get ())
00347             m->add (nrtm);
00348         }
00349       }
00350 
00351       nrtm_timer_ = params_.nrtm_timeout (); // Reset timer.
00352     }
00353 
00354     out_->send (m);
00355   }
00356 
00357   Profile_ptr Acknowledge::
00358   create_nrtm (u32 max_elem)
00359   {
00360     // Prepare NRTM.
00361     //
00362     auto_ptr<NRTM> nrtm (new NRTM ());
00363 
00364     // Gather the information.
00365     //
00366     {
00367       for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00368       {
00369         Address addr ((*i).ext_id_);
00370         Queue& q = (*i).int_id_;
00371 
00372         //@@ Should look for the highest known number.
00373         //
00374         nrtm->insert (addr, q.sn ());
00375 
00376         if (--max_elem == 0)
00377           break;
00378       }
00379     }
00380 
00381     if (nrtm->empty ())
00382       return Profile_ptr (0);
00383     else
00384       return Profile_ptr (nrtm.release ());
00385   }
00386 
00387   ACE_THR_FUNC_RETURN Acknowledge::
00388   track_thunk (void* obj)
00389   {
00390     reinterpret_cast<Acknowledge*> (obj)->track ();
00391     return 0;
00392   }
00393 }

Generated on Sun Jan 27 13:02:56 2008 for ACE_RMCast by doxygen 1.3.6