ACE_Select_Reactor_Notify Class Reference

Unblock the ACE_Select_Reactor from its event loop. More...

#include <Select_Reactor_Base.h>

Inheritance diagram for ACE_Select_Reactor_Notify:

Inheritance graph
[legend]
Collaboration diagram for ACE_Select_Reactor_Notify:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_Select_Reactor_Notify (void)
 Constructor.
virtual ~ACE_Select_Reactor_Notify (void)
 Destructor.
virtual int open (ACE_Reactor_Impl *, ACE_Timer_Queue *=0, int disable_notify_pipe=ACE_DISABLE_NOTIFY_PIPE_DEFAULT)
 Initialize.
virtual int close (void)
 Destroy.
virtual int notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask)
virtual ACE_HANDLE notify_handle (void)
virtual int dispatch_notify (ACE_Notification_Buffer &buffer)
virtual int read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer)
virtual int is_dispatchable (ACE_Notification_Buffer &buffer)
 Verify whether the buffer has dispatchable info or not.
virtual int handle_input (ACE_HANDLE handle)
virtual void max_notify_iterations (int)
virtual int max_notify_iterations (void)
virtual int purge_pending_notifications (ACE_Event_Handler *sh, ACE_Reactor_Mask mask=ACE_Event_Handler::ALL_EVENTS_MASK)
virtual void dump (void) const
 Dump the state of an object.

Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.

Protected Attributes

ACE_Select_Reactor_Implselect_reactor_
ACE_Pipe notification_pipe_
int max_notify_iterations_

Detailed Description

Unblock the ACE_Select_Reactor from its event loop.

This implementation is necessary for cases where the ACE_Select_Reactor is run in a multi-threaded program. In this case, we need to be able to unblock select or poll when updates occur other than in the main ACE_Select_Reactor thread. To do this, we signal an auto-reset event the ACE_Select_Reactor is listening on. If an ACE_Event_Handler and ACE_Select_Reactor_Mask is passed to notify, the appropriate handle_* method is dispatched in the context of the ACE_Select_Reactor thread.

Definition at line 133 of file Select_Reactor_Base.h.


Constructor & Destructor Documentation

ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify ( void   ) 

Constructor.

Definition at line 537 of file Select_Reactor_Base.cpp.

00538   : max_notify_iterations_ (-1)
00539 {
00540 }

ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify ( void   )  [virtual]

Destructor.

Definition at line 542 of file Select_Reactor_Base.cpp.

00543 {
00544 }


Member Function Documentation

int ACE_Select_Reactor_Notify::close ( void   )  [virtual]

Destroy.

Implements ACE_Reactor_Notify.

Definition at line 649 of file Select_Reactor_Base.cpp.

References ACE_TRACE, ACE_Pipe::close(), notification_pipe_, ACE_Pipe::read_handle(), and read_notify_pipe().

00650 {
00651   ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00652 
00653 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00654   notification_queue_.reset();
00655 #else
00656   if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
00657     {
00658       // Please see Bug 2820, if we just close the pipe then we break
00659       // the reference counting rules.  Basically, all the event
00660       // handlers "stored" in the pipe had their reference counts
00661       // increased.  We need to decrease them before closing the
00662       // pipe....
00663       ACE_Notification_Buffer b;
00664       for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
00665            r > 0;
00666            r = read_notify_pipe(notification_pipe_.read_handle(), b))
00667         {
00668           if (b.eh_ != 0)
00669             {
00670               b.eh_->remove_reference();
00671             }
00672         }
00673     }
00674 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00675 
00676   return this->notification_pipe_.close ();
00677 }

int ACE_Select_Reactor_Notify::dispatch_notifications ( int &  number_of_active_handles,
ACE_Handle_Set rd_mask 
) [virtual]

Handles pending threads (if any) that are waiting to unblock the ACE_Select_Reactor.

Implements ACE_Reactor_Notify.

