ACE_RMCast::Socket_Impl Class Reference

Inheritance diagram for ACE_RMCast::Socket_Impl:

Inheritance graph
[legend]
Collaboration diagram for ACE_RMCast::Socket_Impl:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ~Socket_Impl ()
 Socket_Impl (Address const &a, bool loop, Parameters const &params)
void send_ (void const *buf, size_t s)
ssize_t recv_ (void *buf, size_t s, ACE_Time_Value const *timeout, ACE_INET_Addr *from)
ssize_t size_ (ACE_Time_Value const *timeout)
ACE_HANDLE get_handle_ ()

Private Member Functions

virtual void recv (Message_ptr m)

Private Attributes

bool loop_
Parameters const  params_
Mutex mutex_
Condition cond_
ACE_Unbounded_Queue< Message_ptrqueue_
ACE_Pipe signal_pipe_
ACE_Auto_Ptr< Fragmentfragment_
ACE_Auto_Ptr< Reassemblereassemble_
ACE_Auto_Ptr< Acknowledgeacknowledge_
ACE_Auto_Ptr< Retransmitretransmit_
ACE_Auto_Ptr< Flowflow_
ACE_Auto_Ptr< Linklink_

Constructor & Destructor Documentation

ACE_RMCast::Socket_Impl::~Socket_Impl  ) 
 

Definition at line 120 of file Socket.cpp.

References acknowledge_, ACE_Pipe::close(), flow_, fragment_, ACE_RMCast::In_Element::in_stop(), ACE_RMCast::Out_Element::out_stop(), ACE_Pipe::read_handle(), reassemble_, retransmit_, and signal_pipe_.

00121     {
00122       // Stop OUT stack from top to bottom.
00123       //
00124       out_stop ();
00125       fragment_->out_stop ();
00126       reassemble_->out_stop ();
00127       acknowledge_->out_stop ();
00128       retransmit_->out_stop ();
00129       flow_->out_stop ();
00130       link_->out_stop ();
00131 
00132       // Stop IN stack from bottom up.
00133       //
00134       link_->in_stop ();
00135       flow_->in_stop ();
00136       retransmit_->in_stop ();
00137       acknowledge_->in_stop ();
00138       reassemble_->in_stop ();
00139       fragment_->in_stop ();
00140       in_stop ();
00141 
00142       // Close signal pipe.
00143       //
00144       if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00145         signal_pipe_.close ();
00146     }

ACE_RMCast::Socket_Impl::Socket_Impl Address const &  a,
bool  loop,
Parameters const &  params
 

Definition at line 86 of file Socket.cpp.

References acknowledge_, ACE_RMCast::Address, flow_, fragment_, ACE_Auto_Basic_Ptr< X >::get(), ACE_RMCast::In_Element::in_start(), ACE_RMCast::Out_Element::out_start(), reassemble_, ACE_Auto_Basic_Ptr< X >::reset(), and retransmit_.

00087       : loop_ (loop),
00088         params_ (params),
00089         cond_ (mutex_)
00090   {
00091     fragment_.reset (new Fragment (params_));
00092     reassemble_.reset (new Reassemble (params_));
00093     acknowledge_.reset (new Acknowledge (params_));
00094     retransmit_.reset (new Retransmit (params_));
00095     flow_.reset (new Flow (params_));
00096     link_.reset (new Link (a, params_));
00097 
00098     // Start IN stack from top to bottom.
00099     //
00100     in_start (0);
00101     fragment_->in_start (this);
00102     reassemble_->in_start (fragment_.get ());
00103     acknowledge_->in_start (reassemble_.get ());
00104     retransmit_->in_start (acknowledge_.get ());
00105     flow_->in_start (retransmit_.get ());
00106     link_->in_start (flow_.get ());
00107 
00108     // Start OUT stack from bottom up.
00109     //
00110     link_->out_start (0);
00111     flow_->out_start (link_.get ());
00112     retransmit_->out_start (flow_.get ());
00113     acknowledge_->out_start (retransmit_.get ());
00114     reassemble_->out_start (acknowledge_.get ());
00115     fragment_->out_start (reassemble_.get ());
00116     out_start (fragment_.get ());
00117   }


Member Function Documentation

ACE_HANDLE ACE_RMCast::Socket_Impl::get_handle_  ) 
 

Definition at line 280 of file Socket.cpp.

References ACE_Pipe::open(), ACE_Pipe::read_handle(), and signal_pipe_.

