Unblock the ACE_Select_Reactor from its event loop. More...
#include <Select_Reactor_Base.h>


Public Member Functions | |
| ACE_Select_Reactor_Notify (void) | |
| Constructor. | |
| virtual | ~ACE_Select_Reactor_Notify (void) |
| Destructor. | |
| virtual int | open (ACE_Reactor_Impl *, ACE_Timer_Queue *=0, int disable_notify_pipe=ACE_DISABLE_NOTIFY_PIPE_DEFAULT) |
| Initialize. | |
| virtual int | close (void) |
| Destroy. | |
| virtual int | notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0) |
| virtual int | dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask) |
| virtual ACE_HANDLE | notify_handle (void) |
| virtual int | dispatch_notify (ACE_Notification_Buffer &buffer) |
| virtual int | read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer) |
| virtual int | is_dispatchable (ACE_Notification_Buffer &buffer) |
| Verify whether the buffer has dispatchable info or not. | |
| virtual int | handle_input (ACE_HANDLE handle) |
| virtual void | max_notify_iterations (int) |
| virtual int | max_notify_iterations (void) |
| virtual int | purge_pending_notifications (ACE_Event_Handler *sh, ACE_Reactor_Mask mask=ACE_Event_Handler::ALL_EVENTS_MASK) |
| virtual void | dump (void) const |
| Dump the state of an object. | |
Public Attributes | |
| ACE_ALLOC_HOOK_DECLARE | |
| Declare the dynamic allocation hooks. | |
Protected Attributes | |
| ACE_Select_Reactor_Impl * | select_reactor_ |
| ACE_Pipe | notification_pipe_ |
| int | max_notify_iterations_ |
Unblock the ACE_Select_Reactor from its event loop.
This implementation is necessary for cases where the ACE_Select_Reactor is run in a multi-threaded program. In this case, we need to be able to unblock select or poll when updates occur other than in the main ACE_Select_Reactor thread. To do this, we signal an auto-reset event the ACE_Select_Reactor is listening on. If an ACE_Event_Handler and ACE_Select_Reactor_Mask is passed to notify, the appropriate handle_* method is dispatched in the context of the ACE_Select_Reactor thread.
Definition at line 133 of file Select_Reactor_Base.h.
| ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify | ( | void | ) |
Constructor.
Definition at line 537 of file Select_Reactor_Base.cpp.
: max_notify_iterations_ (-1) { }
| ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify | ( | void | ) | [virtual] |
| int ACE_Select_Reactor_Notify::close | ( | void | ) | [virtual] |
Destroy.
Implements ACE_Reactor_Notify.
Definition at line 649 of file Select_Reactor_Base.cpp.
{
ACE_TRACE ("ACE_Select_Reactor_Notify::close");
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
notification_queue_.reset();
#else
if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
{
// Please see Bug 2820, if we just close the pipe then we break
// the reference counting rules. Basically, all the event
// handlers "stored" in the pipe had their reference counts
// increased. We need to decrease them before closing the
// pipe....
ACE_Notification_Buffer b;
for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
r > 0;
r = read_notify_pipe(notification_pipe_.read_handle(), b))
{
if (b.eh_ != 0)
{
b.eh_->remove_reference();
}
}
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return this->notification_pipe_.close ();
}
| int ACE_Select_Reactor_Notify::dispatch_notifications | ( | int & | number_of_active_handles, | |
| ACE_Handle_Set & | rd_mask | |||
| ) | [virtual] |
Handles pending threads (if any) that are waiting to unblock the ACE_Select_Reactor.
Implements ACE_Reactor_Notify.
Definition at line 737 of file Select_Reactor_Base.cpp.
{
ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
ACE_HANDLE const read_handle =
this->notification_pipe_.read_handle ();
if (read_handle != ACE_INVALID_HANDLE
&& rd_mask.is_set (read_handle))
{
--number_of_active_handles;
rd_mask.clr_bit (read_handle);
return this->handle_input (read_handle);
}
else
return 0;
}
| int ACE_Select_Reactor_Notify::dispatch_notify | ( | ACE_Notification_Buffer & | buffer | ) | [virtual] |
Handle one of the notify call on the handle. This could be because of a thread trying to unblock the Reactor_Impl
Implements ACE_Reactor_Notify.
Definition at line 791 of file Select_Reactor_Base.cpp.
{
int result = 0;
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
// Dispatch one message from the notify queue, and put another in
// the pipe if one is available. Remember, the idea is to keep
// exactly one message in the pipe at a time.
bool more_messages_queued = false;
ACE_Notification_Buffer next;
result = notification_queue_.pop_next_notification(buffer,
more_messages_queued,
next);
if (result == 0 || result == -1)
{
return result;
}
if(more_messages_queued)
{
(void) ACE::send(this->notification_pipe_.write_handle(),
(char *)&next, sizeof(ACE_Notification_Buffer));
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// If eh == 0 then another thread is unblocking the
// <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
// internal structures. Otherwise, we need to dispatch the
// appropriate handle_* method on the <ACE_Event_Handler> pointer
// we've been passed.
if (buffer.eh_ != 0)
{
ACE_Event_Handler *event_handler = buffer.eh_;
bool const requires_reference_counting =
event_handler->reference_counting_policy ().value () ==
ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
switch (buffer.mask_)
{
case ACE_Event_Handler::READ_MASK:
case ACE_Event_Handler::ACCEPT_MASK:
result = event_handler->handle_input (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::WRITE_MASK:
result = event_handler->handle_output (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::EXCEPT_MASK:
result = event_handler->handle_exception (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::QOS_MASK:
result = event_handler->handle_qos (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::GROUP_QOS_MASK:
result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
break;
default:
// Should we bail out if we get an invalid mask?
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("invalid mask = %d\n"),
buffer.mask_));
}
if (result == -1)
event_handler->handle_close (ACE_INVALID_HANDLE,
ACE_Event_Handler::EXCEPT_MASK);
if (requires_reference_counting)
{
event_handler->remove_reference ();
}
}
return 1;
}
| void ACE_Select_Reactor_Notify::dump | ( | void | ) | const [virtual] |
Dump the state of an object.
Implements ACE_Reactor_Notify.
Definition at line 586 of file Select_Reactor_Base.cpp.
{
#if defined (ACE_HAS_DUMP)
ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
this->notification_pipe_.dump ();
ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
#endif /* ACE_HAS_DUMP */
}
| int ACE_Select_Reactor_Notify::handle_input | ( | ACE_HANDLE | handle | ) | [virtual] |
Called back by the ACE_Select_Reactor when a thread wants to unblock us.
Reimplemented from ACE_Event_Handler.
Definition at line 927 of file Select_Reactor_Base.cpp.
{
ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
// Precondition: this->select_reactor_.token_.current_owner () ==
// ACE_Thread::self ();
int number_dispatched = 0;
int result = 0;
ACE_Notification_Buffer buffer;
// If there is only one buffer in the pipe, this will loop and call
// read_notify_pipe() twice. The first time will read the buffer, and
// the second will read the fact that the pipe is empty.
while ((result = this->read_notify_pipe (handle, buffer)) > 0)
{
// Dispatch the buffer
// NOTE: We count only if we made any dispatches ie. upcalls.
if (this->dispatch_notify (buffer) > 0)
++number_dispatched;
// Bail out if we've reached the <notify_threshold_>. Note that
// by default <notify_threshold_> is -1, so we'll loop until all
// the notifications in the pipe have been dispatched.
if (number_dispatched == this->max_notify_iterations_)
break;
}
// Reassign number_dispatched to -1 if things have gone seriously
// wrong.
if (result < 0)
number_dispatched = -1;
// Enqueue ourselves into the list of waiting threads. When we
// reacquire the token we'll be off and running again with ownership
// of the token. The postcondition of this call is that
// <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
this->select_reactor_->renew ();
return number_dispatched;
}
| int ACE_Select_Reactor_Notify::is_dispatchable | ( | ACE_Notification_Buffer & | buffer | ) | [virtual] |
Verify whether the buffer has dispatchable info or not.
Implements ACE_Reactor_Notify.
Definition at line 767 of file Select_Reactor_Base.cpp.
{
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
ACE_UNUSED_ARG(buffer);
return 1;
#else
// If eh == 0 then another thread is unblocking the
// <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
// internal structures. Otherwise, we need to dispatch the
// appropriate handle_* method on the <ACE_Event_Handler>
// pointer we've been passed.
if (buffer.eh_ != 0)
{
return 1;
}
else
{
// has no dispatchable buffer
return 0;
}
#endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
}
| void ACE_Select_Reactor_Notify::max_notify_iterations | ( | int | iterations | ) | [virtual] |
Set the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.
Implements ACE_Reactor_Notify.
Definition at line 547 of file Select_Reactor_Base.cpp.
{
// Must always be > 0 or < 0 to optimize the loop exit condition.
if (iterations == 0)
iterations = 1;
this->max_notify_iterations_ = iterations;
}
| int ACE_Select_Reactor_Notify::max_notify_iterations | ( | void | ) | [virtual] |
Get the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop.
Implements ACE_Reactor_Notify.
Definition at line 557 of file Select_Reactor_Base.cpp.
{
return this->max_notify_iterations_;
}
| int ACE_Select_Reactor_Notify::notify | ( | ACE_Event_Handler * | event_handler = 0, |
|
| ACE_Reactor_Mask | mask = ACE_Event_Handler::EXCEPT_MASK, |
|||
| ACE_Time_Value * | timeout = 0 | |||
| ) | [virtual] |
Called by a thread when it wants to unblock the ACE_Select_Reactor. This wakeups the ACE_Select_Reactor if currently blocked in select/poll. Pass over both the Event_Handler *and* the mask to allow the caller to dictate which Event_Handler method the ACE_Select_Reactor will invoke. The ACE_Time_Value indicates how long to blocking trying to notify the ACE_Select_Reactor. If timeout == 0, the caller will block until action is possible, else will wait until the relative time specified in *timeout elapses).
Implements ACE_Reactor_Notify.
Definition at line 680 of file Select_Reactor_Base.cpp.
{
ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
// Just consider this method a "no-op" if there's no
// <ACE_Select_Reactor> configured.
if (this->select_reactor_ == 0)
return 0;
ACE_Event_Handler_var safe_handler (event_handler);
if (event_handler)
{
event_handler->add_reference ();
}
ACE_Notification_Buffer buffer (event_handler, mask);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
int const notification_required =
notification_queue_.push_new_notification(buffer);
if (notification_required == -1)
{
return -1;
}
if (notification_required == 0)
{
// No failures, the handler is now owned by the notification queue
safe_handler.release ();
return 0;
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
(char *) &buffer,
sizeof buffer,
timeout);
if (n == -1)
{
return -1;
}
// No failures.
safe_handler.release ();
return 0;
}
| ACE_HANDLE ACE_Select_Reactor_Notify::notify_handle | ( | void | ) | [virtual] |
Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor
Implements ACE_Reactor_Notify.
Definition at line 758 of file Select_Reactor_Base.cpp.
{
ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
return this->notification_pipe_.read_handle ();
}
| int ACE_Select_Reactor_Notify::open | ( | ACE_Reactor_Impl * | r, | |
| ACE_Timer_Queue * | = 0, |
|||
| int | disable_notify_pipe = ACE_DISABLE_NOTIFY_PIPE_DEFAULT | |||
| ) | [virtual] |
Initialize.
Implements ACE_Reactor_Notify.
Definition at line 599 of file Select_Reactor_Base.cpp.
{
ACE_TRACE ("ACE_Select_Reactor_Notify::open");
if (disable_notify_pipe == 0)
{
this->select_reactor_ =
dynamic_cast<ACE_Select_Reactor_Impl *> (r);
if (select_reactor_ == 0)
{
errno = EINVAL;
return -1;
}
if (this->notification_pipe_.open () == -1)
return -1;
#if defined (F_SETFD)
ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
#endif /* F_SETFD */
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
if (notification_queue_.open() == -1)
{
return -1;
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// There seems to be a Win32 bug with this... Set this into
// non-blocking mode.
if (ACE::set_flags (this->notification_pipe_.read_handle (),
ACE_NONBLOCK) == -1)
return -1;
else
return this->select_reactor_->register_handler
(this->notification_pipe_.read_handle (),
this,
ACE_Event_Handler::READ_MASK);
}
else
{
this->select_reactor_ = 0;
return 0;
}
}
| int ACE_Select_Reactor_Notify::purge_pending_notifications | ( | ACE_Event_Handler * | sh, | |
| ACE_Reactor_Mask | mask = ACE_Event_Handler::ALL_EVENTS_MASK | |||
| ) | [virtual] |
Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. If eh == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error.
Implements ACE_Reactor_Notify.
Definition at line 569 of file Select_Reactor_Base.cpp.
{
ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
return notification_queue_.purge_pending_notifications(eh, mask);
#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
ACE_UNUSED_ARG (eh);
ACE_UNUSED_ARG (mask);
ACE_NOTSUP_RETURN (-1);
#endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
}
| int ACE_Select_Reactor_Notify::read_notify_pipe | ( | ACE_HANDLE | handle, | |
| ACE_Notification_Buffer & | buffer | |||
| ) | [virtual] |
Read one of the notify call on the handle into the buffer. This could be because of a thread trying to unblock the Reactor_Impl
Return value semantics for this are: -1: nothing read, fatal, unrecoverable error 0: nothing read at all 1: complete buffer read
Implements ACE_Reactor_Notify.
Definition at line 871 of file Select_Reactor_Base.cpp.
{
ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
// This is kind of a weird, fragile beast. We first read with a
// regular read. The read side of this socket is non-blocking, so
// the read may end up being short.
//
// If the read is short, then we do a recv_n to insure that we block
// and read the rest of the buffer.
//
// Now, you might be tempted to say, "why don't we just replace the
// first recv with a recv_n?" I was, too. But that doesn't work
// because of how the calling code in handle_input() works. In
// handle_input, the event will only be dispatched if the return
// value from read_notify_pipe() is > 0. That means that we can't
// return zero from this func unless it's an EOF condition.
//
// Thus, the return value semantics for this are:
// -1: nothing read, fatal, unrecoverable error
// 0: nothing read at all
// 1: complete buffer read
ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
if (n > 0)
{
// Check to see if we've got a short read.
if (n != sizeof buffer)
{
ssize_t const remainder = sizeof buffer - n;
// If so, try to recover by reading the remainder. If this
// doesn't work we're in big trouble since the input stream
// won't be aligned correctly. I'm not sure quite what to
// do at this point. It's probably best just to return -1.
if (ACE::recv_n (handle,
((char *) &buffer) + n,
remainder) != remainder)
return -1;
}
return 1;
}
// Return -1 if things have gone seriously wrong.
if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
return -1;
return 0;
}
Declare the dynamic allocation hooks.
Definition at line 233 of file Select_Reactor_Base.h.
int ACE_Select_Reactor_Notify::max_notify_iterations_ [protected] |
Keeps track of the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty."
Definition at line 257 of file Select_Reactor_Base.h.
Contains the ACE_HANDLE the ACE_Select_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Select_Reactor will write to.
Definition at line 248 of file Select_Reactor_Base.h.
Keep a back pointer to the ACE_Select_Reactor. If this value if NULL then the ACE_Select_Reactor has been initialized with disable_notify_pipe.
Definition at line 241 of file Select_Reactor_Base.h.
1.7.0