00001
00002
00003
00004
00005 #include "ace/Time_Value.h"
00006 #include "ace/OS_NS_stdio.h"
00007 #include "ace/OS_NS_stdlib.h"
00008 #include "ace/OS_NS_time.h"
00009 #include "ace/OS_NS_sys_socket.h"
00010
00011 #include "Link.h"
00012
00013 namespace ACE_RMCast
00014 {
00015 Link::
00016 ~Link ()
00017 {
00018 ssock_.close ();
00019 rsock_.close ();
00020 }
00021
00022 Link::
00023 Link (Address const& addr, Parameters const& params)
00024 : params_ (params),
00025 addr_ (addr),
00026 ssock_ (Address (static_cast<unsigned short> (0),
00027 static_cast<ACE_UINT32> (INADDR_ANY)),
00028 AF_INET,
00029 IPPROTO_UDP,
00030 1),
00031 stop_ (false)
00032
00033 {
00034 ACE_OS::srand ((unsigned int) ACE_OS::time (0));
00035
00036
00037 rsock_.set_option (IP_MULTICAST_LOOP, 0);
00038
00039
00040
00041
00042 {
00043 int r (131070);
00044 int s (sizeof (r));
00045
00046 static_cast<ACE_SOCK&> (rsock_).set_option (
00047 SOL_SOCKET, SO_RCVBUF, &r, s);
00048
00049 static_cast<ACE_SOCK&> (ssock_).set_option (
00050 SOL_SOCKET, SO_RCVBUF, &r, s);
00051
00052 rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00053
00054
00055 ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
00056
00057
00058 }
00059
00060
00061
00062 if (ACE_OS::connect (ssock_.get_handle (),
00063 reinterpret_cast<sockaddr*> (addr_.get_addr ()),
00064 addr_.get_addr_size ()) == -1)
00065 {
00066 ACE_OS::perror ("connect: ");
00067 ACE_OS::abort ();
00068 }
00069
00070
00071 ssock_.get_local_addr (self_);
00072
00073
00074 }
00075
00076 void Link::
00077 in_start (In_Element* in)
00078 {
00079 Element::in_start (in);
00080
00081 rsock_.join (addr_);
00082
00083
00084
00085 recv_mgr_.spawn (recv_thunk, this);
00086 }
00087
00088 void Link::
00089 out_start (Out_Element* out)
00090 {
00091 Element::out_start (out);
00092 }
00093
00094 void Link::
00095 in_stop ()
00096 {
00097
00098
00099 {
00100 Lock l (mutex_);
00101 stop_ = true;
00102 }
00103 recv_mgr_.wait ();
00104
00105 Element::in_stop ();
00106 }
00107
00108 void Link::send (Message_ptr m)
00109 {
00110
00111
00112 if (params_.simulator ())
00113 {
00114 if ((ACE_OS::rand () % 17) != 0)
00115 {
00116 Lock l (mutex_);
00117
00118 if (hold_.get ())
00119 {
00120 send_ (m);
00121 send_ (hold_);
00122 hold_ = Message_ptr (0);
00123 }
00124 else
00125 {
00126 if ((ACE_OS::rand () % 17) != 0)
00127 {
00128 send_ (m);
00129 }
00130 else
00131 {
00132 hold_ = m;
00133
00134
00135
00136
00137 m = hold_->clone ();
00138 }
00139 }
00140 }
00141 }
00142 else
00143 send_ (m);
00144
00145
00146
00147 m->add (Profile_ptr (new From (self_)));
00148 m->add (Profile_ptr (new To (self_)));
00149
00150 in_->recv (m);
00151 }
00152
00153 void Link::
00154 send_ (Message_ptr m)
00155 {
00156 ostream os (m->size (), 1);
00157
00158 os << *m;
00159
00160 if (os.length () > size_t (params_.max_packet_size ()))
00161 {
00162 ACE_ERROR ((LM_ERROR,
00163 "packet length (%d) exceeds max_poacket_size (%d)\n",
00164 os.length (), params_.max_packet_size ()));
00165
00166 for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ())
00167 {
00168 ACE_ERROR ((LM_ERROR,
00169 "profile id: %d; size: %d\n",
00170 (*i).ext_id_, (*i).int_id_->size ()));
00171 }
00172
00173 ACE_OS::abort ();
00174 }
00175
00176 ssock_.send (os.buffer (), os.length (), addr_);
00177
00178
00179
00180
00181
00182
00183
00184
00185 }
00186
00187 void Link::recv ()
00188 {
00189 size_t max_packet_size (params_.max_packet_size ());
00190
00191
00192
00193 ACE_Auto_Ptr<char> holder (
00194 reinterpret_cast<char*> (
00195 operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT)));
00196
00197 char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT);
00198
00199 size_t size (0);
00200
00201 while (true)
00202 {
00203
00204
00205
00206 Address addr;
00207
00208
00209
00210 for (;;)
00211 {
00212 ACE_Time_Value t (params_.tick ());
00213 ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
00214
00215
00216
00217
00218 {
00219 Lock l (mutex_);
00220 if (stop_)
00221 return;
00222 }
00223
00224 if (r == -1)
00225 {
00226 if (errno != ETIME)
00227 ACE_OS::abort ();
00228 }
00229 else
00230 {
00231 size = static_cast<size_t> (r);
00232 break;
00233 }
00234 }
00235
00236
00237 if (size != 4 || addr == self_)
00238 {
00239
00240
00241
00242 rsock_.recv (data, 0, addr);
00243 continue;
00244 }
00245
00246 u32 msg_size;
00247 {
00248 istream is (data, size, 1);
00249 is >> msg_size;
00250 }
00251
00252 if (msg_size <= 4 || msg_size > max_packet_size)
00253 {
00254
00255
00256 rsock_.recv (data, 0, addr);
00257 continue;
00258 }
00259
00260 size = rsock_.recv (data, max_packet_size, addr);
00261
00262 if (msg_size != size)
00263 {
00264
00265
00266 continue;
00267 }
00268
00269
00270
00271 Message_ptr m (new Message ());
00272
00273 m->add (Profile_ptr (new From (addr)));
00274 m->add (Profile_ptr (new To (self_)));
00275
00276 istream is (data, size, 1);
00277
00278 is >> msg_size;
00279
00280 while (true)
00281 {
00282 u16 id, size;
00283
00284 if (!((is >> id) && (is >> size))) break;
00285
00286
00287
00288
00289 Profile::Header hdr (id, size);
00290
00291 if (id == SN::id)
00292 {
00293 m->add (Profile_ptr (new SN (hdr, is)));
00294 }
00295 else if (id == Data::id)
00296 {
00297 m->add (Profile_ptr (new Data (hdr, is)));
00298 }
00299 else if (id == NAK::id)
00300 {
00301 m->add (Profile_ptr (new NAK (hdr, is)));
00302 }
00303 else if (id == NRTM::id)
00304 {
00305 m->add (Profile_ptr (new NRTM (hdr, is)));
00306 }
00307 else if (id == NoData::id)
00308 {
00309 m->add (Profile_ptr (new NoData (hdr, is)));
00310 }
00311 else if (id == Part::id)
00312 {
00313 m->add (Profile_ptr (new Part (hdr, is)));
00314 }
00315 else
00316 {
00317
00318 ACE_OS::abort ();
00319 }
00320 }
00321
00322 in_->recv (m);
00323 }
00324 }
00325
00326 ACE_THR_FUNC_RETURN Link::
00327 recv_thunk (void* obj)
00328 {
00329 reinterpret_cast<Link*> (obj)->recv ();
00330 return 0;
00331 }
00332
00333 void Link::recv (Message_ptr)
00334 {
00335 ACE_OS::abort ();
00336 }
00337 }