00001
00002
00003
00004
00005 #ifndef ACE_RMCAST_ACKNOWLEDGE_H
00006 #define ACE_RMCAST_ACKNOWLEDGE_H
00007
00008 #include "ace/Hash_Map_Manager.h"
00009 #include "ace/Thread_Manager.h"
00010
00011 #include "Stack.h"
00012 #include "Protocol.h"
00013 #include "Bits.h"
00014 #include "Parameters.h"
00015
00016 #if !defined (ACE_RMCAST_DEFAULT_MAP_SIZE)
00017 #define ACE_RMCAST_DEFAULT_MAP_SIZE 10
00018 #endif
00019
00020 #if !defined (ACE_RMCAST_DEFAULT_QUEUE_SIZE)
00021 #define ACE_RMCAST_DEFAULT_QUEUE_SIZE 10
00022 #endif
00023
00024 namespace ACE_RMCast
00025 {
00026 class Acknowledge : public Element
00027 {
00028 public:
00029 Acknowledge (Parameters const& params);
00030
00031 virtual void
00032 in_start (In_Element* in);
00033
00034 virtual void
00035 out_start (Out_Element* out);
00036
00037 virtual void
00038 out_stop ();
00039
00040 public:
00041 virtual void
00042 recv (Message_ptr m);
00043
00044 virtual void
00045 send (Message_ptr m);
00046
00047
00048
00049
00050 public:
00051 struct Descr
00052 {
00053
00054
00055 Descr ()
00056 : nak_count_ (0), timer_ (1)
00057 {
00058 }
00059
00060 Descr (unsigned long timer)
00061 : nak_count_ (0), timer_ (timer)
00062 {
00063 }
00064
00065 Descr (Message_ptr m)
00066 : m_ (m)
00067 {
00068 }
00069
00070 public:
00071 bool
00072 lost () const
00073 {
00074 return m_.get () == 0;
00075 }
00076
00077 public:
00078 Message_ptr
00079 msg ()
00080 {
00081 return m_;
00082 }
00083
00084 void
00085 msg (Message_ptr m)
00086 {
00087 m_ = m;
00088 }
00089
00090 public:
00091 unsigned long
00092 nak_count () const
00093 {
00094 return nak_count_;
00095 }
00096
00097 void
00098 nak_count (unsigned long v)
00099 {
00100 nak_count_ = v;
00101 }
00102
00103 unsigned long
00104 timer () const
00105 {
00106 return timer_;
00107 }
00108
00109 void
00110 timer (unsigned long v)
00111 {
00112 timer_ = v;
00113 }
00114
00115 private:
00116 Message_ptr m_;
00117
00118 unsigned long nak_count_;
00119 unsigned long timer_;
00120 };
00121
00122 private:
00123 struct Queue : ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex>
00124 {
00125 typedef ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> Base;
00126
00127
00128
00129 Queue ()
00130 : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (0), max_sn_ (0)
00131 {
00132 }
00133
00134 Queue (u64 sn)
00135 : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (sn), max_sn_ (sn)
00136 {
00137 }
00138
00139 Queue (Queue const& q)
00140 : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (q.sn_), max_sn_ (sn_)
00141 {
00142 for (Queue::const_iterator i (q), e (q, 1); i != e; ++i)
00143 {
00144 bind ((*i).ext_id_, (*i).int_id_);
00145 }
00146 }
00147
00148 public:
00149 int
00150 bind (u64 sn, Descr const& d)
00151 {
00152 int r (Base::bind (sn, d));
00153
00154 if (r == 0 && sn > max_sn_) max_sn_ = sn;
00155
00156 return r;
00157 }
00158
00159 int
00160 rebind (u64 sn, Descr const& d)
00161 {
00162 int r (Base::rebind (sn, d));
00163
00164 if (r == 0 && sn > max_sn_) max_sn_ = sn;
00165
00166 return r;
00167 }
00168
00169 int
00170 unbind (u64 sn)
00171 {
00172 int r (Base::unbind (sn));
00173
00174 if (r == 0 && sn == max_sn_)
00175 {
00176 for (--max_sn_; max_sn_ >= sn_; --max_sn_)
00177 {
00178 if (find (max_sn_) == 0) break;
00179 }
00180 }
00181
00182 return r;
00183 }
00184
00185 public:
00186 u64
00187 sn () const
00188 {
00189 return sn_;
00190 }
00191
00192 void
00193 sn (u64 sn)
00194 {
00195 sn_ = sn;
00196 }
00197
00198 u64
00199 max_sn () const
00200 {
00201 if (current_size () == 0) return sn_;
00202
00203 return max_sn_;
00204 }
00205
00206 private:
00207 u64 sn_, max_sn_;
00208 };
00209
00210 typedef
00211 ACE_Hash_Map_Manager_Ex<Address,
00212 Queue,
00213 AddressHasher,
00214 ACE_Equal_To<Address>,
00215 ACE_Null_Mutex>
00216 Map;
00217
00218 private:
00219 void
00220 collapse (Queue& q);
00221
00222 void
00223 track ();
00224
00225 void
00226 track_queue (Address const& addr, Queue& q, Messages& msgs);
00227
00228 Profile_ptr
00229 create_nrtm (u32 max_elem);
00230
00231 static ACE_THR_FUNC_RETURN
00232 track_thunk (void* obj);
00233
00234 private:
00235 Parameters const& params_;
00236
00237 Map hold_;
00238 Mutex mutex_;
00239 Condition cond_;
00240
00241 unsigned long nrtm_timer_;
00242
00243 bool stop_;
00244 ACE_Thread_Manager tracker_mgr_;
00245 };
00246 }
00247
00248 #endif // ACE_RMCAST_ACKNOWLEDGE_H