ACE_RMCast::Socket_Impl Class Reference

Inheritance diagram for ACE_RMCast::Socket_Impl:

Inheritance graph
[legend]
Collaboration diagram for ACE_RMCast::Socket_Impl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ~Socket_Impl ()
 Socket_Impl (Address const &a, bool loop, Parameters const &params)
void send_ (void const *buf, size_t s)
ssize_t recv_ (void *buf, size_t s, ACE_Time_Value const *timeout, ACE_INET_Addr *from)
ssize_t size_ (ACE_Time_Value const *timeout)
ACE_HANDLE get_handle_ ()

Private Member Functions

virtual void recv (Message_ptr m)

Private Attributes

bool loop_
Parameters const  params_
Mutex mutex_
Condition cond_
ACE_Unbounded_Queue< Message_ptrqueue_
ACE_Pipe signal_pipe_
ACE_Auto_Ptr< Fragmentfragment_
ACE_Auto_Ptr< Reassemblereassemble_
ACE_Auto_Ptr< Acknowledgeacknowledge_
ACE_Auto_Ptr< Retransmitretransmit_
ACE_Auto_Ptr< Flowflow_
ACE_Auto_Ptr< Linklink_

Constructor & Destructor Documentation

ACE_RMCast::Socket_Impl::~Socket_Impl  ) 
 

Definition at line 119 of file Socket.cpp.

References acknowledge_, ACE_Pipe::close(), flow_, fragment_, ACE_RMCast::In_Element::in_stop(), ACE_RMCast::Out_Element::out_stop(), ACE_Pipe::read_handle(), reassemble_, retransmit_, and signal_pipe_.

00120     {
00121       // Stop OUT stack from top to bottom.
00122       //
00123       out_stop ();
00124       fragment_->out_stop ();
00125       reassemble_->out_stop ();
00126       acknowledge_->out_stop ();
00127       retransmit_->out_stop ();
00128       flow_->out_stop ();
00129       link_->out_stop ();
00130 
00131       // Stop IN stack from bottom up.
00132       //
00133       link_->in_stop ();
00134       flow_->in_stop ();
00135       retransmit_->in_stop ();
00136       acknowledge_->in_stop ();
00137       reassemble_->in_stop ();
00138       fragment_->in_stop ();
00139       in_stop ();
00140 
00141       // Close signal pipe.
00142       //
00143       if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00144         signal_pipe_.close ();
00145     }

ACE_RMCast::Socket_Impl::Socket_Impl Address const &  a,
bool  loop,
Parameters const &  params
 

Definition at line 85 of file Socket.cpp.

References acknowledge_, ACE_RMCast::Address, flow_, fragment_, ACE_Auto_Basic_Ptr< X >::get(), ACE_RMCast::In_Element::in_start(), ACE_RMCast::Out_Element::out_start(), reassemble_, ACE_Auto_Basic_Ptr< X >::reset(), and retransmit_.

00086       : loop_ (loop),
00087         params_ (params),
00088         cond_ (mutex_)
00089   {
00090     fragment_.reset (new Fragment (params_));
00091     reassemble_.reset (new Reassemble (params_));
00092     acknowledge_.reset (new Acknowledge (params_));
00093     retransmit_.reset (new Retransmit (params_));
00094     flow_.reset (new Flow (params_));
00095     link_.reset (new Link (a, params_));
00096 
00097     // Start IN stack from top to bottom.
00098     //
00099     in_start (0);
00100     fragment_->in_start (this);
00101     reassemble_->in_start (fragment_.get ());
00102     acknowledge_->in_start (reassemble_.get ());
00103     retransmit_->in_start (acknowledge_.get ());
00104     flow_->in_start (retransmit_.get ());
00105     link_->in_start (flow_.get ());
00106 
00107     // Start OUT stack from bottom up.
00108     //
00109     link_->out_start (0);
00110     flow_->out_start (link_.get ());
00111     retransmit_->out_start (flow_.get ());
00112     acknowledge_->out_start (retransmit_.get ());
00113     reassemble_->out_start (acknowledge_.get ());
00114     fragment_->out_start (reassemble_.get ());
00115     out_start (fragment_.get ());
00116   }


Member Function Documentation

ACE_HANDLE ACE_RMCast::Socket_Impl::get_handle_  ) 
 

Definition at line 279 of file Socket.cpp.

References ACE_Pipe::open(), ACE_Pipe::read_handle(), and signal_pipe_.

