ACE_RMCast::Acknowledge Class Reference

#include <Acknowledge.h>

Inheritance diagram for ACE_RMCast::Acknowledge:

Inheritance graph
[legend]
Collaboration diagram for ACE_RMCast::Acknowledge:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 Acknowledge (Parameters const &params)
virtual void in_start (In_Element *in)
virtual void out_start (Out_Element *out)
virtual void out_stop ()
virtual void recv (Message_ptr m)
virtual void send (Message_ptr m)

Private Types

typedef ACE_Hash_Map_Manager_Ex<
Address, Queue, AddressHasher,
ACE_Equal_To< Address >,
ACE_Null_Mutex
Map

Private Member Functions

void collapse (Queue &q)
void track ()
void track_queue (Address const &addr, Queue &q, Messages &msgs)
Profile_ptr create_nrtm (u32 max_elem)

Static Private Member Functions

ACE_THR_FUNC_RETURN track_thunk (void *obj)

Private Attributes

Parameters const & params_
Map hold_
Mutex mutex_
Condition cond_
unsigned long nrtm_timer_
bool stop_
ACE_Thread_Manager tracker_mgr_

Member Typedef Documentation

typedef ACE_Hash_Map_Manager_Ex<Address, Queue, AddressHasher, ACE_Equal_To<Address>, ACE_Null_Mutex> ACE_RMCast::Acknowledge::Map [private]
 

Definition at line 216 of file Acknowledge.h.


Constructor & Destructor Documentation

ACE_RMCast::Acknowledge::Acknowledge Parameters const &  params  ) 
 

Definition at line 21 of file Acknowledge.cpp.

00022       : params_ (params),
00023         hold_ (params.addr_map_size ()),
00024         cond_ (mutex_),
00025         nrtm_timer_ (params_.nrtm_timeout ()),
00026         stop_ (false)
00027   {
00028   }


Member Function Documentation

void ACE_RMCast::Acknowledge::collapse Queue q  )  [private]
 

Definition at line 59 of file Acknowledge.cpp.

References ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Message_ptr, ACE_RMCast::In_Element::recv(), ACE_RMCast::Acknowledge::Queue::sn(), ACE_RMCast::u64, and ACE_RMCast::Acknowledge::Queue::unbind().

Referenced by recv().

00060   {
00061     // I would normally use iterators in the logic below but ACE_Map_Manager
00062     // iterates over entries in no particular order so it is pretty much
00063     // unusable here. Instead we will do slow and cumbersome find's.
00064     //
00065 
00066     u64 sn (q.sn () + 1);
00067 
00068     for (;; ++sn)
00069     {
00070       Queue::ENTRY* e;
00071 
00072       if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
00073 
00074       Message_ptr m (e->int_id_.msg ());
00075       q.unbind (sn);
00076 
00077       in_->recv (m);
00078     }
00079 
00080     q.sn (sn - 1);
00081   }

Profile_ptr ACE_RMCast::Acknowledge::create_nrtm u32  max_elem  )  [private]
 

Definition at line 354 of file Acknowledge.cpp.

References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), hold_, ACE_RMCast::NRTM_ptr, ACE_RMCast::Profile_ptr, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), and ACE_RMCast::u32.

Referenced by send(), and track().

00355   {
00356     // Prepare NRTM.
00357     //
00358     NRTM_ptr nrtm (new NRTM ());
00359 
00360     // Gather the information.
00361     //
00362     {
00363       for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00364       {
00365         Address addr ((*i).ext_id_);
00366         Queue& q = (*i).int_id_;
00367 
00368         //@@ Should look for the highest known number.
00369         //
00370         nrtm->insert (addr, q.sn ());
00371 
00372         if (--max_elem == 0)
00373           break;
00374       }
00375     }
00376 
00377     if (nrtm->empty ())
00378       return Profile_ptr (0);
00379     else
00380       return Profile_ptr (nrtm.release ());
00381   }

void ACE_RMCast::Acknowledge::in_start In_Element in  )  [virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 31 of file Acknowledge.cpp.

00032   {
00033     Element::in_start (in);
00034   }

void ACE_RMCast::Acknowledge::out_start Out_Element out  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 37 of file Acknowledge.cpp.

References ACE_Thread_Manager::spawn(), track_thunk(), and tracker_mgr_.

00038   {
00039     Element::out_start (out);
00040 
00041     tracker_mgr_.spawn (track_thunk, this);
00042   }

void ACE_RMCast::Acknowledge::out_stop  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 45 of file Acknowledge.cpp.

References ACE_RMCast::Lock, ACE_Condition< MUTEX >::signal(), tracker_mgr_, and ACE_Thread_Manager::wait().

00046   {
00047     {
00048       Lock l (mutex_);
00049       stop_ = true;
00050       cond_.signal ();
00051     }
00052 
00053     tracker_mgr_.wait ();
00054 
00055     Element::out_stop ();
00056   }

void ACE_RMCast::Acknowledge::recv Message_ptr  m  )  [virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 239 of file Acknowledge.cpp.

References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), ACE_RMCast::Acknowledge::Queue::bind(), collapse(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), ACE_RMCast::NRTM::find(), hold_, ACE_RMCast::Lock, ACE_RMCast::Acknowledge::Queue::max_sn(), ACE_RMCast::Message_ptr, ACE_RMCast::Acknowledge::Queue::rebind(), ACE_RMCast::In_Element::recv(), ACE_Guard< ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), and ACE_RMCast::u64.

