Public Member Functions | |
~Socket_Impl () | |
Socket_Impl (Address const &a, bool loop, Parameters const ¶ms) | |
void | send_ (void const *buf, size_t s) |
ssize_t | recv_ (void *buf, size_t s, ACE_Time_Value const *timeout, ACE_INET_Addr *from) |
ssize_t | size_ (ACE_Time_Value const *timeout) |
ACE_HANDLE | get_handle_ () |
Private Member Functions | |
virtual void | recv (Message_ptr m) |
Private Attributes | |
bool | loop_ |
Parameters const | params_ |
Mutex | mutex_ |
Condition | cond_ |
ACE_Unbounded_Queue< Message_ptr > | queue_ |
ACE_Pipe | signal_pipe_ |
ACE_Auto_Ptr< Fragment > | fragment_ |
ACE_Auto_Ptr< Reassemble > | reassemble_ |
ACE_Auto_Ptr< Acknowledge > | acknowledge_ |
ACE_Auto_Ptr< Retransmit > | retransmit_ |
ACE_Auto_Ptr< Flow > | flow_ |
ACE_Auto_Ptr< Link > | link_ |
|
Definition at line 119 of file Socket.cpp. References acknowledge_, ACE_Pipe::close(), flow_, fragment_, ACE_RMCast::In_Element::in_stop(), ACE_RMCast::Out_Element::out_stop(), ACE_Pipe::read_handle(), reassemble_, retransmit_, and signal_pipe_.
00120 { 00121 // Stop OUT stack from top to bottom. 00122 // 00123 out_stop (); 00124 fragment_->out_stop (); 00125 reassemble_->out_stop (); 00126 acknowledge_->out_stop (); 00127 retransmit_->out_stop (); 00128 flow_->out_stop (); 00129 link_->out_stop (); 00130 00131 // Stop IN stack from bottom up. 00132 // 00133 link_->in_stop (); 00134 flow_->in_stop (); 00135 retransmit_->in_stop (); 00136 acknowledge_->in_stop (); 00137 reassemble_->in_stop (); 00138 fragment_->in_stop (); 00139 in_stop (); 00140 00141 // Close signal pipe. 00142 // 00143 if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE) 00144 signal_pipe_.close (); 00145 } |
|
Definition at line 85 of file Socket.cpp. References acknowledge_, ACE_RMCast::Address, flow_, fragment_, ACE_Auto_Basic_Ptr< X >::get(), ACE_RMCast::In_Element::in_start(), ACE_RMCast::Out_Element::out_start(), reassemble_, ACE_Auto_Basic_Ptr< X >::reset(), and retransmit_.
00086 : loop_ (loop), 00087 params_ (params), 00088 cond_ (mutex_) 00089 { 00090 fragment_.reset (new Fragment (params_)); 00091 reassemble_.reset (new Reassemble (params_)); 00092 acknowledge_.reset (new Acknowledge (params_)); 00093 retransmit_.reset (new Retransmit (params_)); 00094 flow_.reset (new Flow (params_)); 00095 link_.reset (new Link (a, params_)); 00096 00097 // Start IN stack from top to bottom. 00098 // 00099 in_start (0); 00100 fragment_->in_start (this); 00101 reassemble_->in_start (fragment_.get ()); 00102 acknowledge_->in_start (reassemble_.get ()); 00103 retransmit_->in_start (acknowledge_.get ()); 00104 flow_->in_start (retransmit_.get ()); 00105 link_->in_start (flow_.get ()); 00106 00107 // Start OUT stack from bottom up. 00108 // 00109 link_->out_start (0); 00110 flow_->out_start (link_.get ()); 00111 retransmit_->out_start (flow_.get ()); 00112 acknowledge_->out_start (retransmit_.get ()); 00113 reassemble_->out_start (acknowledge_.get ()); 00114 fragment_->out_start (reassemble_.get ()); 00115 out_start (fragment_.get ()); 00116 } |
|
Definition at line 279 of file Socket.cpp. References ACE_Pipe::open(), ACE_Pipe::read_handle(), and signal_pipe_.
00280 { 00281 if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE) 00282 { 00283 signal_pipe_.open (); 00284 } 00285 00286 return signal_pipe_.read_handle (); 00287 } |
|
Reimplemented from ACE_RMCast::In_Element. Definition at line 291 of file Socket.cpp. References ACE_OS::abort(), ACE_RMCast::Address, ACE_Unbounded_Queue< Message_ptr >::enqueue_tail(), ACE_Unbounded_Queue< Message_ptr >::is_empty(), ACE_RMCast::Lock, loop_, ACE_RMCast::Message_ptr, ACE_Pipe::send(), ACE_Condition< MUTEX >::signal(), signal_pipe_, and ACE_Pipe::write_handle().
00292 { 00293 if (m->find (Data::id) != 0 || m->find (NoData::id) != 0) 00294 { 00295 if (!loop_) 00296 { 00297 Address to (static_cast<To const*> (m->find (To::id))->address ()); 00298 00299 Address from ( 00300 static_cast<From const*> (m->find (From::id))->address ()); 00301 00302 if (to == from) 00303 return; 00304 } 00305 00306 Lock l (mutex_); 00307 00308 //if (queue_.size () != 0) 00309 // cerr << "recv socket queue size: " << queue_.size () << endl; 00310 00311 bool signal (queue_.is_empty ()); 00312 00313 queue_.enqueue_tail (m); 00314 00315 if (signal) 00316 { 00317 // Also write to the pipe. 00318 if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE) 00319 { 00320 char c; 00321 00322 if (signal_pipe_.send (&c, 1) != 1) 00323 { 00324 // perror ("write: "); 00325 ACE_OS::abort (); 00326 } 00327 } 00328 00329 cond_.signal (); 00330 } 00331 } 00332 } |
|
Definition at line 161 of file Socket.cpp. References ACE_OS::abort(), ACE_RMCast::Data::buf(), ACE_Unbounded_Queue< Message_ptr >::dequeue_head(), ACE_OS::gettimeofday(), ACE_Unbounded_Queue< Message_ptr >::is_empty(), ACE_RMCast::Lock, ACE_OS::memcpy(), ACE_RMCast::Message_ptr, ACE_OS::perror(), ACE_Pipe::read_handle(), ACE_Pipe::recv(), signal_pipe_, ACE_RMCast::Data::size(), ssize_t, and ACE_Condition< MUTEX >::wait().
00165 { 00166 ACE_Time_Value abs_time; 00167 00168 if (timeout) 00169 abs_time = ACE_OS::gettimeofday () + *timeout; 00170 00171 Lock l (mutex_); 00172 00173 while (queue_.is_empty ()) 00174 { 00175 if (timeout) 00176 { 00177 if (cond_.wait (&abs_time) != -1) 00178 break; 00179 } 00180 else 00181 { 00182 if (cond_.wait () != -1) 00183 break; 00184 } 00185 00186 return -1; // errno is already set 00187 } 00188 00189 00190 Message_ptr m; 00191 00192 if (queue_.dequeue_head (m) == -1) 00193 ACE_OS::abort (); 00194 00195 00196 if (queue_.is_empty ()) 00197 { 00198 // Remove data from the pipe. 00199 // 00200 if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE) 00201 { 00202 char c; 00203 00204 if (signal_pipe_.recv (&c, 1) != 1) 00205 { 00206 ACE_OS::perror ("read: "); 00207 ACE_OS::abort (); 00208 } 00209 } 00210 } 00211 00212 if (from) 00213 *from = static_cast<From const*> (m->find (From::id))->address (); 00214 00215 if (m->find (NoData::id) != 0) 00216 { 00217 errno = ENOENT; 00218 return -1; 00219 } 00220 00221 Data const* d = static_cast<Data const*>(m->find (Data::id)); 00222 00223 ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s)); 00224 00225 ACE_OS::memcpy (buf, d->buf (), r); 00226 00227 return r; 00228 } |
|
Definition at line 149 of file Socket.cpp. References ACE_RMCast::Message_ptr, and ACE_RMCast::Profile_ptr.
00150 { 00151 Message_ptr m (new Message); 00152 00153 m->add (Profile_ptr (new Data (buf, s))); 00154 00155 // Qualification is for VC6 and VxWorks. 00156 // 00157 Element::send (m); 00158 } |
|
Definition at line 231 of file Socket.cpp. References ACE_OS::abort(), ACE_Unbounded_Queue< Message_ptr >::dequeue_head(), ACE_Unbounded_Queue< Message_ptr >::enqueue_head(), ACE_OS::gettimeofday(), ACE_Unbounded_Queue< Message_ptr >::is_empty(), ACE_RMCast::Lock, ACE_RMCast::Message_ptr, ACE_RMCast::Data::size(), and ACE_Condition< MUTEX >::wait().
00232 { 00233 ACE_Time_Value abs_time; 00234 00235 if (timeout) 00236 abs_time = ACE_OS::gettimeofday () + *timeout; 00237 00238 Lock l (mutex_); 00239 00240 while (queue_.is_empty ()) 00241 { 00242 if (timeout) 00243 { 00244 if (cond_.wait (&abs_time) != -1) 00245 break; 00246 } 00247 else 00248 { 00249 if (cond_.wait () != -1) 00250 break; 00251 } 00252 00253 return -1; // errno is already set 00254 } 00255 00256 // I can't get the head of the queue without actually dequeuing 00257 // the element. 00258 // 00259 Message_ptr m; 00260 00261 if (queue_.dequeue_head (m) == -1) 00262 ACE_OS::abort (); 00263 00264 if (queue_.enqueue_head (m) == -1) 00265 ACE_OS::abort (); 00266 00267 if (m->find (NoData::id) != 0) 00268 { 00269 errno = ENOENT; 00270 return -1; 00271 } 00272 00273 Data const* d = static_cast<Data const*>(m->find (Data::id)); 00274 00275 return static_cast<ssize_t> (d->size ()); 00276 } |
|
Definition at line 77 of file Socket.cpp. Referenced by Socket_Impl(), and ~Socket_Impl(). |
|
Definition at line 69 of file Socket.cpp. |
|
Definition at line 79 of file Socket.cpp. Referenced by Socket_Impl(), and ~Socket_Impl(). |
|
Definition at line 75 of file Socket.cpp. Referenced by Socket_Impl(), and ~Socket_Impl(). |
|
Definition at line 80 of file Socket.cpp. |
|
Definition at line 65 of file Socket.cpp. Referenced by recv(). |
|
Definition at line 68 of file Socket.cpp. |
|
Definition at line 66 of file Socket.cpp. |
|
Definition at line 71 of file Socket.cpp. |
|
Definition at line 76 of file Socket.cpp. Referenced by Socket_Impl(), and ~Socket_Impl(). |
|
Definition at line 78 of file Socket.cpp. Referenced by Socket_Impl(), and ~Socket_Impl(). |
|
Definition at line 73 of file Socket.cpp. Referenced by get_handle_(), recv(), recv_(), and ~Socket_Impl(). |