#include <Acknowledge.h>
Inheritance diagram for ACE_RMCast::Acknowledge:


Public Member Functions | |
| Acknowledge (Parameters const ¶ms) | |
| virtual void | in_start (In_Element *in) |
| virtual void | out_start (Out_Element *out) |
| virtual void | out_stop () |
| virtual void | recv (Message_ptr m) |
| virtual void | send (Message_ptr m) |
Private Types | |
| typedef ACE_Hash_Map_Manager_Ex< Address, Queue, AddressHasher, ACE_Equal_To< Address >, ACE_Null_Mutex > | Map |
Private Member Functions | |
| void | collapse (Queue &q) |
| void | track () |
| void | track_queue (Address const &addr, Queue &q, Messages &msgs) |
| Profile_ptr | create_nrtm (u32 max_elem) |
Static Private Member Functions | |
| ACE_THR_FUNC_RETURN | track_thunk (void *obj) |
Private Attributes | |
| Parameters const & | params_ |
| Map | hold_ |
| Mutex | mutex_ |
| Condition | cond_ |
| unsigned long | nrtm_timer_ |
| bool | stop_ |
| ACE_Thread_Manager | tracker_mgr_ |
|
|
Definition at line 216 of file Acknowledge.h. |
|
|
Definition at line 21 of file Acknowledge.cpp.
|
|
|
Definition at line 59 of file Acknowledge.cpp. References ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Message_ptr, ACE_RMCast::In_Element::recv(), ACE_RMCast::Acknowledge::Queue::sn(), ACE_RMCast::u64, and ACE_RMCast::Acknowledge::Queue::unbind(). Referenced by recv().
00060 {
00061 // I would normally use iterators in the logic below but ACE_Map_Manager
00062 // iterates over entries in no particular order so it is pretty much
00063 // unusable here. Instead we will do slow and cumbersome find's.
00064 //
00065
00066 u64 sn (q.sn () + 1);
00067
00068 for (;; ++sn)
00069 {
00070 Queue::ENTRY* e;
00071
00072 if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
00073
00074 Message_ptr m (e->int_id_.msg ());
00075 q.unbind (sn);
00076
00077 in_->recv (m);
00078 }
00079
00080 q.sn (sn - 1);
00081 }
|
|
|
Definition at line 358 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), hold_, ACE_RMCast::Profile_ptr, ACE_Auto_Basic_Ptr< X >::release(), ACE_RMCast::Acknowledge::Queue::sn(), and ACE_RMCast::u32. Referenced by send(), and track().
00359 {
00360 // Prepare NRTM.
00361 //
00362 auto_ptr<NRTM> nrtm (new NRTM ());
00363
00364 // Gather the information.
00365 //
00366 {
00367 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00368 {
00369 Address addr ((*i).ext_id_);
00370 Queue& q = (*i).int_id_;
00371
00372 //@@ Should look for the highest known number.
00373 //
00374 nrtm->insert (addr, q.sn ());
00375
00376 if (--max_elem == 0)
00377 break;
00378 }
00379 }
00380
00381 if (nrtm->empty ())
00382 return Profile_ptr (0);
00383 else
00384 return Profile_ptr (nrtm.release ());
00385 }
|
|
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 31 of file Acknowledge.cpp.
00032 {
00033 Element::in_start (in);
00034 }
|
|
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 37 of file Acknowledge.cpp. References ACE_Thread_Manager::spawn(), track_thunk(), and tracker_mgr_.
00038 {
00039 Element::out_start (out);
00040
00041 tracker_mgr_.spawn (track_thunk, this);
00042 }
|
|
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 45 of file Acknowledge.cpp. References ACE_RMCast::Lock, ACE_Condition< MUTEX >::signal(), tracker_mgr_, and ACE_Thread_Manager::wait().
00046 {
00047 {
00048 Lock l (mutex_);
00049 stop_ = true;
00050 cond_.signal ();
00051 }
00052
00053 tracker_mgr_.wait ();
00054
00055 Element::out_stop ();
00056 }
|
|
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 244 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), ACE_RMCast::Acknowledge::Queue::bind(), collapse(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), ACE_RMCast::NRTM::find(), hold_, ACE_RMCast::Lock, ACE_RMCast::Acknowledge::Queue::max_sn(), ACE_RMCast::Message_ptr, ACE_RMCast::Acknowledge::Queue::rebind(), ACE_RMCast::In_Element::recv(), ACE_Guard< ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), and ACE_RMCast::u64.
00245 {
00246 // Handle NRTM. There could be some nasty interaction with code
00247 // that handles data below (like missing message and NAK). This
00248 // is why I hold the lock at the beginning (which may be not very
00249 // efficient).
00250 //
00251 Lock l (mutex_);
00252
00253 if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
00254 {
00255 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00256 {
00257 u64 sn (nrtm->find ((*i).ext_id_));
00258
00259 if (sn != 0)
00260 {
00261 Queue& q = (*i).int_id_;
00262
00263 u64 old (q.max_sn ());
00264
00265 if (old < sn)
00266 {
00267 // Mark as lost.
00268 //
00269 q.bind (sn, Descr (1));
00270 }
00271 }
00272 }
00273 }
00274
00275 if (m->find (Data::id) || m->find (NoData::id))
00276 {
00277 Address from (
00278 static_cast<From const*> (m->find (From::id))->address ());
00279
00280 u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
00281
00282 Map::ENTRY* e;
00283
00284 if (hold_.find (from, e) == -1)
00285 {
00286 // First message from this source.
00287 //
00288 hold_.bind (from, Queue (sn));
00289 in_->recv (m);
00290 }
00291 else
00292 {
00293 Queue& q = e->int_id_;
00294
00295 if (sn <= q.sn ())
00296 {
00297 // Duplicate.
00298 //
00299 //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
00300 // << endl;
00301 }
00302 else if (sn == q.sn () + 1)
00303 {
00304 // Next message.
00305 //
00306
00307 q.rebind (sn, Descr (m));
00308 collapse (q);
00309 }
00310 else
00311 {
00312 // Some messages are missing. Insert this one into the queue.
00313 //
00314 q.rebind (sn, Descr (m));
00315 }
00316 }
00317 }
00318 else
00319 {
00320 l.release ();
00321
00322 // Just forward it up.
00323 //
00324 in_->recv (m);
00325 }
00326 }
|
|
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 328 of file Acknowledge.cpp. References create_nrtm(), ACE_Strong_Bound_Ptr< X, ACE_LOCK >::get(), ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Message_ptr, ACE_RMCast::Parameters::nrtm_timeout(), nrtm_timer_, params_, ACE_RMCast::Profile_ptr, ACE_RMCast::Data::size(), and ACE_RMCast::u32. Referenced by track().
00329 {
00330 if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00331 {
00332 size_t max_payload_size (
00333 params_.max_packet_size () - max_service_size);
00334
00335 if (max_payload_size > data->size ())
00336 {
00337 u32 max_size (max_payload_size - data->size ());
00338 u32 max_elem (NRTM::max_count (max_size));
00339
00340 if (max_elem > 0)
00341 {
00342 Lock l (mutex_);
00343
00344 Profile_ptr nrtm (create_nrtm (max_elem));
00345
00346 if (nrtm.get ())
00347 m->add (nrtm);
00348 }
00349 }
00350
00351 nrtm_timer_ = params_.nrtm_timeout (); // Reset timer.
00352 }
00353
00354 out_->send (m);
00355 }
|
|
|
Definition at line 84 of file Acknowledge.cpp. References ACE_OS::abort(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), create_nrtm(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::current_size(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::current_size(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ETIME, hold_, ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Message_ptr, ACE_RMCast::Messages, ACE_RMCast::Parameters::nrtm_timeout(), nrtm_timer_, ACE_Strong_Bound_Ptr< X, ACE_LOCK >::null(), params_, ACE_RMCast::Profile_ptr, ACE_Vector< T, DEFAULT_SIZE >::push_back(), send(), ACE_RMCast::Parameters::tick(), track_queue(), ACE_RMCast::u32, and ACE_Condition< MUTEX >::wait().
00085 {
00086 while (true)
00087 {
00088 Messages msgs;
00089
00090 {
00091 Lock l (mutex_);
00092
00093 if (stop_)
00094 break;
00095
00096 if (hold_.current_size () != 0)
00097 {
00098 for (Map::iterator i (hold_.begin ()), e (hold_.end ());
00099 i != e;
00100 ++i)
00101 {
00102 Queue& q = (*i).int_id_;
00103
00104 if (q.current_size () == 0) continue;
00105
00106 track_queue ((*i).ext_id_, q, msgs);
00107 }
00108 }
00109
00110 if (--nrtm_timer_ == 0)
00111 {
00112 nrtm_timer_ = params_.nrtm_timeout ();
00113
00114 // Send NRTM.
00115 //
00116 unsigned short max_payload_size (
00117 params_.max_packet_size () - max_service_size);
00118
00119 u32 max_elem (NRTM::max_count (max_payload_size));
00120
00121 Profile_ptr nrtm (create_nrtm (max_elem));
00122
00123 if (!nrtm.null ())
00124 {
00125 Message_ptr m (new Message);
00126 m->add (nrtm);
00127 msgs.push_back (m);
00128
00129 }
00130 }
00131 }
00132
00133 // Send stuff off.
00134 //
00135 for (Messages::Iterator i (msgs); !i.done (); i.advance ())
00136 {
00137 Message_ptr* ppm;
00138 i.next (ppm);
00139
00140 //FUZZ: disable check_for_lack_ACE_OS
00141 send (*ppm);
00142 //FUZZ: enable check_for_lack_ACE_OS
00143 }
00144
00145 // Go to sleep but watch for "manual cancellation" request.
00146 //
00147 {
00148 //FUZZ: disable check_for_lack_ACE_OS
00149 ACE_Time_Value time (ACE_OS::gettimeofday ());
00150 //FUZZ: enable check_for_lack_ACE_OS
00151
00152 time += params_.tick ();
00153
00154 Lock l (mutex_);
00155
00156 while (!stop_)
00157 {
00158 if (cond_.wait (&time) == -1)
00159 {
00160 if (errno != ETIME)
00161 ACE_OS::abort ();
00162 else
00163 break;
00164 }
00165 }
00166
00167 if (stop_)
00168 break;
00169 }
00170 }
00171 }
|
|
||||||||||||||||
|
Definition at line 174 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::begin(), ACE_RMCast::Acknowledge::Queue::bind(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::end(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Acknowledge::Descr::lost(), ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Acknowledge::Queue::max_sn(), ACE_RMCast::Message_ptr, ACE_RMCast::Messages, ACE_RMCast::Acknowledge::Descr::nak_count(), ACE_RMCast::Parameters::nak_timeout(), params_, ACE_RMCast::Profile_ptr, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Auto_Basic_Ptr< X >::release(), ACE_RMCast::Acknowledge::Queue::sn(), ACE_RMCast::Acknowledge::Descr::timer(), ACE_RMCast::u32, and ACE_RMCast::u64. Referenced by track().
00175 {
00176 unsigned short max_payload_size (
00177 params_.max_packet_size () - max_service_size);
00178
00179 u32 max_elem (NAK::max_count (max_payload_size));
00180 u32 count (0);
00181
00182 Queue::iterator i (q.begin ()), e (q.end ());
00183
00184 // Track existing losses.
00185 //
00186 while (i != e)
00187 {
00188 auto_ptr<NAK> nak (new NAK (addr));
00189
00190 // Inner loop that fills NAK profile with up to max_elem elements.
00191 //
00192 for (; i != e && nak->count () < max_elem; ++i)
00193 {
00194 u64 sn ((*i).ext_id_);
00195 Descr& d = (*i).int_id_;
00196
00197 if (d.lost ())
00198 {
00199 d.timer (d.timer () - 1);
00200
00201 if (d.timer () == 0)
00202 {
00203 //@@ Need exp fallback.
00204 //
00205 d.nak_count (d.nak_count () + 1);
00206 d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
00207
00208 nak->add (sn);
00209
00210 ++count;
00211
00212 // cerr << 6 << "NAK # " << d.nak_count () << ": "
00213 // << addr << " " << sn << endl;
00214 }
00215 }
00216 }
00217
00218 // Send this NAK.
00219 //
00220 if (nak->count ())
00221 {
00222 // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
00223 // << endl;
00224
00225 Message_ptr m (new Message);
00226
00227 m->add (Profile_ptr (nak.release ()));
00228
00229 msgs.push_back (m);
00230 }
00231 }
00232
00233 // Detect and record new losses.
00234 //
00235 for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
00236 {
00237 if (q.find (sn) == -1)
00238 {
00239 q.bind (sn, Descr (1));
00240 }
00241 }
00242 }
|
|
|
Definition at line 388 of file Acknowledge.cpp. Referenced by out_start().
00389 {
00390 reinterpret_cast<Acknowledge*> (obj)->track ();
00391 return 0;
00392 }
|
|
|
Definition at line 239 of file Acknowledge.h. |
|
|
Definition at line 237 of file Acknowledge.h. Referenced by create_nrtm(), recv(), and track(). |
|
|
Definition at line 238 of file Acknowledge.h. |
|
|
Definition at line 241 of file Acknowledge.h. |
|
|
Definition at line 235 of file Acknowledge.h. Referenced by send(), track(), and track_queue(). |
|
|
Definition at line 243 of file Acknowledge.h. |
|
|
Definition at line 244 of file Acknowledge.h. Referenced by out_start(), and out_stop(). |
1.3.6