Definition at line 733 of file Select_Reactor_Base.cpp.

References ACE_TRACE, ACE_Handle_Set::clr_bit(), handle_input(), ACE_Handle_Set::is_set(), notification_pipe_, and ACE_Pipe::read_handle().

00735 {
00736   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00737 
00738   ACE_HANDLE const read_handle =
00739     this->notification_pipe_.read_handle ();
00740 
00741   if (read_handle != ACE_INVALID_HANDLE
00742       && rd_mask.is_set (read_handle))
00743     {
00744       --number_of_active_handles;
00745       rd_mask.clr_bit (read_handle);
00746       return this->handle_input (read_handle);
00747     }
00748   else
00749     return 0;
00750 }

int ACE_Select_Reactor_Notify::dispatch_notify ( ACE_Notification_Buffer buffer  )  [virtual]

Handle one of the notify call on the handle. This could be because of a thread trying to unblock the Reactor_Impl

Implements ACE_Reactor_Notify.

Definition at line 787 of file Select_Reactor_Base.cpp.

References ACE_Event_Handler::ACCEPT_MASK, ACE_ERROR, ACE_TEXT, ACE_Notification_Buffer::eh_, ACE_Event_Handler::Reference_Counting_Policy::ENABLED, ACE_Event_Handler::EXCEPT_MASK, ACE_Event_Handler::GROUP_QOS_MASK, ACE_Event_Handler::handle_close(), ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_group_qos(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), ACE_Event_Handler::handle_qos(), LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::QOS_MASK, ACE_Event_Handler::READ_MASK, ACE_Event_Handler::reference_counting_policy(), ACE_Event_Handler::remove_reference(), ACE::send(), and ACE_Event_Handler::WRITE_MASK.

00788 {
00789   int result = 0;
00790 
00791 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00792   // Dispatch one message from the notify queue, and put another in
00793   // the pipe if one is available.  Remember, the idea is to keep
00794   // exactly one message in the pipe at a time.
00795 
00796   bool more_messages_queued = false;
00797   ACE_Notification_Buffer next;
00798 
00799   result = notification_queue_.pop_next_notification(buffer,
00800                                                      more_messages_queued,
00801                                                      next);
00802 
00803   if (result == 0)
00804     {
00805       return 0;
00806     }
00807 
00808   if (result == -1)
00809     {
00810       return -1;
00811     }
00812 
00813   if(more_messages_queued)
00814     {
00815       (void) ACE::send(this->notification_pipe_.write_handle(),
00816             (char *)&next, sizeof(ACE_Notification_Buffer));
00817     }
00818 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00819 
00820   // If eh == 0 then another thread is unblocking the
00821   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00822   // internal structures.  Otherwise, we need to dispatch the
00823   // appropriate handle_* method on the <ACE_Event_Handler> pointer
00824   // we've been passed.
00825   if (buffer.eh_ != 0)
00826     {
00827       ACE_Event_Handler *event_handler = buffer.eh_;
00828 
00829       bool const requires_reference_counting =
00830         event_handler->reference_counting_policy ().value () ==
00831         ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00832 
00833       switch (buffer.mask_)
00834         {
00835         case ACE_Event_Handler::READ_MASK:
00836         case ACE_Event_Handler::ACCEPT_MASK:
00837           result = event_handler->handle_input (ACE_INVALID_HANDLE);
00838           break;
00839         case ACE_Event_Handler::WRITE_MASK:
00840           result = event_handler->handle_output (ACE_INVALID_HANDLE);
00841           break;
00842         case ACE_Event_Handler::EXCEPT_MASK:
00843           result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00844           break;
00845         case ACE_Event_Handler::QOS_MASK:
00846           result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00847           break;
00848         case ACE_Event_Handler::GROUP_QOS_MASK:
00849           result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00850           break;
00851         default:
00852           // Should we bail out if we get an invalid mask?
00853           ACE_ERROR ((LM_ERROR,
00854                       ACE_TEXT ("invalid mask = %d\n"),
00855                       buffer.mask_));
00856         }
00857 
00858       if (result == -1)
00859         event_handler->handle_close (ACE_INVALID_HANDLE,
00860                                      ACE_Event_Handler::EXCEPT_MASK);
00861 
00862       if (requires_reference_counting)
00863         {
00864           event_handler->remove_reference ();
00865         }
00866     }
00867 
00868   return 1;
00869 }

