Flow.cpp

Go to the documentation of this file.
00001 // file      : ace/RMCast/Flow.cpp
00002 // author    : Boris Kolpackov <boris@kolpackov.net>
00003 // cvs-id    : $Id: Flow.cpp 78774 2007-07-04 06:06:59Z sowayaa $
00004 
00005 #include "Flow.h"
00006 
00007 #include "ace/OS_NS_unistd.h"   // sleep
00008 #include "ace/OS_NS_sys_time.h" // gettimeofday
00009 
00010 #include "ace/os_include/os_math.h" // exp
00011 
00012 /*
00013 #include <iostream>
00014 using std::cerr;
00015 using std::endl;
00016 */
00017 
00018 namespace ACE_RMCast
00019 {
00020   Flow::
00021   Flow (Parameters const& params)
00022       : params_ (params),
00023         nak_time_ (0, 0),
00024         sample_start_time_ (0, 0),
00025         sample_bytes_ (0),
00026         current_tput_ (0.0),
00027         cap_tput_ (0.0)
00028   {
00029   }
00030 
00031   void Flow::send (Message_ptr m)
00032   {
00033     if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
00034     {
00035       ACE_Time_Value now_time (ACE_OS::gettimeofday ());
00036 
00037       Lock l (mutex_);
00038       sample_bytes_ += data->size ();
00039 
00040       if (sample_start_time_ == ACE_Time_Value (0, 0))
00041       {
00042         sample_start_time_ = now_time;
00043       }
00044       else
00045       {
00046         ACE_Time_Value delta (now_time - sample_start_time_);
00047 
00048         if (delta > ACE_Time_Value (0, 2000))
00049         {
00050           current_tput_ =
00051             double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ());
00052 
00053           // cerr << "tput: " << current_tput_ << " bytes/usec" << endl;
00054 
00055           sample_bytes_ = 0;
00056           sample_start_time_ = ACE_Time_Value (0, 0);
00057         }
00058       }
00059 
00060       if (cap_tput_ != 0.0
00061           && current_tput_ != 0.0
00062           && current_tput_ > cap_tput_)
00063       {
00064         double dev = (current_tput_ - cap_tput_) / current_tput_;
00065 
00066         // cerr << "deviation: " << dev << endl;
00067 
00068         // Cap decay algorithm.
00069         //
00070         {
00071           ACE_Time_Value delta (now_time - nak_time_);
00072 
00073           unsigned long msec = delta.msec ();
00074 
00075           double x = msec / -16000.0;
00076           double y = 1.0 * exp (x);
00077           cap_tput_ = cap_tput_ / y;
00078 
00079           // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl;
00080         }
00081 
00082         l.release ();
00083 
00084 
00085         timespec time;
00086         time.tv_sec = 0;
00087         time.tv_nsec = static_cast<unsigned long> (dev * 500000.0);
00088 
00089         // Don't bother to sleep if the time is less than 10 usec.
00090         //
00091         if (time.tv_nsec > 10000)
00092           ACE_OS::sleep (ACE_Time_Value (time));
00093       }
00094     }
00095 
00096     out_->send (m);
00097   }
00098 
00099   void Flow::recv (Message_ptr m)
00100   {
00101     if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
00102     {
00103       Address to (static_cast<To const*> (m->find (To::id))->address ());
00104 
00105       if (nak->address () == to)
00106       {
00107         // This one is for us.
00108         //
00109 
00110         //cerr << "NAK from "
00111         //     << static_cast<From const*> (m->find (From::id))->address ()
00112         //     << " for " << nak->count () << " sns." << endl;
00113 
00114 
00115         ACE_Time_Value nak_time (ACE_OS::gettimeofday ());
00116 
00117         Lock l (mutex_);
00118 
00119         nak_time_ = nak_time;
00120 
00121         if (cap_tput_ == 0.0)
00122           cap_tput_ = current_tput_;
00123 
00124         if (cap_tput_ != 0.0)
00125         {
00126           cap_tput_ = cap_tput_ - cap_tput_ / 6.0;
00127 
00128           // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl;
00129         }
00130       }
00131     }
00132 
00133     in_->recv (m);
00134   }
00135 }
00136 

Generated on Sun Jan 27 13:02:56 2008 for ACE_RMCast by doxygen 1.3.6