#include <Link.h>
Inheritance diagram for ACE_RMCast::Link:
Public Member Functions | |
~Link () | |
Link (Address const &addr, Parameters const ¶ms) | |
virtual void | in_start (In_Element *in) |
virtual void | out_start (Out_Element *out) |
virtual void | in_stop () |
virtual void | send (Message_ptr m) |
Private Member Functions | |
virtual void | send_ (Message_ptr m) |
void | recv () |
virtual void | recv (Message_ptr) |
Static Private Member Functions | |
static ACE_THR_FUNC_RETURN | recv_thunk (void *obj) |
Private Attributes | |
Parameters const & | params_ |
Address | addr_ |
Address | self_ |
ACE_SOCK_Dgram_Mcast | rsock_ |
ACE_SOCK_Dgram | ssock_ |
bool | stop_ |
ACE_Thread_Manager | recv_mgr_ |
Message_ptr | hold_ |
Mutex | mutex_ |
Definition at line 19 of file Link.h.
ACE_RMCast::Link::~Link | ( | ) |
ACE_RMCast::Link::Link | ( | Address const & | addr, | |
Parameters const & | params | |||
) |
Definition at line 23 of file Link.cpp.
References ACE_OS::abort(), addr_, ACE_OS::connect(), ACE_INET_Addr::get_addr(), ACE_INET_Addr::get_addr_size(), ACE_IPC_SAP::get_handle(), ACE_SOCK::get_local_addr(), ACE_SOCK::get_option(), IP_MULTICAST_LOOP, ACE_OS::perror(), rsock_, self_, ACE_SOCK_Dgram_Mcast::set_option(), SO_RCVBUF, SOL_SOCKET, ACE_OS::srand(), ssock_, and ACE_OS::time().
00024 : params_ (params), 00025 addr_ (addr), 00026 ssock_ (Address (static_cast<unsigned short> (0), 00027 static_cast<ACE_UINT32> (INADDR_ANY)), 00028 AF_INET, 00029 IPPROTO_UDP, 00030 1), 00031 stop_ (false) 00032 00033 { 00034 ACE_OS::srand ((unsigned int) ACE_OS::time (0)); 00035 00036 00037 rsock_.set_option (IP_MULTICAST_LOOP, 0); 00038 // rsock_.set_option (IP_MULTICAST_TTL, 0); 00039 00040 // Set recv/send buffers. 00041 // 00042 { 00043 int r (131070); 00044 int s (sizeof (r)); 00045 00046 static_cast<ACE_SOCK&> (rsock_).set_option ( 00047 SOL_SOCKET, SO_RCVBUF, &r, s); 00048 00049 static_cast<ACE_SOCK&> (ssock_).set_option ( 00050 SOL_SOCKET, SO_RCVBUF, &r, s); 00051 00052 rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); 00053 //cerr << 5 << "recv buffer size: " << r << endl; 00054 00055 ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); 00056 //cerr << 5 << "send buffer size: " << r << endl; 00057 00058 } 00059 00060 // Bind address and port. 00061 // 00062 if (ACE_OS::connect (ssock_.get_handle (), 00063 reinterpret_cast<sockaddr*> (addr_.get_addr ()), 00064 addr_.get_addr_size ()) == -1) 00065 { 00066 ACE_OS::perror ("connect: "); 00067 ACE_OS::abort (); 00068 } 00069 00070 00071 ssock_.get_local_addr (self_); 00072 00073 //cerr << 5 << "self: " << self_ << endl; 00074 }
void ACE_RMCast::Link::in_start | ( | In_Element * | in | ) | [virtual] |
Reimplemented from ACE_RMCast::In_Element.
Definition at line 77 of file Link.cpp.
References addr_, ACE_RMCast::In_Element::in_start(), ACE_SOCK_Dgram_Mcast::join(), recv_mgr_, recv_thunk(), rsock_, and ACE_Thread_Manager::spawn().
00078 { 00079 Element::in_start (in); 00080 00081 rsock_.join (addr_); 00082 00083 // Start receiving thread. 00084 // 00085 recv_mgr_.spawn (recv_thunk, this); 00086 }
void ACE_RMCast::Link::in_stop | ( | ) | [virtual] |
Reimplemented from ACE_RMCast::In_Element.
Definition at line 95 of file Link.cpp.
References ACE_RMCast::In_Element::in_stop(), mutex_, recv_mgr_, stop_, and ACE_Thread_Manager::wait().
00096 { 00097 // Stop receiving thread. 00098 // 00099 { 00100 Lock l (mutex_); 00101 stop_ = true; 00102 } 00103 recv_mgr_.wait (); 00104 00105 Element::in_stop (); 00106 }
void ACE_RMCast::Link::out_start | ( | Out_Element * | out | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 89 of file Link.cpp.
References ACE_RMCast::Out_Element::out_start().
00090 { 00091 Element::out_start (out); 00092 }
void ACE_RMCast::Link::recv | ( | Message_ptr | ) | [private, virtual] |
Reimplemented from ACE_RMCast::In_Element.
Definition at line 333 of file Link.cpp.
References ACE_OS::abort().
00334 { 00335 ACE_OS::abort (); 00336 }
void ACE_RMCast::Link::recv | ( | ) | [private] |
Definition at line 187 of file Link.cpp.
References ACE_OS::abort(), ACE_ptr_align_binary(), ACE_RMCast::Profile::id(), ACE_RMCast::In_Element::in_, ACE_CDR::MAX_ALIGNMENT, ACE_RMCast::Parameters::max_packet_size(), MSG_PEEK, mutex_, params_, ACE_RMCast::In_Element::recv(), ACE_SOCK_Dgram::recv(), rsock_, self_, stop_, and ACE_RMCast::Parameters::tick().
Referenced by recv_thunk().
00188 { 00189 size_t max_packet_size (params_.max_packet_size ()); 00190 00191 // This is wicked. 00192 // 00193 ACE_Auto_Ptr<char> holder ( 00194 reinterpret_cast<char*> ( 00195 operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT))); 00196 00197 char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT); 00198 00199 size_t size (0); 00200 00201 while (true) 00202 { 00203 //@@ Should I lock here? 00204 // 00205 00206 Address addr; 00207 00208 // Block for up to one tick waiting for an incomming message. 00209 // 00210 for (;;) 00211 { 00212 ACE_Time_Value t (params_.tick ()); 00213 ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t); 00214 00215 00216 // Check for cancellation request. 00217 // 00218 { 00219 Lock l (mutex_); 00220 if (stop_) 00221 return; 00222 } 00223 00224 if (r == -1) 00225 { 00226 if (errno != ETIME) 00227 ACE_OS::abort (); 00228 } 00229 else 00230 { 00231 size = static_cast<size_t> (r); 00232 break; 00233 } 00234 } 00235 00236 00237 if (size != 4 || addr == self_) 00238 { 00239 // Discard bad messages and ones from ourselvs since 00240 // we are using reliable loopback. 00241 // 00242 rsock_.recv (data, 0, addr); 00243 continue; 00244 } 00245 00246 u32 msg_size; 00247 { 00248 istream is (data, size, 1); // Always little-endian. 00249 is >> msg_size; 00250 } 00251 00252 if (msg_size <= 4 || msg_size > max_packet_size) 00253 { 00254 // Bad message. 00255 // 00256 rsock_.recv (data, 0, addr); 00257 continue; 00258 } 00259 00260 size = rsock_.recv (data, max_packet_size, addr); 00261 00262 if (msg_size != size) 00263 { 00264 // Bad message. 00265 // 00266 continue; 00267 } 00268 00269 //cerr << 6 << "from: " << addr << endl; 00270 00271 Message_ptr m (new Message ()); 00272 00273 m->add (Profile_ptr (new From (addr))); 00274 m->add (Profile_ptr (new To (self_))); 00275 00276 istream is (data, size, 1); // Always little-endian. 00277 00278 is >> msg_size; 00279 00280 while (true) 00281 { 00282 u16 id, size; 00283 00284 if (!((is >> id) && (is >> size))) break; 00285 00286 //cerr << 6 << "reading profile with id " << id << " " 00287 // << size << " bytes long" << endl; 00288 00289 Profile::Header hdr (id, size); 00290 00291 if (id == SN::id) 00292 { 00293 m->add (Profile_ptr (new SN (hdr, is))); 00294 } 00295 else if (id == Data::id) 00296 { 00297 m->add (Profile_ptr (new Data (hdr, is))); 00298 } 00299 else if (id == NAK::id) 00300 { 00301 m->add (Profile_ptr (new NAK (hdr, is))); 00302 } 00303 else if (id == NRTM::id) 00304 { 00305 m->add (Profile_ptr (new NRTM (hdr, is))); 00306 } 00307 else if (id == NoData::id) 00308 { 00309 m->add (Profile_ptr (new NoData (hdr, is))); 00310 } 00311 else if (id == Part::id) 00312 { 00313 m->add (Profile_ptr (new Part (hdr, is))); 00314 } 00315 else 00316 { 00317 //cerr << 0 << "unknown profile id " << hdr.id () << endl; 00318 ACE_OS::abort (); 00319 } 00320 } 00321 00322 in_->recv (m); 00323 } 00324 }
ACE_THR_FUNC_RETURN ACE_RMCast::Link::recv_thunk | ( | void * | obj | ) | [static, private] |
Definition at line 327 of file Link.cpp.
References recv().
Referenced by in_start().
00328 { 00329 reinterpret_cast<Link*> (obj)->recv (); 00330 return 0; 00331 }
void ACE_RMCast::Link::send | ( | Message_ptr | m | ) | [virtual] |
Reimplemented from ACE_RMCast::Out_Element.
Definition at line 108 of file Link.cpp.
References ACE_Strong_Bound_Ptr< X, ACE_LOCK >::get(), hold_, ACE_RMCast::In_Element::in_, mutex_, params_, ACE_OS::rand(), ACE_RMCast::In_Element::recv(), self_, send_(), and ACE_RMCast::Parameters::simulator().
00109 { 00110 // Simulate message loss and reordering. 00111 // 00112 if (params_.simulator ()) 00113 { 00114 if ((ACE_OS::rand () % 17) != 0) 00115 { 00116 Lock l (mutex_); 00117 00118 if (hold_.get ()) 00119 { 00120 send_ (m); 00121 send_ (hold_); 00122 hold_ = Message_ptr (0); 00123 } 00124 else 00125 { 00126 if ((ACE_OS::rand () % 17) != 0) 00127 { 00128 send_ (m); 00129 } 00130 else 00131 { 00132 hold_ = m; 00133 00134 // Make a copy in M so that the reliable loop below 00135 // won't add FROM and TO to HOLD_. 00136 // 00137 m = hold_->clone (); 00138 } 00139 } 00140 } 00141 } 00142 else 00143 send_ (m); 00144 00145 // Reliable loop. 00146 // 00147 m->add (Profile_ptr (new From (self_))); 00148 m->add (Profile_ptr (new To (self_))); 00149 00150 in_->recv (m); 00151 }
void ACE_RMCast::Link::send_ | ( | Message_ptr | m | ) | [private, virtual] |
Definition at line 154 of file Link.cpp.
References ACE_OS::abort(), ACE_ERROR, addr_, LM_ERROR, ACE_RMCast::Parameters::max_packet_size(), params_, ACE_SOCK_Dgram::send(), and ssock_.
Referenced by send().
00155 { 00156 ostream os (m->size (), 1); // Always little-endian. 00157 00158 os << *m; 00159 00160 if (os.length () > size_t (params_.max_packet_size ())) 00161 { 00162 ACE_ERROR ((LM_ERROR, 00163 "packet length (%d) exceeds max_poacket_size (%d)\n", 00164 os.length (), params_.max_packet_size ())); 00165 00166 for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ()) 00167 { 00168 ACE_ERROR ((LM_ERROR, 00169 "profile id: %d; size: %d\n", 00170 (*i).ext_id_, (*i).int_id_->size ())); 00171 } 00172 00173 ACE_OS::abort (); 00174 } 00175 00176 ssock_.send (os.buffer (), os.length (), addr_); 00177 00178 /* 00179 if (m->find (nrtm::id)) 00180 { 00181 ACE_OS::write (1, os.buffer (), os.length ()); 00182 ACE_OS::exit (1); 00183 } 00184 */ 00185 }
Address ACE_RMCast::Link::addr_ [private] |
Message_ptr ACE_RMCast::Link::hold_ [private] |
Mutex ACE_RMCast::Link::mutex_ [private] |
Parameters const& ACE_RMCast::Link::params_ [private] |
ACE_SOCK_Dgram_Mcast ACE_RMCast::Link::rsock_ [private] |
Address ACE_RMCast::Link::self_ [private] |
ACE_SOCK_Dgram ACE_RMCast::Link::ssock_ [private] |
bool ACE_RMCast::Link::stop_ [private] |