00001
00002
00003
00004
00005 #include "ace/Time_Value.h"
00006 #include "ace/OS_NS_stdio.h"
00007 #include "ace/OS_NS_stdlib.h"
00008 #include "ace/OS_NS_sys_socket.h"
00009
00010 #include "Link.h"
00011
00012 namespace ACE_RMCast
00013 {
00014 Link::
00015 ~Link ()
00016 {
00017 ssock_.close ();
00018 rsock_.close ();
00019 }
00020
00021 Link::
00022 Link (Address const& addr, Parameters const& params)
00023 : params_ (params),
00024 addr_ (addr),
00025 ssock_ (Address (static_cast<unsigned short> (0),
00026 static_cast<ACE_UINT32> (INADDR_ANY)),
00027 AF_INET,
00028 IPPROTO_UDP,
00029 1),
00030 stop_ (false)
00031
00032 {
00033 srand (time (0));
00034
00035
00036 rsock_.set_option (IP_MULTICAST_LOOP, 0);
00037
00038
00039
00040
00041 {
00042 int r (131070);
00043 int s (sizeof (r));
00044
00045 static_cast<ACE_SOCK&> (rsock_).set_option (
00046 SOL_SOCKET, SO_RCVBUF, &r, s);
00047
00048 static_cast<ACE_SOCK&> (ssock_).set_option (
00049 SOL_SOCKET, SO_RCVBUF, &r, s);
00050
00051 rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00052
00053
00054 ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00055
00056
00057 }
00058
00059
00060
00061 if (ACE_OS::connect (ssock_.get_handle (),
00062 reinterpret_cast<sockaddr*> (addr_.get_addr ()),
00063 addr_.get_addr_size ()) == -1)
00064 {
00065 ACE_OS::perror ("connect: ");
00066 ACE_OS::abort ();
00067 }
00068
00069
00070 ssock_.get_local_addr (self_);
00071
00072
00073 }
00074
00075 void Link::
00076 in_start (In_Element* in)
00077 {
00078 Element::in_start (in);
00079
00080 rsock_.join (addr_);
00081
00082
00083
00084 recv_mgr_.spawn (recv_thunk, this);
00085 }
00086
00087 void Link::
00088 out_start (Out_Element* out)
00089 {
00090 Element::out_start (out);
00091 }
00092
00093 void Link::
00094 in_stop ()
00095 {
00096
00097
00098 {
00099 Lock l (mutex_);
00100 stop_ = true;
00101 }
00102 recv_mgr_.wait ();
00103
00104 Element::in_stop ();
00105 }
00106
00107 void Link::
00108 send (Message_ptr m)
00109 {
00110
00111
00112 if (params_.simulator ())
00113 {
00114 if ((rand () % 17) != 0)
00115 {
00116 Lock l (mutex_);
00117
00118 if (hold_.get ())
00119 {
00120 send_ (m);
00121 send_ (hold_);
00122 hold_ = Message_ptr (0);
00123 }
00124 else
00125 {
00126 if ((rand () % 17) != 0)
00127 {
00128 send_ (m);
00129 }
00130 else
00131 {
00132 hold_ = m;
00133
00134
00135
00136
00137 m = hold_->clone ();
00138 }
00139 }
00140 }
00141 }
00142 else
00143 send_ (m);
00144
00145
00146
00147 m->add (Profile_ptr (new From (self_)));
00148 m->add (Profile_ptr (new To (self_)));
00149
00150 in_->recv (m);
00151 }
00152
00153 void Link::
00154 send_ (Message_ptr m)
00155 {
00156 ostream os (m->size (), 1);
00157
00158 os << *m;
00159
00160 if (os.length () > size_t (params_.max_packet_size ()))
00161 {
00162 ACE_ERROR ((LM_ERROR,
00163 "packet length (%d) exceeds max_poacket_size (%d)\n",
00164 os.length (), params_.max_packet_size ()));
00165
00166 for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ())
00167 {
00168 ACE_ERROR ((LM_ERROR,
00169 "profile id: %d; size: %d\n",
00170 (*i).ext_id_, (*i).int_id_->size ()));
00171 }
00172
00173 ACE_OS::abort ();
00174 }
00175
00176 ssock_.send (os.buffer (), os.length (), addr_);
00177
00178
00179
00180
00181
00182
00183
00184
00185 }
00186
00187 void Link::
00188 recv ()
00189 {
00190 size_t max_packet_size (params_.max_packet_size ());
00191
00192
00193
00194 ACE_Auto_Ptr<char> holder (
00195 reinterpret_cast<char*> (
00196 operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT)));
00197
00198 char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT);
00199
00200 size_t size (0);
00201
00202 while (true)
00203 {
00204
00205
00206
00207 Address addr;
00208
00209
00210
00211 for (;;)
00212 {
00213 ACE_Time_Value t (params_.tick ());
00214 ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
00215
00216
00217
00218
00219 {
00220 Lock l (mutex_);
00221 if (stop_)
00222 return;
00223 }
00224
00225 if (r == -1)
00226 {
00227 if (errno != ETIME)
00228 ACE_OS::abort ();
00229 }
00230 else
00231 {
00232 size = static_cast<size_t> (r);
00233 break;
00234 }
00235 }
00236
00237
00238 if (size != 4 || addr == self_)
00239 {
00240
00241
00242
00243 rsock_.recv (data, 0, addr);
00244 continue;
00245 }
00246
00247 u32 msg_size;
00248 {
00249 istream is (data, size, 1);
00250 is >> msg_size;
00251 }
00252
00253 if (msg_size <= 4 || msg_size > max_packet_size)
00254 {
00255
00256
00257 rsock_.recv (data, 0, addr);
00258 continue;
00259 }
00260
00261 size = rsock_.recv (data, max_packet_size, addr);
00262
00263 if (msg_size != size)
00264 {
00265
00266
00267 continue;
00268 }
00269
00270
00271
00272 Message_ptr m (new Message ());
00273
00274 m->add (Profile_ptr (new From (addr)));
00275 m->add (Profile_ptr (new To (self_)));
00276
00277 istream is (data, size, 1);
00278
00279 is >> msg_size;
00280
00281 while (true)
00282 {
00283 u16 id, size;
00284
00285 if (!((is >> id) && (is >> size))) break;
00286
00287
00288
00289
00290 Profile::Header hdr (id, size);
00291
00292 if (id == SN::id)
00293 {
00294 m->add (Profile_ptr (new SN (hdr, is)));
00295 }
00296 else if (id == Data::id)
00297 {
00298 m->add (Profile_ptr (new Data (hdr, is)));
00299 }
00300 else if (id == NAK::id)
00301 {
00302 m->add (Profile_ptr (new NAK (hdr, is)));
00303 }
00304 else if (id == NRTM::id)
00305 {
00306 m->add (Profile_ptr (new NRTM (hdr, is)));
00307 }
00308 else if (id == NoData::id)
00309 {
00310 m->add (Profile_ptr (new NoData (hdr, is)));
00311 }
00312 else if (id == Part::id)
00313 {
00314 m->add (Profile_ptr (new Part (hdr, is)));
00315 }
00316 else
00317 {
00318
00319 ACE_OS::abort ();
00320 }
00321 }
00322
00323 in_->recv (m);
00324 }
00325 }
00326
00327 ACE_THR_FUNC_RETURN Link::
00328 recv_thunk (void* obj)
00329 {
00330 reinterpret_cast<Link*> (obj)->recv ();
00331 return 0;
00332 }
00333
00334 void Link::
00335 recv (Message_ptr)
00336 {
00337 ACE_OS::abort ();
00338 }
00339 }