Public Member Functions | Public Attributes | Protected Attributes

ACE_Select_Reactor_Notify Class Reference

Unblock the ACE_Select_Reactor from its event loop. More...

#include <Select_Reactor_Base.h>

Inheritance diagram for ACE_Select_Reactor_Notify:
Inheritance graph
[legend]
Collaboration diagram for ACE_Select_Reactor_Notify:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 ACE_Select_Reactor_Notify (void)
 Constructor.
virtual ~ACE_Select_Reactor_Notify (void)
 Destructor.
virtual int open (ACE_Reactor_Impl *, ACE_Timer_Queue *=0, int disable_notify_pipe=ACE_DISABLE_NOTIFY_PIPE_DEFAULT)
 Initialize.
virtual int close (void)
 Destroy.
virtual int notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask)
virtual ACE_HANDLE notify_handle (void)
virtual int dispatch_notify (ACE_Notification_Buffer &buffer)
virtual int read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer)
virtual int is_dispatchable (ACE_Notification_Buffer &buffer)
 Verify whether the buffer has dispatchable info or not.
virtual int handle_input (ACE_HANDLE handle)
virtual void max_notify_iterations (int)
virtual int max_notify_iterations (void)
virtual int purge_pending_notifications (ACE_Event_Handler *sh, ACE_Reactor_Mask mask=ACE_Event_Handler::ALL_EVENTS_MASK)
virtual void dump (void) const
 Dump the state of an object.

Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.

Protected Attributes

ACE_Select_Reactor_Implselect_reactor_
ACE_Pipe notification_pipe_
int max_notify_iterations_

Detailed Description

Unblock the ACE_Select_Reactor from its event loop.

This implementation is necessary for cases where the ACE_Select_Reactor is run in a multi-threaded program. In this case, we need to be able to unblock select or poll when updates occur other than in the main ACE_Select_Reactor thread. To do this, we signal an auto-reset event the ACE_Select_Reactor is listening on. If an ACE_Event_Handler and ACE_Select_Reactor_Mask is passed to notify, the appropriate handle_* method is dispatched in the context of the ACE_Select_Reactor thread.

Definition at line 133 of file Select_Reactor_Base.h.


Constructor & Destructor Documentation

ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify ( void   ) 

Constructor.

Definition at line 537 of file Select_Reactor_Base.cpp.

ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify ( void   )  [virtual]

Destructor.

Definition at line 542 of file Select_Reactor_Base.cpp.

{
}


Member Function Documentation

int ACE_Select_Reactor_Notify::close ( void   )  [virtual]

Destroy.

Implements ACE_Reactor_Notify.

Definition at line 649 of file Select_Reactor_Base.cpp.

{
  ACE_TRACE ("ACE_Select_Reactor_Notify::close");

#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
  notification_queue_.reset();
#else
  if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
    {
      // Please see Bug 2820, if we just close the pipe then we break
      // the reference counting rules.  Basically, all the event
      // handlers "stored" in the pipe had their reference counts
      // increased.  We need to decrease them before closing the
      // pipe....
      ACE_Notification_Buffer b;
      for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
           r > 0;
           r = read_notify_pipe(notification_pipe_.read_handle(), b))
        {
          if (b.eh_ != 0)
            {
              b.eh_->remove_reference();
            }
        }
    }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */

  return this->notification_pipe_.close ();
}

int ACE_Select_Reactor_Notify::dispatch_notifications ( int &  number_of_active_handles,
ACE_Handle_Set rd_mask 
) [virtual]

Handles pending threads (if any) that are waiting to unblock the ACE_Select_Reactor.

Implements ACE_Reactor_Notify.

Definition at line 737 of file Select_Reactor_Base.cpp.

{
  ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");

  ACE_HANDLE const read_handle =
    this->notification_pipe_.read_handle ();

  if (read_handle != ACE_INVALID_HANDLE
      && rd_mask.is_set (read_handle))
    {
      --number_of_active_handles;
      rd_mask.clr_bit (read_handle);
      return this->handle_input (read_handle);
    }
  else
    return 0;
}

int ACE_Select_Reactor_Notify::dispatch_notify ( ACE_Notification_Buffer buffer  )  [virtual]

