#include <Retransmit.h>
Inheritance diagram for ACE_RMCast::Retransmit:
Public Member Functions | |
Retransmit (Parameters const ¶ms) | |
virtual void | out_start (Out_Element *out) |
virtual void | out_stop () |
virtual void | send (Message_ptr m) |
virtual void | recv (Message_ptr m) |
Private Types | |
typedef ACE_Hash_Map_Manager< u64, Descr, ACE_Null_Mutex > | Queue |
Private Member Functions | |
void | track () |
Static Private Member Functions | |
static ACE_THR_FUNC_RETURN | track_thunk (void *obj) |
Private Attributes | |
Parameters const & | params_ |
Queue | queue_ |
Mutex | mutex_ |
Condition | cond_ |
bool | stop_ |
ACE_Thread_Manager | tracker_mgr_ |
Classes | |
struct | Descr |
Definition at line 18 of file Retransmit.h.
typedef ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> ACE_RMCast::Retransmit::Queue [private] |
Definition at line 76 of file Retransmit.h.
ACE_RMCast::Retransmit::Retransmit | ( | Parameters const & | params | ) |
void ACE_RMCast::Retransmit::out_start | ( | Out_Element * | out | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 28 of file Retransmit.cpp.
References ACE_RMCast::Out_Element::out_start(), ACE_Thread_Manager::spawn(), track_thunk(), and tracker_mgr_.
00029 { 00030 Element::out_start (out); 00031 00032 tracker_mgr_.spawn (track_thunk, this); 00033 }
void ACE_RMCast::Retransmit::out_stop | ( | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 36 of file Retransmit.cpp.
References cond_, mutex_, ACE_RMCast::Out_Element::out_stop(), stop_, tracker_mgr_, and ACE_Thread_Manager::wait().
00037 { 00038 { 00039 Lock l (mutex_); 00040 stop_ = true; 00041 cond_.signal (); 00042 } 00043 00044 tracker_mgr_.wait (); 00045 00046 Element::out_stop (); 00047 }
void ACE_RMCast::Retransmit::recv | ( | Message_ptr | m | ) | [virtual] |
Reimplemented from ACE_RMCast::In_Element.
Definition at line 62 of file Retransmit.cpp.
References ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), ACE_RMCast::Profile::id(), ACE_RMCast::In_Element::in_, mutex_, ACE_RMCast::Out_Element::out_, queue_, ACE_RMCast::In_Element::recv(), ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), and ACE_RMCast::Out_Element::send().
00063 { 00064 if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id))) 00065 { 00066 Address to (static_cast<To const*> (m->find (To::id))->address ()); 00067 00068 if (nak->address () == to) 00069 { 00070 Lock l (mutex_); 00071 00072 for (NAK::iterator j (const_cast<NAK*> (nak)->begin ()); 00073 !j.done (); 00074 j.advance ()) 00075 { 00076 u64* psn; 00077 j.next (psn); 00078 00079 Message_ptr m; 00080 00081 Queue::ENTRY* pair; 00082 00083 if (queue_.find (*psn, pair) == 0) 00084 { 00085 //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl; 00086 00087 m = pair->int_id_.message (); 00088 00089 pair->int_id_.reset (); 00090 } 00091 else 00092 { 00093 //cerr << 4 << "message " << *psn << " not available" << endl; 00094 00095 m = Message_ptr (new Message); 00096 m->add (Profile_ptr (new SN (*psn))); 00097 m->add (Profile_ptr (new NoData)); 00098 } 00099 00100 out_->send (m); 00101 } 00102 } 00103 } 00104 00105 in_->recv (m); 00106 }
void ACE_RMCast::Retransmit::send | ( | Message_ptr | m | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 49 of file Retransmit.cpp.
References ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), ACE_RMCast::Profile::id(), mutex_, ACE_RMCast::Out_Element::out_, queue_, and ACE_RMCast::Out_Element::send().
00050 { 00051 if (m->find (Data::id) != 0) 00052 { 00053 SN const* sn = static_cast<SN const*> (m->find (SN::id)); 00054 00055 Lock l (mutex_); 00056 queue_.bind (sn->num (), Descr (m->clone ())); 00057 } 00058 00059 out_->send (m); 00060 }
void ACE_RMCast::Retransmit::track | ( | ) | [private] |
Definition at line 116 of file Retransmit.cpp.
References ACE_OS::abort(), cond_, ACE_Hash_Map_Iterator_Base_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::done(), ACE_OS::gettimeofday(), mutex_, params_, queue_, ACE_RMCast::Parameters::retention_timeout(), stop_, ACE_RMCast::Parameters::tick(), and time().
Referenced by track_thunk().
00117 { 00118 while (true) 00119 { 00120 Lock l (mutex_); 00121 00122 for (Queue::iterator i (queue_); !i.done ();) 00123 { 00124 if ((*i).int_id_.inc () >= params_.retention_timeout ()) 00125 { 00126 u64 sn ((*i).ext_id_); 00127 i.advance (); 00128 queue_.unbind (sn); 00129 } 00130 else 00131 { 00132 i.advance (); 00133 } 00134 } 00135 00136 //FUZZ: disable check_for_lack_ACE_OS 00137 // Go to sleep but watch for "manual cancellation" request. 00138 // 00139 ACE_Time_Value time (ACE_OS::gettimeofday ()); 00140 //FUZZ: enable check_for_lack_ACE_OS 00141 00142 time += params_.tick (); 00143 00144 while (!stop_) 00145 { 00146 if (cond_.wait (&time) == -1) 00147 { 00148 if (errno != ETIME) 00149 ACE_OS::abort (); 00150 else 00151 break; 00152 } 00153 } 00154 00155 if (stop_) 00156 break; 00157 } 00158 }
ACE_THR_FUNC_RETURN ACE_RMCast::Retransmit::track_thunk | ( | void * | obj | ) | [static, private] |
Definition at line 109 of file Retransmit.cpp.
References track().
Referenced by out_start().
00110 { 00111 reinterpret_cast<Retransmit*> (obj)->track (); 00112 return 0; 00113 }
Condition ACE_RMCast::Retransmit::cond_ [private] |
Mutex ACE_RMCast::Retransmit::mutex_ [private] |
Parameters const& ACE_RMCast::Retransmit::params_ [private] |
Queue ACE_RMCast::Retransmit::queue_ [private] |
bool ACE_RMCast::Retransmit::stop_ [private] |