void ACE_Select_Reactor_Notify::dump ( void   )  const [virtual]

Dump the state of an object.

Implements ACE_Reactor_Notify.

Definition at line 586 of file Select_Reactor_Base.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_TEXT, ACE_TRACE, ACE_Pipe::dump(), LM_DEBUG, and notification_pipe_.

00587 {
00588 #if defined (ACE_HAS_DUMP)
00589   ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00590 
00591   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00592   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00593   this->notification_pipe_.dump ();
00594   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00595 #endif /* ACE_HAS_DUMP */
00596 }

int ACE_Select_Reactor_Notify::handle_input ( ACE_HANDLE  handle  )  [virtual]

Called back by the ACE_Select_Reactor when a thread wants to unblock us.

Reimplemented from ACE_Event_Handler.

Definition at line 909 of file Select_Reactor_Base.cpp.

References ACE_TRACE, ACE_Select_Reactor_Impl::renew(), and select_reactor_.

Referenced by dispatch_notifications().

00910 {
00911   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
00912   // Precondition: this->select_reactor_.token_.current_owner () ==
00913   // ACE_Thread::self ();
00914 
00915   int number_dispatched = 0;
00916   int result = 0;
00917   ACE_Notification_Buffer buffer;
00918 
00919   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00920     {
00921       // Dispatch the buffer
00922       // NOTE: We count only if we made any dispatches ie. upcalls.
00923       if (this->dispatch_notify (buffer) > 0)
00924         ++number_dispatched;
00925 
00926       // Bail out if we've reached the <notify_threshold_>.  Note that
00927       // by default <notify_threshold_> is -1, so we'll loop until all
00928       // the notifications in the pipe have been dispatched.
00929       if (number_dispatched == this->max_notify_iterations_)
00930         break;
00931     }
00932 
00933   // Reassign number_dispatched to -1 if things have gone seriously
00934   // wrong.
00935   if (result < 0)
00936     number_dispatched = -1;
00937 
00938   // Enqueue ourselves into the list of waiting threads.  When we
00939   // reacquire the token we'll be off and running again with ownership
00940   // of the token.  The postcondition of this call is that
00941   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
00942   this->select_reactor_->renew ();
00943   return number_dispatched;
00944 }

int ACE_Select_Reactor_Notify::is_dispatchable ( ACE_Notification_Buffer buffer  )  [virtual]

Verify whether the buffer has dispatchable info or not.

Implements ACE_Reactor_Notify.

Definition at line 763 of file Select_Reactor_Base.cpp.

References ACE_Notification_Buffer::eh_.

00764 {
00765 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00766   ACE_UNUSED_ARG(buffer);
00767   return 1;
00768 #else
00769   // If eh == 0 then another thread is unblocking the
00770   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00771   // internal structures.  Otherwise, we need to dispatch the
00772   // appropriate handle_* method on the <ACE_Event_Handler>
00773   // pointer we've been passed.
00774   if (buffer.eh_ != 0)
00775     {
00776       return 1;
00777     }
00778   else
00779     {
00780       // has no dispatchable buffer
00781       return 0;
00782     }
00783 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00784 }

int ACE_Select_Reactor_Notify::max_notify_iterations ( void   )  [virtual]

Get the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop.

Implements ACE_Reactor_Notify.

Definition at line 557 of file Select_Reactor_Base.cpp.

References max_notify_iterations_.

00558 {
00559   return this->max_notify_iterations_;
00560 }

