#include <Acknowledge.h>
Inheritance diagram for ACE_RMCast::Acknowledge:
Public Member Functions | |
Acknowledge (Parameters const ¶ms) | |
virtual void | in_start (In_Element *in) |
virtual void | out_start (Out_Element *out) |
virtual void | out_stop () |
virtual void | recv (Message_ptr m) |
virtual void | send (Message_ptr m) |
Private Types | |
typedef ACE_Hash_Map_Manager_Ex< Address, Queue, AddressHasher, ACE_Equal_To< Address >, ACE_Null_Mutex > | Map |
Private Member Functions | |
void | collapse (Queue &q) |
void | track () |
void | track_queue (Address const &addr, Queue &q, Messages &msgs) |
Profile_ptr | create_nrtm (u32 max_elem) |
Static Private Member Functions | |
ACE_THR_FUNC_RETURN | track_thunk (void *obj) |
Private Attributes | |
Parameters const & | params_ |
Map | hold_ |
Mutex | mutex_ |
Condition | cond_ |
unsigned long | nrtm_timer_ |
bool | stop_ |
ACE_Thread_Manager | tracker_mgr_ |
|
Definition at line 216 of file Acknowledge.h. |
|
Definition at line 21 of file Acknowledge.cpp.
|
|
Definition at line 59 of file Acknowledge.cpp. References ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Message_ptr, ACE_RMCast::In_Element::recv(), ACE_RMCast::Acknowledge::Queue::sn(), ACE_RMCast::u64, and ACE_RMCast::Acknowledge::Queue::unbind(). Referenced by recv().
00060 { 00061 // I would normally use iterators in the logic below but ACE_Map_Manager 00062 // iterates over entries in no particular order so it is pretty much 00063 // unusable here. Instead we will do slow and cumbersome find's. 00064 // 00065 00066 u64 sn (q.sn () + 1); 00067 00068 for (;; ++sn) 00069 { 00070 Queue::ENTRY* e; 00071 00072 if (q.find (sn, e) == -1 || e->int_id_.lost ()) break; 00073 00074 Message_ptr m (e->int_id_.msg ()); 00075 q.unbind (sn); 00076 00077 in_->recv (m); 00078 } 00079 00080 q.sn (sn - 1); 00081 } |
|
Definition at line 354 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), hold_, ACE_RMCast::NRTM_ptr, ACE_RMCast::Profile_ptr, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), and ACE_RMCast::u32. Referenced by send(), and track().
00355 { 00356 // Prepare NRTM. 00357 // 00358 NRTM_ptr nrtm (new NRTM ()); 00359 00360 // Gather the information. 00361 // 00362 { 00363 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) 00364 { 00365 Address addr ((*i).ext_id_); 00366 Queue& q = (*i).int_id_; 00367 00368 //@@ Should look for the highest known number. 00369 // 00370 nrtm->insert (addr, q.sn ()); 00371 00372 if (--max_elem == 0) 00373 break; 00374 } 00375 } 00376 00377 if (nrtm->empty ()) 00378 return Profile_ptr (0); 00379 else 00380 return Profile_ptr (nrtm.release ()); 00381 } |
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 31 of file Acknowledge.cpp.
00032 { 00033 Element::in_start (in); 00034 } |
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 37 of file Acknowledge.cpp. References ACE_Thread_Manager::spawn(), track_thunk(), and tracker_mgr_.
00038 { 00039 Element::out_start (out); 00040 00041 tracker_mgr_.spawn (track_thunk, this); 00042 } |
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 45 of file Acknowledge.cpp. References ACE_RMCast::Lock, ACE_Condition< MUTEX >::signal(), tracker_mgr_, and ACE_Thread_Manager::wait().
00046 { 00047 { 00048 Lock l (mutex_); 00049 stop_ = true; 00050 cond_.signal (); 00051 } 00052 00053 tracker_mgr_.wait (); 00054 00055 Element::out_stop (); 00056 } |
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 239 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), ACE_RMCast::Acknowledge::Queue::bind(), collapse(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), ACE_RMCast::NRTM::find(), hold_, ACE_RMCast::Lock, ACE_RMCast::Acknowledge::Queue::max_sn(), ACE_RMCast::Message_ptr, ACE_RMCast::Acknowledge::Queue::rebind(), ACE_RMCast::In_Element::recv(), ACE_Guard< ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), and ACE_RMCast::u64.
00240 { 00241 // Handle NRTM. There could be some nasty interaction with code 00242 // that handles data below (like missing message and NAK). This 00243 // is why I hold the lock at the beginning (which may be not very 00244 // efficient). 00245 // 00246 Lock l (mutex_); 00247 00248 if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id))) 00249 { 00250 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) 00251 { 00252 u64 sn (nrtm->find ((*i).ext_id_)); 00253 00254 if (sn != 0) 00255 { 00256 Queue& q = (*i).int_id_; 00257 00258 u64 old (q.max_sn ()); 00259 00260 if (old < sn) 00261 { 00262 // Mark as lost. 00263 // 00264 q.bind (sn, Descr (1)); 00265 } 00266 } 00267 } 00268 } 00269 00270 if (m->find (Data::id) || m->find (NoData::id)) 00271 { 00272 Address from ( 00273 static_cast<From const*> (m->find (From::id))->address ()); 00274 00275 u64 sn (static_cast<SN const*> (m->find (SN::id))->num ()); 00276 00277 Map::ENTRY* e; 00278 00279 if (hold_.find (from, e) == -1) 00280 { 00281 // First message from this source. 00282 // 00283 hold_.bind (from, Queue (sn)); 00284 in_->recv (m); 00285 } 00286 else 00287 { 00288 Queue& q = e->int_id_; 00289 00290 if (sn <= q.sn ()) 00291 { 00292 // Duplicate. 00293 // 00294 //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn 00295 // << endl; 00296 } 00297 else if (sn == q.sn () + 1) 00298 { 00299 // Next message. 00300 // 00301 00302 q.rebind (sn, Descr (m)); 00303 collapse (q); 00304 } 00305 else 00306 { 00307 // Some messages are missing. Insert this one into the queue. 00308 // 00309 q.rebind (sn, Descr (m)); 00310 } 00311 } 00312 } 00313 else 00314 { 00315 l.release (); 00316 00317 // Just forward it up. 00318 // 00319 in_->recv (m); 00320 } 00321 } |
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 324 of file Acknowledge.cpp. References create_nrtm(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Message_ptr, ACE_RMCast::Parameters::nrtm_timeout(), nrtm_timer_, params_, ACE_RMCast::Profile_ptr, ACE_RMCast::Data::size(), and ACE_RMCast::u32. Referenced by track().
00325 { 00326 if (Data const* data = static_cast<Data const*> (m->find (Data::id))) 00327 { 00328 size_t max_payload_size ( 00329 params_.max_packet_size () - max_service_size); 00330 00331 if (max_payload_size > data->size ()) 00332 { 00333 u32 max_size (max_payload_size - data->size ()); 00334 u32 max_elem (NRTM::max_count (max_size)); 00335 00336 if (max_elem > 0) 00337 { 00338 Lock l (mutex_); 00339 00340 Profile_ptr nrtm (create_nrtm (max_elem)); 00341 00342 if (nrtm.get ()) 00343 m->add (nrtm); 00344 } 00345 } 00346 00347 nrtm_timer_ = params_.nrtm_timeout (); // Reset timer. 00348 } 00349 00350 out_->send (m); 00351 } |
|
Definition at line 84 of file Acknowledge.cpp. References ACE_OS::abort(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), create_nrtm(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::current_size(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::current_size(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ETIME, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), hold_, ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Message_ptr, ACE_RMCast::Messages, ACE_RMCast::Parameters::nrtm_timeout(), nrtm_timer_, params_, ACE_RMCast::Profile_ptr, ACE_Vector< T, DEFAULT_SIZE >::push_back(), send(), ACE_RMCast::Parameters::tick(), track_queue(), ACE_RMCast::u32, and ACE_Condition< MUTEX >::wait().
00085 { 00086 while (true) 00087 { 00088 Messages msgs; 00089 00090 { 00091 Lock l (mutex_); 00092 00093 if (stop_) 00094 break; 00095 00096 if (hold_.current_size () != 0) 00097 { 00098 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); 00099 i != e; 00100 ++i) 00101 { 00102 Queue& q = (*i).int_id_; 00103 00104 if (q.current_size () == 0) continue; 00105 00106 track_queue ((*i).ext_id_, q, msgs); 00107 } 00108 } 00109 00110 if (--nrtm_timer_ == 0) 00111 { 00112 nrtm_timer_ = params_.nrtm_timeout (); 00113 00114 // Send NRTM. 00115 // 00116 unsigned short max_payload_size ( 00117 params_.max_packet_size () - max_service_size); 00118 00119 u32 max_elem (NRTM::max_count (max_payload_size)); 00120 00121 Profile_ptr nrtm (create_nrtm (max_elem)); 00122 00123 if (nrtm.get ()) 00124 { 00125 Message_ptr m (new Message); 00126 m->add (nrtm); 00127 msgs.push_back (m); 00128 00129 } 00130 } 00131 } 00132 00133 // Send stuff off. 00134 // 00135 for (Messages::Iterator i (msgs); !i.done (); i.advance ()) 00136 { 00137 Message_ptr* ppm; 00138 i.next (ppm); 00139 send (*ppm); 00140 } 00141 00142 // Go to sleep but watch for "manual cancellation" request. 00143 // 00144 { 00145 ACE_Time_Value time (ACE_OS::gettimeofday ()); 00146 time += params_.tick (); 00147 00148 Lock l (mutex_); 00149 00150 while (!stop_) 00151 { 00152 if (cond_.wait (&time) == -1) 00153 { 00154 if (errno != ETIME) 00155 ACE_OS::abort (); 00156 else 00157 break; 00158 } 00159 } 00160 00161 if (stop_) 00162 break; 00163 } 00164 } 00165 } |
|
Definition at line 168 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::begin(), ACE_RMCast::Acknowledge::Queue::bind(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::count(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::end(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Acknowledge::Descr::lost(), ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Acknowledge::Queue::max_sn(), ACE_RMCast::Message_ptr, ACE_RMCast::Messages, ACE_RMCast::Acknowledge::Descr::nak_count(), ACE_RMCast::NAK_ptr, ACE_RMCast::Parameters::nak_timeout(), params_, ACE_RMCast::Profile_ptr, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), ACE_RMCast::Acknowledge::Descr::timer(), ACE_RMCast::u32, and ACE_RMCast::u64. Referenced by track().
00169 { 00170 unsigned short max_payload_size ( 00171 params_.max_packet_size () - max_service_size); 00172 00173 u32 max_elem (NAK::max_count (max_payload_size)); 00174 u32 count (0); 00175 00176 Queue::iterator i (q.begin ()), e (q.end ()); 00177 00178 // Track existing losses. 00179 // 00180 while (i != e) 00181 { 00182 NAK_ptr nak (new NAK (addr)); 00183 00184 // Inner loop that fills NAK profile with up to max_elem elements. 00185 // 00186 for (; i != e && nak->count () < max_elem; ++i) 00187 { 00188 u64 sn ((*i).ext_id_); 00189 Descr& d = (*i).int_id_; 00190 00191 if (d.lost ()) 00192 { 00193 d.timer (d.timer () - 1); 00194 00195 if (d.timer () == 0) 00196 { 00197 //@@ Need exp fallback. 00198 // 00199 d.nak_count (d.nak_count () + 1); 00200 d.timer ((d.nak_count () + 1) * params_.nak_timeout ()); 00201 00202 nak->add (sn); 00203 00204 ++count; 00205 00206 // cerr << 6 << "NAK # " << d.nak_count () << ": " 00207 // << addr << " " << sn << endl; 00208 } 00209 } 00210 } 00211 00212 // Send this NAK. 00213 // 00214 if (nak->count ()) 00215 { 00216 // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns" 00217 // << endl; 00218 00219 Message_ptr m (new Message); 00220 00221 m->add (Profile_ptr (nak.release ())); 00222 00223 msgs.push_back (m); 00224 } 00225 } 00226 00227 // Detect and record new losses. 00228 // 00229 for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn) 00230 { 00231 if (q.find (sn) == -1) 00232 { 00233 q.bind (sn, Descr (1)); 00234 } 00235 } 00236 } |
|
Definition at line 384 of file Acknowledge.cpp. Referenced by out_start().
00385 {
00386 reinterpret_cast<Acknowledge*> (obj)->track ();
00387 return 0;
00388 }
|
|
Definition at line 239 of file Acknowledge.h. |
|
Definition at line 237 of file Acknowledge.h. Referenced by create_nrtm(), recv(), and track(). |
|
Definition at line 238 of file Acknowledge.h. |
|
Definition at line 241 of file Acknowledge.h. |
|
Definition at line 235 of file Acknowledge.h. Referenced by send(), track(), and track_queue(). |
|
Definition at line 243 of file Acknowledge.h. |
|
Definition at line 244 of file Acknowledge.h. Referenced by out_start(), and out_stop(). |