Handle one of the notify call on the handle. This could be because of a thread trying to unblock the Reactor_Impl

Implements ACE_Reactor_Notify.

Definition at line 791 of file Select_Reactor_Base.cpp.

{
  int result = 0;

#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
  // Dispatch one message from the notify queue, and put another in
  // the pipe if one is available.  Remember, the idea is to keep
  // exactly one message in the pipe at a time.

  bool more_messages_queued = false;
  ACE_Notification_Buffer next;

  result = notification_queue_.pop_next_notification(buffer,
                                                     more_messages_queued,
                                                     next);

  if (result == 0 || result == -1)
    {
      return result;
    }

  if(more_messages_queued)
    {
      (void) ACE::send(this->notification_pipe_.write_handle(),
            (char *)&next, sizeof(ACE_Notification_Buffer));
    }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */

  // If eh == 0 then another thread is unblocking the
  // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
  // internal structures.  Otherwise, we need to dispatch the
  // appropriate handle_* method on the <ACE_Event_Handler> pointer
  // we've been passed.
  if (buffer.eh_ != 0)
    {
      ACE_Event_Handler *event_handler = buffer.eh_;

      bool const requires_reference_counting =
        event_handler->reference_counting_policy ().value () ==
        ACE_Event_Handler::Reference_Counting_Policy::ENABLED;

      switch (buffer.mask_)
        {
        case ACE_Event_Handler::READ_MASK:
        case ACE_Event_Handler::ACCEPT_MASK:
          result = event_handler->handle_input (ACE_INVALID_HANDLE);
          break;
        case ACE_Event_Handler::WRITE_MASK:
          result = event_handler->handle_output (ACE_INVALID_HANDLE);
          break;
        case ACE_Event_Handler::EXCEPT_MASK:
          result = event_handler->handle_exception (ACE_INVALID_HANDLE);
          break;
        case ACE_Event_Handler::QOS_MASK:
          result = event_handler->handle_qos (ACE_INVALID_HANDLE);
          break;
        case ACE_Event_Handler::GROUP_QOS_MASK:
          result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
          break;
        default:
          // Should we bail out if we get an invalid mask?
          ACE_ERROR ((LM_ERROR,
                      ACE_TEXT ("invalid mask = %d\n"),
                      buffer.mask_));
        }

      if (result == -1)
        event_handler->handle_close (ACE_INVALID_HANDLE,
                                     ACE_Event_Handler::EXCEPT_MASK);

      if (requires_reference_counting)
        {
          event_handler->remove_reference ();
        }
    }

  return 1;
}

void ACE_Select_Reactor_Notify::dump ( void   )  const [virtual]

Dump the state of an object.

Implements ACE_Reactor_Notify.

Definition at line 586 of file Select_Reactor_Base.cpp.

{
#if defined (ACE_HAS_DUMP)
  ACE_TRACE ("ACE_Select_Reactor_Notify::dump");

  ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
  this->notification_pipe_.dump ();
  ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
#endif /* ACE_HAS_DUMP */
}

int ACE_Select_Reactor_Notify::handle_input ( ACE_HANDLE  handle  )  [virtual]

Called back by the ACE_Select_Reactor when a thread wants to unblock us.

Reimplemented from ACE_Event_Handler.

Definition at line 927 of file Select_Reactor_Base.cpp.

{
  ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
  // Precondition: this->select_reactor_.token_.current_owner () ==
  // ACE_Thread::self ();

  int number_dispatched = 0;
  int result = 0;
  ACE_Notification_Buffer buffer;

  // If there is only one buffer in the pipe, this will loop and call
  // read_notify_pipe() twice.  The first time will read the buffer, and
  // the second will read the fact that the pipe is empty.
  while ((result = this->read_notify_pipe (handle, buffer)) > 0)
    {
      // Dispatch the buffer
      // NOTE: We count only if we made any dispatches ie. upcalls.
      if (this->dispatch_notify (buffer) > 0)
        ++number_dispatched;

      // Bail out if we've reached the <notify_threshold_>.  Note that
      // by default <notify_threshold_> is -1, so we'll loop until all
      // the notifications in the pipe have been dispatched.
      if (number_dispatched == this->max_notify_iterations_)
        break;
    }

  // Reassign number_dispatched to -1 if things have gone seriously
  // wrong.
  if (result < 0)
    number_dispatched = -1;

  // Enqueue ourselves into the list of waiting threads.  When we
  // reacquire the token we'll be off and running again with ownership
  // of the token.  The postcondition of this call is that
  // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
  this->select_reactor_->renew ();
  return number_dispatched;
}

