ACE_Select_Reactor_Notify Class Reference

Unblock the ACE_Select_Reactor from its event loop. More...

#include <Select_Reactor_Base.h>

Inheritance diagram for ACE_Select_Reactor_Notify:

Inheritance graph
[legend]
Collaboration diagram for ACE_Select_Reactor_Notify:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_Select_Reactor_Notify (void)
 Constructor.

virtual ~ACE_Select_Reactor_Notify (void)
 Destructor.

virtual int open (ACE_Reactor_Impl *, ACE_Timer_Queue *=0, int disable_notify_pipe=ACE_DISABLE_NOTIFY_PIPE_DEFAULT)
 Initialize.

virtual int close (void)
 Destroy.

virtual int notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask)
virtual ACE_HANDLE notify_handle (void)
virtual int dispatch_notify (ACE_Notification_Buffer &buffer)
virtual int read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer)
virtual int is_dispatchable (ACE_Notification_Buffer &buffer)
 Verify whether the buffer has dispatchable info or not.

virtual int handle_input (ACE_HANDLE handle)
virtual void max_notify_iterations (int)
virtual int max_notify_iterations (void)
virtual int purge_pending_notifications (ACE_Event_Handler *sh, ACE_Reactor_Mask mask=ACE_Event_Handler::ALL_EVENTS_MASK)
virtual void dump (void) const
 Dump the state of an object.


Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.


Protected Attributes

ACE_Select_Reactor_Implselect_reactor_
ACE_Pipe notification_pipe_
int max_notify_iterations_

Detailed Description

Unblock the ACE_Select_Reactor from its event loop.

This implementation is necessary for cases where the ACE_Select_Reactor is run in a multi-threaded program. In this case, we need to be able to unblock select or poll when updates occur other than in the main ACE_Select_Reactor thread. To do this, we signal an auto-reset event the ACE_Select_Reactor is listening on. If an ACE_Event_Handler and ACE_Select_Reactor_Mask is passed to notify, the appropriate handle_* method is dispatched in the context of the ACE_Select_Reactor thread.

Definition at line 134 of file Select_Reactor_Base.h.


Constructor & Destructor Documentation

ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify void   ) 
 

Constructor.

Definition at line 525 of file Select_Reactor_Base.cpp.

00526   : max_notify_iterations_ (-1)
00527 {
00528 }

ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify void   )  [virtual]
 

Destructor.

Definition at line 530 of file Select_Reactor_Base.cpp.

00531 {
00532 }


Member Function Documentation

int ACE_Select_Reactor_Notify::close void   )  [virtual]
 

Destroy.

Implements ACE_Reactor_Notify.

Definition at line 637 of file Select_Reactor_Base.cpp.

References ACE_TRACE, ACE_Pipe::close(), ACE_Notification_Buffer::eh_, notification_pipe_, ACE_Pipe::read_handle(), read_notify_pipe(), and ACE_Event_Handler::remove_reference().

00638 {
00639   ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00640 
00641 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00642   notification_queue_.reset();
00643 #else
00644   if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
00645     {
00646       // Please see Bug 2820, if we just close the pipe then we break
00647       // the reference counting rules.  Basically, all the event
00648       // handlers "stored" in the pipe had their reference counts
00649       // increased.  We need to decrease them before closing the
00650       // pipe....
00651       ACE_Notification_Buffer b;
00652       for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
00653            r > 0;
00654            r = read_notify_pipe(notification_pipe_.read_handle(), b))
00655         {
00656           if (b.eh_ == 0) continue;
00657           b.eh_->remove_reference();
00658         }
00659     }
00660 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00661 
00662   return this->notification_pipe_.close ();
00663 }

int ACE_Select_Reactor_Notify::dispatch_notifications int &  number_of_active_handles,
ACE_Handle_Set rd_mask
[virtual]
 

Handles pending threads (if any) that are waiting to unblock the ACE_Select_Reactor.

Implements ACE_Reactor_Notify.

