#include <Acknowledge.h>
Inheritance diagram for ACE_RMCast::Acknowledge:


Public Member Functions | |
| Acknowledge (Parameters const ¶ms) | |
| virtual void | in_start (In_Element *in) |
| virtual void | out_start (Out_Element *out) |
| virtual void | out_stop () |
| virtual void | recv (Message_ptr m) |
| virtual void | send (Message_ptr m) |
Private Types | |
| typedef ACE_Hash_Map_Manager_Ex< Address, Queue, AddressHasher, ACE_Equal_To< Address >, ACE_Null_Mutex > | Map |
Private Member Functions | |
| void | collapse (Queue &q) |
| void | track () |
| void | track_queue (Address const &addr, Queue &q, Messages &msgs) |
| Profile_ptr | create_nrtm (u32 max_elem) |
Static Private Member Functions | |
| ACE_THR_FUNC_RETURN | track_thunk (void *obj) |
Private Attributes | |
| Parameters const & | params_ |
| Map | hold_ |
| Mutex | mutex_ |
| Condition | cond_ |
| unsigned long | nrtm_timer_ |
| bool | stop_ |
| ACE_Thread_Manager | tracker_mgr_ |
|
|
Definition at line 216 of file Acknowledge.h. |
|
|
Definition at line 21 of file Acknowledge.cpp.
|
|
|
Definition at line 59 of file Acknowledge.cpp. References ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Message_ptr, ACE_RMCast::In_Element::recv(), ACE_RMCast::Acknowledge::Queue::sn(), ACE_RMCast::u64, and ACE_RMCast::Acknowledge::Queue::unbind(). Referenced by recv().
00060 {
00061 // I would normally use iterators in the logic below but ACE_Map_Manager
00062 // iterates over entries in no particular order so it is pretty much
00063 // unusable here. Instead we will do slow and cumbersome find's.
00064 //
00065
00066 u64 sn (q.sn () + 1);
00067
00068 for (;; ++sn)
00069 {
00070 Queue::ENTRY* e;
00071
00072 if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
00073
00074 Message_ptr m (e->int_id_.msg ());
00075 q.unbind (sn);
00076
00077 in_->recv (m);
00078 }
00079
00080 q.sn (sn - 1);
00081 }
|
|
|
Definition at line 354 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), hold_, ACE_RMCast::NRTM_ptr, ACE_RMCast::Profile_ptr, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), and ACE_RMCast::u32. Referenced by send(), and track().
00355 {
00356 // Prepare NRTM.
00357 //
00358 NRTM_ptr nrtm (new NRTM ());
00359
00360 // Gather the information.
00361 //
00362 {
00363 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00364 {
00365 Address addr ((*i).ext_id_);
00366 Queue& q = (*i).int_id_;
00367
00368 //@@ Should look for the highest known number.
00369 //
00370 nrtm->insert (addr, q.sn ());
00371
00372 if (--max_elem == 0)
00373 break;
00374 }
00375 }
00376
00377 if (nrtm->empty ())
00378 return Profile_ptr (0);
00379 else
00380 return Profile_ptr (nrtm.release ());
00381 }
|
|
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 31 of file Acknowledge.cpp.
00032 {
00033 Element::in_start (in);
00034 }
|
|
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 37 of file Acknowledge.cpp. References ACE_Thread_Manager::spawn(), track_thunk(), and tracker_mgr_.
00038 {
00039 Element::out_start (out);
00040
00041 tracker_mgr_.spawn (track_thunk, this);
00042 }
|
|
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 45 of file Acknowledge.cpp. References ACE_RMCast::Lock, ACE_Condition< MUTEX >::signal(), tracker_mgr_, and ACE_Thread_Manager::wait().
00046 {
00047 {
00048 Lock l (mutex_);
00049 stop_ = true;
00050 cond_.signal ();
00051 }
00052
00053 tracker_mgr_.wait ();
00054
00055 Element::out_stop ();
00056 }
|
|
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 239 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), ACE_RMCast::Acknowledge::Queue::bind(), collapse(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), ACE_RMCast::NRTM::find(), hold_, ACE_RMCast::Lock, ACE_RMCast::Acknowledge::Queue::max_sn(), ACE_RMCast::Message_ptr, ACE_RMCast::Acknowledge::Queue::rebind(), ACE_RMCast::In_Element::recv(), ACE_Guard< ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), and ACE_RMCast::u64.
00240 {
00241 // Handle NRTM. There could be some nasty interaction with code
00242 // that handles data below (like missing message and NAK). This
00243 // is why I hold the lock at the beginning (which may be not very
00244 // efficient).
00245 //
00246 Lock l (mutex_);
00247
00248 if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
00249 {
00250 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00251 {
00252 u64 sn (nrtm->find ((*i).ext_id_));
00253
00254 if (sn != 0)
00255 {
00256 Queue& q = (*i).int_id_;
00257
00258 u64 old (q.max_sn ());
00259
00260 if (old < sn)
00261 {
00262 // Mark as lost.
00263 //
00264 q.bind (sn, Descr (1));
00265 }
00266 }
00267 }
00268 }
00269
00270 if (m->find (Data::id) || m->find (NoData::id))
00271 {
00272 Address from (
00273 static_cast<From const*> (m->find (From::id))->address ());
00274
00275 u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
00276
00277 Map::ENTRY* e;
00278
00279 if (hold_.find (from, e) == -1)
00280 {
00281 // First message from this source.
00282 //
00283 hold_.bind (from, Queue (sn));
00284 in_->recv (m);
00285 }
00286 else
00287 {
00288 Queue& q = e->int_id_;
00289
00290 if (sn <= q.sn ())
00291 {
00292 // Duplicate.
00293 //
00294 //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
00295 // << endl;
00296 }
00297 else if (sn == q.sn () + 1)
00298 {
00299 // Next message.
00300 //
00301
00302 q.rebind (sn, Descr (m));
00303 collapse (q);
00304 }
00305 else
00306 {
00307 // Some messages are missing. Insert this one into the queue.
00308 //
00309 q.rebind (sn, Descr (m));
00310 }
00311 }
00312 }
00313 else
00314 {
00315 l.release ();
00316
00317 // Just forward it up.
00318 //
00319 in_->recv (m);
00320 }
00321 }
|
|
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 324 of file Acknowledge.cpp. References create_nrtm(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Message_ptr, ACE_RMCast::Parameters::nrtm_timeout(), nrtm_timer_, params_, ACE_RMCast::Profile_ptr, ACE_RMCast::Data::size(), and ACE_RMCast::u32. Referenced by track().
00325 {
00326 if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00327 {
00328 size_t max_payload_size (
00329 params_.max_packet_size () - max_service_size);
00330
00331 if (max_payload_size > data->size ())
00332 {
00333 u32 max_size (max_payload_size - data->size ());
00334 u32 max_elem (NRTM::max_count (max_size));
00335
00336 if (max_elem > 0)
00337 {
00338 Lock l (mutex_);
00339
00340 Profile_ptr nrtm (create_nrtm (max_elem));
00341
00342 if (nrtm.get ())
00343 m->add (nrtm);
00344 }
00345 }
00346
00347 nrtm_timer_ = params_.nrtm_timeout (); // Reset timer.
00348 }
00349
00350 out_->send (m);
00351 }
|
|
|
Definition at line 84 of file Acknowledge.cpp. References ACE_OS::abort(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), create_nrtm(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::current_size(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::current_size(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), ETIME, ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), hold_, ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Message_ptr, ACE_RMCast::Messages, ACE_RMCast::Parameters::nrtm_timeout(), nrtm_timer_, params_, ACE_RMCast::Profile_ptr, ACE_Vector< T, DEFAULT_SIZE >::push_back(), send(), ACE_RMCast::Parameters::tick(), track_queue(), ACE_RMCast::u32, and ACE_Condition< MUTEX >::wait().
00085 {
00086 while (true)
00087 {
00088 Messages msgs;
00089
00090 {
00091 Lock l (mutex_);
00092
00093 if (stop_)
00094 break;
00095
00096 if (hold_.current_size () != 0)
00097 {
00098 for (Map::iterator i (hold_.begin ()), e (hold_.end ());
00099 i != e;
00100 ++i)
00101 {
00102 Queue& q = (*i).int_id_;
00103
00104 if (q.current_size () == 0) continue;
00105
00106 track_queue ((*i).ext_id_, q, msgs);
00107 }
00108 }
00109
00110 if (--nrtm_timer_ == 0)
00111 {
00112 nrtm_timer_ = params_.nrtm_timeout ();
00113
00114 // Send NRTM.
00115 //
00116 unsigned short max_payload_size (
00117 params_.max_packet_size () - max_service_size);
00118
00119 u32 max_elem (NRTM::max_count (max_payload_size));
00120
00121 Profile_ptr nrtm (create_nrtm (max_elem));
00122
00123 if (nrtm.get ())
00124 {
00125 Message_ptr m (new Message);
00126 m->add (nrtm);
00127 msgs.push_back (m);
00128
00129 }
00130 }
00131 }
00132
00133 // Send stuff off.
00134 //
00135 for (Messages::Iterator i (msgs); !i.done (); i.advance ())
00136 {
00137 Message_ptr* ppm;
00138 i.next (ppm);
00139 send (*ppm);
00140 }
00141
00142 // Go to sleep but watch for "manual cancellation" request.
00143 //
00144 {
00145 ACE_Time_Value time (ACE_OS::gettimeofday ());
00146 time += params_.tick ();
00147
00148 Lock l (mutex_);
00149
00150 while (!stop_)
00151 {
00152 if (cond_.wait (&time) == -1)
00153 {
00154 if (errno != ETIME)
00155 ACE_OS::abort ();
00156 else
00157 break;
00158 }
00159 }
00160
00161 if (stop_)
00162 break;
00163 }
00164 }
00165 }
|
|
||||||||||||||||
|
Definition at line 168 of file Acknowledge.cpp. References ACE_RMCast::Address, ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::begin(), ACE_RMCast::Acknowledge::Queue::bind(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::count(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::end(), ACE_Hash_Map_Manager_Ex< u64, Descr, ACE_Hash< u64 >, ACE_Equal_To< u64 >, ACE_Null_Mutex >::find(), ACE_RMCast::Acknowledge::Descr::lost(), ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::max_service_size, ACE_RMCast::Acknowledge::Queue::max_sn(), ACE_RMCast::Message_ptr, ACE_RMCast::Messages, ACE_RMCast::Acknowledge::Descr::nak_count(), ACE_RMCast::NAK_ptr, ACE_RMCast::Parameters::nak_timeout(), params_, ACE_RMCast::Profile_ptr, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::release(), ACE_RMCast::Acknowledge::Queue::sn(), ACE_RMCast::Acknowledge::Descr::timer(), ACE_RMCast::u32, and ACE_RMCast::u64. Referenced by track().
00169 {
00170 unsigned short max_payload_size (
00171 params_.max_packet_size () - max_service_size);
00172
00173 u32 max_elem (NAK::max_count (max_payload_size));
00174 u32 count (0);
00175
00176 Queue::iterator i (q.begin ()), e (q.end ());
00177
00178 // Track existing losses.
00179 //
00180 while (i != e)
00181 {
00182 NAK_ptr nak (new NAK (addr));
00183
00184 // Inner loop that fills NAK profile with up to max_elem elements.
00185 //
00186 for (; i != e && nak->count () < max_elem; ++i)
00187 {
00188 u64 sn ((*i).ext_id_);
00189 Descr& d = (*i).int_id_;
00190
00191 if (d.lost ())
00192 {
00193 d.timer (d.timer () - 1);
00194
00195 if (d.timer () == 0)
00196 {
00197 //@@ Need exp fallback.
00198 //
00199 d.nak_count (d.nak_count () + 1);
00200 d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
00201
00202 nak->add (sn);
00203
00204 ++count;
00205
00206 // cerr << 6 << "NAK # " << d.nak_count () << ": "
00207 // << addr << " " << sn << endl;
00208 }
00209 }
00210 }
00211
00212 // Send this NAK.
00213 //
00214 if (nak->count ())
00215 {
00216 // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
00217 // << endl;
00218
00219 Message_ptr m (new Message);
00220
00221 m->add (Profile_ptr (nak.release ()));
00222
00223 msgs.push_back (m);
00224 }
00225 }
00226
00227 // Detect and record new losses.
00228 //
00229 for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
00230 {
00231 if (q.find (sn) == -1)
00232 {
00233 q.bind (sn, Descr (1));
00234 }
00235 }
00236 }
|
|
|
Definition at line 384 of file Acknowledge.cpp. Referenced by out_start().
00385 {
00386 reinterpret_cast<Acknowledge*> (obj)->track ();
00387 return 0;
00388 }
|
|
|
Definition at line 239 of file Acknowledge.h. |
|
|
Definition at line 237 of file Acknowledge.h. Referenced by create_nrtm(), recv(), and track(). |
|
|
Definition at line 238 of file Acknowledge.h. |
|
|
Definition at line 241 of file Acknowledge.h. |
|
|
Definition at line 235 of file Acknowledge.h. Referenced by send(), track(), and track_queue(). |
|
|
Definition at line 243 of file Acknowledge.h. |
|
|
Definition at line 244 of file Acknowledge.h. Referenced by out_start(), and out_stop(). |
1.3.6