#include <Flow.h>
Inheritance diagram for ACE_RMCast::Flow:
Public Member Functions | |
Flow (Parameters const ¶ms) | |
virtual void | send (Message_ptr m) |
virtual void | recv (Message_ptr m) |
Private Attributes | |
Parameters const & | params_ |
Mutex | mutex_ |
ACE_Time_Value | nak_time_ |
ACE_Time_Value | sample_start_time_ |
unsigned long | sample_bytes_ |
double | current_tput_ |
double | cap_tput_ |
|
Definition at line 21 of file Flow.cpp.
00022 : params_ (params), 00023 nak_time_ (0, 0), 00024 sample_start_time_ (0, 0), 00025 sample_bytes_ (0), 00026 current_tput_ (0.0), 00027 cap_tput_ (0.0) 00028 { 00029 } |
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 99 of file Flow.cpp. References ACE_RMCast::Address, cap_tput_, current_tput_, ACE_RMCast::Lock, ACE_RMCast::Message_ptr, nak_time_, and ACE_RMCast::In_Element::recv().
00100 { 00101 if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id))) 00102 { 00103 Address to (static_cast<To const*> (m->find (To::id))->address ()); 00104 00105 if (nak->address () == to) 00106 { 00107 // This one is for us. 00108 // 00109 00110 //cerr << "NAK from " 00111 // << static_cast<From const*> (m->find (From::id))->address () 00112 // << " for " << nak->count () << " sns." << endl; 00113 00114 00115 ACE_Time_Value nak_time (ACE_OS::gettimeofday ()); 00116 00117 Lock l (mutex_); 00118 00119 nak_time_ = nak_time; 00120 00121 if (cap_tput_ == 0.0) 00122 cap_tput_ = current_tput_; 00123 00124 if (cap_tput_ != 0.0) 00125 { 00126 cap_tput_ = cap_tput_ - cap_tput_ / 6.0; 00127 00128 // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl; 00129 } 00130 } 00131 } 00132 00133 in_->recv (m); 00134 } |
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 31 of file Flow.cpp. References ACE_Time_Value, cap_tput_, current_tput_, ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_Time_Value::msec(), nak_time_, ACE_Guard< ACE_LOCK >::release(), sample_bytes_, sample_start_time_, ACE_Time_Value::sec(), ACE_RMCast::Data::size(), ACE_OS::sleep(), and ACE_Time_Value::usec().
00032 { 00033 if (Data const* data = static_cast<Data const*> (m->find (Data::id))) 00034 { 00035 ACE_Time_Value now_time (ACE_OS::gettimeofday ()); 00036 00037 Lock l (mutex_); 00038 sample_bytes_ += data->size (); 00039 00040 if (sample_start_time_ == ACE_Time_Value (0, 0)) 00041 { 00042 sample_start_time_ = now_time; 00043 } 00044 else 00045 { 00046 ACE_Time_Value delta (now_time - sample_start_time_); 00047 00048 if (delta > ACE_Time_Value (0, 2000)) 00049 { 00050 current_tput_ = 00051 double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ()); 00052 00053 // cerr << "tput: " << current_tput_ << " bytes/usec" << endl; 00054 00055 sample_bytes_ = 0; 00056 sample_start_time_ = ACE_Time_Value (0, 0); 00057 } 00058 } 00059 00060 if (cap_tput_ != 0.0 00061 && current_tput_ != 0.0 00062 && current_tput_ > cap_tput_) 00063 { 00064 double dev = (current_tput_ - cap_tput_) / current_tput_; 00065 00066 // cerr << "deviation: " << dev << endl; 00067 00068 // Cap decay algorithm. 00069 // 00070 { 00071 ACE_Time_Value delta (now_time - nak_time_); 00072 00073 unsigned long msec = delta.msec (); 00074 00075 double x = msec / -16000.0; 00076 double y = 1.0 * exp (x); 00077 cap_tput_ = cap_tput_ / y; 00078 00079 // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl; 00080 } 00081 00082 l.release (); 00083 00084 00085 timespec time; 00086 time.tv_sec = 0; 00087 time.tv_nsec = static_cast<unsigned long> (dev * 500000.0); 00088 00089 // Don't bother to sleep if the time is less than 10 usec. 00090 // 00091 if (time.tv_nsec > 10000) 00092 ACE_OS::sleep (ACE_Time_Value (time)); 00093 } 00094 } 00095 00096 out_->send (m); 00097 } |
|
|
|
|
|
|
|
|
|
|
|
Definition at line 36 of file Flow.h. Referenced by send(). |
|
Definition at line 35 of file Flow.h. Referenced by send(). |