#include <Flow.h>


Public Member Functions | |
| Flow (Parameters const ¶ms) | |
| virtual void | send (Message_ptr m) |
| virtual void | recv (Message_ptr m) |
Private Attributes | |
| Parameters const & | params_ |
| Mutex | mutex_ |
| ACE_Time_Value | nak_time_ |
| ACE_Time_Value | sample_start_time_ |
| unsigned long | sample_bytes_ |
| double | current_tput_ |
| double | cap_tput_ |
Definition at line 15 of file Flow.h.
| ACE_RMCast::Flow::Flow | ( | Parameters const & | params | ) |
| void ACE_RMCast::Flow::recv | ( | Message_ptr | m | ) | [virtual] |
Definition at line 86 of file Flow.cpp.
{
if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
{
Address to (static_cast<To const*> (m->find (To::id))->address ());
if (nak->address () == to)
{
// This one is for us.
//
//cerr << "NAK from "
// << static_cast<From const*> (m->find (From::id))->address ()
// << " for " << nak->count () << " sns." << endl;
ACE_Time_Value nak_time (ACE_OS::gettimeofday ());
Lock l (mutex_);
nak_time_ = nak_time;
if (cap_tput_ == 0.0)
| void ACE_RMCast::Flow::send | ( | Message_ptr | m | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 18 of file Flow.cpp.
{
Flow::
Flow (Parameters const& params)
: params_ (params),
nak_time_ (0, 0),
sample_start_time_ (0, 0),
sample_bytes_ (0),
current_tput_ (0.0),
cap_tput_ (0.0)
{
}
void Flow::send (Message_ptr m)
{
if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
{
ACE_Time_Value now_time (ACE_OS::gettimeofday ());
Lock l (mutex_);
sample_bytes_ += data->size ();
if (sample_start_time_ == ACE_Time_Value (0, 0))
{
sample_start_time_ = now_time;
}
else
{
ACE_Time_Value delta (now_time - sample_start_time_);
if (delta > ACE_Time_Value (0, 2000))
{
current_tput_ =
double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ());
// cerr << "tput: " << current_tput_ << " bytes/usec" << endl;
sample_bytes_ = 0;
sample_start_time_ = ACE_Time_Value (0, 0);
}
}
if (cap_tput_ != 0.0
&& current_tput_ != 0.0
&& current_tput_ > cap_tput_)
{
double dev = (current_tput_ - cap_tput_) / current_tput_;
// cerr << "deviation: " << dev << endl;
// Cap decay algorithm.
//
{
ACE_Time_Value delta (now_time - nak_time_);
unsigned long msec = delta.msec ();
double x = msec / -16000.0;
double y = 1.0 * exp (x);
cap_tput_ = cap_tput_ / y;
// cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl;
}
l.release ();
double ACE_RMCast::Flow::cap_tput_ [private] |
double ACE_RMCast::Flow::current_tput_ [private] |
Mutex ACE_RMCast::Flow::mutex_ [private] |
ACE_Time_Value ACE_RMCast::Flow::nak_time_ [private] |
Parameters const& ACE_RMCast::Flow::params_ [private] |
unsigned long ACE_RMCast::Flow::sample_bytes_ [private] |
1.7.0