int ACE_Select_Reactor_Notify::is_dispatchable ( ACE_Notification_Buffer buffer  )  [virtual]

Verify whether the buffer has dispatchable info or not.

Implements ACE_Reactor_Notify.

Definition at line 767 of file Select_Reactor_Base.cpp.

{
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
  ACE_UNUSED_ARG(buffer);
  return 1;
#else
  // If eh == 0 then another thread is unblocking the
  // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
  // internal structures.  Otherwise, we need to dispatch the
  // appropriate handle_* method on the <ACE_Event_Handler>
  // pointer we've been passed.
  if (buffer.eh_ != 0)
    {
      return 1;
    }
  else
    {
      // has no dispatchable buffer
      return 0;
    }
#endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
}

void ACE_Select_Reactor_Notify::max_notify_iterations ( int  iterations  )  [virtual]

Set the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.

Implements ACE_Reactor_Notify.

Definition at line 547 of file Select_Reactor_Base.cpp.

{
  // Must always be > 0 or < 0 to optimize the loop exit condition.
  if (iterations == 0)
    iterations = 1;

  this->max_notify_iterations_ = iterations;
}

int ACE_Select_Reactor_Notify::max_notify_iterations ( void   )  [virtual]

Get the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop.

Implements ACE_Reactor_Notify.

Definition at line 557 of file Select_Reactor_Base.cpp.

{
  return this->max_notify_iterations_;
}

int ACE_Select_Reactor_Notify::notify ( ACE_Event_Handler event_handler = 0,
ACE_Reactor_Mask  mask = ACE_Event_Handler::EXCEPT_MASK,
ACE_Time_Value timeout = 0 
) [virtual]

Called by a thread when it wants to unblock the ACE_Select_Reactor. This wakeups the ACE_Select_Reactor if currently blocked in select/poll. Pass over both the Event_Handler *and* the mask to allow the caller to dictate which Event_Handler method the ACE_Select_Reactor will invoke. The ACE_Time_Value indicates how long to blocking trying to notify the ACE_Select_Reactor. If timeout == 0, the caller will block until action is possible, else will wait until the relative time specified in *timeout elapses).

Implements ACE_Reactor_Notify.

Definition at line 680 of file Select_Reactor_Base.cpp.

{
  ACE_TRACE ("ACE_Select_Reactor_Notify::notify");

  // Just consider this method a "no-op" if there's no
  // <ACE_Select_Reactor> configured.
  if (this->select_reactor_ == 0)
    return 0;

  ACE_Event_Handler_var safe_handler (event_handler);

  if (event_handler)
    {
      event_handler->add_reference ();
    }

  ACE_Notification_Buffer buffer (event_handler, mask);

#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
  int const notification_required =
    notification_queue_.push_new_notification(buffer);

  if (notification_required == -1)
    {
      return -1;
    }

  if (notification_required == 0)
    {
      // No failures, the handler is now owned by the notification queue
      safe_handler.release ();

      return 0;
    }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */

  ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
                               (char *) &buffer,
                               sizeof buffer,
                               timeout);
  if (n == -1)
    {
      return -1;
    }

  // No failures.
  safe_handler.release ();

  return 0;
}

ACE_HANDLE ACE_Select_Reactor_Notify::notify_handle ( void   )  [virtual]

Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor

Implements ACE_Reactor_Notify.

Definition at line 758 of file Select_Reactor_Base.cpp.

{
  ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");

  return this->notification_pipe_.read_handle ();
}

int ACE_Select_Reactor_Notify::open ( ACE_Reactor_Impl r,
ACE_Timer_Queue = 0,
int  disable_notify_pipe = ACE_DISABLE_NOTIFY_PIPE_DEFAULT 
) [virtual]

Initialize.

Implements ACE_Reactor_Notify.

