00001
00002
00003
00004
00005 #include "ace/OS_Memory.h"
00006 #include "ace/OS_NS_stdio.h"
00007 #include "ace/OS_NS_stdlib.h"
00008 #include "ace/OS_NS_string.h"
00009 #include "ace/OS_NS_unistd.h"
00010 #include "ace/OS_NS_sys_time.h"
00011
00012 #include "ace/Unbounded_Queue.h"
00013
00014 #include "ace/Pipe.h"
00015
00016 #include "Stack.h"
00017 #include "Protocol.h"
00018 #include "Bits.h"
00019
00020 #include "Fragment.h"
00021 #include "Reassemble.h"
00022 #include "Acknowledge.h"
00023 #include "Retransmit.h"
00024 #include "Flow.h"
00025 #include "Link.h"
00026
00027 #include "Socket.h"
00028
00029
00030
00031
00032
00033
00034
00035 namespace ACE_RMCast
00036 {
00037 class Socket_Impl : protected Element
00038 {
00039 public:
00040 ~Socket_Impl ();
00041
00042 Socket_Impl (Address const& a, bool loop, Parameters const& params);
00043
00044 public:
00045 void
00046 send_ (void const* buf, size_t s);
00047
00048 ssize_t
00049 recv_ (void* buf,
00050 size_t s,
00051 ACE_Time_Value const* timeout,
00052 ACE_INET_Addr* from);
00053
00054 ssize_t
00055 size_ (ACE_Time_Value const* timeout);
00056
00057 ACE_HANDLE
00058 get_handle_ ();
00059
00060 private:
00061 virtual void
00062 recv (Message_ptr m);
00063
00064 private:
00065 bool loop_;
00066 Parameters const params_;
00067
00068 Mutex mutex_;
00069 Condition cond_;
00070
00071 ACE_Unbounded_Queue<Message_ptr> queue_;
00072
00073 ACE_Pipe signal_pipe_;
00074
00075 ACE_Auto_Ptr<Fragment> fragment_;
00076 ACE_Auto_Ptr<Reassemble> reassemble_;
00077 ACE_Auto_Ptr<Acknowledge> acknowledge_;
00078 ACE_Auto_Ptr<Retransmit> retransmit_;
00079 ACE_Auto_Ptr<Flow> flow_;
00080 ACE_Auto_Ptr<Link> link_;
00081 };
00082
00083
00084 Socket_Impl::
00085 Socket_Impl (Address const& a, bool loop, Parameters const& params)
00086 : loop_ (loop),
00087 params_ (params),
00088 cond_ (mutex_)
00089 {
00090 fragment_.reset (new Fragment (params_));
00091 reassemble_.reset (new Reassemble (params_));
00092 acknowledge_.reset (new Acknowledge (params_));
00093 retransmit_.reset (new Retransmit (params_));
00094 flow_.reset (new Flow (params_));
00095 link_.reset (new Link (a, params_));
00096
00097
00098
00099 in_start (0);
00100 fragment_->in_start (this);
00101 reassemble_->in_start (fragment_.get ());
00102 acknowledge_->in_start (reassemble_.get ());
00103 retransmit_->in_start (acknowledge_.get ());
00104 flow_->in_start (retransmit_.get ());
00105 link_->in_start (flow_.get ());
00106
00107
00108
00109 link_->out_start (0);
00110 flow_->out_start (link_.get ());
00111 retransmit_->out_start (flow_.get ());
00112 acknowledge_->out_start (retransmit_.get ());
00113 reassemble_->out_start (acknowledge_.get ());
00114 fragment_->out_start (reassemble_.get ());
00115 out_start (fragment_.get ());
00116 }
00117
00118 Socket_Impl::
00119 ~Socket_Impl ()
00120 {
00121
00122
00123 out_stop ();
00124 fragment_->out_stop ();
00125 reassemble_->out_stop ();
00126 acknowledge_->out_stop ();
00127 retransmit_->out_stop ();
00128 flow_->out_stop ();
00129 link_->out_stop ();
00130
00131
00132
00133 link_->in_stop ();
00134 flow_->in_stop ();
00135 retransmit_->in_stop ();
00136 acknowledge_->in_stop ();
00137 reassemble_->in_stop ();
00138 fragment_->in_stop ();
00139 in_stop ();
00140
00141
00142
00143 if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00144 signal_pipe_.close ();
00145 }
00146
00147
00148 void Socket_Impl::
00149 send_ (void const* buf, size_t s)
00150 {
00151 Message_ptr m (new Message);
00152
00153 m->add (Profile_ptr (new Data (buf, s)));
00154
00155
00156
00157 Element::send (m);
00158 }
00159
00160 ssize_t Socket_Impl::
00161 recv_ (void* buf,
00162 size_t s,
00163 ACE_Time_Value const* timeout,
00164 ACE_INET_Addr* from)
00165 {
00166 ACE_Time_Value abs_time;
00167
00168 if (timeout)
00169 abs_time = ACE_OS::gettimeofday () + *timeout;
00170
00171 Lock l (mutex_);
00172
00173 while (queue_.is_empty ())
00174 {
00175 if (timeout)
00176 {
00177 if (cond_.wait (&abs_time) != -1)
00178 break;
00179 }
00180 else
00181 {
00182 if (cond_.wait () != -1)
00183 break;
00184 }
00185
00186 return -1;
00187 }
00188
00189
00190 Message_ptr m;
00191
00192 if (queue_.dequeue_head (m) == -1)
00193 ACE_OS::abort ();
00194
00195
00196 if (queue_.is_empty ())
00197 {
00198
00199
00200 if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
00201 {
00202 char c;
00203
00204 if (signal_pipe_.recv (&c, 1) != 1)
00205 {
00206 ACE_OS::perror ("read: ");
00207 ACE_OS::abort ();
00208 }
00209 }
00210 }
00211
00212 if (from)
00213 *from = static_cast<From const*> (m->find (From::id))->address ();
00214
00215 if (m->find (NoData::id) != 0)
00216 {
00217 errno = ENOENT;
00218 return -1;
00219 }
00220
00221 Data const* d = static_cast<Data const*>(m->find (Data::id));
00222
00223 ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s));
00224
00225 ACE_OS::memcpy (buf, d->buf (), r);
00226
00227 return r;
00228 }
00229
00230 ssize_t Socket_Impl::
00231 size_ (ACE_Time_Value const* timeout)
00232 {
00233 ACE_Time_Value abs_time;
00234
00235 if (timeout)
00236 abs_time = ACE_OS::gettimeofday () + *timeout;
00237
00238 Lock l (mutex_);
00239
00240 while (queue_.is_empty ())
00241 {
00242 if (timeout)
00243 {
00244 if (cond_.wait (&abs_time) != -1)
00245 break;
00246 }
00247 else
00248 {
00249 if (cond_.wait () != -1)
00250 break;
00251 }
00252
00253 return -1;
00254 }
00255
00256
00257
00258
00259 Message_ptr m;
00260
00261 if (queue_.dequeue_head (m) == -1)
00262 ACE_OS::abort ();
00263
00264 if (queue_.enqueue_head (m) == -1)
00265 ACE_OS::abort ();
00266
00267 if (m->find (NoData::id) != 0)
00268 {
00269 errno = ENOENT;
00270 return -1;
00271 }
00272
00273 Data const* d = static_cast<Data const*>(m->find (Data::id));
00274
00275 return static_cast<ssize_t> (d->size ());
00276 }
00277
00278 ACE_HANDLE Socket_Impl::
00279 get_handle_ ()
00280 {
00281 if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE)
00282 {
00283 signal_pipe_.open ();
00284 }
00285
00286 return signal_pipe_.read_handle ();
00287 }
00288
00289
00290 void Socket_Impl::
00291 recv (Message_ptr m)
00292 {
00293 if (m->find (Data::id) != 0 || m->find (NoData::id) != 0)
00294 {
00295 if (!loop_)
00296 {
00297 Address to (static_cast<To const*> (m->find (To::id))->address ());
00298
00299 Address from (
00300 static_cast<From const*> (m->find (From::id))->address ());
00301
00302 if (to == from)
00303 return;
00304 }
00305
00306 Lock l (mutex_);
00307
00308
00309
00310
00311 bool signal (queue_.is_empty ());
00312
00313 queue_.enqueue_tail (m);
00314
00315 if (signal)
00316 {
00317
00318 if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
00319 {
00320 char c;
00321
00322 if (signal_pipe_.send (&c, 1) != 1)
00323 {
00324
00325 ACE_OS::abort ();
00326 }
00327 }
00328
00329 cond_.signal ();
00330 }
00331 }
00332 }
00333
00334
00335
00336
00337
00338 Socket::
00339 ~Socket ()
00340 {
00341 }
00342
00343 Socket::
00344 Socket (Address const& a, bool loop, Parameters const& params)
00345 : impl_ (new Socket_Impl (a, loop, params))
00346 {
00347 }
00348
00349 void Socket::
00350 send (void const* buf, size_t s)
00351 {
00352 impl_->send_ (buf, s);
00353 }
00354
00355 ssize_t Socket::
00356 recv (void* buf, size_t s)
00357 {
00358 return impl_->recv_ (buf, s, 0, 0);
00359 }
00360
00361 ssize_t Socket::
00362 recv (void* buf, size_t s, ACE_INET_Addr& from)
00363 {
00364 return impl_->recv_ (buf, s, 0, &from);
00365 }
00366
00367 ssize_t Socket::
00368 recv (void* buf, size_t s, ACE_Time_Value const& timeout)
00369 {
00370 return impl_->recv_ (buf, s, &timeout, 0);
00371 }
00372
00373 ssize_t Socket::
00374 recv (void* buf,
00375 size_t s,
00376 ACE_Time_Value const& timeout,
00377 ACE_INET_Addr& from)
00378 {
00379 return impl_->recv_ (buf, s, &timeout, &from);
00380 }
00381
00382 ssize_t Socket::
00383 size ()
00384 {
00385 return impl_->size_ (0);
00386 }
00387
00388 ssize_t Socket::
00389 size (ACE_Time_Value const& timeout)
00390 {
00391 return impl_->size_ (&timeout);
00392 }
00393
00394 ACE_HANDLE Socket::
00395 get_handle ()
00396 {
00397 return impl_->get_handle_ ();
00398 }
00399 }