#include <StreamHandler.h>


Classes | |
| class | NotificationStrategyGuard |
Public Types | |
| typedef StreamHandler < ACE_PEER_STREAM, ACE_SYNCH_USE > | this_type |
| typedef ACE_Svc_Handler < ACE_PEER_STREAM, ACE_SYNCH_USE > | base_type |
| typedef ACE_Message_Queue < ACE_SYNCH_USE > | mq_type |
Public Member Functions | |
| StreamHandler (const ACE_Synch_Options &synch_options=ACE_Synch_Options::defaults, ACE_Thread_Manager *thr_mgr=0, mq_type *mq=0, ACE_Reactor *reactor=ACE_Reactor::instance()) | |
| Constructor. | |
| virtual | ~StreamHandler () |
| Destructor. | |
| virtual int | open (void *=0) |
| Activate the connection. | |
| virtual int | close (u_long flags=0) |
| Close the connection. | |
| virtual int | handle_input (ACE_HANDLE) |
| virtual int | handle_output (ACE_HANDLE) |
| int | read_from_stream (void *buf, size_t length, u_short char_size) |
| Called by streambuffer to read/receive new data from peer. | |
| int | write_to_stream (const void *buf, size_t length, u_short char_size) |
| Called by streambuffer to send new data to peer. | |
| bool | is_connected () const |
| Returns true as long as the connection to peer is active. | |
| bool | using_reactor () const |
| Returns true if StreamHandler has been configured for reactive mode. | |
Private Types | |
| enum | { MAX_INPUT_SIZE = 4096 } |
Private Member Functions | |
| int | handle_output_i (ACE_Time_Value *timeout=0) |
| int | handle_input_i (size_t rdlen, ACE_Time_Value *timeout=0) |
| int | process_input (char *buf, size_t &char_length, u_short char_size, ACE_Time_Value *timeout) |
| processes queued input | |
| bool | use_timeout () const |
| Returns true if a timeout is to be used on IO operations. | |
| bool | char_in_queue (u_short char_size) |
| Returns true is the queued data contains at least char_size bytes. | |
Private Attributes | |
| bool | connected_ |
| ACE_Synch_Options | sync_opt_ |
| bool | send_timeout_ |
| bool | receive_timeout_ |
| ACE_Reactor_Notification_Strategy | notification_strategy_ |
Definition at line 34 of file StreamHandler.h.
| typedef ACE_Svc_Handler<ACE_PEER_STREAM, ACE_SYNCH_USE> ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::base_type |
Definition at line 40 of file StreamHandler.h.
| typedef ACE_Message_Queue<ACE_SYNCH_USE> ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::mq_type |
Definition at line 41 of file StreamHandler.h.
| typedef StreamHandler<ACE_PEER_STREAM, ACE_SYNCH_USE> ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::this_type |
Definition at line 39 of file StreamHandler.h.
anonymous enum [private] |
| ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::StreamHandler | ( | const ACE_Synch_Options & | synch_options = ACE_Synch_Options::defaults, |
|
| ACE_Thread_Manager * | thr_mgr = 0, |
|||
| mq_type * | mq = 0, |
|||
| ACE_Reactor * | reactor = ACE_Reactor::instance () | |||
| ) |
Constructor.
Definition at line 20 of file StreamHandler.cpp.
: ACE_Svc_Handler<ACE_PEER_STREAM, ACE_SYNCH_USE> (thr_mgr, mq, reactor), connected_ (false), send_timeout_ (false), receive_timeout_ (false), notification_strategy_ (reactor, this, ACE_Event_Handler::WRITE_MASK) { INET_TRACE ("ACE_IOS_StreamHandler - ctor"); unsigned long opt = synch_options[ACE_Synch_Options::USE_REACTOR] ? ACE_Synch_Options::USE_REACTOR : 0; if (synch_options[ACE_Synch_Options::USE_TIMEOUT]) opt |= ACE_Synch_Options::USE_TIMEOUT; this->sync_opt_.set (opt, synch_options.timeout (), synch_options.arg ()); }
| ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::~StreamHandler | ( | ) | [virtual] |
Destructor.
Definition at line 45 of file StreamHandler.cpp.
{
INET_TRACE ("ACE_IOS_StreamHandler - dtor");
this->connected_ = false;
}
| bool ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::char_in_queue | ( | u_short | char_size | ) | [private] |
Returns true is the queued data contains at least char_size bytes.
Definition at line 428 of file StreamHandler.cpp.
{
return this->msg_queue ()->message_bytes () >= char_size;
}
| int ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::close | ( | u_long | flags = 0 |
) | [virtual] |
Close the connection.
Reimplemented from ACE_Svc_Handler< ACE_PEER_STREAM, ACE_SYNCH_USE >.
Definition at line 60 of file StreamHandler.cpp.
{
this->connected_ = false;
return base_type::close (flags);
}
| int ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::handle_input | ( | ACE_HANDLE | ) | [virtual] |
Called to handle incoming data when using StreamHandler in reactive mode
Definition at line 67 of file StreamHandler.cpp.
{
// always read non-blocking however much there is
ACE_Time_Value to = ACE_Time_Value::zero;
return this->handle_input_i (MAX_INPUT_SIZE, &to);
}
| int ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::handle_input_i | ( | size_t | rdlen, | |
| ACE_Time_Value * | timeout = 0 | |||
| ) | [private] |
Attempts to send queued data to peer. Called either from handle_output in reactive mode or directly from write_to_stream when non-reactive.
Definition at line 75 of file StreamHandler.cpp.
{
INET_TRACE ("ACE_IOS_StreamHandler::handle_input_i");
char buffer[MAX_INPUT_SIZE];
ssize_t recv_cnt;
size_t bytes_in = 0;
// blocking (with or without timeout) or non-blocking?
bool no_wait = timeout && (*timeout == ACE_Time_Value::zero);
recv_cnt = this->peer ().recv_n (buffer,
rdlen <= sizeof(buffer) ? rdlen : sizeof(buffer),
timeout,
&bytes_in);
if (bytes_in > 0)
{
INET_HEX_DUMP (11, (LM_DEBUG, buffer, bytes_in, DLINFO
ACE_TEXT ("ACE_IOS_StreamHandler::handle_input_i <--")));
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb, ACE_Message_Block (bytes_in), -1);
mb->copy (buffer, bytes_in);
ACE_Time_Value nowait (ACE_OS::gettimeofday ());
if (this->putq (mb, &nowait) == -1)
{
INET_ERROR (1, (LM_ERROR, DLINFO
ACE_TEXT ("ACE_IOS_StreamHandler - discarding input data, "),
ACE_TEXT ("enqueue failed (%d)\n"),
ACE_OS::last_error ()));
mb->release ();
this->connected_ = false;
return -1;
}
}
if (recv_cnt == 0 || (recv_cnt < 0 && !no_wait))
{
if (recv_cnt < 0)
{
INET_ERROR (1, (LM_ERROR, DLINFO
ACE_TEXT ("ACE_IOS_StreamHandler - receive failed (%d)\n"),
ACE_OS::last_error ()));
}
this->connected_ = false;
return this->using_reactor () ? -1 : 0;
}
return 0;
}
| int ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::handle_output | ( | ACE_HANDLE | ) | [virtual] |
Called to handle outgoing data when using StreamHandler in reactive mode
Definition at line 127 of file StreamHandler.cpp.
{
if (this->use_timeout ())
{
ACE_Time_Value to = this->sync_opt_.timeout ();
return this->handle_output_i (&to);
}
else
return this->handle_output_i (0);
}
| int ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::handle_output_i | ( | ACE_Time_Value * | timeout = 0 |
) | [private] |
Attempts to receive data from peer and queue it. Called either from handle_input in reactive mode or directly from read_from_stream when non-reactive.
Definition at line 139 of file StreamHandler.cpp.
{
INET_TRACE ("ACE_IOS_StreamHandler::handle_output_i");
ACE_Message_Block *mb = 0;
ACE_Time_Value nowait (ACE_OS::gettimeofday ());
size_t bytes_out = 0;
if (-1 != this->getq (mb, &nowait))
{
ssize_t send_cnt =
this->peer ().send_n (mb->rd_ptr (), mb->length (), timeout, &bytes_out);
if (bytes_out > 0)
{
INET_HEX_DUMP (11, (LM_DEBUG, mb->rd_ptr (), bytes_out, DLINFO
ACE_TEXT ("ACE_IOS_StreamHandler::handle_output_i -->")));
mb->rd_ptr (static_cast<size_t> (bytes_out));
if (mb->length () > 0)
this->ungetq (mb);
else
mb->release ();
}
if (send_cnt <= 0)
{
INET_ERROR (1, (LM_ERROR, DLINFO
ACE_TEXT ("%p; ACE_IOS_StreamHandler - "),
ACE_TEXT ("send failed\n")));
this->connected_ = false;
return this->using_reactor () ? -1 : 0;
}
}
return (this->msg_queue ()->is_empty ()) ? -1 : 0;
}
| bool ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::is_connected | ( | ) | const |
Returns true as long as the connection to peer is active.
Definition at line 559 of file StreamHandler.cpp.
{
return this->connected_;
}
| int ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::open | ( | void * | = 0 |
) | [virtual] |
Activate the connection.
Reimplemented from ACE_Svc_Handler< ACE_PEER_STREAM, ACE_SYNCH_USE >.
Definition at line 53 of file StreamHandler.cpp.
{
this->connected_ = true;
return 0;
}
| int ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::process_input | ( | char * | buf, | |
| size_t & | char_length, | |||
| u_short | char_size, | |||
| ACE_Time_Value * | timeout | |||
| ) | [private] |
processes queued input
Definition at line 311 of file StreamHandler.cpp.
{
INET_TRACE ("ACE_IOS_StreamHandler::process_input");
ACE_Time_Value wait (ACE_OS::gettimeofday ());
// keep track of how much time we use here
ACE_Countdown_Time timeout_countdown (timeout);
// if timeout specified add it to the abs waittime
// otherwise it's a 'nowait'
if (timeout)
{
wait += *timeout;
timeout_countdown.start ();
}
ACE_Message_Block *mb_remain = 0;
size_t recv_char_count = 0;
while (!this->msg_queue ()->is_empty () && char_length > 0)
{
ACE_Message_Block *mb = 0;
if (this->getq (mb, &wait) == -1)
{
if (ACE_OS::last_error () == EWOULDBLOCK)
break; // timeout; queue still empty
else
return -1; // message queue shut down
}
size_t copy_len = 0;
if (mb_remain)
{
if ((mb_remain->length () + mb->length ()) < char_size)
{
ACE_Message_Block *mb_new = 0;
ACE_NEW_NORETURN (mb,
ACE_Message_Block (mb_remain->length () + mb->length ()));
if (mb_new == 0)
{
mb->release ();
mb_remain->release ();
return -1; // out of memory error
}
mb_new->copy (mb_remain->rd_ptr (), mb_remain->length ());
mb_remain->release ();
mb_new->copy (mb->rd_ptr (), mb->length ());
mb->release ();
mb_remain = mb_new;
continue; // check for next msg block
}
copy_len = (mb_remain->length () > char_length) ?
char_length :
mb_remain->length ();
ACE_OS::memmove (&buf[recv_char_count],
mb_remain->rd_ptr (),
copy_len);
char_length -= copy_len;
recv_char_count += copy_len;
mb_remain->rd_ptr (copy_len);
if (mb_remain->length () > 0)
{
continue; // buffer is full
}
// cleanup empty block
mb_remain->release ();
mb_remain = 0;
}
// normalize to total nr of char_size elements available in mb [+ mb_remain]
size_t total_char_len = ((mb->length () + copy_len)/ char_size) * char_size;
// what was the max we could copy?
size_t max_copy_len = (total_char_len > char_length) ?
char_length :
total_char_len;
// subtract what we possibly already copied from mb_remain
copy_len = max_copy_len - copy_len;
ACE_OS::memmove (&buf[recv_char_count],
mb->rd_ptr (),
copy_len);
recv_char_count += copy_len;
char_length -= copy_len;
mb->rd_ptr (copy_len);
if (mb->length () > 0)
{
mb_remain = mb;
}
else
mb->release ();
}
if (mb_remain)
{
this->ungetq (mb_remain);
}
if (timeout)
{
// stop countdown; update timeout value
timeout_countdown.stop ();
}
return ACE_Utils::truncate_cast<int> (recv_char_count);
}
| int ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::read_from_stream | ( | void * | buf, | |
| size_t | length, | |||
| u_short | char_size | |||
| ) |
Called by streambuffer to read/receive new data from peer.
Definition at line 174 of file StreamHandler.cpp.
{
INET_TRACE ("ACE_IOS_StreamHandler::read_from_stream");
size_t recv_char_count = 0;
char* wptr = (char*)buf;
size_t char_length = length * char_size;
ACE_Time_Value max_wait_time = this->sync_opt_.timeout ();
int result = 0;
if (this->using_reactor ())
{
ACE_thread_t tid;
this->reactor ()->owner (&tid);
bool reactor_thread =
ACE_OS::thr_equal (ACE_Thread::self (), tid) ? true : false;
if (this->connected_)
{
if (this->reactor ()->register_handler(this,
ACE_Event_Handler::READ_MASK) != 0)
{
return -1;
}
}
// run the event loop for the maximum allowed time to get the
// message data in
while ((this->connected_ || this->char_in_queue (char_size)) && char_length > 0)
{
result = 0;
if (reactor_thread && !this->char_in_queue (char_size))
{
// Run the event loop.
result = this->reactor ()->handle_events (this->use_timeout () ?
&max_wait_time : 0);
}
if (result != -1)
{
result = this->process_input (&wptr[recv_char_count],
char_length,
char_size,
this->use_timeout () ?
&max_wait_time : 0);
}
if (result == -1)
{
this->reactor ()->remove_handler (this,
ACE_Event_Handler::READ_MASK);
return -1;
}
recv_char_count += result;
if (recv_char_count > 0)
{
break;
}
if (this->use_timeout () &&
max_wait_time == ACE_Time_Value::zero)
{
this->reactor ()->remove_handler (this,
ACE_Event_Handler::READ_MASK);
this->receive_timeout_ = true;
return -1;
}
}
this->reactor ()->remove_handler (this,
ACE_Event_Handler::READ_MASK);
}
else
{
// non-reactive
// the first read we will try to read as much as possible
// non-blocking
// if that does not result in any data the next read will be
// blocking for 1 char_size data
size_t rdlen = MAX_INPUT_SIZE;
ACE_Time_Value timeout = ACE_Time_Value::zero;
ACE_Time_Value* to = &timeout;
while ((this->connected_ || this->char_in_queue (char_size)) && char_length > 0)
{
if (!this->char_in_queue (char_size))
{
// nothing in queue, so see if there is anything newly arrived
result = this->handle_input_i (rdlen, to);
}
if (result == -1)
return result;
result = this->process_input (&wptr[recv_char_count],
char_length,
char_size,
this->use_timeout () ?
&max_wait_time : 0);
if (result == -1)
return result;
recv_char_count += result;
if (recv_char_count > 0)
{
// if we got any char_size data (either newly read
// or remainder from queue) we quit
break;
}
if (this->use_timeout () &&
max_wait_time == ACE_Time_Value::zero)
{
this->receive_timeout_ = true;
return -1;
}
if (this->connected_ && char_length >0)
{
// nothing has been read the first time round
// now start blocking read 1 char_size data at a time
rdlen = char_size;
to = this->use_timeout () ? &max_wait_time : 0;
}
}
}
return ACE_Utils::truncate_cast<int> (recv_char_count / char_size);
}
| bool ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::use_timeout | ( | ) | const [private] |
Returns true if a timeout is to be used on IO operations.
Definition at line 422 of file StreamHandler.cpp.
{
return this->sync_opt_[ACE_Synch_Options::USE_TIMEOUT];
}
| bool ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::using_reactor | ( | ) | const |
Returns true if StreamHandler has been configured for reactive mode.
Definition at line 565 of file StreamHandler.cpp.
{
return this->sync_opt_[ACE_Synch_Options::USE_REACTOR];
}
| int ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::write_to_stream | ( | const void * | buf, | |
| size_t | length, | |||
| u_short | char_size | |||
| ) |
Called by streambuffer to send new data to peer.
Definition at line 434 of file StreamHandler.cpp.
{
INET_TRACE ("ACE_IOS_StreamHandler::write_to_stream");
// check if we're allowed to control the reactor if reactive
bool use_reactor = this->using_reactor ();
if (use_reactor)
{
ACE_thread_t tid;
this->reactor ()->owner (&tid);
use_reactor =
ACE_OS::thr_equal (ACE_Thread::self (), tid) ? true : false;
}
// set notification strategy if reactive
NotificationStrategyGuard ns_guard__(*this,
use_reactor ?
&this->notification_strategy_ : 0);
size_t datasz = length * char_size;
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb, ACE_Message_Block (datasz), -1);
mb->copy ((const char*)buf, datasz);
ACE_Time_Value nowait (ACE_OS::gettimeofday ());
if (this->putq (mb, &nowait) == -1)
{
INET_ERROR (1, (LM_ERROR, DLINFO
ACE_TEXT ("(%d) ACE_IOS_StreamHandler - discarding output data, "),
ACE_TEXT ("enqueue failed\n"),
ACE_OS::last_error ()));
mb->release ();
return 0;
}
ACE_Time_Value max_wait_time = this->sync_opt_.timeout ();
int result = 0;
if (use_reactor)
{
if (this->reactor ()->register_handler(this,
ACE_Event_Handler::WRITE_MASK) != 0)
{
return -1;
}
// run the event loop for the maximum allowed time to get the
// message data out
while (this->connected_)
{
// Run the event loop.
result = this->reactor ()->handle_events (this->use_timeout () ?
&max_wait_time : 0);
if (result == -1)
{
INET_ERROR (1, (LM_ERROR, DLINFO
ACE_TEXT ("(%d) ACE_IOS_StreamHandler::write_to_stream - ")
ACE_TEXT ("handle_events failed\n"),
ACE_OS::last_error ()));
}
// If we got our message out, no need to run the event loop any
// further.
if (this->msg_queue ()->is_empty ())
{
break;
}
// Did we timeout? If so, stop running the loop.
if (result == 0
&& this->use_timeout ()
&& max_wait_time == ACE_Time_Value::zero)
{
this->reactor ()->remove_handler (this, ACE_Event_Handler::WRITE_MASK);
this->send_timeout_ = true;
return ACE_Utils::truncate_cast<int>
(length - (this->msg_queue ()->message_bytes () / char_size));
}
// Other errors? If so, stop running the loop.
if (result == -1)
{
this->reactor ()->remove_handler (this, ACE_Event_Handler::WRITE_MASK);
return -1;
}
// Otherwise, keep going...
}
}
else
{
while (this->connected_)
{
result = this->handle_output_i (this->use_timeout () ?
&max_wait_time : 0);
// If we got our message out, no need to run the event loop any
// further.
if (this->msg_queue ()->is_empty ())
{
break;
}
// Did we timeout? If so, stop running the loop.
if (result == 0
&& this->use_timeout ()
&& max_wait_time == ACE_Time_Value::zero)
{
this->send_timeout_ = true;
return ACE_Utils::truncate_cast<int>
(length - (this->msg_queue ()->message_bytes () / char_size));
}
// Otherwise, keep going...
}
}
if (this->connected_)
return ACE_Utils::truncate_cast<int> (length); // all sent
else
return ACE_Utils::truncate_cast<int>
(length - (this->msg_queue ()->message_bytes () / char_size));
}
bool ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::connected_ [private] |
Definition at line 106 of file StreamHandler.h.
ACE_Reactor_Notification_Strategy ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::notification_strategy_ [private] |
Definition at line 110 of file StreamHandler.h.
bool ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::receive_timeout_ [private] |
Definition at line 109 of file StreamHandler.h.
bool ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::send_timeout_ [private] |
Definition at line 108 of file StreamHandler.h.
ACE_Synch_Options ACE::IOS::StreamHandler< ACE_PEER_STREAM_1, ACE_SYNCH_DECL >::sync_opt_ [private] |
Definition at line 107 of file StreamHandler.h.
1.7.0