Classes | Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes

ACE_RMCast::Acknowledge Class Reference

#include <Acknowledge.h>

Inheritance diagram for ACE_RMCast::Acknowledge:
Inheritance graph
[legend]
Collaboration diagram for ACE_RMCast::Acknowledge:
Collaboration graph
[legend]

List of all members.

Classes

struct  Descr
struct  Queue

Public Member Functions

 Acknowledge (Parameters const &params)
virtual void in_start (In_Element *in)
virtual void out_start (Out_Element *out)
virtual void out_stop ()
virtual void recv (Message_ptr m)
virtual void send (Message_ptr m)

Private Types

typedef
ACE_Hash_Map_Manager_Ex
< Address, Queue,
AddressHasher, ACE_Equal_To
< Address >, ACE_Null_Mutex
Map

Private Member Functions

void collapse (Queue &q)
void track ()
void track_queue (Address const &addr, Queue &q, Messages &msgs)
Profile_ptr create_nrtm (u32 max_elem)

Static Private Member Functions

static ACE_THR_FUNC_RETURN track_thunk (void *obj)

Private Attributes

Parameters const & params_
Map hold_
Mutex mutex_
Condition cond_
unsigned long nrtm_timer_
bool stop_
ACE_Thread_Manager tracker_mgr_

Detailed Description

Definition at line 26 of file Acknowledge.h.


Member Typedef Documentation

Definition at line 216 of file Acknowledge.h.


Constructor & Destructor Documentation

ACE_RMCast::Acknowledge::Acknowledge ( Parameters const &  params  ) 

Definition at line 13 of file Acknowledge.cpp.


Member Function Documentation

void ACE_RMCast::Acknowledge::collapse ( Queue q  )  [private]