void ACE_Select_Reactor_Notify::max_notify_iterations ( int   )  [virtual]

Set the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.

Implements ACE_Reactor_Notify.

Definition at line 547 of file Select_Reactor_Base.cpp.

References max_notify_iterations_.

00548 {
00549   // Must always be > 0 or < 0 to optimize the loop exit condition.
00550   if (iterations == 0)
00551     iterations = 1;
00552 
00553   this->max_notify_iterations_ = iterations;
00554 }

int ACE_Select_Reactor_Notify::notify ( ACE_Event_Handler = 0,
ACE_Reactor_Mask  = ACE_Event_Handler::EXCEPT_MASK,
ACE_Time_Value timeout = 0 
) [virtual]

Called by a thread when it wants to unblock the ACE_Select_Reactor. This wakeups the ACE_Select_Reactor if currently blocked in select/poll. Pass over both the Event_Handler *and* the mask to allow the caller to dictate which Event_Handler method the ACE_Select_Reactor will invoke. The ACE_Time_Value indicates how long to blocking trying to notify the ACE_Select_Reactor. If timeout == 0, the caller will block until action is possible, else will wait until the relative time specified in *timeout elapses).

Implements ACE_Reactor_Notify.

Definition at line 680 of file Select_Reactor_Base.cpp.

References ACE_TRACE, ACE_Event_Handler::add_reference(), ACE_Event_Handler_var::release(), and ACE::send().

00683 {
00684   ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00685 
00686   // Just consider this method a "no-op" if there's no
00687   // <ACE_Select_Reactor> configured.
00688   if (this->select_reactor_ == 0)
00689     return 0;
00690 
00691   ACE_Event_Handler_var safe_handler (event_handler);
00692 
00693   if (event_handler)
00694     event_handler->add_reference ();
00695 
00696   ACE_Notification_Buffer buffer (event_handler, mask);
00697 
00698 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00699   int const notification_required =
00700     notification_queue_.push_new_notification(buffer);
00701 
00702   if (notification_required == -1)
00703     {
00704       return -1;
00705     }
00706 
00707   if (notification_required == 0)
00708     {
00709       // No failures, the handler is now owned by the notification queue
00710       safe_handler.release ();
00711 
00712       return 0;
00713     }
00714 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00715 
00716   ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
00717                                (char *) &buffer,
00718                                sizeof buffer,
00719                                timeout);
00720   if (n == -1)
00721     return -1;
00722 
00723   // No failures.
00724   safe_handler.release ();
00725 
00726   return 0;
00727 }

ACE_HANDLE ACE_Select_Reactor_Notify::notify_handle ( void   )  [virtual]

Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor

Implements ACE_Reactor_Notify.

Definition at line 754 of file Select_Reactor_Base.cpp.

References ACE_TRACE, notification_pipe_, and ACE_Pipe::read_handle().

00755 {
00756   ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00757 
00758   return this->notification_pipe_.read_handle ();
00759 }

int ACE_Select_Reactor_Notify::open ( ACE_Reactor_Impl ,
ACE_Timer_Queue = 0,
int  disable_notify_pipe = ACE_DISABLE_NOTIFY_PIPE_DEFAULT 
) [virtual]

Initialize.

Implements ACE_Reactor_Notify.

Definition at line 599 of file Select_Reactor_Base.cpp.

References ACE_NONBLOCK, ACE_TRACE, ACE_OS::fcntl(), ACE_Event_Handler::READ_MASK, ACE_Reactor_Impl::register_handler(), select_reactor_, and ACE::set_flags().

