Socket.cpp

Go to the documentation of this file.
00001 // file      : ace/RMCast/Socket.cpp
00002 // author    : Boris Kolpackov <boris@kolpackov.net>
00003 // cvs-id    : Socket.cpp,v 1.19 2006/06/28 09:29:26 boris Exp
00004 
00005 #include "ace/OS_Memory.h"
00006 #include "ace/OS_NS_stdio.h"
00007 #include "ace/OS_NS_stdlib.h"
00008 #include "ace/OS_NS_string.h"
00009 #include "ace/OS_NS_unistd.h"
00010 #include "ace/OS_NS_sys_time.h" // gettimeofday
00011 
00012 #include "ace/Unbounded_Queue.h"
00013 
00014 #include "ace/Pipe.h"
00015 
00016 #include "Stack.h"
00017 #include "Protocol.h"
00018 #include "Bits.h"
00019 
00020 #include "Fragment.h"
00021 #include "Reassemble.h"
00022 #include "Acknowledge.h"
00023 #include "Retransmit.h"
00024 #include "Flow.h"
00025 #include "Link.h"
00026 
00027 #include "Socket.h"
00028 
00029 /*
00030 #include <iostream>
00031 using std::cerr;
00032 using std::endl;
00033 */
00034 
00035 namespace ACE_RMCast
00036 {
00037   class Socket_Impl : protected Element
00038   {
00039   public:
00040     ~Socket_Impl ();
00041 
00042     Socket_Impl (Address const& a, bool loop, Parameters const& params);
00043 
00044   public:
00045     void
00046     send_ (void const* buf, size_t s);
00047 
00048     ssize_t
00049     recv_ (void* buf,
00050            size_t s,
00051            ACE_Time_Value const* timeout,
00052            ACE_INET_Addr* from);
00053 
00054     ssize_t
00055     size_ (ACE_Time_Value const* timeout);
00056 
00057     ACE_HANDLE
00058     get_handle_ ();
00059 
00060   private:
00061     virtual void
00062     recv (Message_ptr m);
00063 
00064   private:
00065     bool loop_;
00066     Parameters const params_;
00067 
00068     Mutex mutex_;
00069     Condition cond_;
00070 
00071     ACE_Unbounded_Queue<Message_ptr> queue_;
00072 
00073     ACE_Pipe signal_pipe_;
00074 
00075     ACE_Auto_Ptr<Fragment> fragment_;
00076     ACE_Auto_Ptr<Reassemble> reassemble_;
00077     ACE_Auto_Ptr<Acknowledge> acknowledge_;
00078     ACE_Auto_Ptr<Retransmit> retransmit_;
00079     ACE_Auto_Ptr<Flow> flow_;
00080     ACE_Auto_Ptr<Link> link_;
00081   };
00082 
00083 
00084   Socket_Impl::
00085   Socket_Impl (Address const& a, bool loop, Parameters const& params)
00086       : loop_ (loop),
00087         params_ (params),
00088         cond_ (mutex_)
00089   {
00090     fragment_.reset (new Fragment (params_));
00091     reassemble_.reset (new Reassemble (params_));
00092     acknowledge_.reset (new Acknowledge (params_));
00093     retransmit_.reset (new Retransmit (params_));
00094     flow_.reset (new Flow (params_));
00095     link_.reset (new Link (a, params_));
00096 
00097     // Start IN stack from top to bottom.
00098     //
00099     in_start (0);
00100     fragment_->in_start (this);
00101     reassemble_->in_start (fragment_.get ());
00102     acknowledge_->in_start (reassemble_.get ());
00103     retransmit_->in_start (acknowledge_.get ());
00104     flow_->in_start (retransmit_.get ());
00105     link_->in_start (flow_.get ());
00106 
00107     // Start OUT stack from bottom up.
00108     //
00109     link_->out_start (0);
00110     flow_->out_start (link_.get ());
00111     retransmit_->out_start (flow_.get ());
00112     acknowledge_->out_start (retransmit_.get ());
00113     reassemble_->out_start (acknowledge_.get ());
00114     fragment_->out_start (reassemble_.get ());
00115     out_start (fragment_.get ());
00116   }
00117 
00118   Socket_Impl::
00119     ~Socket_Impl ()
00120     {
00121       // Stop OUT stack from top to bottom.
00122       //
00123       out_stop ();
00124       fragment_->out_stop ();
00125       reassemble_->out_stop ();
00126       acknowledge_->out_stop ();
00127       retransmit_->out_stop ();
00128       flow_->out_stop ();
00129       link_->out_stop ();
00130 
00131       // Stop IN stack from bottom up.
00132       //
00133       link_->in_stop ();
00134       flow_->in_stop ();
00135       retransmit_->in_stop ();
00136       acknowledge_->in_stop ();
00137       reassemble_->in_stop ();
00138       fragment_->in_stop ();
00139       in_stop ();
00140 
00141       // Close signal pipe.
00142       //
00143       if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00144         signal_pipe_.close ();
00145     }
00146 
00147 
00148   void Socket_Impl::
00149     send_ (void const* buf, size_t s)
00150     {
00151       Message_ptr m (new Message);
00152 
00153       m->add (Profile_ptr (new Data (buf, s)));
00154 
00155       // Qualification is for VC6 and VxWorks.
00156       //
00157       Element::send (m);
00158     }
00159 
00160   ssize_t Socket_Impl::
00161     recv_ (void* buf,
00162            size_t s,
00163            ACE_Time_Value const* timeout,
00164            ACE_INET_Addr* from)
00165     {
00166       ACE_Time_Value abs_time;
00167 
00168       if (timeout)
00169         abs_time = ACE_OS::gettimeofday () + *timeout;
00170 
00171       Lock l (mutex_);
00172 
00173       while (queue_.is_empty ())
00174         {
00175           if (timeout)
00176             {
00177               if (cond_.wait (&abs_time) != -1)
00178                 break;
00179             }
00180           else
00181             {
00182               if (cond_.wait () != -1)
00183                 break;
00184             }
00185 
00186           return -1; // errno is already set
00187         }
00188 
00189 
00190       Message_ptr m;
00191 
00192       if (queue_.dequeue_head (m) == -1)
00193         ACE_OS::abort ();
00194 
00195 
00196       if (queue_.is_empty ())
00197         {
00198           // Remove data from the pipe.
00199           //
00200           if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00201             {
00202               char c;
00203 
00204               if (signal_pipe_.recv (&c, 1) != 1)
00205                 {
00206                   ACE_OS::perror ("read: ");
00207                   ACE_OS::abort ();
00208                 }
00209             }
00210         }
00211 
00212       if (from)
00213         *from = static_cast<From const*> (m->find (From::id))->address ();
00214 
00215       if (m->find (NoData::id) != 0)
00216         {
00217           errno = ENOENT;
00218           return -1;
00219         }
00220 
00221       Data const* d = static_cast<Data const*>(m->find (Data::id));
00222 
00223       ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s));
00224 
00225       ACE_OS::memcpy (buf, d->buf (), r);
00226 
00227       return r;
00228     }
00229 
00230   ssize_t Socket_Impl::
00231     size_ (ACE_Time_Value const* timeout)
00232     {
00233       ACE_Time_Value abs_time;
00234 
00235       if (timeout)
00236         abs_time = ACE_OS::gettimeofday () + *timeout;
00237 
00238       Lock l (mutex_);
00239 
00240       while (queue_.is_empty ())
00241         {
00242           if (timeout)
00243             {
00244               if (cond_.wait (&abs_time) != -1)
00245                 break;
00246             }
00247           else
00248             {
00249               if (cond_.wait () != -1)
00250                 break;
00251             }
00252 
00253           return -1; // errno is already set
00254         }
00255 
00256       // I can't get the head of the queue without actually dequeuing
00257       // the element.
00258       //
00259       Message_ptr m;
00260 
00261       if (queue_.dequeue_head (m) == -1)
00262         ACE_OS::abort ();
00263 
00264       if (queue_.enqueue_head (m) == -1)
00265         ACE_OS::abort ();
00266 
00267       if (m->find (NoData::id) != 0)
00268         {
00269           errno = ENOENT;
00270           return -1;
00271         }
00272 
00273       Data const* d = static_cast<Data const*>(m->find (Data::id));
00274 
00275       return static_cast<ssize_t> (d->size ());
00276     }
00277 
00278   ACE_HANDLE Socket_Impl::
00279     get_handle_ ()
00280     {
00281       if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE)
00282         {
00283           signal_pipe_.open ();
00284         }
00285 
00286       return signal_pipe_.read_handle ();
00287     }
00288 
00289 
00290   void Socket_Impl::
00291     recv (Message_ptr m)
00292     {
00293       if (m->find (Data::id) != 0 || m->find (NoData::id) != 0)
00294         {
00295           if (!loop_)
00296             {
00297               Address to (static_cast<To const*> (m->find (To::id))->address ());
00298 
00299               Address from (
00300                             static_cast<From const*> (m->find (From::id))->address ());
00301 
00302               if (to == from)
00303                 return;
00304             }
00305 
00306           Lock l (mutex_);
00307 
00308           //if (queue_.size () != 0)
00309           //  cerr << "recv socket queue size: " << queue_.size () << endl;
00310 
00311           bool signal (queue_.is_empty ());
00312 
00313           queue_.enqueue_tail (m);
00314 
00315           if (signal)
00316             {
00317               // Also write to the pipe.
00318               if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
00319                 {
00320                   char c;
00321 
00322                   if (signal_pipe_.send (&c, 1) != 1)
00323                     {
00324                       // perror ("write: ");
00325                       ACE_OS::abort ();
00326                     }
00327                 }
00328 
00329               cond_.signal ();
00330             }
00331         }
00332     }
00333 
00334 
00335   // Socket
00336   //
00337   //
00338   Socket::
00339     ~Socket ()
00340     {
00341     }
00342 
00343   Socket::
00344     Socket (Address const& a, bool loop, Parameters const& params)
00345     : impl_ (new Socket_Impl (a, loop, params))
00346     {
00347     }
00348 
00349   void Socket::
00350     send (void const* buf, size_t s)
00351     {
00352       impl_->send_ (buf, s);
00353     }
00354 
00355   ssize_t Socket::
00356     recv (void* buf, size_t s)
00357     {
00358       return impl_->recv_ (buf, s, 0, 0);
00359     }
00360 
00361   ssize_t Socket::
00362     recv (void* buf, size_t s, ACE_INET_Addr& from)
00363     {
00364       return impl_->recv_ (buf, s, 0, &from);
00365     }
00366 
00367   ssize_t Socket::
00368     recv (void* buf, size_t s, ACE_Time_Value const& timeout)
00369     {
00370       return impl_->recv_ (buf, s, &timeout, 0);
00371     }
00372 
00373   ssize_t Socket::
00374     recv (void* buf,
00375           size_t s,
00376           ACE_Time_Value const& timeout,
00377           ACE_INET_Addr& from)
00378     {
00379       return impl_->recv_ (buf, s, &timeout, &from);
00380     }
00381 
00382   ssize_t Socket::
00383     size ()
00384     {
00385       return impl_->size_ (0);
00386     }
00387 
00388   ssize_t Socket::
00389     size (ACE_Time_Value const& timeout)
00390     {
00391       return impl_->size_ (&timeout);
00392     }
00393 
00394   ACE_HANDLE Socket::
00395     get_handle ()
00396     {
00397       return impl_->get_handle_ ();
00398     }
00399 }

Generated on Thu Nov 9 11:40:41 2006 for ACE_RMCast by doxygen 1.3.6