ACE_RMCast::Link Class Reference

#include <Link.h>

Inheritance diagram for ACE_RMCast::Link:

Inheritance graph
[legend]
Collaboration diagram for ACE_RMCast::Link:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ~Link ()
 Link (Address const &addr, Parameters const &params)
virtual void in_start (In_Element *in)
virtual void out_start (Out_Element *out)
virtual void in_stop ()
virtual void send (Message_ptr m)

Private Member Functions

virtual void send_ (Message_ptr m)
void recv ()
virtual void recv (Message_ptr)

Static Private Member Functions

ACE_THR_FUNC_RETURN recv_thunk (void *obj)

Private Attributes

Parameters const & params_
Address addr_
Address self_
ACE_SOCK_Dgram_Mcast rsock_
ACE_SOCK_Dgram ssock_
bool stop_
ACE_Thread_Manager recv_mgr_
Message_ptr hold_
Mutex mutex_

Constructor & Destructor Documentation

ACE_RMCast::Link::~Link  ) 
 

Definition at line 16 of file Link.cpp.

References ACE_SOCK::close(), rsock_, and ssock_.

00017   {
00018     ssock_.close ();
00019     rsock_.close ();
00020   }

ACE_RMCast::Link::Link Address const &  addr,
Parameters const &  params
 

Definition at line 23 of file Link.cpp.

References ACE_OS::abort(), ACE_RMCast::Address, AF_INET, ACE_OS::connect(), ACE_INET_Addr::get_addr(), ACE_INET_Addr::get_addr_size(), ACE_IPC_SAP::get_handle(), ACE_SOCK::get_local_addr(), ACE_SOCK::get_option(), INADDR_ANY, IP_MULTICAST_LOOP, ACE_OS::perror(), rsock_, self_, ACE_SOCK_Dgram_Mcast::set_option(), SO_RCVBUF, SOL_SOCKET, ACE_OS::srand(), and ssock_.

00024       : params_ (params),
00025         addr_ (addr),
00026         ssock_ (Address (static_cast<unsigned short> (0),
00027                          static_cast<ACE_UINT32> (INADDR_ANY)),
00028                 AF_INET,
00029                 IPPROTO_UDP,
00030                 1),
00031         stop_ (false)
00032 
00033   {
00034     ACE_OS::srand ((unsigned int) ACE_OS::time (0));
00035 
00036 
00037     rsock_.set_option (IP_MULTICAST_LOOP, 0);
00038     // rsock_.set_option (IP_MULTICAST_TTL, 0);
00039 
00040     // Set recv/send buffers.
00041     //
00042     {
00043       int r (131070);
00044       int s (sizeof (r));
00045 
00046       static_cast<ACE_SOCK&> (rsock_).set_option (
00047         SOL_SOCKET, SO_RCVBUF, &r, s);
00048 
00049       static_cast<ACE_SOCK&> (ssock_).set_option (
00050         SOL_SOCKET, SO_RCVBUF, &r, s);
00051 
00052       rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00053       //cerr << 5 << "recv buffer size: " << r << endl;
00054 
00055       ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00056       //cerr << 5 << "send buffer size: " << r << endl;
00057 
00058     }
00059 
00060     // Bind address and port.
00061     //
00062     if (ACE_OS::connect (ssock_.get_handle (),
00063                          reinterpret_cast<sockaddr*> (addr_.get_addr ()),
00064                          addr_.get_addr_size ()) == -1)
00065     {
00066       ACE_OS::perror ("connect: ");
00067       ACE_OS::abort ();
00068     }
00069 
00070 
00071     ssock_.get_local_addr (self_);
00072 
00073     //cerr << 5 << "self: " << self_ << endl;
00074   }


Member Function Documentation

void ACE_RMCast::Link::in_start In_Element in  )  [virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 77 of file Link.cpp.

References ACE_SOCK_Dgram_Mcast::join(), recv_mgr_, recv_thunk(), rsock_, and ACE_Thread_Manager::spawn().

00078   {
00079     Element::in_start (in);
00080 
00081     rsock_.join (addr_);
00082 
00083     // Start receiving thread.
00084     //
00085     recv_mgr_.spawn (recv_thunk, this);
00086   }

void ACE_RMCast::Link::in_stop  )  [virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 95 of file Link.cpp.

References ACE_RMCast::Lock, recv_mgr_, and ACE_Thread_Manager::wait().

00096   {
00097     // Stop receiving thread.
00098     //
00099     {
00100       Lock l (mutex_);
00101       stop_ = true;
00102     }
00103     recv_mgr_.wait ();
00104 
00105     Element::in_stop ();
00106   }

void ACE_RMCast::Link::out_start Out_Element out  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 89 of file Link.cpp.

00090   {
00091     Element::out_start (out);
00092   }

void ACE_RMCast::Link::recv Message_ptr   )  [private, virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 333 of file Link.cpp.

References ACE_OS::abort(), and ACE_RMCast::Message_ptr.

00334   {
00335     ACE_OS::abort ();
00336   }

void ACE_RMCast::Link::recv  )  [private]
 

