Link.cpp

Go to the documentation of this file.
00001 // file      : ace/RMCast/Link.cpp
00002 // author    : Boris Kolpackov <boris@kolpackov.net>
00003 // cvs-id    : Link.cpp,v 1.22 2006/06/28 09:29:26 boris Exp
00004 
00005 #include "ace/Time_Value.h"        // ACE_Time_Value
00006 #include "ace/OS_NS_stdio.h"
00007 #include "ace/OS_NS_stdlib.h"
00008 #include "ace/OS_NS_sys_socket.h"
00009 
00010 #include "Link.h"
00011 
00012 namespace ACE_RMCast
00013 {
00014   Link::
00015   ~Link ()
00016   {
00017     ssock_.close ();
00018     rsock_.close ();
00019   }
00020 
00021   Link::
00022   Link (Address const& addr, Parameters const& params)
00023       : params_ (params),
00024         addr_ (addr),
00025         ssock_ (Address (static_cast<unsigned short> (0),
00026                          static_cast<ACE_UINT32> (INADDR_ANY)),
00027                 AF_INET,
00028                 IPPROTO_UDP,
00029                 1),
00030         stop_ (false)
00031 
00032   {
00033     srand (time (0));
00034 
00035 
00036     rsock_.set_option (IP_MULTICAST_LOOP, 0);
00037     // rsock_.set_option (IP_MULTICAST_TTL, 0);
00038 
00039     // Set recv/send buffers.
00040     //
00041     {
00042       int r (131070);
00043       int s (sizeof (r));
00044 
00045       static_cast<ACE_SOCK&> (rsock_).set_option (
00046         SOL_SOCKET, SO_RCVBUF, &r, s);
00047 
00048       static_cast<ACE_SOCK&> (ssock_).set_option (
00049         SOL_SOCKET, SO_RCVBUF, &r, s);
00050 
00051       rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00052       //cerr << 5 << "recv buffer size: " << r << endl;
00053 
00054       ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00055       //cerr << 5 << "send buffer size: " << r << endl;
00056 
00057     }
00058 
00059     // Bind address and port.
00060     //
00061     if (ACE_OS::connect (ssock_.get_handle (),
00062                          reinterpret_cast<sockaddr*> (addr_.get_addr ()),
00063                          addr_.get_addr_size ()) == -1)
00064     {
00065       ACE_OS::perror ("connect: ");
00066       ACE_OS::abort ();
00067     }
00068 
00069 
00070     ssock_.get_local_addr (self_);
00071 
00072     //cerr << 5 << "self: " << self_ << endl;
00073   }
00074 
00075   void Link::
00076   in_start (In_Element* in)
00077   {
00078     Element::in_start (in);
00079 
00080     rsock_.join (addr_);
00081 
00082     // Start receiving thread.
00083     //
00084     recv_mgr_.spawn (recv_thunk, this);
00085   }
00086 
00087   void Link::
00088   out_start (Out_Element* out)
00089   {
00090     Element::out_start (out);
00091   }
00092 
00093   void Link::
00094   in_stop ()
00095   {
00096     // Stop receiving thread.
00097     //
00098     {
00099       Lock l (mutex_);
00100       stop_ = true;
00101     }
00102     recv_mgr_.wait ();
00103 
00104     Element::in_stop ();
00105   }
00106 
00107   void Link::
00108   send (Message_ptr m)
00109   {
00110     // Simulate message loss and reordering.
00111     //
00112     if (params_.simulator ())
00113     {
00114       if ((rand () % 17) != 0)
00115       {
00116         Lock l (mutex_);
00117 
00118         if (hold_.get ())
00119         {
00120           send_ (m);
00121           send_ (hold_);
00122           hold_ = Message_ptr (0);
00123         }
00124         else
00125         {
00126           if ((rand () % 17) != 0)
00127           {
00128             send_ (m);
00129           }
00130           else
00131           {
00132             hold_ = m;
00133 
00134             // Make a copy in M so that the reliable loop below
00135             // won't add FROM and TO to HOLD_.
00136             //
00137             m = hold_->clone ();
00138           }
00139         }
00140       }
00141     }
00142     else
00143       send_ (m);
00144 
00145     // Reliable loop.
00146     //
00147     m->add (Profile_ptr (new From (self_)));
00148     m->add (Profile_ptr (new To (self_)));
00149 
00150     in_->recv (m);
00151   }
00152 
00153   void Link::
00154   send_ (Message_ptr m)
00155   {
00156     ostream os (m->size (), 1); // Always little-endian.
00157 
00158     os << *m;
00159 
00160     if (os.length () > size_t (params_.max_packet_size ()))
00161     {
00162       ACE_ERROR ((LM_ERROR,
00163                   "packet length (%d) exceeds max_poacket_size (%d)\n",
00164                   os.length (), params_.max_packet_size ()));
00165 
00166       for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ())
00167       {
00168         ACE_ERROR ((LM_ERROR,
00169                     "profile id: %d; size: %d\n",
00170                     (*i).ext_id_, (*i).int_id_->size ()));
00171       }
00172 
00173       ACE_OS::abort ();
00174     }
00175 
00176     ssock_.send (os.buffer (), os.length (), addr_);
00177 
00178     /*
00179       if (m->find (nrtm::id))
00180       {
00181       write (1, os.buffer (), os.length ());
00182       exit (1);
00183       }
00184     */
00185   }
00186 
00187   void Link::
00188   recv ()
00189   {
00190     size_t max_packet_size (params_.max_packet_size ());
00191 
00192     // This is wicked.
00193     //
00194     ACE_Auto_Ptr<char> holder (
00195       reinterpret_cast<char*> (
00196         operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT)));
00197 
00198     char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT);
00199 
00200     size_t size (0);
00201 
00202     while (true)
00203     {
00204       //@@ Should I lock here?
00205       //
00206 
00207       Address addr;
00208 
00209       // Block for up to one tick waiting for an incomming message.
00210       //
00211       for (;;)
00212       {
00213         ACE_Time_Value t (params_.tick ());
00214         ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
00215 
00216 
00217         // Check for cancellation request.
00218         //
00219         {
00220           Lock l (mutex_);
00221           if (stop_)
00222             return;
00223         }
00224 
00225         if (r == -1)
00226         {
00227           if (errno != ETIME)
00228             ACE_OS::abort ();
00229         }
00230         else
00231         {
00232           size = static_cast<size_t> (r);
00233           break;
00234         }
00235       }
00236 
00237 
00238       if (size != 4 || addr == self_)
00239       {
00240         // Discard bad messages and ones from ourselvs since
00241         // we are using reliable loopback.
00242         //
00243         rsock_.recv (data, 0, addr);
00244         continue;
00245       }
00246 
00247       u32 msg_size;
00248       {
00249         istream is (data, size, 1); // Always little-endian.
00250         is >> msg_size;
00251       }
00252 
00253       if (msg_size <= 4 || msg_size > max_packet_size)
00254       {
00255         // Bad message.
00256         //
00257         rsock_.recv (data, 0, addr);
00258         continue;
00259       }
00260 
00261       size = rsock_.recv (data, max_packet_size, addr);
00262 
00263       if (msg_size != size)
00264       {
00265         // Bad message.
00266         //
00267         continue;
00268       }
00269 
00270       //cerr << 6 << "from: " << addr << endl;
00271 
00272       Message_ptr m (new Message ());
00273 
00274       m->add (Profile_ptr (new From (addr)));
00275       m->add (Profile_ptr (new To (self_)));
00276 
00277       istream is (data, size, 1); // Always little-endian.
00278 
00279       is >> msg_size;
00280 
00281       while (true)
00282       {
00283         u16 id, size;
00284 
00285         if (!((is >> id) && (is >> size))) break;
00286 
00287         //cerr << 6 << "reading profile with id " << id << " "
00288         //     << size << " bytes long" << endl;
00289 
00290         Profile::Header hdr (id, size);
00291 
00292         if (id == SN::id)
00293           {
00294             m->add (Profile_ptr (new SN (hdr, is)));
00295           }
00296         else if (id == Data::id)
00297           {
00298             m->add (Profile_ptr (new Data (hdr, is)));
00299           }
00300         else if (id == NAK::id)
00301           {
00302             m->add (Profile_ptr (new NAK (hdr, is)));
00303           }
00304         else if (id == NRTM::id)
00305           {
00306             m->add (Profile_ptr (new NRTM (hdr, is)));
00307           }
00308         else if (id == NoData::id)
00309           {
00310             m->add (Profile_ptr (new NoData (hdr, is)));
00311           }
00312         else if (id == Part::id)
00313           {
00314             m->add (Profile_ptr (new Part (hdr, is)));
00315           }
00316         else
00317           {
00318             //cerr << 0 << "unknown profile id " << hdr.id () << endl;
00319             ACE_OS::abort ();
00320           }
00321       }
00322 
00323       in_->recv (m);
00324     }
00325   }
00326 
00327   ACE_THR_FUNC_RETURN Link::
00328   recv_thunk (void* obj)
00329   {
00330     reinterpret_cast<Link*> (obj)->recv ();
00331     return 0;
00332   }
00333 
00334   void Link::
00335   recv (Message_ptr)
00336   {
00337     ACE_OS::abort ();
00338   }
00339 }

Generated on Thu Nov 9 11:40:40 2006 for ACE_RMCast by doxygen 1.3.6