00281     {
00282       if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE)
00283         {
00284           signal_pipe_.open ();
00285         }
00286 
00287       return signal_pipe_.read_handle ();
00288     }

void ACE_RMCast::Socket_Impl::recv Message_ptr  m  )  [private, virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 291 of file Socket.cpp.

References ACE_OS::abort(), ACE_RMCast::Address, ACE_Unbounded_Queue< Message_ptr >::enqueue_tail(), ACE_Unbounded_Queue< Message_ptr >::is_empty(), ACE_RMCast::Lock, loop_, ACE_RMCast::Message_ptr, ACE_Pipe::send(), ACE_Condition< MUTEX >::signal(), signal_pipe_, and ACE_Pipe::write_handle().

00292     {
00293       if (m->find (Data::id) != 0 || m->find (NoData::id) != 0)
00294         {
00295           if (!loop_)
00296             {
00297               Address to (static_cast<To const*> (m->find (To::id))->address ());
00298 
00299               Address from (
00300                             static_cast<From const*> (m->find (From::id))->address ());
00301 
00302               if (to == from)
00303                 return;
00304             }
00305 
00306           Lock l (mutex_);
00307 
00308           //if (queue_.size () != 0)
00309           //  cerr << "recv socket queue size: " << queue_.size () << endl;
00310 
00311           //FUZZ: disable check_for_lack_ACE_OS
00312           bool signal (queue_.is_empty ());
00313           //FUZZ: enable check_for_lack_ACE_OS
00314 
00315           queue_.enqueue_tail (m);
00316 
00317           if (signal)
00318             {
00319               // Also write to the pipe.
00320               if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
00321                 {
00322                   char c;
00323 
00324                   if (signal_pipe_.send (&c, 1) != 1)
00325                     {
00326                       // perror ("write: ");
00327                       ACE_OS::abort ();
00328                     }
00329                 }
00330 
00331               cond_.signal ();
00332             }
00333         }
00334     }

ssize_t ACE_RMCast::Socket_Impl::recv_ void *  buf,
size_t  s,
ACE_Time_Value const *  timeout,
ACE_INET_Addr from
 

Definition at line 162 of file Socket.cpp.

References ACE_OS::abort(), ACE_RMCast::Data::buf(), ACE_Unbounded_Queue< Message_ptr >::dequeue_head(), ACE_OS::gettimeofday(), ACE_Unbounded_Queue< Message_ptr >::is_empty(), ACE_RMCast::Lock, ACE_OS::memcpy(), ACE_RMCast::Message_ptr, ACE_OS::perror(), ACE_Pipe::read_handle(), ACE_Pipe::recv(), signal_pipe_, ACE_RMCast::Data::size(), ssize_t, and ACE_Condition< MUTEX >::wait().

00166     {
00167       ACE_Time_Value abs_time;
00168 
00169       if (timeout)
00170         abs_time = ACE_OS::gettimeofday () + *timeout;
00171 
00172       Lock l (mutex_);
00173 
00174       while (queue_.is_empty ())
00175         {
00176           if (timeout)
00177             {
00178               if (cond_.wait (&abs_time) != -1)
00179                 break;
00180             }
00181           else
00182             {
00183               if (cond_.wait () != -1)
00184                 break;
00185             }
00186 
00187           return -1; // errno is already set
00188         }
00189 
00190 
00191       Message_ptr m;
00192 
00193       if (queue_.dequeue_head (m) == -1)
00194         ACE_OS::abort ();
00195 
00196 
00197       if (queue_.is_empty ())
00198         {
00199           // Remove data from the pipe.
00200           //
00201           if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00202             {
00203               char c;
00204 
00205               if (signal_pipe_.recv (&c, 1) != 1)
00206                 {
00207                   ACE_OS::perror ("read: ");
00208                   ACE_OS::abort ();
00209                 }
00210             }
00211         }
00212 
00213       if (from)
00214         *from = static_cast<From const*> (m->find (From::id))->address ();
00215 
00216       if (m->find (NoData::id) != 0)
00217         {
00218           errno = ENOENT;
00219           return -1;
00220         }
00221 
00222       Data const* d = static_cast<Data const*>(m->find (Data::id));
00223 
00224       ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s));
00225 
00226       ACE_OS::memcpy (buf, d->buf (), r);
00227 
00228       return r;
00229     }

void ACE_RMCast::Socket_Impl::send_ void const *  buf,
size_t  s
 

Definition at line 150 of file Socket.cpp.