00240   {
00241     // Handle NRTM. There could be some nasty interaction with code
00242     // that handles data below (like missing message and NAK). This
00243     // is why I hold the lock at the beginning (which may be not very
00244     // efficient).
00245     //
00246     Lock l (mutex_);
00247 
00248     if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
00249     {
00250       for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00251       {
00252         u64 sn (nrtm->find ((*i).ext_id_));
00253 
00254         if (sn != 0)
00255         {
00256           Queue& q = (*i).int_id_;
00257 
00258           u64 old (q.max_sn ());
00259 
00260           if (old < sn)
00261           {
00262             // Mark as lost.
00263             //
00264             q.bind (sn, Descr (1));
00265           }
00266         }
00267       }
00268     }
00269 
00270     if (m->find (Data::id) || m->find (NoData::id))
00271     {
00272       Address from (
00273         static_cast<From const*> (m->find (From::id))->address ());
00274 
00275       u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
00276 
00277       Map::ENTRY* e;
00278 
00279       if (hold_.find (from, e) == -1)
00280       {
00281         // First message from this source.
00282         //
00283         hold_.bind (from, Queue (sn));
00284         in_->recv (m);
00285       }
00286       else
00287       {
00288         Queue& q = e->int_id_;
00289 
00290         if (sn <= q.sn ())
00291         {
00292           // Duplicate.
00293           //
00294           //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
00295           //     << endl;
00296         }
00297         else if (sn == q.sn () + 1)
00298         {
00299           // Next message.
00300           //
00301 
00302           q.rebind (sn, Descr (m));
00303           collapse (q);
00304         }
00305         else
00306         {
00307           // Some messages are missing. Insert this one into the queue.
00308           //
00309           q.rebind (sn, Descr (m));
00310         }
00311       }
00312     }
00313     else
00314     {
00315       l.release ();
00316 
00317       // Just forward it up.
00318       //
00319       in_->recv (m);
00320     }
00321   }

void ACE_RMCast::Acknowledge::send Message_ptr  m  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 324 of file Acknowledge.cpp.

References create_nrtm(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Message_ptr, ACE_RMCast::Parameters::nrtm_timeout(), nrtm_timer_, params_, ACE_RMCast::Profile_ptr, ACE_RMCast::Data::size(), and ACE_RMCast::u32.

Referenced by track().

00325   {
00326     if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00327     {
00328       size_t max_payload_size (
00329         params_.max_packet_size () - max_service_size);
00330 
00331       if (max_payload_size > data->size ())
00332       {
00333         u32 max_size (max_payload_size - data->size ());
00334         u32 max_elem (NRTM::max_count (max_size));
00335 
00336         if (max_elem > 0)
00337         {
00338           Lock l (mutex_);
00339 
00340           Profile_ptr nrtm (create_nrtm (max_elem));
00341 
00342           if (nrtm.get ())
00343             m->add (nrtm);
00344         }
00345       }
00346 
00347       nrtm_timer_ = params_.nrtm_timeout (); // Reset timer.
00348     }
00349 
00350     out_->send (m);
00351   }

void ACE_RMCast::Acknowledge::track  )  [private]
 

Definition at line 84 of file Acknowledge.cpp.

