Link.cpp

Go to the documentation of this file.
00001 // file      : ace/RMCast/Link.cpp
00002 // author    : Boris Kolpackov <boris@kolpackov.net>
00003 // cvs-id    : $Id: Link.cpp 78774 2007-07-04 06:06:59Z sowayaa $
00004 
00005 #include "ace/Time_Value.h"        // ACE_Time_Value
00006 #include "ace/OS_NS_stdio.h"
00007 #include "ace/OS_NS_stdlib.h"
00008 #include "ace/OS_NS_time.h"
00009 #include "ace/OS_NS_sys_socket.h"
00010 
00011 #include "Link.h"
00012 
00013 namespace ACE_RMCast
00014 {
00015   Link::
00016   ~Link ()
00017   {
00018     ssock_.close ();
00019     rsock_.close ();
00020   }
00021 
00022   Link::
00023   Link (Address const& addr, Parameters const& params)
00024       : params_ (params),
00025         addr_ (addr),
00026         ssock_ (Address (static_cast<unsigned short> (0),
00027                          static_cast<ACE_UINT32> (INADDR_ANY)),
00028                 AF_INET,
00029                 IPPROTO_UDP,
00030                 1),
00031         stop_ (false)
00032 
00033   {
00034     ACE_OS::srand ((unsigned int) ACE_OS::time (0));
00035 
00036 
00037     rsock_.set_option (IP_MULTICAST_LOOP, 0);
00038     // rsock_.set_option (IP_MULTICAST_TTL, 0);
00039 
00040     // Set recv/send buffers.
00041     //
00042     {
00043       int r (131070);
00044       int s (sizeof (r));
00045 
00046       static_cast<ACE_SOCK&> (rsock_).set_option (
00047         SOL_SOCKET, SO_RCVBUF, &r, s);
00048 
00049       static_cast<ACE_SOCK&> (ssock_).set_option (
00050         SOL_SOCKET, SO_RCVBUF, &r, s);
00051 
00052       rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00053       //cerr << 5 << "recv buffer size: " << r << endl;
00054 
00055       ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00056       //cerr << 5 << "send buffer size: " << r << endl;
00057 
00058     }
00059 
00060     // Bind address and port.
00061     //
00062     if (ACE_OS::connect (ssock_.get_handle (),
00063                          reinterpret_cast<sockaddr*> (addr_.get_addr ()),
00064                          addr_.get_addr_size ()) == -1)
00065     {
00066       ACE_OS::perror ("connect: ");
00067       ACE_OS::abort ();
00068     }
00069 
00070 
00071     ssock_.get_local_addr (self_);
00072 
00073     //cerr << 5 << "self: " << self_ << endl;
00074   }
00075 
00076   void Link::
00077   in_start (In_Element* in)
00078   {
00079     Element::in_start (in);
00080 
00081     rsock_.join (addr_);
00082 
00083     // Start receiving thread.
00084     //
00085     recv_mgr_.spawn (recv_thunk, this);
00086   }
00087 
00088   void Link::
00089   out_start (Out_Element* out)
00090   {
00091     Element::out_start (out);
00092   }
00093 
00094   void Link::
00095   in_stop ()
00096   {
00097     // Stop receiving thread.
00098     //
00099     {
00100       Lock l (mutex_);
00101       stop_ = true;
00102     }
00103     recv_mgr_.wait ();
00104 
00105     Element::in_stop ();
00106   }
00107 
00108   void Link::send (Message_ptr m)
00109   {
00110     // Simulate message loss and reordering.
00111     //
00112     if (params_.simulator ())
00113     {
00114       if ((ACE_OS::rand () % 17) != 0)
00115       {
00116         Lock l (mutex_);
00117 
00118         if (hold_.get ())
00119         {
00120           send_ (m);
00121           send_ (hold_);
00122           hold_ = Message_ptr (0);
00123         }
00124         else
00125         {
00126           if ((ACE_OS::rand () % 17) != 0)
00127           {
00128             send_ (m);
00129           }
00130           else
00131           {
00132             hold_ = m;
00133 
00134             // Make a copy in M so that the reliable loop below
00135             // won't add FROM and TO to HOLD_.
00136             //
00137             m = hold_->clone ();
00138           }
00139         }
00140       }
00141     }
00142     else
00143       send_ (m);
00144 
00145     // Reliable loop.
00146     //
00147     m->add (Profile_ptr (new From (self_)));
00148     m->add (Profile_ptr (new To (self_)));
00149 
00150     in_->recv (m);
00151   }
00152 
00153   void Link::
00154   send_ (Message_ptr m)
00155   {
00156     ostream os (m->size (), 1); // Always little-endian.
00157 
00158     os << *m;
00159 
00160     if (os.length () > size_t (params_.max_packet_size ()))
00161     {
00162       ACE_ERROR ((LM_ERROR,
00163                   "packet length (%d) exceeds max_poacket_size (%d)\n",
00164                   os.length (), params_.max_packet_size ()));
00165 
00166       for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ())
00167       {
00168         ACE_ERROR ((LM_ERROR,
00169                     "profile id: %d; size: %d\n",
00170                     (*i).ext_id_, (*i).int_id_->size ()));
00171       }
00172 
00173       ACE_OS::abort ();
00174     }
00175 
00176     ssock_.send (os.buffer (), os.length (), addr_);
00177 
00178     /*
00179       if (m->find (nrtm::id))
00180       {
00181         ACE_OS::write (1, os.buffer (), os.length ());
00182         ACE_OS::exit (1);
00183       }
00184     */
00185   }
00186 
00187   void Link::recv ()
00188   {
00189     size_t max_packet_size (params_.max_packet_size ());
00190 
00191     // This is wicked.
00192     //
00193     ACE_Auto_Ptr<char> holder (
00194       reinterpret_cast<char*> (
00195         operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT)));
00196 
00197     char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT);
00198 
00199     size_t size (0);
00200 
00201     while (true)
00202     {
00203       //@@ Should I lock here?
00204       //
00205 
00206       Address addr;
00207 
00208       // Block for up to one tick waiting for an incomming message.
00209       //
00210       for (;;)
00211       {
00212         ACE_Time_Value t (params_.tick ());
00213         ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
00214 
00215 
00216         // Check for cancellation request.
00217         //
00218         {
00219           Lock l (mutex_);
00220           if (stop_)
00221             return;
00222         }
00223 
00224         if (r == -1)
00225         {
00226           if (errno != ETIME)
00227             ACE_OS::abort ();
00228         }
00229         else
00230         {
00231           size = static_cast<size_t> (r);
00232           break;
00233         }
00234       }
00235 
00236 
00237       if (size != 4 || addr == self_)
00238       {
00239         // Discard bad messages and ones from ourselvs since
00240         // we are using reliable loopback.
00241         //
00242         rsock_.recv (data, 0, addr);
00243         continue;
00244       }
00245 
00246       u32 msg_size;
00247       {
00248         istream is (data, size, 1); // Always little-endian.
00249         is >> msg_size;
00250       }
00251 
00252       if (msg_size <= 4 || msg_size > max_packet_size)
00253       {
00254         // Bad message.
00255         //
00256         rsock_.recv (data, 0, addr);
00257         continue;
00258       }
00259 
00260       size = rsock_.recv (data, max_packet_size, addr);
00261 
00262       if (msg_size != size)
00263       {
00264         // Bad message.
00265         //
00266         continue;
00267       }
00268 
00269       //cerr << 6 << "from: " << addr << endl;
00270 
00271       Message_ptr m (new Message ());
00272 
00273       m->add (Profile_ptr (new From (addr)));
00274       m->add (Profile_ptr (new To (self_)));
00275 
00276       istream is (data, size, 1); // Always little-endian.
00277 
00278       is >> msg_size;
00279 
00280       while (true)
00281       {
00282         u16 id, size;
00283 
00284         if (!((is >> id) && (is >> size))) break;
00285 
00286         //cerr << 6 << "reading profile with id " << id << " "
00287         //     << size << " bytes long" << endl;
00288 
00289         Profile::Header hdr (id, size);
00290 
00291         if (id == SN::id)
00292           {
00293             m->add (Profile_ptr (new SN (hdr, is)));
00294           }
00295         else if (id == Data::id)
00296           {
00297             m->add (Profile_ptr (new Data (hdr, is)));
00298           }
00299         else if (id == NAK::id)
00300           {
00301             m->add (Profile_ptr (new NAK (hdr, is)));
00302           }
00303         else if (id == NRTM::id)
00304           {
00305             m->add (Profile_ptr (new NRTM (hdr, is)));
00306           }
00307         else if (id == NoData::id)
00308           {
00309             m->add (Profile_ptr (new NoData (hdr, is)));
00310           }
00311         else if (id == Part::id)
00312           {
00313             m->add (Profile_ptr (new Part (hdr, is)));
00314           }
00315         else
00316           {
00317             //cerr << 0 << "unknown profile id " << hdr.id () << endl;
00318             ACE_OS::abort ();
00319           }
00320       }
00321 
00322       in_->recv (m);
00323     }
00324   }
00325 
00326   ACE_THR_FUNC_RETURN Link::
00327   recv_thunk (void* obj)
00328   {
00329     reinterpret_cast<Link*> (obj)->recv ();
00330     return 0;
00331   }
00332 
00333   void Link::recv (Message_ptr)
00334   {
00335     ACE_OS::abort ();
00336   }
00337 }

Generated on Sun Jan 27 13:02:56 2008 for ACE_RMCast by doxygen 1.3.6