#include <Acknowledge.h>
Inheritance diagram for ACE_RMCast::Acknowledge:
Public Member Functions | |
Acknowledge (Parameters const ¶ms) | |
virtual void | in_start (In_Element *in) |
virtual void | out_start (Out_Element *out) |
virtual void | out_stop () |
virtual void | recv (Message_ptr m) |
virtual void | send (Message_ptr m) |
Private Types | |
typedef ACE_Hash_Map_Manager_Ex< Address, Queue, AddressHasher, ACE_Equal_To< Address >, ACE_Null_Mutex > | Map |
Private Member Functions | |
void | collapse (Queue &q) |
void | track () |
void | track_queue (Address const &addr, Queue &q, Messages &msgs) |
Profile_ptr | create_nrtm (u32 max_elem) |
Static Private Member Functions | |
ACE_THR_FUNC_RETURN | track_thunk (void *obj) |
Private Attributes | |
Parameters const & | params_ |
Map | hold_ |
Mutex | mutex_ |
Condition | cond_ |
unsigned long | nrtm_timer_ |
bool | stop_ |
ACE_Thread_Manager | tracker_mgr_ |
|
Definition at line 216 of file Acknowledge.h. |
|
Definition at line 21 of file Acknowledge.cpp.
|
|
Definition at line 59 of file Acknowledge.cpp. References ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Message_ptr, ACE_RMCast::In_Element::recv(), ACE_RMCast::Acknowledge::Queue::sn(), ACE_RMCast::u64, and ACE_RMCast::Acknowledge::Queue::unbind(). Referenced by recv().
00060 { 00061 // I would normally use iterators in the logic below but ACE_Map_Manager 00062 // iterates over entries in no particular order so it is pretty much 00063 // unusable here. Instead we will do slow and cumbersome find's. 00064 // 00065 00066 u64 sn (q.sn () + 1); 00067 00068 for (;; ++sn) 00069 { 00070 Queue::ENTRY* e; 00071 00072 if (q.find (sn, e) == -1 || e->int_id_.lost ()) break; 00073 00074 Message_ptr m (e->int_id_.msg ()); 00075 q.unbind (sn); 00076 00077 in_->recv (m); 00078 } 00079 00080 q.sn (sn - 1); 00081 } |
|
Definition at line 358 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), hold_, ACE_RMCast::Profile_ptr, ACE_Auto_Basic_Ptr< X >::release(), ACE_RMCast::Acknowledge::Queue::sn(), and ACE_RMCast::u32. Referenced by send(), and track().
00359 { 00360 // Prepare NRTM. 00361 // 00362 auto_ptr<NRTM> nrtm (new NRTM ()); 00363 00364 // Gather the information. 00365 // 00366 { 00367 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) 00368 { 00369 Address addr ((*i).ext_id_); 00370 Queue& q = (*i).int_id_; 00371 00372 //@@ Should look for the highest known number. 00373 // 00374 nrtm->insert (addr, q.sn ()); 00375 00376 if (--max_elem == 0) 00377 break; 00378 } 00379 } 00380 00381 if (nrtm->empty ()) 00382 return Profile_ptr (0); 00383 else 00384 return Profile_ptr (nrtm.release ()); 00385 } |
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 31 of file Acknowledge.cpp.
00032 { 00033 Element::in_start (in); 00034 } |
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 37 of file Acknowledge.cpp. References ACE_Thread_Manager::spawn(), track_thunk(), and tracker_mgr_.
00038 { 00039 Element::out_start (out); 00040 00041 tracker_mgr_.spawn (track_thunk, this); 00042 } |
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 45 of file Acknowledge.cpp. References ACE_RMCast::Lock, ACE_Condition< MUTEX >::signal(), tracker_mgr_, and ACE_Thread_Manager::wait().
00046 { 00047 { 00048 Lock l (mutex_); 00049 stop_ = true; 00050 cond_.signal (); 00051 } 00052 00053 tracker_mgr_.wait (); 00054 00055 Element::out_stop (); 00056 } |
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 244 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), ACE_RMCast::Acknowledge::Queue::bind(), collapse(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), ACE_RMCast::NRTM::find(), hold_, ACE_RMCast::Lock, ACE_RMCast::Acknowledge::Queue::max_sn(), ACE_RMCast::Message_ptr, ACE_RMCast::Acknowledge::Queue::rebind(), ACE_RMCast::In_Element::recv(), ACE_Guard< ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), and ACE_RMCast::u64.
00245 { 00246 // Handle NRTM. There could be some nasty interaction with code 00247 // that handles data below (like missing message and NAK). This 00248 // is why I hold the lock at the beginning (which may be not very 00249 // efficient). 00250 // 00251 Lock l (mutex_); 00252 00253 if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id))) 00254 { 00255 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) 00256 { 00257 u64 sn (nrtm->find ((*i).ext_id_)); 00258 00259 if (sn != 0) 00260 { 00261 Queue& q = (*i).int_id_; 00262 00263 u64 old (q.max_sn ()); 00264 00265 if (old < sn) 00266 { 00267 // Mark as lost. 00268 // 00269 q.bind (sn, Descr (1)); 00270 } 00271 } 00272 } 00273 } 00274 00275 if (m->find (Data::id) || m->find (NoData::id)) 00276 { 00277 Address from ( 00278 static_cast<From const*> (m->find (From::id))->address ()); 00279 00280 u64 sn (static_cast<SN const*> (m->find (SN::id))->num ()); 00281 00282 Map::ENTRY* e; 00283 00284 if (hold_.find (from, e) == -1) 00285 { 00286 // First message from this source. 00287 // 00288 hold_.bind (from, Queue (sn)); 00289 in_->recv (m); 00290 } 00291 else 00292 { 00293 Queue& q = e->int_id_; 00294 00295 if (sn <= q.sn ()) 00296 { 00297 // Duplicate. 00298 // 00299 //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn 00300 // << endl; 00301 } 00302 else if (sn == q.sn () + 1) 00303 { 00304 // Next message. 00305 // 00306 00307 q.rebind (sn, Descr (m)); 00308 collapse (q); 00309 } 00310 else 00311 { 00312 // Some messages are missing. Insert this one into the queue. 00313 // 00314 q.rebind (sn, Descr (m)); 00315 } 00316 } 00317 } 00318 else 00319 { 00320 l.release (); 00321 00322 // Just forward it up. 00323 // 00324 in_->recv (m); 00325 } 00326 } |
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 328 of file Acknowledge.cpp. References create_nrtm(), ACE_Strong_Bound_Ptr< X, ACE_LOCK >::get(), ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Message_ptr, ACE_RMCast::Parameters::nrtm_timeout(), nrtm_timer_, params_, ACE_RMCast::Profile_ptr, ACE_RMCast::Data::size(), and ACE_RMCast::u32. Referenced by track().
00329 { 00330 if (Data const* data = static_cast<Data const*> (m->find (Data::id))) 00331 { 00332 size_t max_payload_size ( 00333 params_.max_packet_size () - max_service_size); 00334 00335 if (max_payload_size > data->size ()) 00336 { 00337 u32 max_size (max_payload_size - data->size ()); 00338 u32 max_elem (NRTM::max_count (max_size)); 00339 00340 if (max_elem > 0) 00341 { 00342 Lock l (mutex_); 00343 00344 Profile_ptr nrtm (create_nrtm (max_elem)); 00345 00346 if (nrtm.get ()) 00347 m->add (nrtm); 00348 } 00349 } 00350 00351 nrtm_timer_ = params_.nrtm_timeout (); // Reset timer. 00352 } 00353 00354 out_->send (m); 00355 } |
|
Definition at line 84 of file Acknowledge.cpp. References ACE_OS::abort(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), create_nrtm(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::current_size(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::current_size(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ETIME, hold_, ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Message_ptr, ACE_RMCast::Messages, ACE_RMCast::Parameters::nrtm_timeout(), nrtm_timer_, ACE_Strong_Bound_Ptr< X, ACE_LOCK >::null(), params_, ACE_RMCast::Profile_ptr, ACE_Vector< T, DEFAULT_SIZE >::push_back(), send(), ACE_RMCast::Parameters::tick(), track_queue(), ACE_RMCast::u32, and ACE_Condition< MUTEX >::wait().
00085 { 00086 while (true) 00087 { 00088 Messages msgs; 00089 00090 { 00091 Lock l (mutex_); 00092 00093 if (stop_) 00094 break; 00095 00096 if (hold_.current_size () != 0) 00097 { 00098 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); 00099 i != e; 00100 ++i) 00101 { 00102 Queue& q = (*i).int_id_; 00103 00104 if (q.current_size () == 0) continue; 00105 00106 track_queue ((*i).ext_id_, q, msgs); 00107 } 00108 } 00109 00110 if (--nrtm_timer_ == 0) 00111 { 00112 nrtm_timer_ = params_.nrtm_timeout (); 00113 00114 // Send NRTM. 00115 // 00116 unsigned short max_payload_size ( 00117 params_.max_packet_size () - max_service_size); 00118 00119 u32 max_elem (NRTM::max_count (max_payload_size)); 00120 00121 Profile_ptr nrtm (create_nrtm (max_elem)); 00122 00123 if (!nrtm.null ()) 00124 { 00125 Message_ptr m (new Message); 00126 m->add (nrtm); 00127 msgs.push_back (m); 00128 00129 } 00130 } 00131 } 00132 00133 // Send stuff off. 00134 // 00135 for (Messages::Iterator i (msgs); !i.done (); i.advance ()) 00136 { 00137 Message_ptr* ppm; 00138 i.next (ppm); 00139 00140 //FUZZ: disable check_for_lack_ACE_OS 00141 send (*ppm); 00142 //FUZZ: enable check_for_lack_ACE_OS 00143 } 00144 00145 // Go to sleep but watch for "manual cancellation" request. 00146 // 00147 { 00148 //FUZZ: disable check_for_lack_ACE_OS 00149 ACE_Time_Value time (ACE_OS::gettimeofday ()); 00150 //FUZZ: enable check_for_lack_ACE_OS 00151 00152 time += params_.tick (); 00153 00154 Lock l (mutex_); 00155 00156 while (!stop_) 00157 { 00158 if (cond_.wait (&time) == -1) 00159 { 00160 if (errno != ETIME) 00161 ACE_OS::abort (); 00162 else 00163 break; 00164 } 00165 } 00166 00167 if (stop_) 00168 break; 00169 } 00170 } 00171 } |
|
Definition at line 174 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::begin(), ACE_RMCast::Acknowledge::Queue::bind(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::end(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Acknowledge::Descr::lost(), ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Acknowledge::Queue::max_sn(), ACE_RMCast::Message_ptr, ACE_RMCast::Messages, ACE_RMCast::Acknowledge::Descr::nak_count(), ACE_RMCast::Parameters::nak_timeout(), params_, ACE_RMCast::Profile_ptr, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Auto_Basic_Ptr< X >::release(), ACE_RMCast::Acknowledge::Queue::sn(), ACE_RMCast::Acknowledge::Descr::timer(), ACE_RMCast::u32, and ACE_RMCast::u64. Referenced by track().
00175 { 00176 unsigned short max_payload_size ( 00177 params_.max_packet_size () - max_service_size); 00178 00179 u32 max_elem (NAK::max_count (max_payload_size)); 00180 u32 count (0); 00181 00182 Queue::iterator i (q.begin ()), e (q.end ()); 00183 00184 // Track existing losses. 00185 // 00186 while (i != e) 00187 { 00188 auto_ptr<NAK> nak (new NAK (addr)); 00189 00190 // Inner loop that fills NAK profile with up to max_elem elements. 00191 // 00192 for (; i != e && nak->count () < max_elem; ++i) 00193 { 00194 u64 sn ((*i).ext_id_); 00195 Descr& d = (*i).int_id_; 00196 00197 if (d.lost ()) 00198 { 00199 d.timer (d.timer () - 1); 00200 00201 if (d.timer () == 0) 00202 { 00203 //@@ Need exp fallback. 00204 // 00205 d.nak_count (d.nak_count () + 1); 00206 d.timer ((d.nak_count () + 1) * params_.nak_timeout ()); 00207 00208 nak->add (sn); 00209 00210 ++count; 00211 00212 // cerr << 6 << "NAK # " << d.nak_count () << ": " 00213 // << addr << " " << sn << endl; 00214 } 00215 } 00216 } 00217 00218 // Send this NAK. 00219 // 00220 if (nak->count ()) 00221 { 00222 // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns" 00223 // << endl; 00224 00225 Message_ptr m (new Message); 00226 00227 m->add (Profile_ptr (nak.release ())); 00228 00229 msgs.push_back (m); 00230 } 00231 } 00232 00233 // Detect and record new losses. 00234 // 00235 for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn) 00236 { 00237 if (q.find (sn) == -1) 00238 { 00239 q.bind (sn, Descr (1)); 00240 } 00241 } 00242 } |
|
Definition at line 388 of file Acknowledge.cpp. Referenced by out_start().
00389 {
00390 reinterpret_cast<Acknowledge*> (obj)->track ();
00391 return 0;
00392 }
|
|
Definition at line 239 of file Acknowledge.h. |
|
Definition at line 237 of file Acknowledge.h. Referenced by create_nrtm(), recv(), and track(). |
|
Definition at line 238 of file Acknowledge.h. |
|
Definition at line 241 of file Acknowledge.h. |
|
Definition at line 235 of file Acknowledge.h. Referenced by send(), track(), and track_queue(). |
|
Definition at line 243 of file Acknowledge.h. |
|
Definition at line 244 of file Acknowledge.h. Referenced by out_start(), and out_stop(). |