#include <Select_Reactor_Base.h>
Inheritance diagram for ACE_Select_Reactor_Notify:
Public Member Functions | |
ACE_Select_Reactor_Notify (void) | |
Constructor. | |
virtual | ~ACE_Select_Reactor_Notify (void) |
Destructor. | |
virtual int | open (ACE_Reactor_Impl *, ACE_Timer_Queue *=0, int disable_notify_pipe=ACE_DISABLE_NOTIFY_PIPE_DEFAULT) |
Initialize. | |
virtual int | close (void) |
Destroy. | |
virtual int | notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0) |
virtual int | dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask) |
virtual ACE_HANDLE | notify_handle (void) |
virtual int | dispatch_notify (ACE_Notification_Buffer &buffer) |
virtual int | read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer) |
virtual int | is_dispatchable (ACE_Notification_Buffer &buffer) |
Verify whether the buffer has dispatchable info or not. | |
virtual int | handle_input (ACE_HANDLE handle) |
virtual void | max_notify_iterations (int) |
virtual int | max_notify_iterations (void) |
virtual int | purge_pending_notifications (ACE_Event_Handler *sh, ACE_Reactor_Mask mask=ACE_Event_Handler::ALL_EVENTS_MASK) |
virtual void | dump (void) const |
Dump the state of an object. | |
Public Attributes | |
ACE_ALLOC_HOOK_DECLARE | |
Declare the dynamic allocation hooks. | |
Protected Attributes | |
ACE_Select_Reactor_Impl * | select_reactor_ |
ACE_Pipe | notification_pipe_ |
int | max_notify_iterations_ |
This implementation is necessary for cases where the ACE_Select_Reactor is run in a multi-threaded program. In this case, we need to be able to unblock select
or poll
when updates occur other than in the main ACE_Select_Reactor thread. To do this, we signal an auto-reset event the ACE_Select_Reactor is listening on. If an ACE_Event_Handler and ACE_Select_Reactor_Mask is passed to notify
, the appropriate handle_*
method is dispatched in the context of the ACE_Select_Reactor thread.
Definition at line 134 of file Select_Reactor_Base.h.
|
Constructor.
Definition at line 525 of file Select_Reactor_Base.cpp.
00526 : max_notify_iterations_ (-1) 00527 { 00528 } |
|
Destructor.
Definition at line 530 of file Select_Reactor_Base.cpp.
00531 { 00532 } |
|
Destroy.
Implements ACE_Reactor_Notify. Definition at line 637 of file Select_Reactor_Base.cpp. References ACE_TRACE, ACE_Pipe::close(), ACE_Notification_Buffer::eh_, notification_pipe_, ACE_Pipe::read_handle(), read_notify_pipe(), and ACE_Event_Handler::remove_reference().
00638 { 00639 ACE_TRACE ("ACE_Select_Reactor_Notify::close"); 00640 00641 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00642 notification_queue_.reset(); 00643 #else 00644 if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE) 00645 { 00646 // Please see Bug 2820, if we just close the pipe then we break 00647 // the reference counting rules. Basically, all the event 00648 // handlers "stored" in the pipe had their reference counts 00649 // increased. We need to decrease them before closing the 00650 // pipe.... 00651 ACE_Notification_Buffer b; 00652 for (int r = read_notify_pipe(notification_pipe_.read_handle(), b); 00653 r > 0; 00654 r = read_notify_pipe(notification_pipe_.read_handle(), b)) 00655 { 00656 if (b.eh_ == 0) continue; 00657 b.eh_->remove_reference(); 00658 } 00659 } 00660 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00661 00662 return this->notification_pipe_.close (); 00663 } |
|
Handles pending threads (if any) that are waiting to unblock the ACE_Select_Reactor. Implements ACE_Reactor_Notify. Definition at line 719 of file Select_Reactor_Base.cpp. References ACE_TRACE, ACE_Handle_Set::clr_bit(), handle_input(), ACE_Handle_Set::is_set(), notification_pipe_, and ACE_Pipe::read_handle().
00721 { 00722 ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications"); 00723 00724 ACE_HANDLE const read_handle = 00725 this->notification_pipe_.read_handle (); 00726 00727 if (read_handle != ACE_INVALID_HANDLE 00728 && rd_mask.is_set (read_handle)) 00729 { 00730 --number_of_active_handles; 00731 rd_mask.clr_bit (read_handle); 00732 return this->handle_input (read_handle); 00733 } 00734 else 00735 return 0; 00736 } |
|
Handle one of the notify call on the Implements ACE_Reactor_Notify. Definition at line 773 of file Select_Reactor_Base.cpp. References ACE_ERROR, ACE_TEXT, ACE_Notification_Buffer::eh_, ACE_Event_Handler::handle_close(), ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_group_qos(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), ACE_Event_Handler::handle_qos(), LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::reference_counting_policy(), ACE_Event_Handler::remove_reference(), and ACE::send(). Referenced by handle_input().
00774 { 00775 int result = 0; 00776 00777 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00778 // Dispatch one message from the notify queue, and put another in 00779 // the pipe if one is available. Remember, the idea is to keep 00780 // exactly one message in the pipe at a time. 00781 00782 bool more_messages_queued = false; 00783 ACE_Notification_Buffer next; 00784 00785 result = notification_queue_.pop_next_notification(buffer, 00786 more_messages_queued, 00787 next); 00788 00789 if (result == 0) 00790 { 00791 return 0; 00792 } 00793 00794 if (result == -1) 00795 { 00796 return -1; 00797 } 00798 00799 if(more_messages_queued) 00800 { 00801 (void) ACE::send(this->notification_pipe_.write_handle(), 00802 (char *)&next, sizeof(ACE_Notification_Buffer)); 00803 } 00804 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00805 00806 // If eh == 0 then another thread is unblocking the 00807 // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s 00808 // internal structures. Otherwise, we need to dispatch the 00809 // appropriate handle_* method on the <ACE_Event_Handler> pointer 00810 // we've been passed. 00811 if (buffer.eh_ != 0) 00812 { 00813 ACE_Event_Handler *event_handler = 00814 buffer.eh_; 00815 00816 bool const requires_reference_counting = 00817 event_handler->reference_counting_policy ().value () == 00818 ACE_Event_Handler::Reference_Counting_Policy::ENABLED; 00819 00820 switch (buffer.mask_) 00821 { 00822 case ACE_Event_Handler::READ_MASK: 00823 case ACE_Event_Handler::ACCEPT_MASK: 00824 result = event_handler->handle_input (ACE_INVALID_HANDLE); 00825 break; 00826 case ACE_Event_Handler::WRITE_MASK: 00827 result = event_handler->handle_output (ACE_INVALID_HANDLE); 00828 break; 00829 case ACE_Event_Handler::EXCEPT_MASK: 00830 result = event_handler->handle_exception (ACE_INVALID_HANDLE); 00831 break; 00832 case ACE_Event_Handler::QOS_MASK: 00833 result = event_handler->handle_qos (ACE_INVALID_HANDLE); 00834 break; 00835 case ACE_Event_Handler::GROUP_QOS_MASK: 00836 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE); 00837 break; 00838 default: 00839 // Should we bail out if we get an invalid mask? 00840 ACE_ERROR ((LM_ERROR, 00841 ACE_TEXT ("invalid mask = %d\n"), 00842 buffer.mask_)); 00843 } 00844 00845 if (result == -1) 00846 event_handler->handle_close (ACE_INVALID_HANDLE, 00847 ACE_Event_Handler::EXCEPT_MASK); 00848 00849 if (requires_reference_counting) 00850 { 00851 event_handler->remove_reference (); 00852 } 00853 } 00854 00855 return 1; 00856 } |
|
Dump the state of an object.
Implements ACE_Reactor_Notify. Definition at line 574 of file Select_Reactor_Base.cpp. References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_TEXT, ACE_TRACE, ACE_Pipe::dump(), LM_DEBUG, and notification_pipe_.
00575 { 00576 #if defined (ACE_HAS_DUMP) 00577 ACE_TRACE ("ACE_Select_Reactor_Notify::dump"); 00578 00579 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); 00580 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_)); 00581 this->notification_pipe_.dump (); 00582 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); 00583 #endif /* ACE_HAS_DUMP */ 00584 } |
|
Called back by the ACE_Select_Reactor when a thread wants to unblock us. Reimplemented from ACE_Event_Handler. Definition at line 896 of file Select_Reactor_Base.cpp. References ACE_TRACE, dispatch_notify(), max_notify_iterations_, read_notify_pipe(), and ACE_Select_Reactor_Impl::renew(). Referenced by dispatch_notifications().
00897 { 00898 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input"); 00899 // Precondition: this->select_reactor_.token_.current_owner () == 00900 // ACE_Thread::self (); 00901 00902 int number_dispatched = 0; 00903 int result = 0; 00904 ACE_Notification_Buffer buffer; 00905 00906 while ((result = this->read_notify_pipe (handle, buffer)) > 0) 00907 { 00908 // Dispatch the buffer 00909 // NOTE: We count only if we made any dispatches ie. upcalls. 00910 if (this->dispatch_notify (buffer) > 0) 00911 ++number_dispatched; 00912 00913 // Bail out if we've reached the <notify_threshold_>. Note that 00914 // by default <notify_threshold_> is -1, so we'll loop until all 00915 // the notifications in the pipe have been dispatched. 00916 if (number_dispatched == this->max_notify_iterations_) 00917 break; 00918 } 00919 00920 // Reassign number_dispatched to -1 if things have gone seriously 00921 // wrong. 00922 if (result < 0) 00923 number_dispatched = -1; 00924 00925 // Enqueue ourselves into the list of waiting threads. When we 00926 // reacquire the token we'll be off and running again with ownership 00927 // of the token. The postcondition of this call is that 00928 // <select_reactor_.token_.current_owner> == <ACE_Thread::self>. 00929 this->select_reactor_->renew (); 00930 return number_dispatched; 00931 } |
|
Verify whether the buffer has dispatchable info or not.
Implements ACE_Reactor_Notify. Definition at line 749 of file Select_Reactor_Base.cpp. References ACE_Notification_Buffer::eh_.
00750 { 00751 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00752 ACE_UNUSED_ARG(buffer); 00753 return 1; 00754 #else 00755 // If eh == 0 then another thread is unblocking the 00756 // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s 00757 // internal structures. Otherwise, we need to dispatch the 00758 // appropriate handle_* method on the <ACE_Event_Handler> 00759 // pointer we've been passed. 00760 if (buffer.eh_ != 0) 00761 { 00762 return 1; 00763 } 00764 else 00765 { 00766 // has no dispatchable buffer 00767 return 0; 00768 } 00769 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00770 } |
|
Get the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify pipe before breaking out of its loop. Implements ACE_Reactor_Notify. Definition at line 545 of file Select_Reactor_Base.cpp. References max_notify_iterations_.
00546 { 00547 return this->max_notify_iterations_; 00548 } |
|
Set the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify pipe before breaking out of its loop. By default, this is set to -1, which means "iterate until the pipe is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead. Implements ACE_Reactor_Notify. Definition at line 535 of file Select_Reactor_Base.cpp. References max_notify_iterations_.
00536 { 00537 // Must always be > 0 or < 0 to optimize the loop exit condition. 00538 if (iterations == 0) 00539 iterations = 1; 00540 00541 this->max_notify_iterations_ = iterations; 00542 } |
|
Called by a thread when it wants to unblock the ACE_Select_Reactor. This wakeups the ACE_Select_Reactor if currently blocked in Implements ACE_Reactor_Notify. Definition at line 666 of file Select_Reactor_Base.cpp. References ACE_Reactor_Mask, ACE_TRACE, ACE_Event_Handler::add_reference(), ACE_Event_Handler_var::release(), ACE::send(), and ssize_t.
00669 { 00670 ACE_TRACE ("ACE_Select_Reactor_Notify::notify"); 00671 00672 // Just consider this method a "no-op" if there's no 00673 // <ACE_Select_Reactor> configured. 00674 if (this->select_reactor_ == 0) 00675 return 0; 00676 00677 ACE_Event_Handler_var safe_handler (event_handler); 00678 00679 if (event_handler) 00680 event_handler->add_reference (); 00681 00682 ACE_Notification_Buffer buffer (event_handler, mask); 00683 00684 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00685 int notification_required = 00686 notification_queue_.push_new_notification(buffer); 00687 00688 if (notification_required == -1) 00689 { 00690 return -1; 00691 } 00692 00693 if (notification_required == 0) 00694 { 00695 // No failures, the handler is now owned by the notification queue 00696 safe_handler.release (); 00697 00698 return 0; 00699 } 00700 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00701 00702 ssize_t const n = ACE::send (this->notification_pipe_.write_handle (), 00703 (char *) &buffer, 00704 sizeof buffer, 00705 timeout); 00706 if (n == -1) 00707 return -1; 00708 00709 // No failures. 00710 safe_handler.release (); 00711 00712 return 0; 00713 } |
|
Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor Implements ACE_Reactor_Notify. Definition at line 740 of file Select_Reactor_Base.cpp. References ACE_TRACE, notification_pipe_, and ACE_Pipe::read_handle().
00741 { 00742 ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle"); 00743 00744 return this->notification_pipe_.read_handle (); 00745 } |
|
Initialize.
Implements ACE_Reactor_Notify. Definition at line 587 of file Select_Reactor_Base.cpp. References ACE_NONBLOCK, ACE_Timer_Queue, ACE_TRACE, ACE_OS::fcntl(), notification_pipe_, ACE_Pipe::open(), ACE_Pipe::read_handle(), ACE_Reactor_Impl::register_handler(), and ACE::set_flags().
00590 { 00591 ACE_TRACE ("ACE_Select_Reactor_Notify::open"); 00592 00593 if (disable_notify_pipe == 0) 00594 { 00595 this->select_reactor_ = 00596 dynamic_cast<ACE_Select_Reactor_Impl *> (r); 00597 00598 if (select_reactor_ == 0) 00599 { 00600 errno = EINVAL; 00601 return -1; 00602 } 00603 00604 if (this->notification_pipe_.open () == -1) 00605 return -1; 00606 #if defined (F_SETFD) 00607 ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1); 00608 ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1); 00609 #endif /* F_SETFD */ 00610 00611 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00612 if (notification_queue_.open() == -1) 00613 { 00614 return -1; 00615 } 00616 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00617 00618 // There seems to be a Win32 bug with this... Set this into 00619 // non-blocking mode. 00620 if (ACE::set_flags (this->notification_pipe_.read_handle (), 00621 ACE_NONBLOCK) == -1) 00622 return -1; 00623 else 00624 return this->select_reactor_->register_handler 00625 (this->notification_pipe_.read_handle (), 00626 this, 00627 ACE_Event_Handler::READ_MASK); 00628 } 00629 else 00630 { 00631 this->select_reactor_ = 0; 00632 return 0; 00633 } 00634 } |
|
Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. If eh == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error. Implements ACE_Reactor_Notify. Definition at line 557 of file Select_Reactor_Base.cpp. References ACE_NOTSUP_RETURN, ACE_Reactor_Mask, and ACE_TRACE.
00559 { 00560 ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications"); 00561 00562 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00563 00564 return notification_queue_.purge_pending_notifications(eh, mask); 00565 00566 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ 00567 ACE_UNUSED_ARG (eh); 00568 ACE_UNUSED_ARG (mask); 00569 ACE_NOTSUP_RETURN (-1); 00570 #endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ 00571 } |
|
Read one of the notify call on the handle into the buffer. This could be because of a thread trying to unblock the Implements ACE_Reactor_Notify. Definition at line 859 of file Select_Reactor_Base.cpp. References ACE_TRACE, EWOULDBLOCK, ACE::recv(), and ssize_t. Referenced by close(), and handle_input().
00861 { 00862 ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe"); 00863 00864 ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer); 00865 00866 if (n > 0) 00867 { 00868 // Check to see if we've got a short read. 00869 if (n != sizeof buffer) 00870 { 00871 ssize_t const remainder = sizeof buffer - n; 00872 00873 // If so, try to recover by reading the remainder. If this 00874 // doesn't work we're in big trouble since the input stream 00875 // won't be aligned correctly. I'm not sure quite what to 00876 // do at this point. It's probably best just to return -1. 00877 if (ACE::recv (handle, 00878 ((char *) &buffer) + n, 00879 remainder) != remainder) 00880 return -1; 00881 } 00882 00883 00884 return 1; 00885 } 00886 00887 // Return -1 if things have gone seriously wrong. 00888 if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN)) 00889 return -1; 00890 00891 return 0; 00892 } |
|
Declare the dynamic allocation hooks.
Definition at line 229 of file Select_Reactor_Base.h. |
|
Keeps track of the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify pipe before breaking out of its loop. By default, this is set to -1, which means "iterate until the pipe is empty." Definition at line 253 of file Select_Reactor_Base.h. Referenced by handle_input(), and max_notify_iterations(). |
|
Contains the ACE_HANDLE the ACE_Select_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Select_Reactor will write to. Definition at line 244 of file Select_Reactor_Base.h. Referenced by close(), dispatch_notifications(), dump(), notify_handle(), and open(). |
|
Keep a back pointer to the ACE_Select_Reactor. If this value if NULL then the ACE_Select_Reactor has been initialized with . Definition at line 237 of file Select_Reactor_Base.h. |