ACE_RMCast::Link Class Reference

#include <Link.h>

Inheritance diagram for ACE_RMCast::Link:

Inheritance graph
[legend]
Collaboration diagram for ACE_RMCast::Link:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ~Link ()
 Link (Address const &addr, Parameters const &params)
virtual void in_start (In_Element *in)
virtual void out_start (Out_Element *out)
virtual void in_stop ()
virtual void send (Message_ptr m)

Private Member Functions

virtual void send_ (Message_ptr m)
void recv ()
virtual void recv (Message_ptr)

Static Private Member Functions

ACE_THR_FUNC_RETURN recv_thunk (void *obj)

Private Attributes

Parameters const & params_
Address addr_
Address self_
ACE_SOCK_Dgram_Mcast rsock_
ACE_SOCK_Dgram ssock_
bool stop_
ACE_Thread_Manager recv_mgr_
Message_ptr hold_
Mutex mutex_

Constructor & Destructor Documentation

ACE_RMCast::Link::~Link  ) 
 

Definition at line 15 of file Link.cpp.

References ACE_SOCK::close(), rsock_, and ssock_.

00016   {
00017     ssock_.close ();
00018     rsock_.close ();
00019   }

ACE_RMCast::Link::Link Address const &  addr,
Parameters const &  params
 

Definition at line 22 of file Link.cpp.

References ACE_OS::abort(), ACE_RMCast::Address, ACE_OS::connect(), ACE_INET_Addr::get_addr(), ACE_INET_Addr::get_addr_size(), ACE_IPC_SAP::get_handle(), ACE_SOCK::get_local_addr(), ACE_SOCK::get_option(), IP_MULTICAST_LOOP, ACE_OS::perror(), rsock_, self_, ACE_SOCK_Dgram_Mcast::set_option(), and ssock_.

00023       : params_ (params),
00024         addr_ (addr),
00025         ssock_ (Address (static_cast<unsigned short> (0),
00026                          static_cast<ACE_UINT32> (INADDR_ANY)),
00027                 AF_INET,
00028                 IPPROTO_UDP,
00029                 1),
00030         stop_ (false)
00031 
00032   {
00033     srand (time (0));
00034 
00035 
00036     rsock_.set_option (IP_MULTICAST_LOOP, 0);
00037     // rsock_.set_option (IP_MULTICAST_TTL, 0);
00038 
00039     // Set recv/send buffers.
00040     //
00041     {
00042       int r (131070);
00043       int s (sizeof (r));
00044 
00045       static_cast<ACE_SOCK&> (rsock_).set_option (
00046         SOL_SOCKET, SO_RCVBUF, &r, s);
00047 
00048       static_cast<ACE_SOCK&> (ssock_).set_option (
00049         SOL_SOCKET, SO_RCVBUF, &r, s);
00050 
00051       rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00052       //cerr << 5 << "recv buffer size: " << r << endl;
00053 
00054       ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00055       //cerr << 5 << "send buffer size: " << r << endl;
00056 
00057     }
00058 
00059     // Bind address and port.
00060     //
00061     if (ACE_OS::connect (ssock_.get_handle (),
00062                          reinterpret_cast<sockaddr*> (addr_.get_addr ()),
00063                          addr_.get_addr_size ()) == -1)
00064     {
00065       ACE_OS::perror ("connect: ");
00066       ACE_OS::abort ();
00067     }
00068 
00069 
00070     ssock_.get_local_addr (self_);
00071 
00072     //cerr << 5 << "self: " << self_ << endl;
00073   }


Member Function Documentation

void ACE_RMCast::Link::in_start In_Element in  )  [virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 76 of file Link.cpp.

References ACE_SOCK_Dgram_Mcast::join(), recv_mgr_, recv_thunk(), rsock_, and ACE_Thread_Manager::spawn().

00077   {
00078     Element::in_start (in);
00079 
00080     rsock_.join (addr_);
00081 
00082     // Start receiving thread.
00083     //
00084     recv_mgr_.spawn (recv_thunk, this);
00085   }

void ACE_RMCast::Link::in_stop  )  [virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 94 of file Link.cpp.

References ACE_RMCast::Lock, recv_mgr_, and ACE_Thread_Manager::wait().

00095   {
00096     // Stop receiving thread.
00097     //
00098     {
00099       Lock l (mutex_);
00100       stop_ = true;
00101     }
00102     recv_mgr_.wait ();
00103 
00104     Element::in_stop ();
00105   }

void ACE_RMCast::Link::out_start Out_Element out  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 88 of file Link.cpp.

00089   {
00090     Element::out_start (out);
00091   }

void ACE_RMCast::Link::recv Message_ptr   )  [private, virtual]
 