Definition at line 719 of file Select_Reactor_Base.cpp.

References ACE_TRACE, ACE_Handle_Set::clr_bit(), handle_input(), ACE_Handle_Set::is_set(), notification_pipe_, and ACE_Pipe::read_handle().

00721 {
00722   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00723 
00724   ACE_HANDLE const read_handle =
00725     this->notification_pipe_.read_handle ();
00726 
00727   if (read_handle != ACE_INVALID_HANDLE
00728       && rd_mask.is_set (read_handle))
00729     {
00730       --number_of_active_handles;
00731       rd_mask.clr_bit (read_handle);
00732       return this->handle_input (read_handle);
00733     }
00734   else
00735     return 0;
00736 }

int ACE_Select_Reactor_Notify::dispatch_notify ACE_Notification_Buffer buffer  )  [virtual]
 

Handle one of the notify call on the handle. This could be because of a thread trying to unblock the

Implements ACE_Reactor_Notify.

Definition at line 773 of file Select_Reactor_Base.cpp.

References ACE_ERROR, ACE_TEXT, ACE_Notification_Buffer::eh_, ACE_Event_Handler::handle_close(), ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_group_qos(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), ACE_Event_Handler::handle_qos(), LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::reference_counting_policy(), ACE_Event_Handler::remove_reference(), and ACE::send().

Referenced by handle_input().

00774 {
00775   int result = 0;
00776 
00777 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00778   // Dispatch one message from the notify queue, and put another in
00779   // the pipe if one is available.  Remember, the idea is to keep
00780   // exactly one message in the pipe at a time.
00781 
00782   bool more_messages_queued = false;
00783   ACE_Notification_Buffer next;
00784 
00785   result = notification_queue_.pop_next_notification(buffer, 
00786                                                      more_messages_queued, 
00787                                                      next);
00788 
00789   if (result == 0)
00790     {
00791       return 0;
00792     }
00793 
00794   if (result == -1)
00795     {
00796       return -1;
00797     }
00798 
00799   if(more_messages_queued)
00800     {
00801       (void) ACE::send(this->notification_pipe_.write_handle(),
00802             (char *)&next, sizeof(ACE_Notification_Buffer));
00803     }
00804 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00805 
00806   // If eh == 0 then another thread is unblocking the
00807   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00808   // internal structures.  Otherwise, we need to dispatch the
00809   // appropriate handle_* method on the <ACE_Event_Handler> pointer
00810   // we've been passed.
00811   if (buffer.eh_ != 0)
00812     {
00813       ACE_Event_Handler *event_handler =
00814         buffer.eh_;
00815 
00816       bool const requires_reference_counting =
00817         event_handler->reference_counting_policy ().value () ==
00818         ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00819 
00820       switch (buffer.mask_)
00821         {
00822         case ACE_Event_Handler::READ_MASK:
00823         case ACE_Event_Handler::ACCEPT_MASK:
00824           result = event_handler->handle_input (ACE_INVALID_HANDLE);
00825           break;
00826         case ACE_Event_Handler::WRITE_MASK:
00827           result = event_handler->handle_output (ACE_INVALID_HANDLE);
00828           break;
00829         case ACE_Event_Handler::EXCEPT_MASK:
00830           result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00831           break;
00832         case ACE_Event_Handler::QOS_MASK:
00833           result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00834           break;
00835         case ACE_Event_Handler::GROUP_QOS_MASK:
00836           result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00837           break;
00838         default:
00839           // Should we bail out if we get an invalid mask?
00840           ACE_ERROR ((LM_ERROR,
00841                       ACE_TEXT ("invalid mask = %d\n"),
00842                       buffer.mask_));
00843         }
00844 
00845       if (result == -1)
00846         event_handler->handle_close (ACE_INVALID_HANDLE,
00847                                      ACE_Event_Handler::EXCEPT_MASK);
00848 
00849       if (requires_reference_counting)
00850         {
00851           event_handler->remove_reference ();
00852         }
00853     }
00854 
00855   return 1;
00856 }

