Public Member Functions | Private Member Functions | Private Attributes

ACE_RMCast::Socket_Impl Class Reference

Inheritance diagram for ACE_RMCast::Socket_Impl:
Inheritance graph
[legend]
Collaboration diagram for ACE_RMCast::Socket_Impl:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 ~Socket_Impl ()
 Socket_Impl (Address const &a, bool loop, Parameters const &params)
void send_ (void const *buf, size_t s)
ssize_t recv_ (void *buf, size_t s, ACE_Time_Value const *timeout, ACE_INET_Addr *from)
ssize_t size_ (ACE_Time_Value const *timeout)
ACE_HANDLE get_handle_ ()

Private Member Functions

virtual void recv (Message_ptr m)

Private Attributes

bool loop_
Parameters const params_
Mutex mutex_
Condition cond_
ACE_Unbounded_Queue< Message_ptrqueue_
ACE_Pipe signal_pipe_
ACE_Auto_Ptr< Fragmentfragment_
ACE_Auto_Ptr< Reassemblereassemble_
ACE_Auto_Ptr< Acknowledgeacknowledge_
ACE_Auto_Ptr< Retransmitretransmit_
ACE_Auto_Ptr< Flowflow_
ACE_Auto_Ptr< Linklink_

Detailed Description

Definition at line 29 of file Socket.cpp.


Constructor & Destructor Documentation

ACE_RMCast::Socket_Impl::~Socket_Impl (  ) 

Definition at line 112 of file Socket.cpp.

    {
      // Stop OUT stack from top to bottom.
      //
      out_stop ();
      fragment_->out_stop ();
      reassemble_->out_stop ();
      acknowledge_->out_stop ();
      retransmit_->out_stop ();
      flow_->out_stop ();
      link_->out_stop ();

      // Stop IN stack from bottom up.
      //
      link_->in_stop ();
      flow_->in_stop ();
      retransmit_->in_stop ();
      acknowledge_->in_stop ();
      reassemble_->in_stop ();

ACE_RMCast::Socket_Impl::Socket_Impl ( Address const &  a,
bool  loop,
Parameters const &  params 
)

Definition at line 78 of file Socket.cpp.

      : loop_ (loop),
        params_ (params),
        cond_ (mutex_)
  {
    fragment_.reset (new Fragment (params_));
    reassemble_.reset (new Reassemble (params_));
    acknowledge_.reset (new Acknowledge (params_));
    retransmit_.reset (new Retransmit (params_));
    flow_.reset (new Flow (params_));
    link_.reset (new Link (a, params_));

    // Start IN stack from top to bottom.
    //
    in_start (0);
    fragment_->in_start (this);
    reassemble_->in_start (fragment_.get ());
    acknowledge_->in_start (reassemble_.get ());
    retransmit_->in_start (acknowledge_.get ());
    flow_->in_start (retransmit_.get ());
    link_->in_start (flow_.get ());

    // Start OUT stack from bottom up.
    //


Member Function Documentation

ACE_HANDLE ACE_RMCast::Socket_Impl::get_handle_ (  ) 

Definition at line 272 of file Socket.cpp.

    {

void ACE_RMCast::Socket_Impl::recv ( Message_ptr  m  )  [private, virtual]

Definition at line 283 of file Socket.cpp.

        {
          signal_pipe_.open ();
        }

      return signal_pipe_.read_handle ();
    }


  void Socket_Impl::recv (Message_ptr m)
    {
      if (m->find (Data::id) != 0 || m->find (NoData::id) != 0)
        {
          if (!loop_)
            {
              Address to (static_cast<To const*> (m->find (To::id))->address ());

              Address from (
                            static_cast<From const*> (m->find (From::id))->address ());

              if (to == from)
                return;
            }

          Lock l (mutex_);

          //if (queue_.size () != 0)
          //  cerr << "recv socket queue size: " << queue_.size () << endl;

          //FUZZ: disable check_for_lack_ACE_OS
          bool signal (queue_.is_empty ());
          //FUZZ: enable check_for_lack_ACE_OS

          queue_.enqueue_tail (m);

          if (signal)
            {
              // Also write to the pipe.
              if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
                {
                  char c;

                  if (signal_pipe_.send (&c, 1) != 1)
                    {
                      // perror ("write: ");

ssize_t ACE_RMCast::Socket_Impl::recv_ ( void *  buf,
size_t  s,
ACE_Time_Value const *  timeout,
ACE_INET_Addr from 
)

Definition at line 154 of file Socket.cpp.

    {
      ACE_Time_Value abs_time;

      if (timeout)
        abs_time = ACE_OS::gettimeofday () + *timeout;

      Lock l (mutex_);

      while (queue_.is_empty ())
        {
          if (timeout)
            {
              if (cond_.wait (&abs_time) != -1)
                break;
            }
          else
            {
              if (cond_.wait () != -1)
                break;
            }

          return -1; // errno is already set
        }


      Message_ptr m;

      if (queue_.dequeue_head (m) == -1)
        ACE_OS::abort ();


      if (queue_.is_empty ())
        {
          // Remove data from the pipe.
          //
          if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
            {
              char c;

              if (signal_pipe_.recv (&c, 1) != 1)
                {
                  ACE_OS::perror ("read: ");
                  ACE_OS::abort ();
                }
            }
        }

      if (from)
        *from = static_cast<From const*> (m->find (From::id))->address ();

      if (m->find (NoData::id) != 0)
        {
          errno = ENOENT;
          return -1;
        }

void ACE_RMCast::Socket_Impl::send_ ( void const *  buf,
size_t  s 
)

Definition at line 142 of file Socket.cpp.

    {

ssize_t ACE_RMCast::Socket_Impl::size_ ( ACE_Time_Value const *  timeout  ) 

Definition at line 224 of file Socket.cpp.

                                                                   : s));

      ACE_OS::memcpy (buf, d->buf (), r);

      return r;
    }

  ssize_t Socket_Impl::
    size_ (ACE_Time_Value const* timeout)
    {
      ACE_Time_Value abs_time;

      if (timeout)
        abs_time = ACE_OS::gettimeofday () + *timeout;

      Lock l (mutex_);

      while (queue_.is_empty ())
        {
          if (timeout)
            {
              if (cond_.wait (&abs_time) != -1)
                break;
            }
          else
            {
              if (cond_.wait () != -1)
                break;
            }

          return -1; // errno is already set
        }

      // I can't get the head of the queue without actually dequeuing
      // the element.
      //
      Message_ptr m;

      if (queue_.dequeue_head (m) == -1)
        ACE_OS::abort ();

      if (queue_.enqueue_head (m) == -1)
        ACE_OS::abort ();

      if (m->find (NoData::id) != 0)
        {


Member Data Documentation

Definition at line 68 of file Socket.cpp.

Definition at line 60 of file Socket.cpp.

Definition at line 70 of file Socket.cpp.

Definition at line 66 of file Socket.cpp.

Definition at line 71 of file Socket.cpp.

Definition at line 56 of file Socket.cpp.

Definition at line 59 of file Socket.cpp.

Definition at line 57 of file Socket.cpp.

Definition at line 62 of file Socket.cpp.

Definition at line 67 of file Socket.cpp.

Definition at line 69 of file Socket.cpp.

Definition at line 64 of file Socket.cpp.


The documentation for this class was generated from the following file:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends Defines