#include <Acknowledge.h>


Classes | |
| struct | Descr |
| struct | Queue |
Public Member Functions | |
| Acknowledge (Parameters const ¶ms) | |
| virtual void | in_start (In_Element *in) |
| virtual void | out_start (Out_Element *out) |
| virtual void | out_stop () |
| virtual void | recv (Message_ptr m) |
| virtual void | send (Message_ptr m) |
Private Types | |
| typedef ACE_Hash_Map_Manager_Ex < Address, Queue, AddressHasher, ACE_Equal_To < Address >, ACE_Null_Mutex > | Map |
Private Member Functions | |
| void | collapse (Queue &q) |
| void | track () |
| void | track_queue (Address const &addr, Queue &q, Messages &msgs) |
| Profile_ptr | create_nrtm (u32 max_elem) |
Static Private Member Functions | |
| static ACE_THR_FUNC_RETURN | track_thunk (void *obj) |
Private Attributes | |
| Parameters const & | params_ |
| Map | hold_ |
| Mutex | mutex_ |
| Condition | cond_ |
| unsigned long | nrtm_timer_ |
| bool | stop_ |
| ACE_Thread_Manager | tracker_mgr_ |
Definition at line 26 of file Acknowledge.h.
typedef ACE_Hash_Map_Manager_Ex<Address, Queue, AddressHasher, ACE_Equal_To<Address>, ACE_Null_Mutex> ACE_RMCast::Acknowledge::Map [private] |
Definition at line 216 of file Acknowledge.h.
| ACE_RMCast::Acknowledge::Acknowledge | ( | Parameters const & | params | ) |
Definition at line 13 of file Acknowledge.cpp.
{
Acknowledge::
| void ACE_RMCast::Acknowledge::collapse | ( | Queue & | q | ) | [private] |
Definition at line 51 of file Acknowledge.cpp.
{
// I would normally use iterators in the logic below but ACE_Map_Manager
// iterates over entries in no particular order so it is pretty much
// unusable here. Instead we will do slow and cumbersome find's.
//
u64 sn (q.sn () + 1);
for (;; ++sn)
{
Queue::ENTRY* e = 0;
if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
| Profile_ptr ACE_RMCast::Acknowledge::create_nrtm | ( | u32 | max_elem | ) | [private] |
Definition at line 350 of file Acknowledge.cpp.
{
// Prepare NRTM.
//
auto_ptr<NRTM> nrtm (new NRTM ());
// Gather the information.
//
{
for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
{
Address addr ((*i).ext_id_);
Queue& q = (*i).int_id_;
//@@ Should look for the highest known number.
//
nrtm->insert (addr, q.sn ());
if (--max_elem == 0)
break;
| void ACE_RMCast::Acknowledge::in_start | ( | In_Element * | in | ) | [virtual] |
Definition at line 23 of file Acknowledge.cpp.
{
| void ACE_RMCast::Acknowledge::out_start | ( | Out_Element * | out | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 29 of file Acknowledge.cpp.
{
Element::in_start (in);
}
| void ACE_RMCast::Acknowledge::out_stop | ( | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 37 of file Acknowledge.cpp.
{
Element::out_start (out);
tracker_mgr_.spawn (track_thunk, this);
}
void Acknowledge::
out_stop ()
{
{
Lock l (mutex_);
| void ACE_RMCast::Acknowledge::recv | ( | Message_ptr | m | ) | [virtual] |
Definition at line 236 of file Acknowledge.cpp.
{
if (q.find (sn) == -1)
{
q.bind (sn, Descr (1));
}
}
}
void Acknowledge::recv (Message_ptr m)
{
// Handle NRTM. There could be some nasty interaction with code
// that handles data below (like missing message and NAK). This
// is why I hold the lock at the beginning (which may be not very
// efficient).
//
Lock l (mutex_);
if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
{
for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
{
u64 sn (nrtm->find ((*i).ext_id_));
if (sn != 0)
{
Queue& q = (*i).int_id_;
u64 old (q.max_sn ());
if (old < sn)
{
// Mark as lost.
//
q.bind (sn, Descr (1));
}
}
}
}
if (m->find (Data::id) || m->find (NoData::id))
{
Address from (
static_cast<From const*> (m->find (From::id))->address ());
u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
Map::ENTRY* e;
if (hold_.find (from, e) == -1)
{
// First message from this source.
//
hold_.bind (from, Queue (sn));
in_->recv (m);
}
else
{
Queue& q = e->int_id_;
if (sn <= q.sn ())
{
// Duplicate.
//
//cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
// << endl;
}
else if (sn == q.sn () + 1)
{
// Next message.
//
q.rebind (sn, Descr (m));
collapse (q);
}
else
{
// Some messages are missing. Insert this one into the queue.
//
q.rebind (sn, Descr (m));
}
}
}
else
| void ACE_RMCast::Acknowledge::send | ( | Message_ptr | m | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 320 of file Acknowledge.cpp.
{
if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
{
size_t max_payload_size (
params_.max_packet_size () - max_service_size);
if (max_payload_size > data->size ())
{
u32 max_size (max_payload_size - data->size ());
u32 max_elem (NRTM::max_count (max_size));
if (max_elem > 0)
{
Lock l (mutex_);
Profile_ptr nrtm (create_nrtm (max_elem));
if (nrtm.get ())
m->add (nrtm);
| void ACE_RMCast::Acknowledge::track | ( | ) | [private] |
Definition at line 76 of file Acknowledge.cpp.
{
while (true)
{
Messages msgs;
{
Lock l (mutex_);
if (stop_)
break;
if (hold_.current_size () != 0)
{
for (Map::iterator i (hold_.begin ()), e (hold_.end ());
i != e;
++i)
{
Queue& q = (*i).int_id_;
if (q.current_size () == 0) continue;
track_queue ((*i).ext_id_, q, msgs);
}
}
if (--nrtm_timer_ == 0)
{
nrtm_timer_ = params_.nrtm_timeout ();
// Send NRTM.
//
unsigned short max_payload_size (
params_.max_packet_size () - max_service_size);
u32 max_elem (NRTM::max_count (max_payload_size));
Profile_ptr nrtm (create_nrtm (max_elem));
if (!nrtm.null ())
{
Message_ptr m (new Message);
m->add (nrtm);
msgs.push_back (m);
}
}
}
// Send stuff off.
//
for (Messages::Iterator i (msgs); !i.done (); i.advance ())
{
Message_ptr* ppm;
i.next (ppm);
//FUZZ: disable check_for_lack_ACE_OS
send (*ppm);
//FUZZ: enable check_for_lack_ACE_OS
}
// Go to sleep but watch for "manual cancellation" request.
//
{
//FUZZ: disable check_for_lack_ACE_OS
ACE_Time_Value time (ACE_OS::gettimeofday ());
//FUZZ: enable check_for_lack_ACE_OS
time += params_.tick ();
Lock l (mutex_);
while (!stop_)
{
if (cond_.wait (&time) == -1)
{
if (errno != ETIME)
ACE_OS::abort ();
else
break;
| void ACE_RMCast::Acknowledge::track_queue | ( | Address const & | addr, | |
| Queue & | q, | |||
| Messages & | msgs | |||
| ) | [private] |
Definition at line 166 of file Acknowledge.cpp.
{
unsigned short max_payload_size (
params_.max_packet_size () - max_service_size);
u32 max_elem (NAK::max_count (max_payload_size));
u32 count (0);
Queue::iterator i (q.begin ()), e (q.end ());
// Track existing losses.
//
while (i != e)
{
auto_ptr<NAK> nak (new NAK (addr));
// Inner loop that fills NAK profile with up to max_elem elements.
//
for (; i != e && nak->count () < max_elem; ++i)
{
u64 sn ((*i).ext_id_);
Descr& d = (*i).int_id_;
if (d.lost ())
{
d.timer (d.timer () - 1);
if (d.timer () == 0)
{
//@@ Need exp fallback.
//
d.nak_count (d.nak_count () + 1);
d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
nak->add (sn);
++count;
// cerr << 6 << "NAK # " << d.nak_count () << ": "
// << addr << " " << sn << endl;
}
}
}
// Send this NAK.
//
if (nak->count ())
{
// cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
// << endl;
Message_ptr m (new Message);
m->add (Profile_ptr (nak.release ()));
msgs.push_back (m);
}
}
// Detect and record new losses.
//
| ACE_THR_FUNC_RETURN ACE_RMCast::Acknowledge::track_thunk | ( | void * | obj | ) | [static, private] |
Definition at line 380 of file Acknowledge.cpp.
Condition ACE_RMCast::Acknowledge::cond_ [private] |
Definition at line 239 of file Acknowledge.h.
Map ACE_RMCast::Acknowledge::hold_ [private] |
Definition at line 237 of file Acknowledge.h.
Mutex ACE_RMCast::Acknowledge::mutex_ [private] |
Definition at line 238 of file Acknowledge.h.
unsigned long ACE_RMCast::Acknowledge::nrtm_timer_ [private] |
Definition at line 241 of file Acknowledge.h.
Parameters const& ACE_RMCast::Acknowledge::params_ [private] |
Definition at line 235 of file Acknowledge.h.
bool ACE_RMCast::Acknowledge::stop_ [private] |
Definition at line 243 of file Acknowledge.h.
Definition at line 244 of file Acknowledge.h.
1.7.0