Definition at line 51 of file Acknowledge.cpp.

  {
    // I would normally use iterators in the logic below but ACE_Map_Manager
    // iterates over entries in no particular order so it is pretty much
    // unusable here. Instead we will do slow and cumbersome find's.
    //

    u64 sn (q.sn () + 1);

    for (;; ++sn)
    {
      Queue::ENTRY* e = 0;

      if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;

Profile_ptr ACE_RMCast::Acknowledge::create_nrtm ( u32  max_elem  )  [private]

Definition at line 350 of file Acknowledge.cpp.

  {
    // Prepare NRTM.
    //
    auto_ptr<NRTM> nrtm (new NRTM ());

    // Gather the information.
    //
    {
      for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
      {
        Address addr ((*i).ext_id_);
        Queue& q = (*i).int_id_;

        //@@ Should look for the highest known number.
        //
        nrtm->insert (addr, q.sn ());

        if (--max_elem == 0)
          break;

void ACE_RMCast::Acknowledge::in_start ( In_Element in  )  [virtual]

Definition at line 23 of file Acknowledge.cpp.

  {

void ACE_RMCast::Acknowledge::out_start ( Out_Element out  )  [virtual]

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 29 of file Acknowledge.cpp.

  {
    Element::in_start (in);
  }

void ACE_RMCast::Acknowledge::out_stop (  )  [virtual]

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 37 of file Acknowledge.cpp.

  {
    Element::out_start (out);

    tracker_mgr_.spawn (track_thunk, this);
  }

  void Acknowledge::
  out_stop ()
  {
    {
      Lock l (mutex_);

void ACE_RMCast::Acknowledge::recv ( Message_ptr  m  )  [virtual]

Definition at line 236 of file Acknowledge.cpp.

    {
      if (q.find (sn) == -1)
      {
        q.bind (sn, Descr (1));
      }
    }
  }

  void Acknowledge::recv (Message_ptr m)
  {
    // Handle NRTM. There could be some nasty interaction with code
    // that handles data below (like missing message and NAK). This
    // is why I hold the lock at the beginning (which may be not very
    // efficient).
    //
    Lock l (mutex_);

    if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
    {
      for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
      {
        u64 sn (nrtm->find ((*i).ext_id_));

        if (sn != 0)
        {
          Queue& q = (*i).int_id_;

          u64 old (q.max_sn ());

          if (old < sn)
          {
            // Mark as lost.
            //
            q.bind (sn, Descr (1));
          }
        }
      }
    }

    if (m->find (Data::id) || m->find (NoData::id))
    {
      Address from (
        static_cast<From const*> (m->find (From::id))->address ());

      u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());

      Map::ENTRY* e;

      if (hold_.find (from, e) == -1)
      {
        // First message from this source.
        //
        hold_.bind (from, Queue (sn));
        in_->recv (m);
      }
      else
      {
        Queue& q = e->int_id_;

        if (sn <= q.sn ())
        {
          // Duplicate.
          //
          //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
          //     << endl;
        }
        else if (sn == q.sn () + 1)
        {
          // Next message.
          //

          q.rebind (sn, Descr (m));
          collapse (q);
        }
        else
        {
          // Some messages are missing. Insert this one into the queue.
          //
          q.rebind (sn, Descr (m));
        }
      }
    }
    else

void ACE_RMCast::Acknowledge::send ( Message_ptr  m  )  [virtual]

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 320 of file Acknowledge.cpp.

  {
    if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
    {
      size_t max_payload_size (
        params_.max_packet_size () - max_service_size);

      if (max_payload_size > data->size ())
      {
        u32 max_size (max_payload_size - data->size ());
        u32 max_elem (NRTM::max_count (max_size));

        if (max_elem > 0)
        {
          Lock l (mutex_);

          Profile_ptr nrtm (create_nrtm (max_elem));

          if (nrtm.get ())
            m->add (nrtm);

void ACE_RMCast::Acknowledge::track (  )  [private]

Definition at line 76 of file Acknowledge.cpp.

  {
    while (true)
    {
      Messages msgs;

      {
        Lock l (mutex_);

        if (stop_)
          break;

        if (hold_.current_size () != 0)
        {
          for (Map::iterator i (hold_.begin ()), e (hold_.end ());
               i != e;
               ++i)
          {
            Queue& q = (*i).int_id_;

            if (q.current_size () == 0) continue;

            track_queue ((*i).ext_id_, q, msgs);
          }
        }

        if (--nrtm_timer_ == 0)
        {
          nrtm_timer_ = params_.nrtm_timeout ();

          // Send NRTM.
          //
          unsigned short max_payload_size (
            params_.max_packet_size () - max_service_size);

          u32 max_elem (NRTM::max_count (max_payload_size));

          Profile_ptr nrtm (create_nrtm (max_elem));

          if (!nrtm.null ())
          {
            Message_ptr m (new Message);
            m->add (nrtm);
            msgs.push_back (m);

          }
        }
      }

      // Send stuff off.
      //
      for (Messages::Iterator i (msgs); !i.done (); i.advance ())
      {
        Message_ptr* ppm;
        i.next (ppm);

        //FUZZ: disable check_for_lack_ACE_OS
        send (*ppm);
        //FUZZ: enable check_for_lack_ACE_OS
      }

      // Go to sleep but watch for "manual cancellation" request.
      //
      {
        //FUZZ: disable check_for_lack_ACE_OS
        ACE_Time_Value time (ACE_OS::gettimeofday ());
        //FUZZ: enable check_for_lack_ACE_OS

        time += params_.tick ();

        Lock l (mutex_);

        while (!stop_)
        {
          if (cond_.wait (&time) == -1)
          {
            if (errno != ETIME)
              ACE_OS::abort ();
            else
              break;

void ACE_RMCast::Acknowledge::track_queue ( Address const &  addr,
Queue q,
Messages msgs 
) [private]

Definition at line 166 of file Acknowledge.cpp.

  {
    unsigned short max_payload_size (
      params_.max_packet_size () - max_service_size);

    u32 max_elem (NAK::max_count (max_payload_size));
    u32 count (0);

    Queue::iterator i (q.begin ()), e (q.end ());

    // Track existing losses.
    //
    while (i != e)
    {
      auto_ptr<NAK> nak (new NAK (addr));

      // Inner loop that fills NAK profile with up to max_elem elements.
      //
      for (; i != e && nak->count () < max_elem; ++i)
      {
        u64 sn ((*i).ext_id_);
        Descr& d = (*i).int_id_;

        if (d.lost ())
        {
          d.timer (d.timer () - 1);

          if (d.timer () == 0)
          {
            //@@ Need exp fallback.
            //
            d.nak_count (d.nak_count () + 1);
            d.timer ((d.nak_count () + 1) * params_.nak_timeout ());

            nak->add (sn);

            ++count;

            // cerr << 6 << "NAK # " << d.nak_count () << ": "
            // << addr << " " << sn << endl;
          }
        }
      }

      // Send this NAK.
      //
      if (nak->count ())
      {
        // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
        //     << endl;

        Message_ptr m (new Message);

        m->add (Profile_ptr (nak.release ()));

        msgs.push_back (m);
      }
    }

    // Detect and record new losses.
    //

ACE_THR_FUNC_RETURN ACE_RMCast::Acknowledge::track_thunk ( void *  obj  )  [static, private]

Definition at line 380 of file Acknowledge.cpp.


Member Data Documentation

Definition at line 239 of file Acknowledge.h.

Definition at line 237 of file Acknowledge.h.

Definition at line 238 of file Acknowledge.h.

unsigned long ACE_RMCast::Acknowledge::nrtm_timer_ [private]

Definition at line 241 of file Acknowledge.h.

Definition at line 235 of file Acknowledge.h.

Definition at line 243 of file Acknowledge.h.

Definition at line 244 of file Acknowledge.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends Defines