00602 {
00603   ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00604 
00605   if (disable_notify_pipe == 0)
00606     {
00607       this->select_reactor_ =
00608         dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00609 
00610       if (select_reactor_ == 0)
00611         {
00612           errno = EINVAL;
00613           return -1;
00614         }
00615 
00616       if (this->notification_pipe_.open () == -1)
00617         return -1;
00618 #if defined (F_SETFD)
00619       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00620       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00621 #endif /* F_SETFD */
00622 
00623 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00624       if (notification_queue_.open() == -1)
00625         {
00626           return -1;
00627         }
00628 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00629 
00630       // There seems to be a Win32 bug with this...  Set this into
00631       // non-blocking mode.
00632       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00633                           ACE_NONBLOCK) == -1)
00634         return -1;
00635       else
00636         return this->select_reactor_->register_handler
00637           (this->notification_pipe_.read_handle (),
00638            this,
00639            ACE_Event_Handler::READ_MASK);
00640     }
00641   else
00642     {
00643       this->select_reactor_ = 0;
00644       return 0;
00645     }
00646 }

int ACE_Select_Reactor_Notify::purge_pending_notifications ( ACE_Event_Handler sh,
ACE_Reactor_Mask  mask = ACE_Event_Handler::ALL_EVENTS_MASK 
) [virtual]

Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. If eh == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error.

Implements ACE_Reactor_Notify.

Definition at line 569 of file Select_Reactor_Base.cpp.

References ACE_NOTSUP_RETURN, and ACE_TRACE.

00571 {
00572   ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00573 
00574 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00575 
00576   return notification_queue_.purge_pending_notifications(eh, mask);
00577 
00578 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00579   ACE_UNUSED_ARG (eh);
00580   ACE_UNUSED_ARG (mask);
00581   ACE_NOTSUP_RETURN (-1);
00582 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00583 }

int ACE_Select_Reactor_Notify::read_notify_pipe ( ACE_HANDLE  handle,
ACE_Notification_Buffer buffer 
) [virtual]

Read one of the notify call on the handle into the buffer. This could be because of a thread trying to unblock the Reactor_Impl

Implements ACE_Reactor_Notify.

Definition at line 872 of file Select_Reactor_Base.cpp.

References ACE_TRACE, and ACE::recv().

Referenced by close().

00874 {
00875   ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00876 
00877   ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00878 
00879   if (n > 0)
00880     {
00881       // Check to see if we've got a short read.
00882       if (n != sizeof buffer)
00883         {
00884           ssize_t const remainder = sizeof buffer - n;
00885 
00886           // If so, try to recover by reading the remainder.  If this
00887           // doesn't work we're in big trouble since the input stream
00888           // won't be aligned correctly.  I'm not sure quite what to
00889           // do at this point.  It's probably best just to return -1.
00890           if (ACE::recv (handle,
00891                          ((char *) &buffer) + n,
00892                          remainder) != remainder)
00893             return -1;
00894         }
00895 
00896 
00897       return 1;
00898     }
00899 
00900   // Return -1 if things have gone seriously  wrong.
00901   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00902     return -1;
00903 
00904   return 0;
00905 }


Member Data Documentation

ACE_Select_Reactor_Notify::ACE_ALLOC_HOOK_DECLARE

Declare the dynamic allocation hooks.

Definition at line 228 of file Select_Reactor_Base.h.

int ACE_Select_Reactor_Notify::max_notify_iterations_ [protected]

Keeps track of the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty."

Definition at line 252 of file Select_Reactor_Base.h.

Referenced by max_notify_iterations().

ACE_Pipe ACE_Select_Reactor_Notify::notification_pipe_ [protected]

Contains the ACE_HANDLE the ACE_Select_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Select_Reactor will write to.

Definition at line 243 of file Select_Reactor_Base.h.

Referenced by close(), dispatch_notifications(), dump(), and notify_handle().

ACE_Select_Reactor_Impl* ACE_Select_Reactor_Notify::select_reactor_ [protected]

Keep a back pointer to the ACE_Select_Reactor. If this value if NULL then the ACE_Select_Reactor has been initialized with disable_notify_pipe.

Definition at line 236 of file Select_Reactor_Base.h.

Referenced by handle_input(), and open().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:35:33 2010 for ACE by  doxygen 1.4.7