00280     {
00281       if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE)
00282         {
00283           signal_pipe_.open ();
00284         }
00285 
00286       return signal_pipe_.read_handle ();
00287     }

void ACE_RMCast::Socket_Impl::recv Message_ptr  m  )  [private, virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 291 of file Socket.cpp.

References ACE_OS::abort(), ACE_RMCast::Address, ACE_Unbounded_Queue< Message_ptr >::enqueue_tail(), ACE_Unbounded_Queue< Message_ptr >::is_empty(), ACE_RMCast::Lock, loop_, ACE_RMCast::Message_ptr, ACE_Pipe::send(), ACE_Condition< MUTEX >::signal(), signal_pipe_, and ACE_Pipe::write_handle().

00292     {
00293       if (m->find (Data::id) != 0 || m->find (NoData::id) != 0)
00294         {
00295           if (!loop_)
00296             {
00297               Address to (static_cast<To const*> (m->find (To::id))->address ());
00298 
00299               Address from (
00300                             static_cast<From const*> (m->find (From::id))->address ());
00301 
00302               if (to == from)
00303                 return;
00304             }
00305 
00306           Lock l (mutex_);
00307 
00308           //if (queue_.size () != 0)
00309           //  cerr << "recv socket queue size: " << queue_.size () << endl;
00310 
00311           bool signal (queue_.is_empty ());
00312 
00313           queue_.enqueue_tail (m);
00314 
00315           if (signal)
00316             {
00317               // Also write to the pipe.
00318               if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
00319                 {
00320                   char c;
00321 
00322                   if (signal_pipe_.send (&c, 1) != 1)
00323                     {
00324                       // perror ("write: ");
00325                       ACE_OS::abort ();
00326                     }
00327                 }
00328 
00329               cond_.signal ();
00330             }
00331         }
00332     }

ssize_t ACE_RMCast::Socket_Impl::recv_ void *  buf,
size_t  s,
ACE_Time_Value const *  timeout,
ACE_INET_Addr from
 

Definition at line 161 of file Socket.cpp.

References ACE_OS::abort(), ACE_RMCast::Data::buf(), ACE_Unbounded_Queue< Message_ptr >::dequeue_head(), ACE_OS::gettimeofday(), ACE_Unbounded_Queue< Message_ptr >::is_empty(), ACE_RMCast::Lock, ACE_OS::memcpy(), ACE_RMCast::Message_ptr, ACE_OS::perror(), ACE_Pipe::read_handle(), ACE_Pipe::recv(), signal_pipe_, ACE_RMCast::Data::size(), ssize_t, and ACE_Condition< MUTEX >::wait().

00165     {
00166       ACE_Time_Value abs_time;
00167 
00168       if (timeout)
00169         abs_time = ACE_OS::gettimeofday () + *timeout;
00170 
00171       Lock l (mutex_);
00172 
00173       while (queue_.is_empty ())
00174         {
00175           if (timeout)
00176             {
00177               if (cond_.wait (&abs_time) != -1)
00178                 break;
00179             }
00180           else
00181             {
00182               if (cond_.wait () != -1)
00183                 break;
00184             }
00185 
00186           return -1; // errno is already set
00187         }
00188 
00189 
00190       Message_ptr m;
00191 
00192       if (queue_.dequeue_head (m) == -1)
00193         ACE_OS::abort ();
00194 
00195 
00196       if (queue_.is_empty ())
00197         {
00198           // Remove data from the pipe.
00199           //
00200           if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00201             {
00202               char c;
00203 
00204               if (signal_pipe_.recv (&c, 1) != 1)
00205                 {
00206                   ACE_OS::perror ("read: ");
00207                   ACE_OS::abort ();
00208                 }
00209             }
00210         }
00211 
00212       if (from)
00213         *from = static_cast<From const*> (m->find (From::id))->address ();
00214 
00215       if (m->find (NoData::id) != 0)
00216         {
00217           errno = ENOENT;
00218           return -1;
00219         }
00220 
00221       Data const* d = static_cast<Data const*>(m->find (Data::id));
00222 
00223       ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s));
00224 
00225       ACE_OS::memcpy (buf, d->buf (), r);
00226 
00227       return r;
00228     }

void ACE_RMCast::Socket_Impl::send_ void const *  buf,
size_t  s
 

Definition at line 149 of file Socket.cpp.

References ACE_RMCast::Message_ptr, and ACE_RMCast::Profile_ptr.