References ACE_OS::abort(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), create_nrtm(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::current_size(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::current_size(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ETIME, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), hold_, ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Message_ptr, ACE_RMCast::Messages, ACE_RMCast::Parameters::nrtm_timeout(), nrtm_timer_, params_, ACE_RMCast::Profile_ptr, ACE_Vector< T, DEFAULT_SIZE >::push_back(), send(), ACE_RMCast::Parameters::tick(), track_queue(), ACE_RMCast::u32, and ACE_Condition< MUTEX >::wait().

00085   {
00086     while (true)
00087     {
00088       Messages msgs;
00089 
00090       {
00091         Lock l (mutex_);
00092 
00093         if (stop_)
00094           break;
00095 
00096         if (hold_.current_size () != 0)
00097         {
00098           for (Map::iterator i (hold_.begin ()), e (hold_.end ());
00099                i != e;
00100                ++i)
00101           {
00102             Queue& q = (*i).int_id_;
00103 
00104             if (q.current_size () == 0) continue;
00105 
00106             track_queue ((*i).ext_id_, q, msgs);
00107           }
00108         }
00109 
00110         if (--nrtm_timer_ == 0)
00111         {
00112           nrtm_timer_ = params_.nrtm_timeout ();
00113 
00114           // Send NRTM.
00115           //
00116           unsigned short max_payload_size (
00117             params_.max_packet_size () - max_service_size);
00118 
00119           u32 max_elem (NRTM::max_count (max_payload_size));
00120 
00121           Profile_ptr nrtm (create_nrtm (max_elem));
00122 
00123           if (nrtm.get ())
00124           {
00125             Message_ptr m (new Message);
00126             m->add (nrtm);
00127             msgs.push_back (m);
00128 
00129           }
00130         }
00131       }
00132 
00133       // Send stuff off.
00134       //
00135       for (Messages::Iterator i (msgs); !i.done (); i.advance ())
00136       {
00137         Message_ptr* ppm;
00138         i.next (ppm);
00139         send (*ppm);
00140       }
00141 
00142       // Go to sleep but watch for "manual cancellation" request.
00143       //
00144       {
00145         ACE_Time_Value time (ACE_OS::gettimeofday ());
00146         time += params_.tick ();
00147 
00148         Lock l (mutex_);
00149 
00150         while (!stop_)
00151         {
00152           if (cond_.wait (&time) == -1)
00153           {
00154             if (errno != ETIME)
00155               ACE_OS::abort ();
00156             else
00157               break;
00158           }
00159         }
00160 
00161         if (stop_)
00162           break;
00163       }
00164     }
00165   }

void ACE_RMCast::Acknowledge::track_queue Address const &  addr,
Queue q,
Messages msgs
[private]
 

Definition at line 168 of file Acknowledge.cpp.

References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::begin(), ACE_RMCast::Acknowledge::Queue::bind(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::count(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::end(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Acknowledge::Descr::lost(), ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Acknowledge::Queue::max_sn(), ACE_RMCast::Message_ptr, ACE_RMCast::Messages, ACE_RMCast::Acknowledge::Descr::nak_count(), ACE_RMCast::NAK_ptr, ACE_RMCast::Parameters::nak_timeout(), params_, ACE_RMCast::Profile_ptr, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), ACE_RMCast::Acknowledge::Descr::timer(), ACE_RMCast::u32, and ACE_RMCast::u64.

Referenced by track().

00169   {
00170     unsigned short max_payload_size (
00171       params_.max_packet_size () - max_service_size);
00172 
00173     u32 max_elem (NAK::max_count (max_payload_size));
00174     u32 count (0);
00175 
00176     Queue::iterator i (q.begin ()), e (q.end ());
00177 
00178     // Track existing losses.
00179     //
00180     while (i != e)
00181     {
00182       NAK_ptr nak (new NAK (addr));
00183 
00184       // Inner loop that fills NAK profile with up to max_elem elements.
00185       //
00186       for (; i != e && nak->count () < max_elem; ++i)
00187       {
00188         u64 sn ((*i).ext_id_);
00189         Descr& d = (*i).int_id_;
00190 
00191         if (d.lost ())
00192         {
00193           d.timer (d.timer () - 1);
00194 
00195           if (d.timer () == 0)
00196           {
00197             //@@ Need exp fallback.
00198             //
00199             d.nak_count (d.nak_count () + 1);
00200             d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
00201 
00202             nak->add (sn);
00203 
00204             ++count;
00205 
00206             // cerr << 6 << "NAK # " << d.nak_count () << ": "
00207             // << addr << " " << sn << endl;
00208           }
00209         }
00210       }
00211 
00212       // Send this NAK.
00213       //
00214       if (nak->count ())
00215       {
00216         // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
00217         //     << endl;
00218 
00219         Message_ptr m (new Message);
00220 
00221         m->add (Profile_ptr (nak.release ()));
00222 
00223         msgs.push_back (m);
00224       }
00225     }
00226 
00227     // Detect and record new losses.
00228     //
00229     for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
00230     {
00231       if (q.find (sn) == -1)
00232       {
00233         q.bind (sn, Descr (1));
00234       }
00235     }
00236   }

ACE_THR_FUNC_RETURN ACE_RMCast::Acknowledge::track_thunk void *  obj  )  [static, private]
 

Definition at line 384 of file Acknowledge.cpp.

Referenced by out_start().

00385   {
00386     reinterpret_cast<Acknowledge*> (obj)->track ();
00387     return 0;
00388   }


Member Data Documentation

Condition ACE_RMCast::Acknowledge::cond_ [private]
 

Definition at line 239 of file Acknowledge.h.

Map ACE_RMCast::Acknowledge::hold_ [private]
 

Definition at line 237 of file Acknowledge.h.

Referenced by create_nrtm(), recv(), and track().

Mutex ACE_RMCast::Acknowledge::mutex_ [private]
 

Definition at line 238 of file Acknowledge.h.

unsigned long ACE_RMCast::Acknowledge::nrtm_timer_ [private]
 

Definition at line 241 of file Acknowledge.h.

Referenced by send(), and track().

Parameters const& ACE_RMCast::Acknowledge::params_ [private]
 

Definition at line 235 of file Acknowledge.h.

Referenced by send(), track(), and track_queue().

bool ACE_RMCast::Acknowledge::stop_ [private]
 

Definition at line 243 of file Acknowledge.h.

ACE_Thread_Manager ACE_RMCast::Acknowledge::tracker_mgr_ [private]
 

Definition at line 244 of file Acknowledge.h.

Referenced by out_start(), and out_stop().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 11:41:01 2006 for ACE_RMCast by doxygen 1.3.6