Reimplemented from ACE_RMCast::In_Element.

Definition at line 335 of file Link.cpp.

References ACE_OS::abort(), and ACE_RMCast::Message_ptr.

00336   {
00337     ACE_OS::abort ();
00338   }

void ACE_RMCast::Link::recv  )  [private]
 

Definition at line 188 of file Link.cpp.

References ACE_OS::abort(), ACE_ptr_align_binary, ACE_RMCast::Address, ETIME, ACE_Auto_Basic_Ptr< X >::get(), ACE_RMCast::istream, ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::Message_ptr, ACE_RMCast::Profile_ptr, ACE_RMCast::In_Element::recv(), ACE_SOCK_Dgram::recv(), rsock_, self_, ssize_t, ACE_RMCast::Parameters::tick(), ACE_RMCast::u16, and ACE_RMCast::u32.

00189   {
00190     size_t max_packet_size (params_.max_packet_size ());
00191 
00192     // This is wicked.
00193     //
00194     ACE_Auto_Ptr<char> holder (
00195       reinterpret_cast<char*> (
00196         operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT)));
00197 
00198     char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT);
00199 
00200     size_t size (0);
00201 
00202     while (true)
00203     {
00204       //@@ Should I lock here?
00205       //
00206 
00207       Address addr;
00208 
00209       // Block for up to one tick waiting for an incomming message.
00210       //
00211       for (;;)
00212       {
00213         ACE_Time_Value t (params_.tick ());
00214         ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
00215 
00216 
00217         // Check for cancellation request.
00218         //
00219         {
00220           Lock l (mutex_);
00221           if (stop_)
00222             return;
00223         }
00224 
00225         if (r == -1)
00226         {
00227           if (errno != ETIME)
00228             ACE_OS::abort ();
00229         }
00230         else
00231         {
00232           size = static_cast<size_t> (r);
00233           break;
00234         }
00235       }
00236 
00237 
00238       if (size != 4 || addr == self_)
00239       {
00240         // Discard bad messages and ones from ourselvs since
00241         // we are using reliable loopback.
00242         //
00243         rsock_.recv (data, 0, addr);
00244         continue;
00245       }
00246 
00247       u32 msg_size;
00248       {
00249         istream is (data, size, 1); // Always little-endian.
00250         is >> msg_size;
00251       }
00252 
00253       if (msg_size <= 4 || msg_size > max_packet_size)
00254       {
00255         // Bad message.
00256         //
00257         rsock_.recv (data, 0, addr);
00258         continue;
00259       }
00260 
00261       size = rsock_.recv (data, max_packet_size, addr);
00262 
00263       if (msg_size != size)
00264       {
00265         // Bad message.
00266         //
00267         continue;
00268       }
00269 
00270       //cerr << 6 << "from: " << addr << endl;
00271 
00272       Message_ptr m (new Message ());
00273 
00274       m->add (Profile_ptr (new From (addr)));
00275       m->add (Profile_ptr (new To (self_)));
00276 
00277       istream is (data, size, 1); // Always little-endian.
00278 
00279       is >> msg_size;
00280 
00281       while (true)
00282       {
00283         u16 id, size;
00284 
00285         if (!((is >> id) && (is >> size))) break;
00286 
00287         //cerr << 6 << "reading profile with id " << id << " "
00288         //     << size << " bytes long" << endl;
00289 
00290         Profile::Header hdr (id, size);
00291 
00292         if (id == SN::id)
00293           {
00294             m->add (Profile_ptr (new SN (hdr, is)));
00295           }
00296         else if (id == Data::id)
00297           {
00298             m->add (Profile_ptr (new Data (hdr, is)));
00299           }
00300         else if (id == NAK::id)
00301           {
00302             m->add (Profile_ptr (new NAK (hdr, is)));
00303           }
00304         else if (id == NRTM::id)
00305           {
00306             m->add (Profile_ptr (new NRTM (hdr, is)));
00307           }
00308         else if (id == NoData::id)
00309           {
00310             m->add (Profile_ptr (new NoData (hdr, is)));
00311           }
00312         else if (id == Part::id)
00313           {
00314             m->add (Profile_ptr (new Part (hdr, is)));
00315           }
00316         else
00317           {
00318             //cerr << 0 << "unknown profile id " << hdr.id () << endl;
00319             ACE_OS::abort ();
00320           }
00321       }
00322 
00323       in_->recv (m);
00324     }
00325   }

