#include <Flow.h>
Inheritance diagram for ACE_RMCast::Flow:


Public Member Functions | |
| Flow (Parameters const ¶ms) | |
| virtual void | send (Message_ptr m) |
| virtual void | recv (Message_ptr m) |
Private Attributes | |
| Parameters const & | params_ |
| Mutex | mutex_ |
| ACE_Time_Value | nak_time_ |
| ACE_Time_Value | sample_start_time_ |
| unsigned long | sample_bytes_ |
| double | current_tput_ |
| double | cap_tput_ |
|
|
Definition at line 21 of file Flow.cpp.
00022 : params_ (params), 00023 nak_time_ (0, 0), 00024 sample_start_time_ (0, 0), 00025 sample_bytes_ (0), 00026 current_tput_ (0.0), 00027 cap_tput_ (0.0) 00028 { 00029 } |
|
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 101 of file Flow.cpp. References ACE_RMCast::Address, cap_tput_, current_tput_, ACE_RMCast::Lock, ACE_RMCast::Message_ptr, nak_time_, and ACE_RMCast::In_Element::recv().
00102 {
00103 if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
00104 {
00105 Address to (static_cast<To const*> (m->find (To::id))->address ());
00106
00107 if (nak->address () == to)
00108 {
00109 // This one is for us.
00110 //
00111
00112 //cerr << "NAK from "
00113 // << static_cast<From const*> (m->find (From::id))->address ()
00114 // << " for " << nak->count () << " sns." << endl;
00115
00116
00117 ACE_Time_Value nak_time (ACE_OS::gettimeofday ());
00118
00119 Lock l (mutex_);
00120
00121 nak_time_ = nak_time;
00122
00123 if (cap_tput_ == 0.0)
00124 cap_tput_ = current_tput_;
00125
00126 if (cap_tput_ != 0.0)
00127 {
00128 cap_tput_ = cap_tput_ - cap_tput_ / 6.0;
00129
00130 // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl;
00131 }
00132 }
00133 }
00134
00135 in_->recv (m);
00136 }
|
|
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 32 of file Flow.cpp. References ACE_Time_Value, cap_tput_, current_tput_, ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_Time_Value::msec(), nak_time_, ACE_Guard< ACE_LOCK >::release(), sample_bytes_, sample_start_time_, ACE_Time_Value::sec(), ACE_RMCast::Data::size(), ACE_OS::sleep(), and ACE_Time_Value::usec().
00033 {
00034 if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00035 {
00036 ACE_Time_Value now_time (ACE_OS::gettimeofday ());
00037
00038 Lock l (mutex_);
00039 sample_bytes_ += data->size ();
00040
00041 if (sample_start_time_ == ACE_Time_Value (0, 0))
00042 {
00043 sample_start_time_ = now_time;
00044 }
00045 else
00046 {
00047 ACE_Time_Value delta (now_time - sample_start_time_);
00048
00049 if (delta > ACE_Time_Value (0, 2000))
00050 {
00051 current_tput_ =
00052 double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ());
00053
00054 // cerr << "tput: " << current_tput_ << " bytes/usec" << endl;
00055
00056 sample_bytes_ = 0;
00057 sample_start_time_ = ACE_Time_Value (0, 0);
00058 }
00059 }
00060
00061 if (cap_tput_ != 0.0
00062 && current_tput_ != 0.0
00063 && current_tput_ > cap_tput_)
00064 {
00065 double dev = (current_tput_ - cap_tput_) / current_tput_;
00066
00067 // cerr << "deviation: " << dev << endl;
00068
00069 // Cap decay algorithm.
00070 //
00071 {
00072 ACE_Time_Value delta (now_time - nak_time_);
00073
00074 unsigned long msec = delta.msec ();
00075
00076 double x = msec / -16000.0;
00077 double y = 1.0 * exp (x);
00078 cap_tput_ = cap_tput_ / y;
00079
00080 // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl;
00081 }
00082
00083 l.release ();
00084
00085
00086 timespec time;
00087 time.tv_sec = 0;
00088 time.tv_nsec = static_cast<unsigned long> (dev * 500000.0);
00089
00090 // Don't bother to sleep if the time is less than 10 usec.
00091 //
00092 if (time.tv_nsec > 10000)
00093 ACE_OS::sleep (ACE_Time_Value (time));
00094 }
00095 }
00096
00097 out_->send (m);
00098 }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Definition at line 36 of file Flow.h. Referenced by send(). |
|
|
Definition at line 35 of file Flow.h. Referenced by send(). |
1.3.6