#include <Link.h>
Inheritance diagram for ACE_RMCast::Link:
Public Member Functions | |
~Link () | |
Link (Address const &addr, Parameters const ¶ms) | |
virtual void | in_start (In_Element *in) |
virtual void | out_start (Out_Element *out) |
virtual void | in_stop () |
virtual void | send (Message_ptr m) |
Private Member Functions | |
virtual void | send_ (Message_ptr m) |
void | recv () |
virtual void | recv (Message_ptr) |
Static Private Member Functions | |
ACE_THR_FUNC_RETURN | recv_thunk (void *obj) |
Private Attributes | |
Parameters const & | params_ |
Address | addr_ |
Address | self_ |
ACE_SOCK_Dgram_Mcast | rsock_ |
ACE_SOCK_Dgram | ssock_ |
bool | stop_ |
ACE_Thread_Manager | recv_mgr_ |
Message_ptr | hold_ |
Mutex | mutex_ |
|
Definition at line 15 of file Link.cpp. References ACE_SOCK::close(), rsock_, and ssock_.
|
|
Definition at line 22 of file Link.cpp. References ACE_OS::abort(), ACE_RMCast::Address, ACE_OS::connect(), ACE_INET_Addr::get_addr(), ACE_INET_Addr::get_addr_size(), ACE_IPC_SAP::get_handle(), ACE_SOCK::get_local_addr(), ACE_SOCK::get_option(), IP_MULTICAST_LOOP, ACE_OS::perror(), rsock_, self_, ACE_SOCK_Dgram_Mcast::set_option(), and ssock_.
00023 : params_ (params), 00024 addr_ (addr), 00025 ssock_ (Address (static_cast<unsigned short> (0), 00026 static_cast<ACE_UINT32> (INADDR_ANY)), 00027 AF_INET, 00028 IPPROTO_UDP, 00029 1), 00030 stop_ (false) 00031 00032 { 00033 srand (time (0)); 00034 00035 00036 rsock_.set_option (IP_MULTICAST_LOOP, 0); 00037 // rsock_.set_option (IP_MULTICAST_TTL, 0); 00038 00039 // Set recv/send buffers. 00040 // 00041 { 00042 int r (131070); 00043 int s (sizeof (r)); 00044 00045 static_cast<ACE_SOCK&> (rsock_).set_option ( 00046 SOL_SOCKET, SO_RCVBUF, &r, s); 00047 00048 static_cast<ACE_SOCK&> (ssock_).set_option ( 00049 SOL_SOCKET, SO_RCVBUF, &r, s); 00050 00051 rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); 00052 //cerr << 5 << "recv buffer size: " << r << endl; 00053 00054 ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); 00055 //cerr << 5 << "send buffer size: " << r << endl; 00056 00057 } 00058 00059 // Bind address and port. 00060 // 00061 if (ACE_OS::connect (ssock_.get_handle (), 00062 reinterpret_cast<sockaddr*> (addr_.get_addr ()), 00063 addr_.get_addr_size ()) == -1) 00064 { 00065 ACE_OS::perror ("connect: "); 00066 ACE_OS::abort (); 00067 } 00068 00069 00070 ssock_.get_local_addr (self_); 00071 00072 //cerr << 5 << "self: " << self_ << endl; 00073 } |
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 76 of file Link.cpp. References ACE_SOCK_Dgram_Mcast::join(), recv_mgr_, recv_thunk(), rsock_, and ACE_Thread_Manager::spawn().
|
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 94 of file Link.cpp. References ACE_RMCast::Lock, recv_mgr_, and ACE_Thread_Manager::wait().
|
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 88 of file Link.cpp.
00089 { 00090 Element::out_start (out); 00091 } |
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 335 of file Link.cpp. References ACE_OS::abort(), and ACE_RMCast::Message_ptr.
00336 { 00337 ACE_OS::abort (); 00338 } |
|
Definition at line 188 of file Link.cpp. References ACE_OS::abort(), ACE_ptr_align_binary, ACE_RMCast::Address, ETIME, ACE_Auto_Basic_Ptr< X >::get(), ACE_RMCast::istream, ACE_RMCast::Lock, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::Message_ptr, ACE_RMCast::Profile_ptr, ACE_RMCast::In_Element::recv(), ACE_SOCK_Dgram::recv(), rsock_, self_, ssize_t, ACE_RMCast::Parameters::tick(), ACE_RMCast::u16, and ACE_RMCast::u32.
00189 { 00190 size_t max_packet_size (params_.max_packet_size ()); 00191 00192 // This is wicked. 00193 // 00194 ACE_Auto_Ptr<char> holder ( 00195 reinterpret_cast<char*> ( 00196 operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT))); 00197 00198 char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT); 00199 00200 size_t size (0); 00201 00202 while (true) 00203 { 00204 //@@ Should I lock here? 00205 // 00206 00207 Address addr; 00208 00209 // Block for up to one tick waiting for an incomming message. 00210 // 00211 for (;;) 00212 { 00213 ACE_Time_Value t (params_.tick ()); 00214 ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t); 00215 00216 00217 // Check for cancellation request. 00218 // 00219 { 00220 Lock l (mutex_); 00221 if (stop_) 00222 return; 00223 } 00224 00225 if (r == -1) 00226 { 00227 if (errno != ETIME) 00228 ACE_OS::abort (); 00229 } 00230 else 00231 { 00232 size = static_cast<size_t> (r); 00233 break; 00234 } 00235 } 00236 00237 00238 if (size != 4 || addr == self_) 00239 { 00240 // Discard bad messages and ones from ourselvs since 00241 // we are using reliable loopback. 00242 // 00243 rsock_.recv (data, 0, addr); 00244 continue; 00245 } 00246 00247 u32 msg_size; 00248 { 00249 istream is (data, size, 1); // Always little-endian. 00250 is >> msg_size; 00251 } 00252 00253 if (msg_size <= 4 || msg_size > max_packet_size) 00254 { 00255 // Bad message. 00256 // 00257 rsock_.recv (data, 0, addr); 00258 continue; 00259 } 00260 00261 size = rsock_.recv (data, max_packet_size, addr); 00262 00263 if (msg_size != size) 00264 { 00265 // Bad message. 00266 // 00267 continue; 00268 } 00269 00270 //cerr << 6 << "from: " << addr << endl; 00271 00272 Message_ptr m (new Message ()); 00273 00274 m->add (Profile_ptr (new From (addr))); 00275 m->add (Profile_ptr (new To (self_))); 00276 00277 istream is (data, size, 1); // Always little-endian. 00278 00279 is >> msg_size; 00280 00281 while (true) 00282 { 00283 u16 id, size; 00284 00285 if (!((is >> id) && (is >> size))) break; 00286 00287 //cerr << 6 << "reading profile with id " << id << " " 00288 // << size << " bytes long" << endl; 00289 00290 Profile::Header hdr (id, size); 00291 00292 if (id == SN::id) 00293 { 00294 m->add (Profile_ptr (new SN (hdr, is))); 00295 } 00296 else if (id == Data::id) 00297 { 00298 m->add (Profile_ptr (new Data (hdr, is))); 00299 } 00300 else if (id == NAK::id) 00301 { 00302 m->add (Profile_ptr (new NAK (hdr, is))); 00303 } 00304 else if (id == NRTM::id) 00305 { 00306 m->add (Profile_ptr (new NRTM (hdr, is))); 00307 } 00308 else if (id == NoData::id) 00309 { 00310 m->add (Profile_ptr (new NoData (hdr, is))); 00311 } 00312 else if (id == Part::id) 00313 { 00314 m->add (Profile_ptr (new Part (hdr, is))); 00315 } 00316 else 00317 { 00318 //cerr << 0 << "unknown profile id " << hdr.id () << endl; 00319 ACE_OS::abort (); 00320 } 00321 } 00322 00323 in_->recv (m); 00324 } 00325 } |
|
Definition at line 328 of file Link.cpp. Referenced by in_start().
00329 {
00330 reinterpret_cast<Link*> (obj)->recv ();
00331 return 0;
00332 }
|
|
Reimplemented from ACE_RMCast::Out_Element. Definition at line 108 of file Link.cpp. References ACE_Refcounted_Auto_Ptr< X, ACE_LOCK >::get(), ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_RMCast::Profile_ptr, ACE_RMCast::In_Element::recv(), self_, send_(), and ACE_RMCast::Parameters::simulator().
00109 { 00110 // Simulate message loss and reordering. 00111 // 00112 if (params_.simulator ()) 00113 { 00114 if ((rand () % 17) != 0) 00115 { 00116 Lock l (mutex_); 00117 00118 if (hold_.get ()) 00119 { 00120 send_ (m); 00121 send_ (hold_); 00122 hold_ = Message_ptr (0); 00123 } 00124 else 00125 { 00126 if ((rand () % 17) != 0) 00127 { 00128 send_ (m); 00129 } 00130 else 00131 { 00132 hold_ = m; 00133 00134 // Make a copy in M so that the reliable loop below 00135 // won't add FROM and TO to HOLD_. 00136 // 00137 m = hold_->clone (); 00138 } 00139 } 00140 } 00141 } 00142 else 00143 send_ (m); 00144 00145 // Reliable loop. 00146 // 00147 m->add (Profile_ptr (new From (self_))); 00148 m->add (Profile_ptr (new To (self_))); 00149 00150 in_->recv (m); 00151 } |
|
Definition at line 154 of file Link.cpp. References ACE_OS::abort(), ACE_ERROR, ACE_OutputCDR::buffer(), ACE_OutputCDR::length(), LM_ERROR, ACE_RMCast::Parameters::max_packet_size(), ACE_RMCast::Message_ptr, ACE_RMCast::ostream, ACE_RMCast::Message::ProfileIterator, ACE_SOCK_Dgram::send(), and ssock_. Referenced by send().
00155 { 00156 ostream os (m->size (), 1); // Always little-endian. 00157 00158 os << *m; 00159 00160 if (os.length () > size_t (params_.max_packet_size ())) 00161 { 00162 ACE_ERROR ((LM_ERROR, 00163 "packet length (%d) exceeds max_poacket_size (%d)\n", 00164 os.length (), params_.max_packet_size ())); 00165 00166 for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ()) 00167 { 00168 ACE_ERROR ((LM_ERROR, 00169 "profile id: %d; size: %d\n", 00170 (*i).ext_id_, (*i).int_id_->size ())); 00171 } 00172 00173 ACE_OS::abort (); 00174 } 00175 00176 ssock_.send (os.buffer (), os.length (), addr_); 00177 00178 /* 00179 if (m->find (nrtm::id)) 00180 { 00181 write (1, os.buffer (), os.length ()); 00182 exit (1); 00183 } 00184 */ 00185 } |
|
|
|
|
|
|
|
|
|
Definition at line 62 of file Link.h. Referenced by in_start(), and in_stop(). |
|
Definition at line 58 of file Link.h. Referenced by in_start(), Link(), recv(), and ~Link(). |
|
|
|
|
|
|