00001
00002
00003
00004
00005 #include "ace/OS_Memory.h"
00006 #include "ace/OS_NS_stdio.h"
00007 #include "ace/OS_NS_stdlib.h"
00008 #include "ace/OS_NS_string.h"
00009 #include "ace/OS_NS_unistd.h"
00010 #include "ace/OS_NS_sys_time.h"
00011
00012 #include "ace/Unbounded_Queue.h"
00013
00014 #include "ace/Pipe.h"
00015
00016 #include "Stack.h"
00017 #include "Protocol.h"
00018 #include "Bits.h"
00019
00020 #include "Fragment.h"
00021 #include "Reassemble.h"
00022 #include "Acknowledge.h"
00023 #include "Retransmit.h"
00024 #include "Flow.h"
00025 #include "Link.h"
00026
00027 #include "Socket.h"
00028
00029
00030
00031
00032
00033
00034
00035 namespace ACE_RMCast
00036 {
00037 class Socket_Impl : protected Element
00038 {
00039 public:
00040 ~Socket_Impl ();
00041
00042 Socket_Impl (Address const& a, bool loop, Parameters const& params);
00043
00044 public:
00045 void
00046 send_ (void const* buf, size_t s);
00047
00048 ssize_t
00049 recv_ (void* buf,
00050 size_t s,
00051 ACE_Time_Value const* timeout,
00052 ACE_INET_Addr* from);
00053
00054 ssize_t
00055 size_ (ACE_Time_Value const* timeout);
00056
00057 ACE_HANDLE
00058 get_handle_ ();
00059
00060 private:
00061
00062 virtual void recv (Message_ptr m);
00063
00064
00065 private:
00066 bool loop_;
00067 Parameters const params_;
00068
00069 Mutex mutex_;
00070 Condition cond_;
00071
00072 ACE_Unbounded_Queue<Message_ptr> queue_;
00073
00074 ACE_Pipe signal_pipe_;
00075
00076 ACE_Auto_Ptr<Fragment> fragment_;
00077 ACE_Auto_Ptr<Reassemble> reassemble_;
00078 ACE_Auto_Ptr<Acknowledge> acknowledge_;
00079 ACE_Auto_Ptr<Retransmit> retransmit_;
00080 ACE_Auto_Ptr<Flow> flow_;
00081 ACE_Auto_Ptr<Link> link_;
00082 };
00083
00084
00085 Socket_Impl::
00086 Socket_Impl (Address const& a, bool loop, Parameters const& params)
00087 : loop_ (loop),
00088 params_ (params),
00089 cond_ (mutex_)
00090 {
00091 fragment_.reset (new Fragment (params_));
00092 reassemble_.reset (new Reassemble (params_));
00093 acknowledge_.reset (new Acknowledge (params_));
00094 retransmit_.reset (new Retransmit (params_));
00095 flow_.reset (new Flow (params_));
00096 link_.reset (new Link (a, params_));
00097
00098
00099
00100 in_start (0);
00101 fragment_->in_start (this);
00102 reassemble_->in_start (fragment_.get ());
00103 acknowledge_->in_start (reassemble_.get ());
00104 retransmit_->in_start (acknowledge_.get ());
00105 flow_->in_start (retransmit_.get ());
00106 link_->in_start (flow_.get ());
00107
00108
00109
00110 link_->out_start (0);
00111 flow_->out_start (link_.get ());
00112 retransmit_->out_start (flow_.get ());
00113 acknowledge_->out_start (retransmit_.get ());
00114 reassemble_->out_start (acknowledge_.get ());
00115 fragment_->out_start (reassemble_.get ());
00116 out_start (fragment_.get ());
00117 }
00118
00119 Socket_Impl::
00120 ~Socket_Impl ()
00121 {
00122
00123
00124 out_stop ();
00125 fragment_->out_stop ();
00126 reassemble_->out_stop ();
00127 acknowledge_->out_stop ();
00128 retransmit_->out_stop ();
00129 flow_->out_stop ();
00130 link_->out_stop ();
00131
00132
00133
00134 link_->in_stop ();
00135 flow_->in_stop ();
00136 retransmit_->in_stop ();
00137 acknowledge_->in_stop ();
00138 reassemble_->in_stop ();
00139 fragment_->in_stop ();
00140 in_stop ();
00141
00142
00143
00144 if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00145 signal_pipe_.close ();
00146 }
00147
00148
00149 void Socket_Impl::
00150 send_ (void const* buf, size_t s)
00151 {
00152 Message_ptr m (new Message);
00153
00154 m->add (Profile_ptr (new Data (buf, s)));
00155
00156
00157
00158 Element::send (m);
00159 }
00160
00161 ssize_t Socket_Impl::
00162 recv_ (void* buf,
00163 size_t s,
00164 ACE_Time_Value const* timeout,
00165 ACE_INET_Addr* from)
00166 {
00167 ACE_Time_Value abs_time;
00168
00169 if (timeout)
00170 abs_time = ACE_OS::gettimeofday () + *timeout;
00171
00172 Lock l (mutex_);
00173
00174 while (queue_.is_empty ())
00175 {
00176 if (timeout)
00177 {
00178 if (cond_.wait (&abs_time) != -1)
00179 break;
00180 }
00181 else
00182 {
00183 if (cond_.wait () != -1)
00184 break;
00185 }
00186
00187 return -1;
00188 }
00189
00190
00191 Message_ptr m;
00192
00193 if (queue_.dequeue_head (m) == -1)
00194 ACE_OS::abort ();
00195
00196
00197 if (queue_.is_empty ())
00198 {
00199
00200
00201 if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00202 {
00203 char c;
00204
00205 if (signal_pipe_.recv (&c, 1) != 1)
00206 {
00207 ACE_OS::perror ("read: ");
00208 ACE_OS::abort ();
00209 }
00210 }
00211 }
00212
00213 if (from)
00214 *from = static_cast<From const*> (m->find (From::id))->address ();
00215
00216 if (m->find (NoData::id) != 0)
00217 {
00218 errno = ENOENT;
00219 return -1;
00220 }
00221
00222 Data const* d = static_cast<Data const*>(m->find (Data::id));
00223
00224 ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s));
00225
00226 ACE_OS::memcpy (buf, d->buf (), r);
00227
00228 return r;
00229 }
00230
00231 ssize_t Socket_Impl::
00232 size_ (ACE_Time_Value const* timeout)
00233 {
00234 ACE_Time_Value abs_time;
00235
00236 if (timeout)
00237 abs_time = ACE_OS::gettimeofday () + *timeout;
00238
00239 Lock l (mutex_);
00240
00241 while (queue_.is_empty ())
00242 {
00243 if (timeout)
00244 {
00245 if (cond_.wait (&abs_time) != -1)
00246 break;
00247 }
00248 else
00249 {
00250 if (cond_.wait () != -1)
00251 break;
00252 }
00253
00254 return -1;
00255 }
00256
00257
00258
00259
00260 Message_ptr m;
00261
00262 if (queue_.dequeue_head (m) == -1)
00263 ACE_OS::abort ();
00264
00265 if (queue_.enqueue_head (m) == -1)
00266 ACE_OS::abort ();
00267
00268 if (m->find (NoData::id) != 0)
00269 {
00270 errno = ENOENT;
00271 return -1;
00272 }
00273
00274 Data const* d = static_cast<Data const*>(m->find (Data::id));
00275
00276 return static_cast<ssize_t> (d->size ());
00277 }
00278
00279 ACE_HANDLE Socket_Impl::
00280 get_handle_ ()
00281 {
00282 if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE)
00283 {
00284 signal_pipe_.open ();
00285 }
00286
00287 return signal_pipe_.read_handle ();
00288 }
00289
00290
00291 void Socket_Impl::recv (Message_ptr m)
00292 {
00293 if (m->find (Data::id) != 0 || m->find (NoData::id) != 0)
00294 {
00295 if (!loop_)
00296 {
00297 Address to (static_cast<To const*> (m->find (To::id))->address ());
00298
00299 Address from (
00300 static_cast<From const*> (m->find (From::id))->address ());
00301
00302 if (to == from)
00303 return;
00304 }
00305
00306 Lock l (mutex_);
00307
00308
00309
00310
00311
00312 bool signal (queue_.is_empty ());
00313
00314
00315 queue_.enqueue_tail (m);
00316
00317 if (signal)
00318 {
00319
00320 if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
00321 {
00322 char c;
00323
00324 if (signal_pipe_.send (&c, 1) != 1)
00325 {
00326
00327 ACE_OS::abort ();
00328 }
00329 }
00330
00331 cond_.signal ();
00332 }
00333 }
00334 }
00335
00336
00337
00338
00339
00340 Socket::
00341 ~Socket ()
00342 {
00343 }
00344
00345 Socket::
00346 Socket (Address const& a, bool loop, Parameters const& params)
00347 : impl_ (new Socket_Impl (a, loop, params))
00348 {
00349 }
00350
00351 void Socket::send (void const* buf, size_t s)
00352 {
00353 impl_->send_ (buf, s);
00354 }
00355
00356 ssize_t Socket::recv (void* buf, size_t s)
00357 {
00358 return impl_->recv_ (buf, s, 0, 0);
00359 }
00360
00361 ssize_t Socket::recv (void* buf, size_t s, ACE_INET_Addr& from)
00362 {
00363 return impl_->recv_ (buf, s, 0, &from);
00364 }
00365
00366 ssize_t Socket::recv (void* buf, size_t s, ACE_Time_Value const& timeout)
00367 {
00368 return impl_->recv_ (buf, s, &timeout, 0);
00369 }
00370
00371 ssize_t Socket::recv (void* buf,
00372 size_t s,
00373 ACE_Time_Value const& timeout,
00374 ACE_INET_Addr& from)
00375 {
00376 return impl_->recv_ (buf, s, &timeout, &from);
00377 }
00378
00379 ssize_t Socket::
00380 size ()
00381 {
00382 return impl_->size_ (0);
00383 }
00384
00385 ssize_t Socket::
00386 size (ACE_Time_Value const& timeout)
00387 {
00388 return impl_->size_ (&timeout);
00389 }
00390
00391 ACE_HANDLE Socket::
00392 get_handle ()
00393 {
00394 return impl_->get_handle_ ();
00395 }
00396 }