References ACE_RMCast::Message_ptr, and ACE_RMCast::Profile_ptr.

00151     {
00152       Message_ptr m (new Message);
00153 
00154       m->add (Profile_ptr (new Data (buf, s)));
00155 
00156       // Qualification is for VC6 and VxWorks.
00157       //
00158       Element::send (m);
00159     }

ssize_t ACE_RMCast::Socket_Impl::size_ ACE_Time_Value const *  timeout  ) 
 

Definition at line 232 of file Socket.cpp.

References ACE_OS::abort(), ACE_Unbounded_Queue< Message_ptr >::dequeue_head(), ACE_Unbounded_Queue< Message_ptr >::enqueue_head(), ACE_OS::gettimeofday(), ACE_Unbounded_Queue< Message_ptr >::is_empty(), ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_RMCast::Data::size(), and ACE_Condition< MUTEX >::wait().

00233     {
00234       ACE_Time_Value abs_time;
00235 
00236       if (timeout)
00237         abs_time = ACE_OS::gettimeofday () + *timeout;
00238 
00239       Lock l (mutex_);
00240 
00241       while (queue_.is_empty ())
00242         {
00243           if (timeout)
00244             {
00245               if (cond_.wait (&abs_time) != -1)
00246                 break;
00247             }
00248           else
00249             {
00250               if (cond_.wait () != -1)
00251                 break;
00252             }
00253 
00254           return -1; // errno is already set
00255         }
00256 
00257       // I can't get the head of the queue without actually dequeuing
00258       // the element.
00259       //
00260       Message_ptr m;
00261 
00262       if (queue_.dequeue_head (m) == -1)
00263         ACE_OS::abort ();
00264 
00265       if (queue_.enqueue_head (m) == -1)
00266         ACE_OS::abort ();
00267 
00268       if (m->find (NoData::id) != 0)
00269         {
00270           errno = ENOENT;
00271           return -1;
00272         }
00273 
00274       Data const* d = static_cast<Data const*>(m->find (Data::id));
00275 
00276       return static_cast<ssize_t> (d->size ());
00277     }


Member Data Documentation

ACE_Auto_Ptr<Acknowledge> ACE_RMCast::Socket_Impl::acknowledge_ [private]
 

Definition at line 78 of file Socket.cpp.

Referenced by Socket_Impl(), and ~Socket_Impl().

Condition ACE_RMCast::Socket_Impl::cond_ [private]
 

Definition at line 70 of file Socket.cpp.

ACE_Auto_Ptr<Flow> ACE_RMCast::Socket_Impl::flow_ [private]
 

Definition at line 80 of file Socket.cpp.

Referenced by Socket_Impl(), and ~Socket_Impl().

ACE_Auto_Ptr<Fragment> ACE_RMCast::Socket_Impl::fragment_ [private]
 

Definition at line 76 of file Socket.cpp.

Referenced by Socket_Impl(), and ~Socket_Impl().

ACE_Auto_Ptr<Link> ACE_RMCast::Socket_Impl::link_ [private]
 

Definition at line 81 of file Socket.cpp.

bool ACE_RMCast::Socket_Impl::loop_ [private]
 

Definition at line 66 of file Socket.cpp.

Referenced by recv().

Mutex ACE_RMCast::Socket_Impl::mutex_ [private]
 

Definition at line 69 of file Socket.cpp.

Parameters const ACE_RMCast::Socket_Impl::params_ [private]
 

Definition at line 67 of file Socket.cpp.

ACE_Unbounded_Queue<Message_ptr> ACE_RMCast::Socket_Impl::queue_ [private]
 

Definition at line 72 of file Socket.cpp.

ACE_Auto_Ptr<Reassemble> ACE_RMCast::Socket_Impl::reassemble_ [private]
 

Definition at line 77 of file Socket.cpp.

Referenced by Socket_Impl(), and ~Socket_Impl().

ACE_Auto_Ptr<Retransmit> ACE_RMCast::Socket_Impl::retransmit_ [private]
 

Definition at line 79 of file Socket.cpp.

Referenced by Socket_Impl(), and ~Socket_Impl().

ACE_Pipe ACE_RMCast::Socket_Impl::signal_pipe_ [private]
 

Definition at line 74 of file Socket.cpp.

Referenced by get_handle_(), recv(), recv_(), and ~Socket_Impl().


The documentation for this class was generated from the following file:
Generated on Sun Jan 27 13:03:11 2008 for ACE_RMCast by doxygen 1.3.6