#include <Flow.h>
Inheritance diagram for ACE_RMCast::Flow:
Public Member Functions | |
Flow (Parameters const ¶ms) | |
virtual void | send (Message_ptr m) |
virtual void | recv (Message_ptr m) |
Private Attributes | |
Parameters const & | params_ |
Mutex | mutex_ |
ACE_Time_Value | nak_time_ |
ACE_Time_Value | sample_start_time_ |
unsigned long | sample_bytes_ |
double | current_tput_ |
double | cap_tput_ |
|
Definition at line 21 of file Flow.cpp.
00022 : params_ (params), 00023 nak_time_ (0, 0), 00024 sample_start_time_ (0, 0), 00025 sample_bytes_ (0), 00026 current_tput_ (0.0), 00027 cap_tput_ (0.0) 00028 { 00029 } |
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 101 of file Flow.cpp. References ACE_RMCast::Address, cap_tput_, current_tput_, ACE_RMCast::Lock, ACE_RMCast::Message_ptr, nak_time_, and ACE_RMCast::In_Element::recv().
00102 { 00103 if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id))) 00104 { 00105 Address to (static_cast<To const*> (m->find (To::id))->address ()); 00106 00107 if (nak->address () == to) 00108 { 00109 // This one is for us. 00110 // 00111 00112 //cerr << "NAK from " 00113 // << static_cast<From const*> (m->find (From::id))->address () 00114 // << " for " << nak->count () << " sns." << endl; 00115 00116 00117 ACE_Time_Value nak_time (ACE_OS::gettimeofday ()); 00118 00119 Lock l (mutex_); 00120 00121 nak_time_ = nak_time; 00122 00123 if (cap_tput_ == 0.0) 00124 cap_tput_ = current_tput_; 00125 00126 if (cap_tput_ != 0.0) 00127 { 00128 cap_tput_ = cap_tput_ - cap_tput_ / 6.0; 00129 00130 // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl; 00131 } 00132 } 00133 } 00134 00135 in_->recv (m); 00136 } |
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 32 of file Flow.cpp. References ACE_Time_Value, cap_tput_, current_tput_, ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_Time_Value::msec(), nak_time_, ACE_Guard< ACE_LOCK >::release(), sample_bytes_, sample_start_time_, ACE_Time_Value::sec(), ACE_RMCast::Data::size(), ACE_OS::sleep(), and ACE_Time_Value::usec().
00033 { 00034 if (Data const* data = static_cast<Data const*> (m->find (Data::id))) 00035 { 00036 ACE_Time_Value now_time (ACE_OS::gettimeofday ()); 00037 00038 Lock l (mutex_); 00039 sample_bytes_ += data->size (); 00040 00041 if (sample_start_time_ == ACE_Time_Value (0, 0)) 00042 { 00043 sample_start_time_ = now_time; 00044 } 00045 else 00046 { 00047 ACE_Time_Value delta (now_time - sample_start_time_); 00048 00049 if (delta > ACE_Time_Value (0, 2000)) 00050 { 00051 current_tput_ = 00052 double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ()); 00053 00054 // cerr << "tput: " << current_tput_ << " bytes/usec" << endl; 00055 00056 sample_bytes_ = 0; 00057 sample_start_time_ = ACE_Time_Value (0, 0); 00058 } 00059 } 00060 00061 if (cap_tput_ != 0.0 00062 && current_tput_ != 0.0 00063 && current_tput_ > cap_tput_) 00064 { 00065 double dev = (current_tput_ - cap_tput_) / current_tput_; 00066 00067 // cerr << "deviation: " << dev << endl; 00068 00069 // Cap decay algorithm. 00070 // 00071 { 00072 ACE_Time_Value delta (now_time - nak_time_); 00073 00074 unsigned long msec = delta.msec (); 00075 00076 double x = msec / -16000.0; 00077 double y = 1.0 * exp (x); 00078 cap_tput_ = cap_tput_ / y; 00079 00080 // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl; 00081 } 00082 00083 l.release (); 00084 00085 00086 timespec time; 00087 time.tv_sec = 0; 00088 time.tv_nsec = static_cast<unsigned long> (dev * 500000.0); 00089 00090 // Don't bother to sleep if the time is less than 10 usec. 00091 // 00092 if (time.tv_nsec > 10000) 00093 ACE_OS::sleep (ACE_Time_Value (time)); 00094 } 00095 } 00096 00097 out_->send (m); 00098 } |
|
|
|
|
|
|
|
|
|
|
|
Definition at line 36 of file Flow.h. Referenced by send(). |
|
Definition at line 35 of file Flow.h. Referenced by send(). |