Definition at line 187 of file Link.cpp.

References ACE_OS::abort(), ACE_ptr_align_binary(), ACE_RMCast::Address, ETIME, ACE_Auto_Basic_Ptr< X >::get(), ACE_RMCast::istream, ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::Message_ptr, MSG_PEEK, ACE_RMCast::Profile_ptr, ACE_RMCast::In_Element::recv(), ACE_SOCK_Dgram::recv(), rsock_, self_, ssize_t, ACE_RMCast::Parameters::tick(), ACE_RMCast::u16, and ACE_RMCast::u32.

00188   {
00189     size_t max_packet_size (params_.max_packet_size ());
00190 
00191     // This is wicked.
00192     //
00193     ACE_Auto_Ptr<char> holder (
00194       reinterpret_cast<char*> (
00195         operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT)));
00196 
00197     char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT);
00198 
00199     size_t size (0);
00200 
00201     while (true)
00202     {
00203       //@@ Should I lock here?
00204       //
00205 
00206       Address addr;
00207 
00208       // Block for up to one tick waiting for an incomming message.
00209       //
00210       for (;;)
00211       {
00212         ACE_Time_Value t (params_.tick ());
00213         ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
00214 
00215 
00216         // Check for cancellation request.
00217         //
00218         {
00219           Lock l (mutex_);
00220           if (stop_)
00221             return;
00222         }
00223 
00224         if (r == -1)
00225         {
00226           if (errno != ETIME)
00227             ACE_OS::abort ();
00228         }
00229         else
00230         {
00231           size = static_cast<size_t> (r);
00232           break;
00233         }
00234       }
00235 
00236 
00237       if (size != 4 || addr == self_)
00238       {
00239         // Discard bad messages and ones from ourselvs since
00240         // we are using reliable loopback.
00241         //
00242         rsock_.recv (data, 0, addr);
00243         continue;
00244       }
00245 
00246       u32 msg_size;
00247       {
00248         istream is (data, size, 1); // Always little-endian.
00249         is >> msg_size;
00250       }
00251 
00252       if (msg_size <= 4 || msg_size > max_packet_size)
00253       {
00254         // Bad message.
00255         //
00256         rsock_.recv (data, 0, addr);
00257         continue;
00258       }
00259 
00260       size = rsock_.recv (data, max_packet_size, addr);
00261 
00262       if (msg_size != size)
00263       {
00264         // Bad message.
00265         //
00266         continue;
00267       }
00268 
00269       //cerr << 6 << "from: " << addr << endl;
00270 
00271       Message_ptr m (new Message ());
00272 
00273       m->add (Profile_ptr (new From (addr)));
00274       m->add (Profile_ptr (new To (self_)));
00275 
00276       istream is (data, size, 1); // Always little-endian.
00277 
00278       is >> msg_size;
00279 
00280       while (true)
00281       {
00282         u16 id, size;
00283 
00284         if (!((is >> id) && (is >> size))) break;
00285 
00286         //cerr << 6 << "reading profile with id " << id << " "
00287         //     << size << " bytes long" << endl;
00288 
00289         Profile::Header hdr (id, size);
00290 
00291         if (id == SN::id)
00292           {
00293             m->add (Profile_ptr (new SN (hdr, is)));
00294           }
00295         else if (id == Data::id)
00296           {
00297             m->add (Profile_ptr (new Data (hdr, is)));
00298           }
00299         else if (id == NAK::id)
00300           {
00301             m->add (Profile_ptr (new NAK (hdr, is)));
00302           }
00303         else if (id == NRTM::id)
00304           {
00305             m->add (Profile_ptr (new NRTM (hdr, is)));
00306           }
00307         else if (id == NoData::id)
00308           {
00309             m->add (Profile_ptr (new NoData (hdr, is)));
00310           }
00311         else if (id == Part::id)
00312           {
00313             m->add (Profile_ptr (new Part (hdr, is)));
00314           }
00315         else
00316           {
00317             //cerr << 0 << "unknown profile id " << hdr.id () << endl;
00318             ACE_OS::abort ();
00319           }
00320       }
00321 
00322       in_->recv (m);
00323     }
00324   }