ACE_THR_FUNC_RETURN ACE_RMCast::Link::recv_thunk void *  obj  )  [static, private]
 

Definition at line 328 of file Link.cpp.

Referenced by in_start().

00329   {
00330     reinterpret_cast<Link*> (obj)->recv ();
00331     return 0;
00332   }

void ACE_RMCast::Link::send Message_ptr  m  )  [virtual]
 

Reimplemented from ACE_RMCast::Out_Element.

Definition at line 108 of file Link.cpp.

References ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_RMCast::Profile_ptr, ACE_RMCast::In_Element::recv(), self_, send_(), and ACE_RMCast::Parameters::simulator().

00109   {
00110     // Simulate message loss and reordering.
00111     //
00112     if (params_.simulator ())
00113     {
00114       if ((rand () % 17) != 0)
00115       {
00116         Lock l (mutex_);
00117 
00118         if (hold_.get ())
00119         {
00120           send_ (m);
00121           send_ (hold_);
00122           hold_ = Message_ptr (0);
00123         }
00124         else
00125         {
00126           if ((rand () % 17) != 0)
00127           {
00128             send_ (m);
00129           }
00130           else
00131           {
00132             hold_ = m;
00133 
00134             // Make a copy in M so that the reliable loop below
00135             // won't add FROM and TO to HOLD_.
00136             //
00137             m = hold_->clone ();
00138           }
00139         }
00140       }
00141     }
00142     else
00143       send_ (m);
00144 
00145     // Reliable loop.
00146     //
00147     m->add (Profile_ptr (new From (self_)));
00148     m->add (Profile_ptr (new To (self_)));
00149 
00150     in_->recv (m);
00151   }

void ACE_RMCast::Link::send_ Message_ptr  m  )  [private, virtual]
 

Definition at line 154 of file Link.cpp.

References ACE_OS::abort(), ACE_ERROR, ACE_OutputCDR::buffer(), ACE_OutputCDR::length(), LM_ERROR, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::Message_ptr, ACE_RMCast::ostream, ACE_RMCast::Message::ProfileIterator, ACE_SOCK_Dgram::send(), and ssock_.

Referenced by send().

00155   {
00156     ostream os (m->size (), 1); // Always little-endian.
00157 
00158     os << *m;
00159 
00160     if (os.length () > size_t (params_.max_packet_size ()))
00161     {
00162       ACE_ERROR ((LM_ERROR,
00163                   "packet length (%d) exceeds max_poacket_size (%d)\n",
00164                   os.length (), params_.max_packet_size ()));
00165 
00166       for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ())
00167       {
00168         ACE_ERROR ((LM_ERROR,
00169                     "profile id: %d; size: %d\n",
00170                     (*i).ext_id_, (*i).int_id_->size ()));
00171       }
00172 
00173       ACE_OS::abort ();
00174     }
00175 
00176     ssock_.send (os.buffer (), os.length (), addr_);
00177 
00178     /*
00179       if (m->find (nrtm::id))
00180       {
00181       write (1, os.buffer (), os.length ());
00182       exit (1);
00183       }
00184     */
00185   }


Member Data Documentation

Address ACE_RMCast::Link::addr_ [private]
 

Definition at line 57 of file Link.h.

Message_ptr ACE_RMCast::Link::hold_ [private]
 

Definition at line 66 of file Link.h.

Mutex ACE_RMCast::Link::mutex_ [private]
 

Definition at line 68 of file Link.h.

Parameters const& ACE_RMCast::Link::params_ [private]
 

Definition at line 55 of file Link.h.

ACE_Thread_Manager ACE_RMCast::Link::recv_mgr_ [private]
 

Definition at line 62 of file Link.h.

Referenced by in_start(), and in_stop().

ACE_SOCK_Dgram_Mcast ACE_RMCast::Link::rsock_ [private]
 

Definition at line 58 of file Link.h.

Referenced by in_start(), Link(), recv(), and ~Link().

Address ACE_RMCast::Link::self_ [private]
 

Definition at line 57 of file Link.h.

Referenced by Link(), recv(), and send().

ACE_SOCK_Dgram ACE_RMCast::Link::ssock_ [private]
 

Definition at line 59 of file Link.h.

Referenced by Link(), send_(), and ~Link().

bool ACE_RMCast::Link::stop_ [private]
 

Definition at line 61 of file Link.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 11:41:09 2006 for ACE_RMCast by doxygen 1.3.6