ACE_RMCast::Retransmit Class Reference

#include <Retransmit.h>

Inheritance diagram for ACE_RMCast::Retransmit:

Inheritance graph
[legend]
Collaboration diagram for ACE_RMCast::Retransmit:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 Retransmit (Parameters const &params)
virtual void out_start (Out_Element *out)
virtual void out_stop ()
virtual void send (Message_ptr m)
virtual void recv (Message_ptr m)

Private Types

typedef ACE_Hash_Map_Manager<
u64, Descr, ACE_Null_Mutex
Queue

Private Member Functions

void track ()

Static Private Member Functions

ACE_THR_FUNC_RETURN track_thunk (void *obj)

Private Attributes

Parameters const & params_
Queue queue_
Mutex mutex_
Condition cond_
bool stop_
ACE_Thread_Manager tracker_mgr_

Member Typedef Documentation

typedef ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> ACE_RMCast::Retransmit::Queue [private]
 

Definition at line 76 of file Retransmit.h.


Constructor & Destructor Documentation

ACE_RMCast::Retransmit::Retransmit Parameters const &  params  ) 
 

Definition at line 20 of file Retransmit.cpp.

00021       : params_ (params),
00022         cond_ (mutex_),
00023         stop_ (false)
00024   {
00025   }


Member Function Documentation

void ACE_RMCast::Retransmit::out_start Out_Element out  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 28 of file Retransmit.cpp.

References ACE_Thread_Manager::spawn().

00029   {
00030     Element::out_start (out);
00031 
00032     tracker_mgr_.spawn (track_thunk, this);
00033   }

void ACE_RMCast::Retransmit::out_stop  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 36 of file Retransmit.cpp.

References ACE_RMCast::Lock, ACE_Condition< MUTEX >::signal(), and ACE_Thread_Manager::wait().

00037   {
00038     {
00039       Lock l (mutex_);
00040       stop_ = true;
00041       cond_.signal ();
00042     }
00043 
00044     tracker_mgr_.wait ();
00045 
00046     Element::out_stop ();
00047   }

void ACE_RMCast::Retransmit::recv Message_ptr  m  )  [virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 62 of file Retransmit.cpp.

References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_RMCast::Profile_ptr, ACE_RMCast::In_Element::recv(), and ACE_RMCast::u64.

00063   {
00064     if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
00065     {
00066       Address to (static_cast<To const*> (m->find (To::id))->address ());
00067 
00068       if (nak->address () == to)
00069       {
00070         Lock l (mutex_);
00071 
00072         for (NAK::iterator j (const_cast<NAK*> (nak)->begin ());
00073              !j.done ();
00074              j.advance ())
00075         {
00076           u64* psn;
00077           j.next (psn);
00078 
00079           Message_ptr m;
00080 
00081           Queue::ENTRY* pair;
00082 
00083           if (queue_.find (*psn, pair) == 0)
00084           {
00085             //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl;
00086 
00087             m = pair->int_id_.message ();
00088 
00089             pair->int_id_.reset ();
00090           }
00091           else
00092           {
00093             //cerr << 4 << "message " << *psn << " not available" << endl;
00094 
00095             m = Message_ptr (new Message);
00096             m->add (Profile_ptr (new SN (*psn)));
00097             m->add (Profile_ptr (new NoData));
00098           }
00099 
00100           out_->send (m);
00101         }
00102       }
00103     }
00104 
00105     in_->recv (m);
00106   }

void ACE_RMCast::Retransmit::send Message_ptr  m  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 49 of file Retransmit.cpp.

References ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::bind(), ACE_RMCast::Lock, ACE_RMCast::Message_ptr, and ACE_RMCast::SN::num().

00050   {
00051     if (m->find (Data::id) != 0)
00052     {
00053       SN const* sn = static_cast<SN const*> (m->find (SN::id));
00054 
00055       Lock l (mutex_);
00056       queue_.bind (sn->num (), Descr (m->clone ()));
00057     }
00058 
00059     out_->send (m);
00060   }

void ACE_RMCast::Retransmit::track  )  [private]
 

Definition at line 116 of file Retransmit.cpp.

References ACE_OS::abort(), ETIME, ACE_RMCast::Lock, ACE_RMCast::Parameters::retention_timeout(), ACE_RMCast::Parameters::tick(), ACE_RMCast::u64, ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::unbind(), and ACE_Condition< MUTEX >::wait().

00117   {
00118     while (true)
00119     {
00120       Lock l (mutex_);
00121 
00122       for (Queue::iterator i (queue_); !i.done ();)
00123       {
00124         if ((*i).int_id_.inc () >= params_.retention_timeout ())
00125         {
00126           u64 sn ((*i).ext_id_);
00127           i.advance ();
00128           queue_.unbind (sn);
00129         }
00130         else
00131         {
00132           i.advance ();
00133         }
00134       }
00135 
00136       //FUZZ: disable check_for_lack_ACE_OS
00137       // Go to sleep but watch for "manual cancellation" request.
00138       //
00139       ACE_Time_Value time (ACE_OS::gettimeofday ());
00140       //FUZZ: enable check_for_lack_ACE_OS
00141 
00142       time += params_.tick ();
00143 
00144       while (!stop_)
00145       {
00146         if (cond_.wait (&time) == -1)
00147         {
00148           if (errno != ETIME)
00149             ACE_OS::abort ();
00150           else
00151             break;
00152         }
00153       }
00154 
00155       if (stop_)
00156         break;
00157     }
00158   }

ACE_THR_FUNC_RETURN ACE_RMCast::Retransmit::track_thunk void *  obj  )  [static, private]
 

Definition at line 109 of file Retransmit.cpp.

00110   {
00111     reinterpret_cast<Retransmit*> (obj)->track ();
00112     return 0;
00113   }


Member Data Documentation

Condition ACE_RMCast::Retransmit::cond_ [private]
 

Definition at line 90 of file Retransmit.h.

Mutex ACE_RMCast::Retransmit::mutex_ [private]
 

Definition at line 89 of file Retransmit.h.

Parameters const& ACE_RMCast::Retransmit::params_ [private]
 

Definition at line 86 of file Retransmit.h.

Queue ACE_RMCast::Retransmit::queue_ [private]
 

Definition at line 88 of file Retransmit.h.

bool ACE_RMCast::Retransmit::stop_ [private]
 

Definition at line 92 of file Retransmit.h.

ACE_Thread_Manager ACE_RMCast::Retransmit::tracker_mgr_ [private]
 

Definition at line 93 of file Retransmit.h.


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 13:03:10 2008 for ACE_RMCast by doxygen 1.3.6