void ACE_Select_Reactor_Notify::dump void   )  const [virtual]
 

Dump the state of an object.

Implements ACE_Reactor_Notify.

Definition at line 574 of file Select_Reactor_Base.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_TEXT, ACE_TRACE, ACE_Pipe::dump(), LM_DEBUG, and notification_pipe_.

00575 {
00576 #if defined (ACE_HAS_DUMP)
00577   ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00578 
00579   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00580   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00581   this->notification_pipe_.dump ();
00582   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00583 #endif /* ACE_HAS_DUMP */
00584 }

int ACE_Select_Reactor_Notify::handle_input ACE_HANDLE  handle  )  [virtual]
 

Called back by the ACE_Select_Reactor when a thread wants to unblock us.

Reimplemented from ACE_Event_Handler.

Definition at line 896 of file Select_Reactor_Base.cpp.

References ACE_TRACE, dispatch_notify(), max_notify_iterations_, read_notify_pipe(), and ACE_Select_Reactor_Impl::renew().

Referenced by dispatch_notifications().

00897 {
00898   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
00899   // Precondition: this->select_reactor_.token_.current_owner () ==
00900   // ACE_Thread::self ();
00901 
00902   int number_dispatched = 0;
00903   int result = 0;
00904   ACE_Notification_Buffer buffer;
00905 
00906   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00907     {
00908       // Dispatch the buffer
00909       // NOTE: We count only if we made any dispatches ie. upcalls.
00910       if (this->dispatch_notify (buffer) > 0)
00911         ++number_dispatched;
00912 
00913       // Bail out if we've reached the <notify_threshold_>.  Note that
00914       // by default <notify_threshold_> is -1, so we'll loop until all
00915       // the notifications in the pipe have been dispatched.
00916       if (number_dispatched == this->max_notify_iterations_)
00917         break;
00918     }
00919 
00920   // Reassign number_dispatched to -1 if things have gone seriously
00921   // wrong.
00922   if (result < 0)
00923     number_dispatched = -1;
00924 
00925   // Enqueue ourselves into the list of waiting threads.  When we
00926   // reacquire the token we'll be off and running again with ownership
00927   // of the token.  The postcondition of this call is that
00928   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
00929   this->select_reactor_->renew ();
00930   return number_dispatched;
00931 }

int ACE_Select_Reactor_Notify::is_dispatchable ACE_Notification_Buffer buffer  )  [virtual]
 

Verify whether the buffer has dispatchable info or not.

Implements ACE_Reactor_Notify.

Definition at line 749 of file Select_Reactor_Base.cpp.

References ACE_Notification_Buffer::eh_.

00750 {
00751 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00752   ACE_UNUSED_ARG(buffer);
00753   return 1;
00754 #else
00755   // If eh == 0 then another thread is unblocking the
00756   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00757   // internal structures.  Otherwise, we need to dispatch the
00758   // appropriate handle_* method on the <ACE_Event_Handler>
00759   // pointer we've been passed.
00760   if (buffer.eh_ != 0)
00761     {
00762       return 1;
00763     }
00764   else
00765     {
00766       // has no dispatchable buffer
00767       return 0;
00768     }
00769 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00770 }

int ACE_Select_Reactor_Notify::max_notify_iterations void   )  [virtual]
 

Get the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify pipe before breaking out of its loop.

Implements ACE_Reactor_Notify.

Definition at line 545 of file Select_Reactor_Base.cpp.

References max_notify_iterations_.

00546 {
00547   return this->max_notify_iterations_;
00548 }

void ACE_Select_Reactor_Notify::max_notify_iterations int   )  [virtual]
 

Set the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify pipe before breaking out of its loop. By default, this is set to -1, which means "iterate until the pipe is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.

Implements ACE_Reactor_Notify.

Definition at line 535 of file Select_Reactor_Base.cpp.

