Socket.cpp

Go to the documentation of this file.
00001 // file      : ace/RMCast/Socket.cpp
00002 // author    : Boris Kolpackov <boris@kolpackov.net>
00003 // cvs-id    : $Id: Socket.cpp 78774 2007-07-04 06:06:59Z sowayaa $
00004 
00005 #include "ace/OS_Memory.h"
00006 #include "ace/OS_NS_stdio.h"
00007 #include "ace/OS_NS_stdlib.h"
00008 #include "ace/OS_NS_string.h"
00009 #include "ace/OS_NS_unistd.h"
00010 #include "ace/OS_NS_sys_time.h" // gettimeofday
00011 
00012 #include "ace/Unbounded_Queue.h"
00013 
00014 #include "ace/Pipe.h"
00015 
00016 #include "Stack.h"
00017 #include "Protocol.h"
00018 #include "Bits.h"
00019 
00020 #include "Fragment.h"
00021 #include "Reassemble.h"
00022 #include "Acknowledge.h"
00023 #include "Retransmit.h"
00024 #include "Flow.h"
00025 #include "Link.h"
00026 
00027 #include "Socket.h"
00028 
00029 /*
00030 #include <iostream>
00031 using std::cerr;
00032 using std::endl;
00033 */
00034 
00035 namespace ACE_RMCast
00036 {
00037   class Socket_Impl : protected Element
00038   {
00039   public:
00040     ~Socket_Impl ();
00041 
00042     Socket_Impl (Address const& a, bool loop, Parameters const& params);
00043 
00044   public:
00045     void
00046     send_ (void const* buf, size_t s);
00047 
00048     ssize_t
00049     recv_ (void* buf,
00050            size_t s,
00051            ACE_Time_Value const* timeout,
00052            ACE_INET_Addr* from);
00053 
00054     ssize_t
00055     size_ (ACE_Time_Value const* timeout);
00056 
00057     ACE_HANDLE
00058     get_handle_ ();
00059 
00060   private:
00061     //FUZZ: disable check_for_lack_ACE_OS
00062     virtual void recv (Message_ptr m);
00063     //FUZZ: enable check_for_lack_ACE_OS
00064 
00065   private:
00066     bool loop_;
00067     Parameters const params_;
00068 
00069     Mutex mutex_;
00070     Condition cond_;
00071 
00072     ACE_Unbounded_Queue<Message_ptr> queue_;
00073 
00074     ACE_Pipe signal_pipe_;
00075 
00076     ACE_Auto_Ptr<Fragment> fragment_;
00077     ACE_Auto_Ptr<Reassemble> reassemble_;
00078     ACE_Auto_Ptr<Acknowledge> acknowledge_;
00079     ACE_Auto_Ptr<Retransmit> retransmit_;
00080     ACE_Auto_Ptr<Flow> flow_;
00081     ACE_Auto_Ptr<Link> link_;
00082   };
00083 
00084 
00085   Socket_Impl::
00086   Socket_Impl (Address const& a, bool loop, Parameters const& params)
00087       : loop_ (loop),
00088         params_ (params),
00089         cond_ (mutex_)
00090   {
00091     fragment_.reset (new Fragment (params_));
00092     reassemble_.reset (new Reassemble (params_));
00093     acknowledge_.reset (new Acknowledge (params_));
00094     retransmit_.reset (new Retransmit (params_));
00095     flow_.reset (new Flow (params_));
00096     link_.reset (new Link (a, params_));
00097 
00098     // Start IN stack from top to bottom.
00099     //
00100     in_start (0);
00101     fragment_->in_start (this);
00102     reassemble_->in_start (fragment_.get ());
00103     acknowledge_->in_start (reassemble_.get ());
00104     retransmit_->in_start (acknowledge_.get ());
00105     flow_->in_start (retransmit_.get ());
00106     link_->in_start (flow_.get ());
00107 
00108     // Start OUT stack from bottom up.
00109     //
00110     link_->out_start (0);
00111     flow_->out_start (link_.get ());
00112     retransmit_->out_start (flow_.get ());
00113     acknowledge_->out_start (retransmit_.get ());
00114     reassemble_->out_start (acknowledge_.get ());
00115     fragment_->out_start (reassemble_.get ());
00116     out_start (fragment_.get ());
00117   }
00118 
00119   Socket_Impl::
00120     ~Socket_Impl ()
00121     {
00122       // Stop OUT stack from top to bottom.
00123       //
00124       out_stop ();
00125       fragment_->out_stop ();
00126       reassemble_->out_stop ();
00127       acknowledge_->out_stop ();
00128       retransmit_->out_stop ();
00129       flow_->out_stop ();
00130       link_->out_stop ();
00131 
00132       // Stop IN stack from bottom up.
00133       //
00134       link_->in_stop ();
00135       flow_->in_stop ();
00136       retransmit_->in_stop ();
00137       acknowledge_->in_stop ();
00138       reassemble_->in_stop ();
00139       fragment_->in_stop ();
00140       in_stop ();
00141 
00142       // Close signal pipe.
00143       //
00144       if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00145         signal_pipe_.close ();
00146     }
00147 
00148 
00149   void Socket_Impl::
00150     send_ (void const* buf, size_t s)
00151     {
00152       Message_ptr m (new Message);
00153 
00154       m->add (Profile_ptr (new Data (buf, s)));
00155 
00156       // Qualification is for VC6 and VxWorks.
00157       //
00158       Element::send (m);
00159     }
00160 
00161   ssize_t Socket_Impl::
00162     recv_ (void* buf,
00163            size_t s,
00164            ACE_Time_Value const* timeout,
00165            ACE_INET_Addr* from)
00166     {
00167       ACE_Time_Value abs_time;
00168 
00169       if (timeout)
00170         abs_time = ACE_OS::gettimeofday () + *timeout;
00171 
00172       Lock l (mutex_);
00173 
00174       while (queue_.is_empty ())
00175         {
00176           if (timeout)
00177             {
00178               if (cond_.wait (&abs_time) != -1)
00179                 break;
00180             }
00181           else
00182             {
00183               if (cond_.wait () != -1)
00184                 break;
00185             }
00186 
00187           return -1; // errno is already set
00188         }
00189 
00190 
00191       Message_ptr m;
00192 
00193       if (queue_.dequeue_head (m) == -1)
00194         ACE_OS::abort ();
00195 
00196 
00197       if (queue_.is_empty ())
00198         {
00199           // Remove data from the pipe.
00200           //
00201           if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00202             {
00203               char c;
00204 
00205               if (signal_pipe_.recv (&c, 1) != 1)
00206                 {
00207                   ACE_OS::perror ("read: ");
00208                   ACE_OS::abort ();
00209                 }
00210             }
00211         }
00212 
00213       if (from)
00214         *from = static_cast<From const*> (m->find (From::id))->address ();
00215 
00216       if (m->find (NoData::id) != 0)
00217         {
00218           errno = ENOENT;
00219           return -1;
00220         }
00221 
00222       Data const* d = static_cast<Data const*>(m->find (Data::id));
00223 
00224       ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s));
00225 
00226       ACE_OS::memcpy (buf, d->buf (), r);
00227 
00228       return r;
00229     }
00230 
00231   ssize_t Socket_Impl::
00232     size_ (ACE_Time_Value const* timeout)
00233     {
00234       ACE_Time_Value abs_time;
00235 
00236       if (timeout)
00237         abs_time = ACE_OS::gettimeofday () + *timeout;
00238 
00239       Lock l (mutex_);
00240 
00241       while (queue_.is_empty ())
00242         {
00243           if (timeout)
00244             {
00245               if (cond_.wait (&abs_time) != -1)
00246                 break;
00247             }
00248           else
00249             {
00250               if (cond_.wait () != -1)
00251                 break;
00252             }
00253 
00254           return -1; // errno is already set
00255         }
00256 
00257       // I can't get the head of the queue without actually dequeuing
00258       // the element.
00259       //
00260       Message_ptr m;
00261 
00262       if (queue_.dequeue_head (m) == -1)
00263         ACE_OS::abort ();
00264 
00265       if (queue_.enqueue_head (m) == -1)
00266         ACE_OS::abort ();
00267 
00268       if (m->find (NoData::id) != 0)
00269         {
00270           errno = ENOENT;
00271           return -1;
00272         }
00273 
00274       Data const* d = static_cast<Data const*>(m->find (Data::id));
00275 
00276       return static_cast<ssize_t> (d->size ());
00277     }
00278 
00279   ACE_HANDLE Socket_Impl::
00280     get_handle_ ()
00281     {
00282       if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE)
00283         {
00284           signal_pipe_.open ();
00285         }
00286 
00287       return signal_pipe_.read_handle ();
00288     }
00289 
00290 
00291   void Socket_Impl::recv (Message_ptr m)
00292     {
00293       if (m->find (Data::id) != 0 || m->find (NoData::id) != 0)
00294         {
00295           if (!loop_)
00296             {
00297               Address to (static_cast<To const*> (m->find (To::id))->address ());
00298 
00299               Address from (
00300                             static_cast<From const*> (m->find (From::id))->address ());
00301 
00302               if (to == from)
00303                 return;
00304             }
00305 
00306           Lock l (mutex_);
00307 
00308           //if (queue_.size () != 0)
00309           //  cerr << "recv socket queue size: " << queue_.size () << endl;
00310 
00311           //FUZZ: disable check_for_lack_ACE_OS
00312           bool signal (queue_.is_empty ());
00313           //FUZZ: enable check_for_lack_ACE_OS
00314 
00315           queue_.enqueue_tail (m);
00316 
00317           if (signal)
00318             {
00319               // Also write to the pipe.
00320               if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
00321                 {
00322                   char c;
00323 
00324                   if (signal_pipe_.send (&c, 1) != 1)
00325                     {
00326                       // perror ("write: ");
00327                       ACE_OS::abort ();
00328                     }
00329                 }
00330 
00331               cond_.signal ();
00332             }
00333         }
00334     }
00335 
00336 
00337   // Socket
00338   //
00339   //
00340   Socket::
00341     ~Socket ()
00342     {
00343     }
00344 
00345   Socket::
00346     Socket (Address const& a, bool loop, Parameters const& params)
00347     : impl_ (new Socket_Impl (a, loop, params))
00348     {
00349     }
00350 
00351   void Socket::send (void const* buf, size_t s)
00352     {
00353       impl_->send_ (buf, s);
00354     }
00355 
00356   ssize_t Socket::recv (void* buf, size_t s)
00357     {
00358       return impl_->recv_ (buf, s, 0, 0);
00359     }
00360 
00361   ssize_t Socket::recv (void* buf, size_t s, ACE_INET_Addr& from)
00362     {
00363       return impl_->recv_ (buf, s, 0, &from);
00364     }
00365 
00366   ssize_t Socket::recv (void* buf, size_t s, ACE_Time_Value const& timeout)
00367     {
00368       return impl_->recv_ (buf, s, &timeout, 0);
00369     }
00370 
00371   ssize_t Socket::recv (void* buf,
00372                         size_t s,
00373                         ACE_Time_Value const& timeout,
00374                         ACE_INET_Addr& from)
00375     {
00376       return impl_->recv_ (buf, s, &timeout, &from);
00377     }
00378 
00379   ssize_t Socket::
00380     size ()
00381     {
00382       return impl_->size_ (0);
00383     }
00384 
00385   ssize_t Socket::
00386     size (ACE_Time_Value const& timeout)
00387     {
00388       return impl_->size_ (&timeout);
00389     }
00390 
00391   ACE_HANDLE Socket::
00392     get_handle ()
00393     {
00394       return impl_->get_handle_ ();
00395     }
00396 }

Generated on Sun Jan 27 13:02:56 2008 for ACE_RMCast by doxygen 1.3.6