00001
00002
00003
00004
00005 #include "ace/Time_Value.h"
00006 #include "ace/OS_NS_unistd.h"
00007 #include "ace/OS_NS_stdlib.h"
00008 #include "ace/OS_NS_sys_time.h"
00009
00010 #include "Acknowledge.h"
00011
00012
00013
00014
00015
00016
00017
00018 namespace ACE_RMCast
00019 {
00020 Acknowledge::
00021 Acknowledge (Parameters const& params)
00022 : params_ (params),
00023 hold_ (params.addr_map_size ()),
00024 cond_ (mutex_),
00025 nrtm_timer_ (params_.nrtm_timeout ()),
00026 stop_ (false)
00027 {
00028 }
00029
00030 void Acknowledge::
00031 in_start (In_Element* in)
00032 {
00033 Element::in_start (in);
00034 }
00035
00036 void Acknowledge::
00037 out_start (Out_Element* out)
00038 {
00039 Element::out_start (out);
00040
00041 tracker_mgr_.spawn (track_thunk, this);
00042 }
00043
00044 void Acknowledge::
00045 out_stop ()
00046 {
00047 {
00048 Lock l (mutex_);
00049 stop_ = true;
00050 cond_.signal ();
00051 }
00052
00053 tracker_mgr_.wait ();
00054
00055 Element::out_stop ();
00056 }
00057
00058 void Acknowledge::
00059 collapse (Queue& q)
00060 {
00061
00062
00063
00064
00065
00066 u64 sn (q.sn () + 1);
00067
00068 for (;; ++sn)
00069 {
00070 Queue::ENTRY* e;
00071
00072 if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
00073
00074 Message_ptr m (e->int_id_.msg ());
00075 q.unbind (sn);
00076
00077 in_->recv (m);
00078 }
00079
00080 q.sn (sn - 1);
00081 }
00082
00083 void Acknowledge::
00084 track ()
00085 {
00086 while (true)
00087 {
00088 Messages msgs;
00089
00090 {
00091 Lock l (mutex_);
00092
00093 if (stop_)
00094 break;
00095
00096 if (hold_.current_size () != 0)
00097 {
00098 for (Map::iterator i (hold_.begin ()), e (hold_.end ());
00099 i != e;
00100 ++i)
00101 {
00102 Queue& q = (*i).int_id_;
00103
00104 if (q.current_size () == 0) continue;
00105
00106 track_queue ((*i).ext_id_, q, msgs);
00107 }
00108 }
00109
00110 if (--nrtm_timer_ == 0)
00111 {
00112 nrtm_timer_ = params_.nrtm_timeout ();
00113
00114
00115
00116 unsigned short max_payload_size (
00117 params_.max_packet_size () - max_service_size);
00118
00119 u32 max_elem (NRTM::max_count (max_payload_size));
00120
00121 Profile_ptr nrtm (create_nrtm (max_elem));
00122
00123 if (nrtm.get ())
00124 {
00125 Message_ptr m (new Message);
00126 m->add (nrtm);
00127 msgs.push_back (m);
00128
00129 }
00130 }
00131 }
00132
00133
00134
00135 for (Messages::Iterator i (msgs); !i.done (); i.advance ())
00136 {
00137 Message_ptr* ppm;
00138 i.next (ppm);
00139 send (*ppm);
00140 }
00141
00142
00143
00144 {
00145 ACE_Time_Value time (ACE_OS::gettimeofday ());
00146 time += params_.tick ();
00147
00148 Lock l (mutex_);
00149
00150 while (!stop_)
00151 {
00152 if (cond_.wait (&time) == -1)
00153 {
00154 if (errno != ETIME)
00155 ACE_OS::abort ();
00156 else
00157 break;
00158 }
00159 }
00160
00161 if (stop_)
00162 break;
00163 }
00164 }
00165 }
00166
00167 void Acknowledge::
00168 track_queue (Address const& addr, Queue& q, Messages& msgs)
00169 {
00170 unsigned short max_payload_size (
00171 params_.max_packet_size () - max_service_size);
00172
00173 u32 max_elem (NAK::max_count (max_payload_size));
00174 u32 count (0);
00175
00176 Queue::iterator i (q.begin ()), e (q.end ());
00177
00178
00179
00180 while (i != e)
00181 {
00182 NAK_ptr nak (new NAK (addr));
00183
00184
00185
00186 for (; i != e && nak->count () < max_elem; ++i)
00187 {
00188 u64 sn ((*i).ext_id_);
00189 Descr& d = (*i).int_id_;
00190
00191 if (d.lost ())
00192 {
00193 d.timer (d.timer () - 1);
00194
00195 if (d.timer () == 0)
00196 {
00197
00198
00199 d.nak_count (d.nak_count () + 1);
00200 d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
00201
00202 nak->add (sn);
00203
00204 ++count;
00205
00206
00207
00208 }
00209 }
00210 }
00211
00212
00213
00214 if (nak->count ())
00215 {
00216
00217
00218
00219 Message_ptr m (new Message);
00220
00221 m->add (Profile_ptr (nak.release ()));
00222
00223 msgs.push_back (m);
00224 }
00225 }
00226
00227
00228
00229 for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
00230 {
00231 if (q.find (sn) == -1)
00232 {
00233 q.bind (sn, Descr (1));
00234 }
00235 }
00236 }
00237
00238 void Acknowledge::
00239 recv (Message_ptr m)
00240 {
00241
00242
00243
00244
00245
00246 Lock l (mutex_);
00247
00248 if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
00249 {
00250 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00251 {
00252 u64 sn (nrtm->find ((*i).ext_id_));
00253
00254 if (sn != 0)
00255 {
00256 Queue& q = (*i).int_id_;
00257
00258 u64 old (q.max_sn ());
00259
00260 if (old < sn)
00261 {
00262
00263
00264 q.bind (sn, Descr (1));
00265 }
00266 }
00267 }
00268 }
00269
00270 if (m->find (Data::id) || m->find (NoData::id))
00271 {
00272 Address from (
00273 static_cast<From const*> (m->find (From::id))->address ());
00274
00275 u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
00276
00277 Map::ENTRY* e;
00278
00279 if (hold_.find (from, e) == -1)
00280 {
00281
00282
00283 hold_.bind (from, Queue (sn));
00284 in_->recv (m);
00285 }
00286 else
00287 {
00288 Queue& q = e->int_id_;
00289
00290 if (sn <= q.sn ())
00291 {
00292
00293
00294
00295
00296 }
00297 else if (sn == q.sn () + 1)
00298 {
00299
00300
00301
00302 q.rebind (sn, Descr (m));
00303 collapse (q);
00304 }
00305 else
00306 {
00307
00308
00309 q.rebind (sn, Descr (m));
00310 }
00311 }
00312 }
00313 else
00314 {
00315 l.release ();
00316
00317
00318
00319 in_->recv (m);
00320 }
00321 }
00322
00323 void Acknowledge::
00324 send (Message_ptr m)
00325 {
00326 if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00327 {
00328 size_t max_payload_size (
00329 params_.max_packet_size () - max_service_size);
00330
00331 if (max_payload_size > data->size ())
00332 {
00333 u32 max_size (max_payload_size - data->size ());
00334 u32 max_elem (NRTM::max_count (max_size));
00335
00336 if (max_elem > 0)
00337 {
00338 Lock l (mutex_);
00339
00340 Profile_ptr nrtm (create_nrtm (max_elem));
00341
00342 if (nrtm.get ())
00343 m->add (nrtm);
00344 }
00345 }
00346
00347 nrtm_timer_ = params_.nrtm_timeout ();
00348 }
00349
00350 out_->send (m);
00351 }
00352
00353 Profile_ptr Acknowledge::
00354 create_nrtm (u32 max_elem)
00355 {
00356
00357
00358 NRTM_ptr nrtm (new NRTM ());
00359
00360
00361
00362 {
00363 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00364 {
00365 Address addr ((*i).ext_id_);
00366 Queue& q = (*i).int_id_;
00367
00368
00369
00370 nrtm->insert (addr, q.sn ());
00371
00372 if (--max_elem == 0)
00373 break;
00374 }
00375 }
00376
00377 if (nrtm->empty ())
00378 return Profile_ptr (0);
00379 else
00380 return Profile_ptr (nrtm.release ());
00381 }
00382
00383 ACE_THR_FUNC_RETURN Acknowledge::
00384 track_thunk (void* obj)
00385 {
00386 reinterpret_cast<Acknowledge*> (obj)->track ();
00387 return 0;
00388 }
00389 }