Acknowledge.cpp

Go to the documentation of this file.
00001 // file      : ace/RMCast/Acknowledge.cpp
00002 // author    : Boris Kolpackov <boris@kolpackov.net>
00003 // cvs-id    : Acknowledge.cpp,v 1.17 2006/03/01 23:48:42 shuston Exp
00004 
00005 #include "ace/Time_Value.h"     // ACE_Time_Value
00006 #include "ace/OS_NS_unistd.h"
00007 #include "ace/OS_NS_stdlib.h"   // abort
00008 #include "ace/OS_NS_sys_time.h" // gettimeofday
00009 
00010 #include "Acknowledge.h"
00011 
00012 /*
00013 #include <iostream>
00014 using std::cerr;
00015 using std::endl;
00016 */
00017 
00018 namespace ACE_RMCast
00019 {
00020   Acknowledge::
00021   Acknowledge (Parameters const& params)
00022       : params_ (params),
00023         hold_ (params.addr_map_size ()),
00024         cond_ (mutex_),
00025         nrtm_timer_ (params_.nrtm_timeout ()),
00026         stop_ (false)
00027   {
00028   }
00029 
00030   void Acknowledge::
00031   in_start (In_Element* in)
00032   {
00033     Element::in_start (in);
00034   }
00035 
00036   void Acknowledge::
00037   out_start (Out_Element* out)
00038   {
00039     Element::out_start (out);
00040 
00041     tracker_mgr_.spawn (track_thunk, this);
00042   }
00043 
00044   void Acknowledge::
00045   out_stop ()
00046   {
00047     {
00048       Lock l (mutex_);
00049       stop_ = true;
00050       cond_.signal ();
00051     }
00052 
00053     tracker_mgr_.wait ();
00054 
00055     Element::out_stop ();
00056   }
00057 
00058   void Acknowledge::
00059   collapse (Queue& q)
00060   {
00061     // I would normally use iterators in the logic below but ACE_Map_Manager
00062     // iterates over entries in no particular order so it is pretty much
00063     // unusable here. Instead we will do slow and cumbersome find's.
00064     //
00065 
00066     u64 sn (q.sn () + 1);
00067 
00068     for (;; ++sn)
00069     {
00070       Queue::ENTRY* e;
00071 
00072       if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
00073 
00074       Message_ptr m (e->int_id_.msg ());
00075       q.unbind (sn);
00076 
00077       in_->recv (m);
00078     }
00079 
00080     q.sn (sn - 1);
00081   }
00082 
00083   void Acknowledge::
00084   track ()
00085   {
00086     while (true)
00087     {
00088       Messages msgs;
00089 
00090       {
00091         Lock l (mutex_);
00092 
00093         if (stop_)
00094           break;
00095 
00096         if (hold_.current_size () != 0)
00097         {
00098           for (Map::iterator i (hold_.begin ()), e (hold_.end ());
00099                i != e;
00100                ++i)
00101           {
00102             Queue& q = (*i).int_id_;
00103 
00104             if (q.current_size () == 0) continue;
00105 
00106             track_queue ((*i).ext_id_, q, msgs);
00107           }
00108         }
00109 
00110         if (--nrtm_timer_ == 0)
00111         {
00112           nrtm_timer_ = params_.nrtm_timeout ();
00113 
00114           // Send NRTM.
00115           //
00116           unsigned short max_payload_size (
00117             params_.max_packet_size () - max_service_size);
00118 
00119           u32 max_elem (NRTM::max_count (max_payload_size));
00120 
00121           Profile_ptr nrtm (create_nrtm (max_elem));
00122 
00123           if (nrtm.get ())
00124           {
00125             Message_ptr m (new Message);
00126             m->add (nrtm);
00127             msgs.push_back (m);
00128 
00129           }
00130         }
00131       }
00132 
00133       // Send stuff off.
00134       //
00135       for (Messages::Iterator i (msgs); !i.done (); i.advance ())
00136       {
00137         Message_ptr* ppm;
00138         i.next (ppm);
00139         send (*ppm);
00140       }
00141 
00142       // Go to sleep but watch for "manual cancellation" request.
00143       //
00144       {
00145         ACE_Time_Value time (ACE_OS::gettimeofday ());
00146         time += params_.tick ();
00147 
00148         Lock l (mutex_);
00149 
00150         while (!stop_)
00151         {
00152           if (cond_.wait (&time) == -1)
00153           {
00154             if (errno != ETIME)
00155               ACE_OS::abort ();
00156             else
00157               break;
00158           }
00159         }
00160 
00161         if (stop_)
00162           break;
00163       }
00164     }
00165   }
00166 
00167   void Acknowledge::
00168   track_queue (Address const& addr, Queue& q, Messages& msgs)
00169   {
00170     unsigned short max_payload_size (
00171       params_.max_packet_size () - max_service_size);
00172 
00173     u32 max_elem (NAK::max_count (max_payload_size));
00174     u32 count (0);
00175 
00176     Queue::iterator i (q.begin ()), e (q.end ());
00177 
00178     // Track existing losses.
00179     //
00180     while (i != e)
00181     {
00182       NAK_ptr nak (new NAK (addr));
00183 
00184       // Inner loop that fills NAK profile with up to max_elem elements.
00185       //
00186       for (; i != e && nak->count () < max_elem; ++i)
00187       {
00188         u64 sn ((*i).ext_id_);
00189         Descr& d = (*i).int_id_;
00190 
00191         if (d.lost ())
00192         {
00193           d.timer (d.timer () - 1);
00194 
00195           if (d.timer () == 0)
00196           {
00197             //@@ Need exp fallback.
00198             //
00199             d.nak_count (d.nak_count () + 1);
00200             d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
00201 
00202             nak->add (sn);
00203 
00204             ++count;
00205 
00206             // cerr << 6 << "NAK # " << d.nak_count () << ": "
00207             // << addr << " " << sn << endl;
00208           }
00209         }
00210       }
00211 
00212       // Send this NAK.
00213       //
00214       if (nak->count ())
00215       {
00216         // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
00217         //     << endl;
00218 
00219         Message_ptr m (new Message);
00220 
00221         m->add (Profile_ptr (nak.release ()));
00222 
00223         msgs.push_back (m);
00224       }
00225     }
00226 
00227     // Detect and record new losses.
00228     //
00229     for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
00230     {
00231       if (q.find (sn) == -1)
00232       {
00233         q.bind (sn, Descr (1));
00234       }
00235     }
00236   }
00237 
00238   void Acknowledge::
00239   recv (Message_ptr m)
00240   {
00241     // Handle NRTM. There could be some nasty interaction with code
00242     // that handles data below (like missing message and NAK). This
00243     // is why I hold the lock at the beginning (which may be not very
00244     // efficient).
00245     //
00246     Lock l (mutex_);
00247 
00248     if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
00249     {
00250       for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00251       {
00252         u64 sn (nrtm->find ((*i).ext_id_));
00253 
00254         if (sn != 0)
00255         {
00256           Queue& q = (*i).int_id_;
00257 
00258           u64 old (q.max_sn ());
00259 
00260           if (old < sn)
00261           {
00262             // Mark as lost.
00263             //
00264             q.bind (sn, Descr (1));
00265           }
00266         }
00267       }
00268     }
00269 
00270     if (m->find (Data::id) || m->find (NoData::id))
00271     {
00272       Address from (
00273         static_cast<From const*> (m->find (From::id))->address ());
00274 
00275       u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
00276 
00277       Map::ENTRY* e;
00278 
00279       if (hold_.find (from, e) == -1)
00280       {
00281         // First message from this source.
00282         //
00283         hold_.bind (from, Queue (sn));
00284         in_->recv (m);
00285       }
00286       else
00287       {
00288         Queue& q = e->int_id_;
00289 
00290         if (sn <= q.sn ())
00291         {
00292           // Duplicate.
00293           //
00294           //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
00295           //     << endl;
00296         }
00297         else if (sn == q.sn () + 1)
00298         {
00299           // Next message.
00300           //
00301 
00302           q.rebind (sn, Descr (m));
00303           collapse (q);
00304         }
00305         else
00306         {
00307           // Some messages are missing. Insert this one into the queue.
00308           //
00309           q.rebind (sn, Descr (m));
00310         }
00311       }
00312     }
00313     else
00314     {
00315       l.release ();
00316 
00317       // Just forward it up.
00318       //
00319       in_->recv (m);
00320     }
00321   }
00322 
00323   void Acknowledge::
00324   send (Message_ptr m)
00325   {
00326     if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00327     {
00328       size_t max_payload_size (
00329         params_.max_packet_size () - max_service_size);
00330 
00331       if (max_payload_size > data->size ())
00332       {
00333         u32 max_size (max_payload_size - data->size ());
00334         u32 max_elem (NRTM::max_count (max_size));
00335 
00336         if (max_elem > 0)
00337         {
00338           Lock l (mutex_);
00339 
00340           Profile_ptr nrtm (create_nrtm (max_elem));
00341 
00342           if (nrtm.get ())
00343             m->add (nrtm);
00344         }
00345       }
00346 
00347       nrtm_timer_ = params_.nrtm_timeout (); // Reset timer.
00348     }
00349 
00350     out_->send (m);
00351   }
00352 
00353   Profile_ptr Acknowledge::
00354   create_nrtm (u32 max_elem)
00355   {
00356     // Prepare NRTM.
00357     //
00358     NRTM_ptr nrtm (new NRTM ());
00359 
00360     // Gather the information.
00361     //
00362     {
00363       for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00364       {
00365         Address addr ((*i).ext_id_);
00366         Queue& q = (*i).int_id_;
00367 
00368         //@@ Should look for the highest known number.
00369         //
00370         nrtm->insert (addr, q.sn ());
00371 
00372         if (--max_elem == 0)
00373           break;
00374       }
00375     }
00376 
00377     if (nrtm->empty ())
00378       return Profile_ptr (0);
00379     else
00380       return Profile_ptr (nrtm.release ());
00381   }
00382 
00383   ACE_THR_FUNC_RETURN Acknowledge::
00384   track_thunk (void* obj)
00385   {
00386     reinterpret_cast<Acknowledge*> (obj)->track ();
00387     return 0;
00388   }
00389 }

Generated on Thu Nov 9 11:40:40 2006 for ACE_RMCast by doxygen 1.3.6