ACE_RMCast::Flow Class Reference

#include <Flow.h>

Inheritance diagram for ACE_RMCast::Flow:

Inheritance graph
[legend]
Collaboration diagram for ACE_RMCast::Flow:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 Flow (Parameters const &params)
virtual void send (Message_ptr m)
virtual void recv (Message_ptr m)

Private Attributes

Parameters const & params_
Mutex mutex_
ACE_Time_Value nak_time_
ACE_Time_Value sample_start_time_
unsigned long sample_bytes_
double current_tput_
double cap_tput_

Constructor & Destructor Documentation

ACE_RMCast::Flow::Flow Parameters const &  params  ) 
 

Definition at line 21 of file Flow.cpp.

00022       : params_ (params),
00023         nak_time_ (0, 0),
00024         sample_start_time_ (0, 0),
00025         sample_bytes_ (0),
00026         current_tput_ (0.0),
00027         cap_tput_ (0.0)
00028   {
00029   }


Member Function Documentation

void ACE_RMCast::Flow::recv Message_ptr  m  )  [virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 101 of file Flow.cpp.

References ACE_RMCast::Address, cap_tput_, current_tput_, ACE_RMCast::Lock, ACE_RMCast::Message_ptr, nak_time_, and ACE_RMCast::In_Element::recv().

00102   {
00103     if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
00104     {
00105       Address to (static_cast<To const*> (m->find (To::id))->address ());
00106 
00107       if (nak->address () == to)
00108       {
00109         // This one is for us.
00110         //
00111 
00112         //cerr << "NAK from "
00113         //     << static_cast<From const*> (m->find (From::id))->address ()
00114         //     << " for " << nak->count () << " sns." << endl;
00115 
00116 
00117         ACE_Time_Value nak_time (ACE_OS::gettimeofday ());
00118 
00119         Lock l (mutex_);
00120 
00121         nak_time_ = nak_time;
00122 
00123         if (cap_tput_ == 0.0)
00124           cap_tput_ = current_tput_;
00125 
00126         if (cap_tput_ != 0.0)
00127         {
00128           cap_tput_ = cap_tput_ - cap_tput_ / 6.0;
00129 
00130           // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl;
00131         }
00132       }
00133     }
00134 
00135     in_->recv (m);
00136   }

void ACE_RMCast::Flow::send Message_ptr  m  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 32 of file Flow.cpp.

References ACE_Time_Value, cap_tput_, current_tput_, ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_Time_Value::msec(), nak_time_, ACE_Guard< ACE_LOCK >::release(), sample_bytes_, sample_start_time_, ACE_Time_Value::sec(), ACE_RMCast::Data::size(), ACE_OS::sleep(), and ACE_Time_Value::usec().

00033   {
00034     if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00035     {
00036       ACE_Time_Value now_time (ACE_OS::gettimeofday ());
00037 
00038       Lock l (mutex_);
00039       sample_bytes_ += data->size ();
00040 
00041       if (sample_start_time_ == ACE_Time_Value (0, 0))
00042       {
00043         sample_start_time_ = now_time;
00044       }
00045       else
00046       {
00047         ACE_Time_Value delta (now_time - sample_start_time_);
00048 
00049         if (delta > ACE_Time_Value (0, 2000))
00050         {
00051           current_tput_ =
00052             double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ());
00053 
00054           // cerr << "tput: " << current_tput_ << " bytes/usec" << endl;
00055 
00056           sample_bytes_ = 0;
00057           sample_start_time_ = ACE_Time_Value (0, 0);
00058         }
00059       }
00060 
00061       if (cap_tput_ != 0.0
00062           && current_tput_ != 0.0
00063           && current_tput_ > cap_tput_)
00064       {
00065         double dev = (current_tput_ - cap_tput_) / current_tput_;
00066 
00067         // cerr << "deviation: " << dev << endl;
00068 
00069         // Cap decay algorithm.
00070         //
00071         {
00072           ACE_Time_Value delta (now_time - nak_time_);
00073 
00074           unsigned long msec = delta.msec ();
00075 
00076           double x = msec / -16000.0;
00077           double y = 1.0 * exp (x);
00078           cap_tput_ = cap_tput_ / y;
00079 
00080           // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl;
00081         }
00082 
00083         l.release ();
00084 
00085 
00086         timespec time;
00087         time.tv_sec = 0;
00088         time.tv_nsec = static_cast<unsigned long> (dev * 500000.0);
00089 
00090         // Don't bother to sleep if the time is less than 10 usec.
00091         //
00092         if (time.tv_nsec > 10000)
00093           ACE_OS::sleep (ACE_Time_Value (time));
00094       }
00095     }
00096 
00097     out_->send (m);
00098   }


Member Data Documentation

double ACE_RMCast::Flow::cap_tput_ [private]
 

Definition at line 38 of file Flow.h.

Referenced by recv(), and send().

double ACE_RMCast::Flow::current_tput_ [private]
 

Definition at line 37 of file Flow.h.

Referenced by recv(), and send().

Mutex ACE_RMCast::Flow::mutex_ [private]
 

Definition at line 30 of file Flow.h.

ACE_Time_Value ACE_RMCast::Flow::nak_time_ [private]
 

Definition at line 31 of file Flow.h.

Referenced by recv(), and send().

Parameters const& ACE_RMCast::Flow::params_ [private]
 

Definition at line 28 of file Flow.h.

unsigned long ACE_RMCast::Flow::sample_bytes_ [private]
 

Definition at line 36 of file Flow.h.

Referenced by send().

ACE_Time_Value ACE_RMCast::Flow::sample_start_time_ [private]
 

Definition at line 35 of file Flow.h.

Referenced by send().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 11:41:05 2006 for ACE_RMCast by doxygen 1.3.6