00150     {
00151       Message_ptr m (new Message);
00152 
00153       m->add (Profile_ptr (new Data (buf, s)));
00154 
00155       // Qualification is for VC6 and VxWorks.
00156       //
00157       Element::send (m);
00158     }

ssize_t ACE_RMCast::Socket_Impl::size_ ACE_Time_Value const *  timeout  ) 
 

Definition at line 231 of file Socket.cpp.

References ACE_OS::abort(), ACE_Unbounded_Queue< Message_ptr >::dequeue_head(), ACE_Unbounded_Queue< Message_ptr >::enqueue_head(), ACE_OS::gettimeofday(), ACE_Unbounded_Queue< Message_ptr >::is_empty(), ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_RMCast::Data::size(), and ACE_Condition< MUTEX >::wait().

00232     {
00233       ACE_Time_Value abs_time;
00234 
00235       if (timeout)
00236         abs_time = ACE_OS::gettimeofday () + *timeout;
00237 
00238       Lock l (mutex_);
00239 
00240       while (queue_.is_empty ())
00241         {
00242           if (timeout)
00243             {
00244               if (cond_.wait (&abs_time) != -1)
00245                 break;
00246             }
00247           else
00248             {
00249               if (cond_.wait () != -1)
00250                 break;
00251             }
00252 
00253           return -1; // errno is already set
00254         }
00255 
00256       // I can't get the head of the queue without actually dequeuing
00257       // the element.
00258       //
00259       Message_ptr m;
00260 
00261       if (queue_.dequeue_head (m) == -1)
00262         ACE_OS::abort ();
00263 
00264       if (queue_.enqueue_head (m) == -1)
00265         ACE_OS::abort ();
00266 
00267       if (m->find (NoData::id) != 0)
00268         {
00269           errno = ENOENT;
00270           return -1;
00271         }
00272 
00273       Data const* d = static_cast<Data const*>(m->find (Data::id));
00274 
00275       return static_cast<ssize_t> (d->size ());
00276     }


Member Data Documentation

ACE_Auto_Ptr<Acknowledge> ACE_RMCast::Socket_Impl::acknowledge_ [private]
 

Definition at line 77 of file Socket.cpp.

Referenced by Socket_Impl(), and ~Socket_Impl().

Condition ACE_RMCast::Socket_Impl::cond_ [private]
 

Definition at line 69 of file Socket.cpp.

ACE_Auto_Ptr<Flow> ACE_RMCast::Socket_Impl::flow_ [private]
 

Definition at line 79 of file Socket.cpp.

Referenced by Socket_Impl(), and ~Socket_Impl().

ACE_Auto_Ptr<Fragment> ACE_RMCast::Socket_Impl::fragment_ [private]
 

Definition at line 75 of file Socket.cpp.

Referenced by Socket_Impl(), and ~Socket_Impl().

ACE_Auto_Ptr<Link> ACE_RMCast::Socket_Impl::link_ [private]
 

Definition at line 80 of file Socket.cpp.

bool ACE_RMCast::Socket_Impl::loop_ [private]
 

Definition at line 65 of file Socket.cpp.

Referenced by recv().

Mutex ACE_RMCast::Socket_Impl::mutex_ [private]
 

Definition at line 68 of file Socket.cpp.

Parameters const ACE_RMCast::Socket_Impl::params_ [private]
 

Definition at line 66 of file Socket.cpp.

ACE_Unbounded_Queue<Message_ptr> ACE_RMCast::Socket_Impl::queue_ [private]
 

Definition at line 71 of file Socket.cpp.

ACE_Auto_Ptr<Reassemble> ACE_RMCast::Socket_Impl::reassemble_ [private]
 

Definition at line 76 of file Socket.cpp.

Referenced by Socket_Impl(), and ~Socket_Impl().

ACE_Auto_Ptr<Retransmit> ACE_RMCast::Socket_Impl::retransmit_ [private]
 

Definition at line 78 of file Socket.cpp.

Referenced by Socket_Impl(), and ~Socket_Impl().

ACE_Pipe ACE_RMCast::Socket_Impl::signal_pipe_ [private]
 

Definition at line 73 of file Socket.cpp.

Referenced by get_handle_(), recv(), recv_(), and ~Socket_Impl().


The documentation for this class was generated from the following file:
Generated on Thu Nov 9 11:41:25 2006 for ACE_RMCast by doxygen 1.3.6