#include <Select_Reactor_Base.h>
Inheritance diagram for ACE_Select_Reactor_Notify:
Public Member Functions | |
ACE_Select_Reactor_Notify (void) | |
Constructor. | |
virtual | ~ACE_Select_Reactor_Notify (void) |
Destructor. | |
virtual int | open (ACE_Reactor_Impl *, ACE_Timer_Queue *=0, int disable_notify_pipe=ACE_DISABLE_NOTIFY_PIPE_DEFAULT) |
Initialize. | |
virtual int | close (void) |
Destroy. | |
virtual int | notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0) |
virtual int | dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask) |
virtual ACE_HANDLE | notify_handle (void) |
virtual int | dispatch_notify (ACE_Notification_Buffer &buffer) |
virtual int | read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer) |
virtual int | is_dispatchable (ACE_Notification_Buffer &buffer) |
Verify whether the buffer has dispatchable info or not. | |
virtual int | handle_input (ACE_HANDLE handle) |
virtual void | max_notify_iterations (int) |
virtual int | max_notify_iterations (void) |
virtual int | purge_pending_notifications (ACE_Event_Handler *sh, ACE_Reactor_Mask mask=ACE_Event_Handler::ALL_EVENTS_MASK) |
virtual void | dump (void) const |
Dump the state of an object. | |
Public Attributes | |
ACE_ALLOC_HOOK_DECLARE | |
Declare the dynamic allocation hooks. | |
Protected Attributes | |
ACE_Select_Reactor_Impl * | select_reactor_ |
ACE_Pipe | notification_pipe_ |
int | max_notify_iterations_ |
This implementation is necessary for cases where the ACE_Select_Reactor is run in a multi-threaded program. In this case, we need to be able to unblock select
or poll
when updates occur other than in the main ACE_Select_Reactor thread. To do this, we signal an auto-reset event the ACE_Select_Reactor is listening on. If an ACE_Event_Handler and ACE_Select_Reactor_Mask is passed to notify
, the appropriate handle_*
method is dispatched in the context of the ACE_Select_Reactor thread.
Definition at line 133 of file Select_Reactor_Base.h.
ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify | ( | void | ) |
Constructor.
Definition at line 537 of file Select_Reactor_Base.cpp.
00538 : max_notify_iterations_ (-1) 00539 { 00540 }
ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify | ( | void | ) | [virtual] |
int ACE_Select_Reactor_Notify::close | ( | void | ) | [virtual] |
Destroy.
Implements ACE_Reactor_Notify.
Definition at line 649 of file Select_Reactor_Base.cpp.
References ACE_TRACE, ACE_Pipe::close(), notification_pipe_, ACE_Pipe::read_handle(), and read_notify_pipe().
00650 { 00651 ACE_TRACE ("ACE_Select_Reactor_Notify::close"); 00652 00653 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00654 notification_queue_.reset(); 00655 #else 00656 if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE) 00657 { 00658 // Please see Bug 2820, if we just close the pipe then we break 00659 // the reference counting rules. Basically, all the event 00660 // handlers "stored" in the pipe had their reference counts 00661 // increased. We need to decrease them before closing the 00662 // pipe.... 00663 ACE_Notification_Buffer b; 00664 for (int r = read_notify_pipe(notification_pipe_.read_handle(), b); 00665 r > 0; 00666 r = read_notify_pipe(notification_pipe_.read_handle(), b)) 00667 { 00668 if (b.eh_ != 0) 00669 { 00670 b.eh_->remove_reference(); 00671 } 00672 } 00673 } 00674 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00675 00676 return this->notification_pipe_.close (); 00677 }
int ACE_Select_Reactor_Notify::dispatch_notifications | ( | int & | number_of_active_handles, | |
ACE_Handle_Set & | rd_mask | |||
) | [virtual] |
Handles pending threads (if any) that are waiting to unblock the ACE_Select_Reactor.
Implements ACE_Reactor_Notify.
Definition at line 733 of file Select_Reactor_Base.cpp.
References ACE_TRACE, ACE_Handle_Set::clr_bit(), handle_input(), ACE_Handle_Set::is_set(), notification_pipe_, and ACE_Pipe::read_handle().
00735 { 00736 ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications"); 00737 00738 ACE_HANDLE const read_handle = 00739 this->notification_pipe_.read_handle (); 00740 00741 if (read_handle != ACE_INVALID_HANDLE 00742 && rd_mask.is_set (read_handle)) 00743 { 00744 --number_of_active_handles; 00745 rd_mask.clr_bit (read_handle); 00746 return this->handle_input (read_handle); 00747 } 00748 else 00749 return 0; 00750 }
int ACE_Select_Reactor_Notify::dispatch_notify | ( | ACE_Notification_Buffer & | buffer | ) | [virtual] |
Handle one of the notify call on the handle
. This could be because of a thread trying to unblock the Reactor_Impl
Implements ACE_Reactor_Notify.
Definition at line 787 of file Select_Reactor_Base.cpp.
References ACE_Event_Handler::ACCEPT_MASK, ACE_ERROR, ACE_TEXT, ACE_Notification_Buffer::eh_, ACE_Event_Handler::Reference_Counting_Policy::ENABLED, ACE_Event_Handler::EXCEPT_MASK, ACE_Event_Handler::GROUP_QOS_MASK, ACE_Event_Handler::handle_close(), ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_group_qos(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), ACE_Event_Handler::handle_qos(), LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::QOS_MASK, ACE_Event_Handler::READ_MASK, ACE_Event_Handler::reference_counting_policy(), ACE_Event_Handler::remove_reference(), ACE::send(), and ACE_Event_Handler::WRITE_MASK.
00788 { 00789 int result = 0; 00790 00791 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00792 // Dispatch one message from the notify queue, and put another in 00793 // the pipe if one is available. Remember, the idea is to keep 00794 // exactly one message in the pipe at a time. 00795 00796 bool more_messages_queued = false; 00797 ACE_Notification_Buffer next; 00798 00799 result = notification_queue_.pop_next_notification(buffer, 00800 more_messages_queued, 00801 next); 00802 00803 if (result == 0) 00804 { 00805 return 0; 00806 } 00807 00808 if (result == -1) 00809 { 00810 return -1; 00811 } 00812 00813 if(more_messages_queued) 00814 { 00815 (void) ACE::send(this->notification_pipe_.write_handle(), 00816 (char *)&next, sizeof(ACE_Notification_Buffer)); 00817 } 00818 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00819 00820 // If eh == 0 then another thread is unblocking the 00821 // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s 00822 // internal structures. Otherwise, we need to dispatch the 00823 // appropriate handle_* method on the <ACE_Event_Handler> pointer 00824 // we've been passed. 00825 if (buffer.eh_ != 0) 00826 { 00827 ACE_Event_Handler *event_handler = buffer.eh_; 00828 00829 bool const requires_reference_counting = 00830 event_handler->reference_counting_policy ().value () == 00831 ACE_Event_Handler::Reference_Counting_Policy::ENABLED; 00832 00833 switch (buffer.mask_) 00834 { 00835 case ACE_Event_Handler::READ_MASK: 00836 case ACE_Event_Handler::ACCEPT_MASK: 00837 result = event_handler->handle_input (ACE_INVALID_HANDLE); 00838 break; 00839 case ACE_Event_Handler::WRITE_MASK: 00840 result = event_handler->handle_output (ACE_INVALID_HANDLE); 00841 break; 00842 case ACE_Event_Handler::EXCEPT_MASK: 00843 result = event_handler->handle_exception (ACE_INVALID_HANDLE); 00844 break; 00845 case ACE_Event_Handler::QOS_MASK: 00846 result = event_handler->handle_qos (ACE_INVALID_HANDLE); 00847 break; 00848 case ACE_Event_Handler::GROUP_QOS_MASK: 00849 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE); 00850 break; 00851 default: 00852 // Should we bail out if we get an invalid mask? 00853 ACE_ERROR ((LM_ERROR, 00854 ACE_TEXT ("invalid mask = %d\n"), 00855 buffer.mask_)); 00856 } 00857 00858 if (result == -1) 00859 event_handler->handle_close (ACE_INVALID_HANDLE, 00860 ACE_Event_Handler::EXCEPT_MASK); 00861 00862 if (requires_reference_counting) 00863 { 00864 event_handler->remove_reference (); 00865 } 00866 } 00867 00868 return 1; 00869 }
void ACE_Select_Reactor_Notify::dump | ( | void | ) | const [virtual] |
Dump the state of an object.
Implements ACE_Reactor_Notify.
Definition at line 586 of file Select_Reactor_Base.cpp.
References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_TEXT, ACE_TRACE, ACE_Pipe::dump(), LM_DEBUG, and notification_pipe_.
00587 { 00588 #if defined (ACE_HAS_DUMP) 00589 ACE_TRACE ("ACE_Select_Reactor_Notify::dump"); 00590 00591 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); 00592 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_)); 00593 this->notification_pipe_.dump (); 00594 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); 00595 #endif /* ACE_HAS_DUMP */ 00596 }
int ACE_Select_Reactor_Notify::handle_input | ( | ACE_HANDLE | handle | ) | [virtual] |
Called back by the ACE_Select_Reactor when a thread wants to unblock us.
Reimplemented from ACE_Event_Handler.
Definition at line 909 of file Select_Reactor_Base.cpp.
References ACE_TRACE, ACE_Select_Reactor_Impl::renew(), and select_reactor_.
Referenced by dispatch_notifications().
00910 { 00911 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input"); 00912 // Precondition: this->select_reactor_.token_.current_owner () == 00913 // ACE_Thread::self (); 00914 00915 int number_dispatched = 0; 00916 int result = 0; 00917 ACE_Notification_Buffer buffer; 00918 00919 while ((result = this->read_notify_pipe (handle, buffer)) > 0) 00920 { 00921 // Dispatch the buffer 00922 // NOTE: We count only if we made any dispatches ie. upcalls. 00923 if (this->dispatch_notify (buffer) > 0) 00924 ++number_dispatched; 00925 00926 // Bail out if we've reached the <notify_threshold_>. Note that 00927 // by default <notify_threshold_> is -1, so we'll loop until all 00928 // the notifications in the pipe have been dispatched. 00929 if (number_dispatched == this->max_notify_iterations_) 00930 break; 00931 } 00932 00933 // Reassign number_dispatched to -1 if things have gone seriously 00934 // wrong. 00935 if (result < 0) 00936 number_dispatched = -1; 00937 00938 // Enqueue ourselves into the list of waiting threads. When we 00939 // reacquire the token we'll be off and running again with ownership 00940 // of the token. The postcondition of this call is that 00941 // <select_reactor_.token_.current_owner> == <ACE_Thread::self>. 00942 this->select_reactor_->renew (); 00943 return number_dispatched; 00944 }
int ACE_Select_Reactor_Notify::is_dispatchable | ( | ACE_Notification_Buffer & | buffer | ) | [virtual] |
Verify whether the buffer has dispatchable info or not.
Implements ACE_Reactor_Notify.
Definition at line 763 of file Select_Reactor_Base.cpp.
References ACE_Notification_Buffer::eh_.
00764 { 00765 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00766 ACE_UNUSED_ARG(buffer); 00767 return 1; 00768 #else 00769 // If eh == 0 then another thread is unblocking the 00770 // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s 00771 // internal structures. Otherwise, we need to dispatch the 00772 // appropriate handle_* method on the <ACE_Event_Handler> 00773 // pointer we've been passed. 00774 if (buffer.eh_ != 0) 00775 { 00776 return 1; 00777 } 00778 else 00779 { 00780 // has no dispatchable buffer 00781 return 0; 00782 } 00783 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00784 }
int ACE_Select_Reactor_Notify::max_notify_iterations | ( | void | ) | [virtual] |
Get the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop.
Implements ACE_Reactor_Notify.
Definition at line 557 of file Select_Reactor_Base.cpp.
References max_notify_iterations_.
00558 { 00559 return this->max_notify_iterations_; 00560 }
void ACE_Select_Reactor_Notify::max_notify_iterations | ( | int | ) | [virtual] |
Set the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv
loop. By default, this is set to -1, which means "iterate until the pipe is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.
Implements ACE_Reactor_Notify.
Definition at line 547 of file Select_Reactor_Base.cpp.
References max_notify_iterations_.
00548 { 00549 // Must always be > 0 or < 0 to optimize the loop exit condition. 00550 if (iterations == 0) 00551 iterations = 1; 00552 00553 this->max_notify_iterations_ = iterations; 00554 }
int ACE_Select_Reactor_Notify::notify | ( | ACE_Event_Handler * | = 0 , |
|
ACE_Reactor_Mask | = ACE_Event_Handler::EXCEPT_MASK , |
|||
ACE_Time_Value * | timeout = 0 | |||
) | [virtual] |
Called by a thread when it wants to unblock the ACE_Select_Reactor. This wakeups the ACE_Select_Reactor if currently blocked in select/poll
. Pass over both the Event_Handler
*and* the mask
to allow the caller to dictate which Event_Handler
method the ACE_Select_Reactor will invoke. The ACE_Time_Value indicates how long to blocking trying to notify the ACE_Select_Reactor. If timeout == 0, the caller will block until action is possible, else will wait until the relative time specified in *timeout
elapses).
Implements ACE_Reactor_Notify.
Definition at line 680 of file Select_Reactor_Base.cpp.
References ACE_TRACE, ACE_Event_Handler::add_reference(), ACE_Event_Handler_var::release(), and ACE::send().
00683 { 00684 ACE_TRACE ("ACE_Select_Reactor_Notify::notify"); 00685 00686 // Just consider this method a "no-op" if there's no 00687 // <ACE_Select_Reactor> configured. 00688 if (this->select_reactor_ == 0) 00689 return 0; 00690 00691 ACE_Event_Handler_var safe_handler (event_handler); 00692 00693 if (event_handler) 00694 event_handler->add_reference (); 00695 00696 ACE_Notification_Buffer buffer (event_handler, mask); 00697 00698 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00699 int const notification_required = 00700 notification_queue_.push_new_notification(buffer); 00701 00702 if (notification_required == -1) 00703 { 00704 return -1; 00705 } 00706 00707 if (notification_required == 0) 00708 { 00709 // No failures, the handler is now owned by the notification queue 00710 safe_handler.release (); 00711 00712 return 0; 00713 } 00714 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00715 00716 ssize_t const n = ACE::send (this->notification_pipe_.write_handle (), 00717 (char *) &buffer, 00718 sizeof buffer, 00719 timeout); 00720 if (n == -1) 00721 return -1; 00722 00723 // No failures. 00724 safe_handler.release (); 00725 00726 return 0; 00727 }
ACE_HANDLE ACE_Select_Reactor_Notify::notify_handle | ( | void | ) | [virtual] |
Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor
Implements ACE_Reactor_Notify.
Definition at line 754 of file Select_Reactor_Base.cpp.
References ACE_TRACE, notification_pipe_, and ACE_Pipe::read_handle().
00755 { 00756 ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle"); 00757 00758 return this->notification_pipe_.read_handle (); 00759 }
int ACE_Select_Reactor_Notify::open | ( | ACE_Reactor_Impl * | , | |
ACE_Timer_Queue * | = 0 , |
|||
int | disable_notify_pipe = ACE_DISABLE_NOTIFY_PIPE_DEFAULT | |||
) | [virtual] |
Initialize.
Implements ACE_Reactor_Notify.
Definition at line 599 of file Select_Reactor_Base.cpp.
References ACE_NONBLOCK, ACE_TRACE, ACE_OS::fcntl(), ACE_Event_Handler::READ_MASK, ACE_Reactor_Impl::register_handler(), select_reactor_, and ACE::set_flags().
00602 { 00603 ACE_TRACE ("ACE_Select_Reactor_Notify::open"); 00604 00605 if (disable_notify_pipe == 0) 00606 { 00607 this->select_reactor_ = 00608 dynamic_cast<ACE_Select_Reactor_Impl *> (r); 00609 00610 if (select_reactor_ == 0) 00611 { 00612 errno = EINVAL; 00613 return -1; 00614 } 00615 00616 if (this->notification_pipe_.open () == -1) 00617 return -1; 00618 #if defined (F_SETFD) 00619 ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1); 00620 ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1); 00621 #endif /* F_SETFD */ 00622 00623 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00624 if (notification_queue_.open() == -1) 00625 { 00626 return -1; 00627 } 00628 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00629 00630 // There seems to be a Win32 bug with this... Set this into 00631 // non-blocking mode. 00632 if (ACE::set_flags (this->notification_pipe_.read_handle (), 00633 ACE_NONBLOCK) == -1) 00634 return -1; 00635 else 00636 return this->select_reactor_->register_handler 00637 (this->notification_pipe_.read_handle (), 00638 this, 00639 ACE_Event_Handler::READ_MASK); 00640 } 00641 else 00642 { 00643 this->select_reactor_ = 0; 00644 return 0; 00645 } 00646 }
int ACE_Select_Reactor_Notify::purge_pending_notifications | ( | ACE_Event_Handler * | sh, | |
ACE_Reactor_Mask | mask = ACE_Event_Handler::ALL_EVENTS_MASK | |||
) | [virtual] |
Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. If eh == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error.
Implements ACE_Reactor_Notify.
Definition at line 569 of file Select_Reactor_Base.cpp.
References ACE_NOTSUP_RETURN, and ACE_TRACE.
00571 { 00572 ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications"); 00573 00574 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00575 00576 return notification_queue_.purge_pending_notifications(eh, mask); 00577 00578 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ 00579 ACE_UNUSED_ARG (eh); 00580 ACE_UNUSED_ARG (mask); 00581 ACE_NOTSUP_RETURN (-1); 00582 #endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ 00583 }
int ACE_Select_Reactor_Notify::read_notify_pipe | ( | ACE_HANDLE | handle, | |
ACE_Notification_Buffer & | buffer | |||
) | [virtual] |
Read one of the notify call on the handle into the buffer. This could be because of a thread trying to unblock the Reactor_Impl
Implements ACE_Reactor_Notify.
Definition at line 872 of file Select_Reactor_Base.cpp.
References ACE_TRACE, and ACE::recv().
Referenced by close().
00874 { 00875 ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe"); 00876 00877 ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer); 00878 00879 if (n > 0) 00880 { 00881 // Check to see if we've got a short read. 00882 if (n != sizeof buffer) 00883 { 00884 ssize_t const remainder = sizeof buffer - n; 00885 00886 // If so, try to recover by reading the remainder. If this 00887 // doesn't work we're in big trouble since the input stream 00888 // won't be aligned correctly. I'm not sure quite what to 00889 // do at this point. It's probably best just to return -1. 00890 if (ACE::recv (handle, 00891 ((char *) &buffer) + n, 00892 remainder) != remainder) 00893 return -1; 00894 } 00895 00896 00897 return 1; 00898 } 00899 00900 // Return -1 if things have gone seriously wrong. 00901 if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN)) 00902 return -1; 00903 00904 return 0; 00905 }
int ACE_Select_Reactor_Notify::max_notify_iterations_ [protected] |
Keeps track of the maximum number of times that the ACE_Select_Reactor_Notify::handle_input() method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty."
Definition at line 252 of file Select_Reactor_Base.h.
Referenced by max_notify_iterations().
Contains the ACE_HANDLE the ACE_Select_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Select_Reactor will write to.
Definition at line 243 of file Select_Reactor_Base.h.
Referenced by close(), dispatch_notifications(), dump(), and notify_handle().
Keep a back pointer to the ACE_Select_Reactor. If this value if NULL then the ACE_Select_Reactor has been initialized with disable_notify_pipe.
Definition at line 236 of file Select_Reactor_Base.h.
Referenced by handle_input(), and open().