#include <Acknowledge.h>
Classes | |
struct | Descr |
struct | Queue |
Public Member Functions | |
Acknowledge (Parameters const ¶ms) | |
virtual void | in_start (In_Element *in) |
virtual void | out_start (Out_Element *out) |
virtual void | out_stop () |
virtual void | recv (Message_ptr m) |
virtual void | send (Message_ptr m) |
Private Types | |
typedef ACE_Hash_Map_Manager_Ex < Address, Queue, AddressHasher, ACE_Equal_To < Address >, ACE_Null_Mutex > | Map |
Private Member Functions | |
void | collapse (Queue &q) |
void | track () |
void | track_queue (Address const &addr, Queue &q, Messages &msgs) |
Profile_ptr | create_nrtm (u32 max_elem) |
Static Private Member Functions | |
static ACE_THR_FUNC_RETURN | track_thunk (void *obj) |
Private Attributes | |
Parameters const & | params_ |
Map | hold_ |
Mutex | mutex_ |
Condition | cond_ |
unsigned long | nrtm_timer_ |
bool | stop_ |
ACE_Thread_Manager | tracker_mgr_ |
Definition at line 26 of file Acknowledge.h.
typedef ACE_Hash_Map_Manager_Ex<Address, Queue, AddressHasher, ACE_Equal_To<Address>, ACE_Null_Mutex> ACE_RMCast::Acknowledge::Map [private] |
Definition at line 216 of file Acknowledge.h.
ACE_RMCast::Acknowledge::Acknowledge | ( | Parameters const & | params | ) |
Definition at line 13 of file Acknowledge.cpp.
{ Acknowledge::
void ACE_RMCast::Acknowledge::collapse | ( | Queue & | q | ) | [private] |
Definition at line 51 of file Acknowledge.cpp.
{ // I would normally use iterators in the logic below but ACE_Map_Manager // iterates over entries in no particular order so it is pretty much // unusable here. Instead we will do slow and cumbersome find's. // u64 sn (q.sn () + 1); for (;; ++sn) { Queue::ENTRY* e = 0; if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
Profile_ptr ACE_RMCast::Acknowledge::create_nrtm | ( | u32 | max_elem | ) | [private] |
Definition at line 350 of file Acknowledge.cpp.
{ // Prepare NRTM. // auto_ptr<NRTM> nrtm (new NRTM ()); // Gather the information. // { for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) { Address addr ((*i).ext_id_); Queue& q = (*i).int_id_; //@@ Should look for the highest known number. // nrtm->insert (addr, q.sn ()); if (--max_elem == 0) break;
void ACE_RMCast::Acknowledge::in_start | ( | In_Element * | in | ) | [virtual] |
Definition at line 23 of file Acknowledge.cpp.
{
void ACE_RMCast::Acknowledge::out_start | ( | Out_Element * | out | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 29 of file Acknowledge.cpp.
{ Element::in_start (in); }
void ACE_RMCast::Acknowledge::out_stop | ( | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 37 of file Acknowledge.cpp.
{ Element::out_start (out); tracker_mgr_.spawn (track_thunk, this); } void Acknowledge:: out_stop () { { Lock l (mutex_);
void ACE_RMCast::Acknowledge::recv | ( | Message_ptr | m | ) | [virtual] |
Definition at line 236 of file Acknowledge.cpp.
{ if (q.find (sn) == -1) { q.bind (sn, Descr (1)); } } } void Acknowledge::recv (Message_ptr m) { // Handle NRTM. There could be some nasty interaction with code // that handles data below (like missing message and NAK). This // is why I hold the lock at the beginning (which may be not very // efficient). // Lock l (mutex_); if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id))) { for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) { u64 sn (nrtm->find ((*i).ext_id_)); if (sn != 0) { Queue& q = (*i).int_id_; u64 old (q.max_sn ()); if (old < sn) { // Mark as lost. // q.bind (sn, Descr (1)); } } } } if (m->find (Data::id) || m->find (NoData::id)) { Address from ( static_cast<From const*> (m->find (From::id))->address ()); u64 sn (static_cast<SN const*> (m->find (SN::id))->num ()); Map::ENTRY* e; if (hold_.find (from, e) == -1) { // First message from this source. // hold_.bind (from, Queue (sn)); in_->recv (m); } else { Queue& q = e->int_id_; if (sn <= q.sn ()) { // Duplicate. // //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn // << endl; } else if (sn == q.sn () + 1) { // Next message. // q.rebind (sn, Descr (m)); collapse (q); } else { // Some messages are missing. Insert this one into the queue. // q.rebind (sn, Descr (m)); } } } else
void ACE_RMCast::Acknowledge::send | ( | Message_ptr | m | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 320 of file Acknowledge.cpp.
{ if (Data const* data = static_cast<Data const*> (m->find (Data::id))) { size_t max_payload_size ( params_.max_packet_size () - max_service_size); if (max_payload_size > data->size ()) { u32 max_size (max_payload_size - data->size ()); u32 max_elem (NRTM::max_count (max_size)); if (max_elem > 0) { Lock l (mutex_); Profile_ptr nrtm (create_nrtm (max_elem)); if (nrtm.get ()) m->add (nrtm);
void ACE_RMCast::Acknowledge::track | ( | ) | [private] |
Definition at line 76 of file Acknowledge.cpp.
{ while (true) { Messages msgs; { Lock l (mutex_); if (stop_) break; if (hold_.current_size () != 0) { for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) { Queue& q = (*i).int_id_; if (q.current_size () == 0) continue; track_queue ((*i).ext_id_, q, msgs); } } if (--nrtm_timer_ == 0) { nrtm_timer_ = params_.nrtm_timeout (); // Send NRTM. // unsigned short max_payload_size ( params_.max_packet_size () - max_service_size); u32 max_elem (NRTM::max_count (max_payload_size)); Profile_ptr nrtm (create_nrtm (max_elem)); if (!nrtm.null ()) { Message_ptr m (new Message); m->add (nrtm); msgs.push_back (m); } } } // Send stuff off. // for (Messages::Iterator i (msgs); !i.done (); i.advance ()) { Message_ptr* ppm; i.next (ppm); //FUZZ: disable check_for_lack_ACE_OS send (*ppm); //FUZZ: enable check_for_lack_ACE_OS } // Go to sleep but watch for "manual cancellation" request. // { //FUZZ: disable check_for_lack_ACE_OS ACE_Time_Value time (ACE_OS::gettimeofday ()); //FUZZ: enable check_for_lack_ACE_OS time += params_.tick (); Lock l (mutex_); while (!stop_) { if (cond_.wait (&time) == -1) { if (errno != ETIME) ACE_OS::abort (); else break;
void ACE_RMCast::Acknowledge::track_queue | ( | Address const & | addr, | |
Queue & | q, | |||
Messages & | msgs | |||
) | [private] |
Definition at line 166 of file Acknowledge.cpp.
{ unsigned short max_payload_size ( params_.max_packet_size () - max_service_size); u32 max_elem (NAK::max_count (max_payload_size)); u32 count (0); Queue::iterator i (q.begin ()), e (q.end ()); // Track existing losses. // while (i != e) { auto_ptr<NAK> nak (new NAK (addr)); // Inner loop that fills NAK profile with up to max_elem elements. // for (; i != e && nak->count () < max_elem; ++i) { u64 sn ((*i).ext_id_); Descr& d = (*i).int_id_; if (d.lost ()) { d.timer (d.timer () - 1); if (d.timer () == 0) { //@@ Need exp fallback. // d.nak_count (d.nak_count () + 1); d.timer ((d.nak_count () + 1) * params_.nak_timeout ()); nak->add (sn); ++count; // cerr << 6 << "NAK # " << d.nak_count () << ": " // << addr << " " << sn << endl; } } } // Send this NAK. // if (nak->count ()) { // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns" // << endl; Message_ptr m (new Message); m->add (Profile_ptr (nak.release ())); msgs.push_back (m); } } // Detect and record new losses. //
ACE_THR_FUNC_RETURN ACE_RMCast::Acknowledge::track_thunk | ( | void * | obj | ) | [static, private] |
Definition at line 380 of file Acknowledge.cpp.
Condition ACE_RMCast::Acknowledge::cond_ [private] |
Definition at line 239 of file Acknowledge.h.
Map ACE_RMCast::Acknowledge::hold_ [private] |
Definition at line 237 of file Acknowledge.h.
Mutex ACE_RMCast::Acknowledge::mutex_ [private] |
Definition at line 238 of file Acknowledge.h.
unsigned long ACE_RMCast::Acknowledge::nrtm_timer_ [private] |
Definition at line 241 of file Acknowledge.h.
Parameters const& ACE_RMCast::Acknowledge::params_ [private] |
Definition at line 235 of file Acknowledge.h.
bool ACE_RMCast::Acknowledge::stop_ [private] |
Definition at line 243 of file Acknowledge.h.
Definition at line 244 of file Acknowledge.h.