References max_notify_iterations_.

00536 {
00537   // Must always be > 0 or < 0 to optimize the loop exit condition.
00538   if (iterations == 0)
00539     iterations = 1;
00540 
00541   this->max_notify_iterations_ = iterations;
00542 }

int ACE_Select_Reactor_Notify::notify ACE_Event_Handler = 0,
ACE_Reactor_Mask  = ACE_Event_Handler::EXCEPT_MASK,
ACE_Time_Value timeout = 0
[virtual]
 

Called by a thread when it wants to unblock the ACE_Select_Reactor. This wakeups the ACE_Select_Reactor if currently blocked in select/poll. Pass over both the Event_Handler *and* the mask to allow the caller to dictate which Event_Handler method the ACE_Select_Reactor will invoke. The ACE_Time_Value indicates how long to blocking trying to notify the ACE_Select_Reactor. If timeout == 0, the caller will block until action is possible, else will wait until the relative time specified in *timeout elapses).

Implements ACE_Reactor_Notify.

Definition at line 666 of file Select_Reactor_Base.cpp.

References ACE_Reactor_Mask, ACE_TRACE, ACE_Event_Handler::add_reference(), ACE_Event_Handler_var::release(), ACE::send(), and ssize_t.

00669 {
00670   ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00671 
00672   // Just consider this method a "no-op" if there's no
00673   // <ACE_Select_Reactor> configured.
00674   if (this->select_reactor_ == 0)
00675     return 0;
00676 
00677   ACE_Event_Handler_var safe_handler (event_handler);
00678 
00679   if (event_handler)
00680     event_handler->add_reference ();
00681 
00682   ACE_Notification_Buffer buffer (event_handler, mask);
00683 
00684 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00685   int notification_required =
00686     notification_queue_.push_new_notification(buffer);
00687 
00688   if (notification_required == -1)
00689   {
00690     return -1;
00691   }
00692 
00693   if (notification_required == 0)
00694   {
00695     // No failures, the handler is now owned by the notification queue
00696     safe_handler.release ();
00697 
00698     return 0;
00699   }
00700 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00701 
00702   ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
00703                                (char *) &buffer,
00704                                sizeof buffer,
00705                                timeout);
00706   if (n == -1)
00707     return -1;
00708 
00709   // No failures.
00710   safe_handler.release ();
00711 
00712   return 0;
00713 }

ACE_HANDLE ACE_Select_Reactor_Notify::notify_handle void   )  [virtual]
 

Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor

Implements ACE_Reactor_Notify.

Definition at line 740 of file Select_Reactor_Base.cpp.

References ACE_TRACE, notification_pipe_, and ACE_Pipe::read_handle().

00741 {
00742   ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00743 
00744   return this->notification_pipe_.read_handle ();
00745 }

int ACE_Select_Reactor_Notify::open ACE_Reactor_Impl ,
ACE_Timer_Queue = 0,
int  disable_notify_pipe = ACE_DISABLE_NOTIFY_PIPE_DEFAULT
[virtual]
 

Initialize.

Implements ACE_Reactor_Notify.

Definition at line 587 of file Select_Reactor_Base.cpp.

References ACE_NONBLOCK, ACE_Timer_Queue, ACE_TRACE, ACE_OS::fcntl(), notification_pipe_, ACE_Pipe::open(), ACE_Pipe::read_handle(), ACE_Reactor_Impl::register_handler(), and ACE::set_flags().

