00001
00002
00003
00004
00005 #include "ace/Time_Value.h"
00006 #include "ace/OS_NS_stdlib.h"
00007 #include "ace/OS_NS_sys_time.h"
00008
00009 #include "Retransmit.h"
00010
00011
00012
00013
00014
00015
00016
00017 namespace ACE_RMCast
00018 {
00019 Retransmit::
00020 Retransmit (Parameters const& params)
00021 : params_ (params),
00022 cond_ (mutex_),
00023 stop_ (false)
00024 {
00025 }
00026
00027 void Retransmit::
00028 out_start (Out_Element* out)
00029 {
00030 Element::out_start (out);
00031
00032 tracker_mgr_.spawn (track_thunk, this);
00033 }
00034
00035 void Retransmit::
00036 out_stop ()
00037 {
00038 {
00039 Lock l (mutex_);
00040 stop_ = true;
00041 cond_.signal ();
00042 }
00043
00044 tracker_mgr_.wait ();
00045
00046 Element::out_stop ();
00047 }
00048
00049 void Retransmit::
00050 send (Message_ptr m)
00051 {
00052 if (m->find (Data::id) != 0)
00053 {
00054 SN const* sn = static_cast<SN const*> (m->find (SN::id));
00055
00056 Lock l (mutex_);
00057 queue_.bind (sn->num (), Descr (m->clone ()));
00058 }
00059
00060 out_->send (m);
00061 }
00062
00063 void Retransmit::
00064 recv (Message_ptr m)
00065 {
00066 if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
00067 {
00068 Address to (static_cast<To const*> (m->find (To::id))->address ());
00069
00070 if (nak->address () == to)
00071 {
00072 Lock l (mutex_);
00073
00074 for (NAK::iterator j (const_cast<NAK*> (nak)->begin ());
00075 !j.done ();
00076 j.advance ())
00077 {
00078 u64* psn;
00079 j.next (psn);
00080
00081 Message_ptr m;
00082
00083 Queue::ENTRY* pair;
00084
00085 if (queue_.find (*psn, pair) == 0)
00086 {
00087
00088
00089 m = pair->int_id_.message ();
00090
00091 pair->int_id_.reset ();
00092 }
00093 else
00094 {
00095
00096
00097 m = Message_ptr (new Message);
00098 m->add (Profile_ptr (new SN (*psn)));
00099 m->add (Profile_ptr (new NoData));
00100 }
00101
00102 out_->send (m);
00103 }
00104 }
00105 }
00106
00107 in_->recv (m);
00108 }
00109
00110 ACE_THR_FUNC_RETURN Retransmit::
00111 track_thunk (void* obj)
00112 {
00113 reinterpret_cast<Retransmit*> (obj)->track ();
00114 return 0;
00115 }
00116
00117 void Retransmit::
00118 track ()
00119 {
00120 while (true)
00121 {
00122 Lock l (mutex_);
00123
00124 for (Queue::iterator i (queue_); !i.done ();)
00125 {
00126 if ((*i).int_id_.inc () >= params_.retention_timeout ())
00127 {
00128 u64 sn ((*i).ext_id_);
00129 i.advance ();
00130 queue_.unbind (sn);
00131 }
00132 else
00133 {
00134 i.advance ();
00135 }
00136 }
00137
00138
00139
00140 ACE_Time_Value time (ACE_OS::gettimeofday ());
00141 time += params_.tick ();
00142
00143 while (!stop_)
00144 {
00145 if (cond_.wait (&time) == -1)
00146 {
00147 if (errno != ETIME)
00148 ACE_OS::abort ();
00149 else
00150 break;
00151 }
00152 }
00153
00154 if (stop_)
00155 break;
00156 }
00157 }
00158 }