00001
00002
00003
00004
00005 #include "ace/Time_Value.h"
00006 #include "ace/OS_NS_unistd.h"
00007 #include "ace/OS_NS_stdlib.h"
00008 #include "ace/OS_NS_sys_time.h"
00009
00010 #include "Acknowledge.h"
00011
00012
00013
00014
00015
00016
00017
00018 namespace ACE_RMCast
00019 {
00020 Acknowledge::
00021 Acknowledge (Parameters const& params)
00022 : params_ (params),
00023 hold_ (params.addr_map_size ()),
00024 cond_ (mutex_),
00025 nrtm_timer_ (params_.nrtm_timeout ()),
00026 stop_ (false)
00027 {
00028 }
00029
00030 void Acknowledge::
00031 in_start (In_Element* in)
00032 {
00033 Element::in_start (in);
00034 }
00035
00036 void Acknowledge::
00037 out_start (Out_Element* out)
00038 {
00039 Element::out_start (out);
00040
00041 tracker_mgr_.spawn (track_thunk, this);
00042 }
00043
00044 void Acknowledge::
00045 out_stop ()
00046 {
00047 {
00048 Lock l (mutex_);
00049 stop_ = true;
00050 cond_.signal ();
00051 }
00052
00053 tracker_mgr_.wait ();
00054
00055 Element::out_stop ();
00056 }
00057
00058 void Acknowledge::
00059 collapse (Queue& q)
00060 {
00061
00062
00063
00064
00065
00066 u64 sn (q.sn () + 1);
00067
00068 for (;; ++sn)
00069 {
00070 Queue::ENTRY* e;
00071
00072 if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
00073
00074 Message_ptr m (e->int_id_.msg ());
00075 q.unbind (sn);
00076
00077 in_->recv (m);
00078 }
00079
00080 q.sn (sn - 1);
00081 }
00082
00083 void Acknowledge::
00084 track ()
00085 {
00086 while (true)
00087 {
00088 Messages msgs;
00089
00090 {
00091 Lock l (mutex_);
00092
00093 if (stop_)
00094 break;
00095
00096 if (hold_.current_size () != 0)
00097 {
00098 for (Map::iterator i (hold_.begin ()), e (hold_.end ());
00099 i != e;
00100 ++i)
00101 {
00102 Queue& q = (*i).int_id_;
00103
00104 if (q.current_size () == 0) continue;
00105
00106 track_queue ((*i).ext_id_, q, msgs);
00107 }
00108 }
00109
00110 if (--nrtm_timer_ == 0)
00111 {
00112 nrtm_timer_ = params_.nrtm_timeout ();
00113
00114
00115
00116 unsigned short max_payload_size (
00117 params_.max_packet_size () - max_service_size);
00118
00119 u32 max_elem (NRTM::max_count (max_payload_size));
00120
00121 Profile_ptr nrtm (create_nrtm (max_elem));
00122
00123 if (!nrtm.null ())
00124 {
00125 Message_ptr m (new Message);
00126 m->add (nrtm);
00127 msgs.push_back (m);
00128
00129 }
00130 }
00131 }
00132
00133
00134
00135 for (Messages::Iterator i (msgs); !i.done (); i.advance ())
00136 {
00137 Message_ptr* ppm;
00138 i.next (ppm);
00139
00140
00141 send (*ppm);
00142
00143 }
00144
00145
00146
00147 {
00148
00149 ACE_Time_Value time (ACE_OS::gettimeofday ());
00150
00151
00152 time += params_.tick ();
00153
00154 Lock l (mutex_);
00155
00156 while (!stop_)
00157 {
00158 if (cond_.wait (&time) == -1)
00159 {
00160 if (errno != ETIME)
00161 ACE_OS::abort ();
00162 else
00163 break;
00164 }
00165 }
00166
00167 if (stop_)
00168 break;
00169 }
00170 }
00171 }
00172
00173 void Acknowledge::
00174 track_queue (Address const& addr, Queue& q, Messages& msgs)
00175 {
00176 unsigned short max_payload_size (
00177 params_.max_packet_size () - max_service_size);
00178
00179 u32 max_elem (NAK::max_count (max_payload_size));
00180 u32 count (0);
00181
00182 Queue::iterator i (q.begin ()), e (q.end ());
00183
00184
00185
00186 while (i != e)
00187 {
00188 auto_ptr<NAK> nak (new NAK (addr));
00189
00190
00191
00192 for (; i != e && nak->count () < max_elem; ++i)
00193 {
00194 u64 sn ((*i).ext_id_);
00195 Descr& d = (*i).int_id_;
00196
00197 if (d.lost ())
00198 {
00199 d.timer (d.timer () - 1);
00200
00201 if (d.timer () == 0)
00202 {
00203
00204
00205 d.nak_count (d.nak_count () + 1);
00206 d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
00207
00208 nak->add (sn);
00209
00210 ++count;
00211
00212
00213
00214 }
00215 }
00216 }
00217
00218
00219
00220 if (nak->count ())
00221 {
00222
00223
00224
00225 Message_ptr m (new Message);
00226
00227 m->add (Profile_ptr (nak.release ()));
00228
00229 msgs.push_back (m);
00230 }
00231 }
00232
00233
00234
00235 for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
00236 {
00237 if (q.find (sn) == -1)
00238 {
00239 q.bind (sn, Descr (1));
00240 }
00241 }
00242 }
00243
00244 void Acknowledge::recv (Message_ptr m)
00245 {
00246
00247
00248
00249
00250
00251 Lock l (mutex_);
00252
00253 if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
00254 {
00255 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00256 {
00257 u64 sn (nrtm->find ((*i).ext_id_));
00258
00259 if (sn != 0)
00260 {
00261 Queue& q = (*i).int_id_;
00262
00263 u64 old (q.max_sn ());
00264
00265 if (old < sn)
00266 {
00267
00268
00269 q.bind (sn, Descr (1));
00270 }
00271 }
00272 }
00273 }
00274
00275 if (m->find (Data::id) || m->find (NoData::id))
00276 {
00277 Address from (
00278 static_cast<From const*> (m->find (From::id))->address ());
00279
00280 u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
00281
00282 Map::ENTRY* e;
00283
00284 if (hold_.find (from, e) == -1)
00285 {
00286
00287
00288 hold_.bind (from, Queue (sn));
00289 in_->recv (m);
00290 }
00291 else
00292 {
00293 Queue& q = e->int_id_;
00294
00295 if (sn <= q.sn ())
00296 {
00297
00298
00299
00300
00301 }
00302 else if (sn == q.sn () + 1)
00303 {
00304
00305
00306
00307 q.rebind (sn, Descr (m));
00308 collapse (q);
00309 }
00310 else
00311 {
00312
00313
00314 q.rebind (sn, Descr (m));
00315 }
00316 }
00317 }
00318 else
00319 {
00320 l.release ();
00321
00322
00323
00324 in_->recv (m);
00325 }
00326 }
00327
00328 void Acknowledge::send (Message_ptr m)
00329 {
00330 if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00331 {
00332 size_t max_payload_size (
00333 params_.max_packet_size () - max_service_size);
00334
00335 if (max_payload_size > data->size ())
00336 {
00337 u32 max_size (max_payload_size - data->size ());
00338 u32 max_elem (NRTM::max_count (max_size));
00339
00340 if (max_elem > 0)
00341 {
00342 Lock l (mutex_);
00343
00344 Profile_ptr nrtm (create_nrtm (max_elem));
00345
00346 if (nrtm.get ())
00347 m->add (nrtm);
00348 }
00349 }
00350
00351 nrtm_timer_ = params_.nrtm_timeout ();
00352 }
00353
00354 out_->send (m);
00355 }
00356
00357 Profile_ptr Acknowledge::
00358 create_nrtm (u32 max_elem)
00359 {
00360
00361
00362 auto_ptr<NRTM> nrtm (new NRTM ());
00363
00364
00365
00366 {
00367 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
00368 {
00369 Address addr ((*i).ext_id_);
00370 Queue& q = (*i).int_id_;
00371
00372
00373
00374 nrtm->insert (addr, q.sn ());
00375
00376 if (--max_elem == 0)
00377 break;
00378 }
00379 }
00380
00381 if (nrtm->empty ())
00382 return Profile_ptr (0);
00383 else
00384 return Profile_ptr (nrtm.release ());
00385 }
00386
00387 ACE_THR_FUNC_RETURN Acknowledge::
00388 track_thunk (void* obj)
00389 {
00390 reinterpret_cast<Acknowledge*> (obj)->track ();
00391 return 0;
00392 }
00393 }