Definition at line 599 of file Select_Reactor_Base.cpp.

{
  ACE_TRACE ("ACE_Select_Reactor_Notify::open");

  if (disable_notify_pipe == 0)
    {
      this->select_reactor_ =
        dynamic_cast<ACE_Select_Reactor_Impl *> (r);

      if (select_reactor_ == 0)
        {
          errno = EINVAL;
          return -1;
        }

      if (this->notification_pipe_.open () == -1)
        return -1;
#if defined (F_SETFD)
      ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
      ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
#endif /* F_SETFD */

#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
      if (notification_queue_.open() == -1)
        {
          return -1;
        }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */

      // There seems to be a Win32 bug with this...  Set this into
      // non-blocking mode.
      if (ACE::set_flags (this->notification_pipe_.read_handle (),
                          ACE_NONBLOCK) == -1)
        return -1;
      else
        return this->select_reactor_->register_handler
          (this->notification_pipe_.read_handle (),
           this,
           ACE_Event_Handler::READ_MASK);
    }
  else
    {
      this->select_reactor_ = 0;
      return 0;
    }
}

int ACE_Select_Reactor_Notify::purge_pending_notifications ( ACE_Event_Handler sh,
ACE_Reactor_Mask  mask = ACE_Event_Handler::ALL_EVENTS_MASK 
) [virtual]

Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. If eh == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error.

Implements ACE_Reactor_Notify.

Definition at line 569 of file Select_Reactor_Base.cpp.

{
  ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");

#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)

  return notification_queue_.purge_pending_notifications(eh, mask);

#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
  ACE_UNUSED_ARG (eh);
  ACE_UNUSED_ARG (mask);
  ACE_NOTSUP_RETURN (-1);
#endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
}

int ACE_Select_Reactor_Notify::read_notify_pipe ( ACE_HANDLE  handle,
ACE_Notification_Buffer buffer 
) [virtual]

Read one of the notify call on the handle into the buffer. This could be because of a thread trying to unblock the Reactor_Impl

Return value semantics for this are: -1: nothing read, fatal, unrecoverable error 0: nothing read at all 1: complete buffer read

Implements ACE_Reactor_Notify.

Definition at line 871 of file Select_Reactor_Base.cpp.

{
  ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");

  // This is kind of a weird, fragile beast.  We first read with a
  // regular read.  The read side of this socket is non-blocking, so
  // the read may end up being short.
  //
  // If the read is short, then we do a recv_n to insure that we block
  // and read the rest of the buffer.
  //
  // Now, you might be tempted to say, "why don't we just replace the
  // first recv with a recv_n?"  I was, too.  But that doesn't work
  // because of how the calling code in handle_input() works.  In
  // handle_input, the event will only be dispatched if the return
  // value from read_notify_pipe() is > 0.  That means that we can't
  // return zero from this func unless it's an EOF condition.
  //
  // Thus, the return value semantics for this are:
  // -1: nothing read, fatal, unrecoverable error
  // 0: nothing read at all
  // 1: complete buffer read

  ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);

  if (n > 0)
    {
      // Check to see if we've got a short read.
      if (n != sizeof buffer)
        {
          ssize_t const remainder = sizeof buffer - n;

          // If so, try to recover by reading the remainder.  If this
          // doesn't work we're in big trouble since the input stream
          // won't be aligned correctly.  I'm not sure quite what to
          // do at this point.  It's probably best just to return -1.
          if (ACE::recv_n (handle,
                           ((char *) &buffer) + n,
                           remainder) != remainder)
            return -1;
        }


      return 1;
    }

  // Return -1 if things have gone seriously  wrong.
  if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
    return -1;

  return 0;
}


Member Data Documentation

Declare the dynamic allocation hooks.

Definition at line 233 of file Select_Reactor_Base.h.

Keeps track of the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty."

Definition at line 257 of file Select_Reactor_Base.h.

Contains the ACE_HANDLE the ACE_Select_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Select_Reactor will write to.

Definition at line 248 of file Select_Reactor_Base.h.

Keep a back pointer to the ACE_Select_Reactor. If this value if NULL then the ACE_Select_Reactor has been initialized with disable_notify_pipe.

Definition at line 241 of file Select_Reactor_Base.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines