00001
00002
00003
00004
00005 #include "Flow.h"
00006
00007 #include "ace/OS_NS_unistd.h"
00008 #include "ace/OS_NS_sys_time.h"
00009
00010 #include "ace/os_include/os_math.h"
00011
00012
00013
00014
00015
00016
00017
00018 namespace ACE_RMCast
00019 {
00020 Flow::
00021 Flow (Parameters const& params)
00022 : params_ (params),
00023 nak_time_ (0, 0),
00024 sample_start_time_ (0, 0),
00025 sample_bytes_ (0),
00026 current_tput_ (0.0),
00027 cap_tput_ (0.0)
00028 {
00029 }
00030
00031 void Flow::
00032 send (Message_ptr m)
00033 {
00034 if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00035 {
00036 ACE_Time_Value now_time (ACE_OS::gettimeofday ());
00037
00038 Lock l (mutex_);
00039 sample_bytes_ += data->size ();
00040
00041 if (sample_start_time_ == ACE_Time_Value (0, 0))
00042 {
00043 sample_start_time_ = now_time;
00044 }
00045 else
00046 {
00047 ACE_Time_Value delta (now_time - sample_start_time_);
00048
00049 if (delta > ACE_Time_Value (0, 2000))
00050 {
00051 current_tput_ =
00052 double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ());
00053
00054
00055
00056 sample_bytes_ = 0;
00057 sample_start_time_ = ACE_Time_Value (0, 0);
00058 }
00059 }
00060
00061 if (cap_tput_ != 0.0
00062 && current_tput_ != 0.0
00063 && current_tput_ > cap_tput_)
00064 {
00065 double dev = (current_tput_ - cap_tput_) / current_tput_;
00066
00067
00068
00069
00070
00071 {
00072 ACE_Time_Value delta (now_time - nak_time_);
00073
00074 unsigned long msec = delta.msec ();
00075
00076 double x = msec / -16000.0;
00077 double y = 1.0 * exp (x);
00078 cap_tput_ = cap_tput_ / y;
00079
00080
00081 }
00082
00083 l.release ();
00084
00085
00086 timespec time;
00087 time.tv_sec = 0;
00088 time.tv_nsec = static_cast<unsigned long> (dev * 500000.0);
00089
00090
00091
00092 if (time.tv_nsec > 10000)
00093 ACE_OS::sleep (ACE_Time_Value (time));
00094 }
00095 }
00096
00097 out_->send (m);
00098 }
00099
00100 void Flow::
00101 recv (Message_ptr m)
00102 {
00103 if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
00104 {
00105 Address to (static_cast<To const*> (m->find (To::id))->address ());
00106
00107 if (nak->address () == to)
00108 {
00109
00110
00111
00112
00113
00114
00115
00116
00117 ACE_Time_Value nak_time (ACE_OS::gettimeofday ());
00118
00119 Lock l (mutex_);
00120
00121 nak_time_ = nak_time;
00122
00123 if (cap_tput_ == 0.0)
00124 cap_tput_ = current_tput_;
00125
00126 if (cap_tput_ != 0.0)
00127 {
00128 cap_tput_ = cap_tput_ - cap_tput_ / 6.0;
00129
00130
00131 }
00132 }
00133 }
00134
00135 in_->recv (m);
00136 }
00137 }
00138