00590 {
00591   ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00592 
00593   if (disable_notify_pipe == 0)
00594     {
00595       this->select_reactor_ =
00596         dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00597 
00598       if (select_reactor_ == 0)
00599         {
00600           errno = EINVAL;
00601           return -1;
00602         }
00603 
00604       if (this->notification_pipe_.open () == -1)
00605         return -1;
00606 #if defined (F_SETFD)
00607       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00608       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00609 #endif /* F_SETFD */
00610 
00611 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00612       if (notification_queue_.open() == -1)
00613         {
00614           return -1;
00615         }
00616 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00617 
00618       // There seems to be a Win32 bug with this...  Set this into
00619       // non-blocking mode.
00620       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00621                           ACE_NONBLOCK) == -1)
00622         return -1;
00623       else
00624         return this->select_reactor_->register_handler
00625           (this->notification_pipe_.read_handle (),
00626            this,
00627            ACE_Event_Handler::READ_MASK);
00628     }
00629   else
00630     {
00631       this->select_reactor_ = 0;
00632       return 0;
00633     }
00634 }

int ACE_Select_Reactor_Notify::purge_pending_notifications ACE_Event_Handler sh,
ACE_Reactor_Mask  mask = ACE_Event_Handler::ALL_EVENTS_MASK
[virtual]
 

Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. If eh == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error.

Implements ACE_Reactor_Notify.

Definition at line 557 of file Select_Reactor_Base.cpp.

References ACE_NOTSUP_RETURN, ACE_Reactor_Mask, and ACE_TRACE.

00559 {
00560   ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00561 
00562 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00563 
00564   return notification_queue_.purge_pending_notifications(eh, mask);
00565 
00566 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00567   ACE_UNUSED_ARG (eh);
00568   ACE_UNUSED_ARG (mask);
00569   ACE_NOTSUP_RETURN (-1);
00570 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00571 }

int ACE_Select_Reactor_Notify::read_notify_pipe ACE_HANDLE  handle,
ACE_Notification_Buffer buffer
[virtual]
 

Read one of the notify call on the handle into the buffer. This could be because of a thread trying to unblock the

Implements ACE_Reactor_Notify.

Definition at line 859 of file Select_Reactor_Base.cpp.

References ACE_TRACE, EWOULDBLOCK, ACE::recv(), and ssize_t.

Referenced by close(), and handle_input().

00861 {
00862   ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00863 
00864   ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00865 
00866   if (n > 0)
00867     {
00868       // Check to see if we've got a short read.
00869       if (n != sizeof buffer)
00870         {
00871           ssize_t const remainder = sizeof buffer - n;
00872 
00873           // If so, try to recover by reading the remainder.  If this
00874           // doesn't work we're in big trouble since the input stream
00875           // won't be aligned correctly.  I'm not sure quite what to
00876           // do at this point.  It's probably best just to return -1.
00877           if (ACE::recv (handle,
00878                          ((char *) &buffer) + n,
00879                          remainder) != remainder)
00880             return -1;
00881         }
00882 
00883 
00884       return 1;
00885     }
00886 
00887   // Return -1 if things have gone seriously  wrong.
00888   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00889     return -1;
00890 
00891   return 0;
00892 }


Member Data Documentation

ACE_Select_Reactor_Notify::ACE_ALLOC_HOOK_DECLARE
 

Declare the dynamic allocation hooks.

Definition at line 229 of file Select_Reactor_Base.h.

int ACE_Select_Reactor_Notify::max_notify_iterations_ [protected]
 

Keeps track of the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify pipe before breaking out of its loop. By default, this is set to -1, which means "iterate until the pipe is empty."

Definition at line 253 of file Select_Reactor_Base.h.

Referenced by handle_input(), and max_notify_iterations().

ACE_Pipe ACE_Select_Reactor_Notify::notification_pipe_ [protected]
 

Contains the ACE_HANDLE the ACE_Select_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Select_Reactor will write to.

Definition at line 244 of file Select_Reactor_Base.h.

Referenced by close(), dispatch_notifications(), dump(), notify_handle(), and open().

ACE_Select_Reactor_Impl* ACE_Select_Reactor_Notify::select_reactor_ [protected]
 

Keep a back pointer to the ACE_Select_Reactor. If this value if NULL then the ACE_Select_Reactor has been initialized with .

Definition at line 237 of file Select_Reactor_Base.h.


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 12:57:11 2008 for ACE by doxygen 1.3.6