ACE_THR_FUNC_RETURN ACE_RMCast::Link::recv_thunk void *  obj  )  [static, private]
 

Definition at line 327 of file Link.cpp.

Referenced by in_start().

00328   {
00329     reinterpret_cast<Link*> (obj)->recv ();
00330     return 0;
00331   }

void ACE_RMCast::Link::send Message_ptr  m  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 108 of file Link.cpp.

References ACE_Strong_Bound_Ptr< X, ACE_LOCK >::get(), ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_RMCast::Profile_ptr, ACE_OS::rand(), ACE_RMCast::In_Element::recv(), self_, send_(), and ACE_RMCast::Parameters::simulator().

00109   {
00110     // Simulate message loss and reordering.
00111     //
00112     if (params_.simulator ())
00113     {
00114       if ((ACE_OS::rand () % 17) != 0)
00115       {
00116         Lock l (mutex_);
00117 
00118         if (hold_.get ())
00119         {
00120           send_ (m);
00121           send_ (hold_);
00122           hold_ = Message_ptr (0);
00123         }
00124         else
00125         {
00126           if ((ACE_OS::rand () % 17) != 0)
00127           {
00128             send_ (m);
00129           }
00130           else
00131           {
00132             hold_ = m;
00133 
00134             // Make a copy in M so that the reliable loop below
00135             // won't add FROM and TO to HOLD_.
00136             //
00137             m = hold_->clone ();
00138           }
00139         }
00140       }
00141     }
00142     else
00143       send_ (m);
00144 
00145     // Reliable loop.
00146     //
00147     m->add (Profile_ptr (new From (self_)));
00148     m->add (Profile_ptr (new To (self_)));
00149 
00150     in_->recv (m);
00151   }

void ACE_RMCast::Link::send_ Message_ptr  m  )  [private, virtual]
 

Definition at line 154 of file Link.cpp.

References ACE_OS::abort(), ACE_ERROR, ACE_OutputCDR::buffer(), ACE_OutputCDR::length(), LM_ERROR, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::Message_ptr, ACE_RMCast::ostream, ACE_RMCast::Message::ProfileIterator, ACE_SOCK_Dgram::send(), and ssock_.

Referenced by send().

00155   {
00156     ostream os (m->size (), 1); // Always little-endian.
00157 
00158     os << *m;
00159 
00160     if (os.length () > size_t (params_.max_packet_size ()))
00161     {
00162       ACE_ERROR ((LM_ERROR,
00163                   "packet length (%d) exceeds max_poacket_size (%d)\n",
00164                   os.length (), params_.max_packet_size ()));
00165 
00166       for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ())
00167       {
00168         ACE_ERROR ((LM_ERROR,
00169                     "profile id: %d; size: %d\n",
00170                     (*i).ext_id_, (*i).int_id_->size ()));
00171       }
00172 
00173       ACE_OS::abort ();
00174     }
00175 
00176     ssock_.send (os.buffer (), os.length (), addr_);
00177 
00178     /*
00179       if (m->find (nrtm::id))
00180       {
00181         ACE_OS::write (1, os.buffer (), os.length ());
00182         ACE_OS::exit (1);
00183       }
00184     */
00185   }


Member Data Documentation

Address ACE_RMCast::Link::addr_ [private]
 

Definition at line 57 of file Link.h.

Message_ptr ACE_RMCast::Link::hold_ [private]
 

Definition at line 66 of file Link.h.

Mutex ACE_RMCast::Link::mutex_ [private]
 

Definition at line 68 of file Link.h.

Parameters const& ACE_RMCast::Link::params_ [private]
 

Definition at line 55 of file Link.h.

ACE_Thread_Manager ACE_RMCast::Link::recv_mgr_ [private]
 

Definition at line 62 of file Link.h.

Referenced by in_start(), and in_stop().

ACE_SOCK_Dgram_Mcast ACE_RMCast::Link::rsock_ [private]
 

Definition at line 58 of file Link.h.

Referenced by in_start(), Link(), recv(), and ~Link().

Address ACE_RMCast::Link::self_ [private]
 

Definition at line 57 of file Link.h.

Referenced by Link(), recv(), and send().

ACE_SOCK_Dgram ACE_RMCast::Link::ssock_ [private]
 

Definition at line 59 of file Link.h.

Referenced by Link(), send_(), and ~Link().

bool ACE_RMCast::Link::stop_ [private]
 

Definition at line 61 of file Link.h.


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 13:03:06 2008 for ACE_RMCast by doxygen 1.3.6