#include <Flow.h>
Inheritance diagram for ACE_RMCast::Flow:


Public Member Functions | |
| Flow (Parameters const ¶ms) | |
| virtual void | send (Message_ptr m) |
| virtual void | recv (Message_ptr m) |
Private Attributes | |
| Parameters const & | params_ |
| Mutex | mutex_ |
| ACE_Time_Value | nak_time_ |
| ACE_Time_Value | sample_start_time_ |
| unsigned long | sample_bytes_ |
| double | current_tput_ |
| double | cap_tput_ |
|
|
Definition at line 21 of file Flow.cpp.
00022 : params_ (params), 00023 nak_time_ (0, 0), 00024 sample_start_time_ (0, 0), 00025 sample_bytes_ (0), 00026 current_tput_ (0.0), 00027 cap_tput_ (0.0) 00028 { 00029 } |
|
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 99 of file Flow.cpp. References ACE_RMCast::Address, cap_tput_, current_tput_, ACE_RMCast::Lock, ACE_RMCast::Message_ptr, nak_time_, and ACE_RMCast::In_Element::recv().
00100 {
00101 if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
00102 {
00103 Address to (static_cast<To const*> (m->find (To::id))->address ());
00104
00105 if (nak->address () == to)
00106 {
00107 // This one is for us.
00108 //
00109
00110 //cerr << "NAK from "
00111 // << static_cast<From const*> (m->find (From::id))->address ()
00112 // << " for " << nak->count () << " sns." << endl;
00113
00114
00115 ACE_Time_Value nak_time (ACE_OS::gettimeofday ());
00116
00117 Lock l (mutex_);
00118
00119 nak_time_ = nak_time;
00120
00121 if (cap_tput_ == 0.0)
00122 cap_tput_ = current_tput_;
00123
00124 if (cap_tput_ != 0.0)
00125 {
00126 cap_tput_ = cap_tput_ - cap_tput_ / 6.0;
00127
00128 // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl;
00129 }
00130 }
00131 }
00132
00133 in_->recv (m);
00134 }
|
|
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 31 of file Flow.cpp. References ACE_Time_Value, cap_tput_, current_tput_, ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_Time_Value::msec(), nak_time_, ACE_Guard< ACE_LOCK >::release(), sample_bytes_, sample_start_time_, ACE_Time_Value::sec(), ACE_RMCast::Data::size(), ACE_OS::sleep(), and ACE_Time_Value::usec().
00032 {
00033 if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00034 {
00035 ACE_Time_Value now_time (ACE_OS::gettimeofday ());
00036
00037 Lock l (mutex_);
00038 sample_bytes_ += data->size ();
00039
00040 if (sample_start_time_ == ACE_Time_Value (0, 0))
00041 {
00042 sample_start_time_ = now_time;
00043 }
00044 else
00045 {
00046 ACE_Time_Value delta (now_time - sample_start_time_);
00047
00048 if (delta > ACE_Time_Value (0, 2000))
00049 {
00050 current_tput_ =
00051 double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ());
00052
00053 // cerr << "tput: " << current_tput_ << " bytes/usec" << endl;
00054
00055 sample_bytes_ = 0;
00056 sample_start_time_ = ACE_Time_Value (0, 0);
00057 }
00058 }
00059
00060 if (cap_tput_ != 0.0
00061 && current_tput_ != 0.0
00062 && current_tput_ > cap_tput_)
00063 {
00064 double dev = (current_tput_ - cap_tput_) / current_tput_;
00065
00066 // cerr << "deviation: " << dev << endl;
00067
00068 // Cap decay algorithm.
00069 //
00070 {
00071 ACE_Time_Value delta (now_time - nak_time_);
00072
00073 unsigned long msec = delta.msec ();
00074
00075 double x = msec / -16000.0;
00076 double y = 1.0 * exp (x);
00077 cap_tput_ = cap_tput_ / y;
00078
00079 // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl;
00080 }
00081
00082 l.release ();
00083
00084
00085 timespec time;
00086 time.tv_sec = 0;
00087 time.tv_nsec = static_cast<unsigned long> (dev * 500000.0);
00088
00089 // Don't bother to sleep if the time is less than 10 usec.
00090 //
00091 if (time.tv_nsec > 10000)
00092 ACE_OS::sleep (ACE_Time_Value (time));
00093 }
00094 }
00095
00096 out_->send (m);
00097 }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Definition at line 36 of file Flow.h. Referenced by send(). |
|
|
Definition at line 35 of file Flow.h. Referenced by send(). |
1.3.6