ACE_Select_Reactor_Notify Class Reference

Unblock the ACE_Select_Reactor from its event loop. More...

#include <Select_Reactor_Base.h>

Inheritance diagram for ACE_Select_Reactor_Notify:

Inheritance graph
[legend]
Collaboration diagram for ACE_Select_Reactor_Notify:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_Select_Reactor_Notify (void)
 Constructor.

virtual ~ACE_Select_Reactor_Notify (void)
 Destructor.

virtual int open (ACE_Reactor_Impl *, ACE_Timer_Queue *=0, int disable_notify_pipe=0)
 Initialize.

virtual int close (void)
 Destroy.

virtual int notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask)
virtual ACE_HANDLE notify_handle (void)
virtual int dispatch_notify (ACE_Notification_Buffer &buffer)
virtual int read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer)
virtual int is_dispatchable (ACE_Notification_Buffer &buffer)
 Verify whether the buffer has dispatchable info or not.

virtual int handle_input (ACE_HANDLE handle)
virtual void max_notify_iterations (int)
virtual int max_notify_iterations (void)
virtual int purge_pending_notifications (ACE_Event_Handler *sh, ACE_Reactor_Mask mask=ACE_Event_Handler::ALL_EVENTS_MASK)
virtual void dump (void) const
 Dump the state of an object.


Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks.


Protected Attributes

ACE_Select_Reactor_Implselect_reactor_
ACE_Pipe notification_pipe_
int max_notify_iterations_

Detailed Description

Unblock the ACE_Select_Reactor from its event loop.

This implementation is necessary for cases where the ACE_Select_Reactor is run in a multi-threaded program. In this case, we need to be able to unblock select or poll when updates occur other than in the main ACE_Select_Reactor thread. To do this, we signal an auto-reset event the ACE_Select_Reactor is listening on. If an ACE_Event_Handler and ACE_Select_Reactor_Mask is passed to notify, the appropriate handle_* method is dispatched in the context of the ACE_Select_Reactor thread.

Definition at line 120 of file Select_Reactor_Base.h.


Constructor & Destructor Documentation

ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify void   ) 
 

Constructor.

Definition at line 546 of file Select_Reactor_Base.cpp.

00547   : max_notify_iterations_ (-1)
00548 {
00549 }

ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify void   )  [virtual]
 

Destructor.

Definition at line 551 of file Select_Reactor_Base.cpp.

00552 {
00553 }


Member Function Documentation

int ACE_Select_Reactor_Notify::close void   )  [virtual]
 

Destroy.

Implements ACE_Reactor_Notify.

Definition at line 747 of file Select_Reactor_Base.cpp.

References ACE_TRACE, ACE_Unbounded_Queue_Iterator< T >::advance(), ACE_Pipe::close(), and ACE_Unbounded_Queue_Iterator< T >::next().

00748 {
00749   ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00750 
00751 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00752   // Free up the dynamically allocated resources.
00753   ACE_Notification_Buffer **b = 0;
00754 
00755   for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
00756        alloc_iter.next (b) != 0;
00757        alloc_iter.advance ())
00758     {
00759       delete [] *b;
00760       *b = 0;
00761     }
00762 
00763   this->alloc_queue_.reset ();
00764   this->notify_queue_.reset ();
00765   this->free_queue_.reset ();
00766 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00767 
00768   return this->notification_pipe_.close ();
00769 }

int ACE_Select_Reactor_Notify::dispatch_notifications int &  number_of_active_handles,
ACE_Handle_Set rd_mask
[virtual]
 

Handles pending threads (if any) that are waiting to unblock the ACE_Select_Reactor.

Implements ACE_Reactor_Notify.

Definition at line 862 of file Select_Reactor_Base.cpp.

References ACE_TRACE, ACE_Handle_Set::clr_bit(), handle_input(), ACE_Handle_Set::is_set(), and ACE_Pipe::read_handle().

00864 {
00865   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00866 
00867   ACE_HANDLE read_handle =
00868     this->notification_pipe_.read_handle ();
00869 
00870   if (read_handle != ACE_INVALID_HANDLE
00871       && rd_mask.is_set (read_handle))
00872     {
00873       --number_of_active_handles;
00874       rd_mask.clr_bit (read_handle);
00875       return this->handle_input (read_handle);
00876     }
00877   else
00878     return 0;
00879 }

int ACE_Select_Reactor_Notify::dispatch_notify ACE_Notification_Buffer buffer  )  [virtual]
 

Handle one of the notify call on the handle. This could be because of a thread trying to unblock the

Implements ACE_Reactor_Notify.

Definition at line 913 of file Select_Reactor_Base.cpp.

References ACE_ERROR, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_SYNCH_MUTEX, ACE_Notification_Buffer::eh_, ACE_Event_Handler::handle_close(), ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_group_qos(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), ACE_Event_Handler::handle_qos(), LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::reference_counting_policy(), ACE_Event_Handler::remove_reference(), and ACE::send().

Referenced by handle_input().

00914 {
00915   int result = 0;
00916 
00917 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00918   // Dispatch one message from the notify queue, and put another in
00919   // the pipe if one is available.  Remember, the idea is to keep
00920   // exactly one message in the pipe at a time.
00921   {
00922     // We acquire the lock in a block to make sure we're not
00923     // holding the lock while delivering callbacks...
00924     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00925 
00926     ACE_Notification_Buffer *temp = 0;
00927 
00928     if (notify_queue_.is_empty ())
00929       return 0;
00930     else if (notify_queue_.dequeue_head (temp) == -1)
00931       ACE_ERROR_RETURN ((LM_ERROR,
00932                          ACE_LIB_TEXT ("%p\n"),
00933                          ACE_LIB_TEXT ("dequeue_head")),
00934                         -1);
00935     buffer = *temp;
00936     if (free_queue_.enqueue_head (temp) == -1)
00937       ACE_ERROR_RETURN ((LM_ERROR,
00938                          ACE_LIB_TEXT ("%p\n"),
00939                          ACE_LIB_TEXT ("enqueue_head")),
00940                         -1);
00941 
00942     bool write_next_buffer = false;
00943     ACE_Notification_Buffer ** next = 0;
00944 
00945     if(!this->notify_queue_.is_empty())
00946       {
00947         // The queue is not empty, need to queue another message.
00948         this->notify_queue_.get (next, 0);
00949         write_next_buffer = true;
00950       }
00951 
00952     if(write_next_buffer)
00953       {
00954         (void) ACE::send(
00955                          this->notification_pipe_.write_handle(),
00956             (char *)*next, sizeof(ACE_Notification_Buffer));
00957       }
00958   }
00959 
00960 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00961 
00962   // If eh == 0 then another thread is unblocking the
00963   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00964   // internal structures.  Otherwise, we need to dispatch the
00965   // appropriate handle_* method on the <ACE_Event_Handler> pointer
00966   // we've been passed.
00967   if (buffer.eh_ != 0)
00968     {
00969       ACE_Event_Handler *event_handler =
00970         buffer.eh_;
00971 
00972       int requires_reference_counting =
00973         event_handler->reference_counting_policy ().value () ==
00974         ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00975 
00976       switch (buffer.mask_)
00977         {
00978         case ACE_Event_Handler::READ_MASK:
00979         case ACE_Event_Handler::ACCEPT_MASK:
00980           result = event_handler->handle_input (ACE_INVALID_HANDLE);
00981           break;
00982         case ACE_Event_Handler::WRITE_MASK:
00983           result = event_handler->handle_output (ACE_INVALID_HANDLE);
00984           break;
00985         case ACE_Event_Handler::EXCEPT_MASK:
00986           result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00987           break;
00988         case ACE_Event_Handler::QOS_MASK:
00989           result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00990           break;
00991         case ACE_Event_Handler::GROUP_QOS_MASK:
00992           result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00993           break;
00994         default:
00995           // Should we bail out if we get an invalid mask?
00996           ACE_ERROR ((LM_ERROR,
00997                       ACE_LIB_TEXT ("invalid mask = %d\n"),
00998                       buffer.mask_));
00999         }
01000 
01001       if (result == -1)
01002         event_handler->handle_close (ACE_INVALID_HANDLE,
01003                                      ACE_Event_Handler::EXCEPT_MASK);
01004 
01005       if (requires_reference_counting)
01006         {
01007           event_handler->remove_reference ();
01008         }
01009     }
01010 
01011   return 1;
01012 }

void ACE_Select_Reactor_Notify::dump void   )  const [virtual]
 

Dump the state of an object.

Implements ACE_Reactor_Notify.

Definition at line 672 of file Select_Reactor_Base.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Pipe::dump(), and LM_DEBUG.

00673 {
00674 #if defined (ACE_HAS_DUMP)
00675   ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00676 
00677   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00678   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00679   this->notification_pipe_.dump ();
00680   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00681 #endif /* ACE_HAS_DUMP */
00682 }

int ACE_Select_Reactor_Notify::handle_input ACE_HANDLE  handle  )  [virtual]
 

Called back by the ACE_Select_Reactor when a thread wants to unblock us.

Reimplemented from ACE_Event_Handler.

Definition at line 1052 of file Select_Reactor_Base.cpp.

References ACE_TRACE, dispatch_notify(), read_notify_pipe(), and ACE_Select_Reactor_Impl::renew().

Referenced by dispatch_notifications().

01053 {
01054   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
01055   // Precondition: this->select_reactor_.token_.current_owner () ==
01056   // ACE_Thread::self ();
01057 
01058   int number_dispatched = 0;
01059   int result = 0;
01060   ACE_Notification_Buffer buffer;
01061 
01062   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
01063     {
01064       // Dispatch the buffer
01065       // NOTE: We count only if we made any dispatches ie. upcalls.
01066       if (this->dispatch_notify (buffer) > 0)
01067         ++number_dispatched;
01068 
01069       // Bail out if we've reached the <notify_threshold_>.  Note that
01070       // by default <notify_threshold_> is -1, so we'll loop until all
01071       // the notifications in the pipe have been dispatched.
01072       if (number_dispatched == this->max_notify_iterations_)
01073         break;
01074     }
01075 
01076   // Reassign number_dispatched to -1 if things have gone seriously
01077   // wrong.
01078   if (result < 0)
01079     number_dispatched = -1;
01080 
01081   // Enqueue ourselves into the list of waiting threads.  When we
01082   // reacquire the token we'll be off and running again with ownership
01083   // of the token.  The postcondition of this call is that
01084   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
01085   this->select_reactor_->renew ();
01086   return number_dispatched;
01087 }

int ACE_Select_Reactor_Notify::is_dispatchable ACE_Notification_Buffer buffer  )  [virtual]
 

Verify whether the buffer has dispatchable info or not.

Implements ACE_Reactor_Notify.

Definition at line 892 of file Select_Reactor_Base.cpp.

References ACE_Notification_Buffer::eh_.

00893 {
00894 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00895   ACE_UNUSED_ARG(buffer);
00896   return 1;
00897 #else
00898   // If eh == 0 then another thread is unblocking the
00899   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00900   // internal structures.  Otherwise, we need to dispatch the
00901   // appropriate handle_* method on the <ACE_Event_Handler>
00902   // pointer we've been passed.
00903   if (buffer.eh_ != 0)
00904     return 1;
00905 
00906 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00907 
00908   // has no dispatchable buffer
00909   return 0;
00910 }

int ACE_Select_Reactor_Notify::max_notify_iterations void   )  [virtual]
 

Get the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify pipe before breaking out of its loop.

Implements ACE_Reactor_Notify.

Definition at line 566 of file Select_Reactor_Base.cpp.

00567 {
00568   return this->max_notify_iterations_;
00569 }

void ACE_Select_Reactor_Notify::max_notify_iterations int   )  [virtual]
 

Set the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify pipe before breaking out of its loop. By default, this is set to -1, which means "iterate until the pipe is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.

Implements ACE_Reactor_Notify.

Definition at line 556 of file Select_Reactor_Base.cpp.

00557 {
00558   // Must always be > 0 or < 0 to optimize the loop exit condition.
00559   if (iterations == 0)
00560     iterations = 1;
00561 
00562   this->max_notify_iterations_ = iterations;
00563 }

int ACE_Select_Reactor_Notify::notify ACE_Event_Handler = 0,
ACE_Reactor_Mask  = ACE_Event_Handler::EXCEPT_MASK,
ACE_Time_Value timeout = 0
[virtual]
 

Called by a thread when it wants to unblock the ACE_Select_Reactor. This wakeups the ACE_Select_Reactor if currently blocked in select/poll. Pass over both the Event_Handler *and* the mask to allow the caller to dictate which Event_Handler method the ACE_Select_Reactor will invoke. The ACE_Time_Value indicates how long to blocking trying to notify the ACE_Select_Reactor. If timeout == 0, the caller will block until action is possible, else will wait until the relative time specified in *timeout elapses).

Implements ACE_Reactor_Notify.

Definition at line 772 of file Select_Reactor_Base.cpp.

References ACE_ASSERT, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_Reactor_Mask, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Event_Handler::add_reference(), ACE_Event_Handler_var::release(), ACE::send(), and ssize_t.

00775 {
00776   ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00777 
00778   // Just consider this method a "no-op" if there's no
00779   // <ACE_Select_Reactor> configured.
00780   if (this->select_reactor_ == 0)
00781     return 0;
00782 
00783   ACE_Event_Handler_var safe_handler (event_handler);
00784 
00785   if (event_handler)
00786     event_handler->add_reference ();
00787 
00788   ACE_Notification_Buffer buffer (event_handler, mask);
00789 
00790 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00791   // Artificial scope to limit the duration of the mutex.
00792   {
00793     bool notification_required = false;
00794 
00795     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00796 
00797     // No pending notifications.
00798     if (this->notify_queue_.is_empty ())
00799       notification_required = true;
00800 
00801     ACE_Notification_Buffer *temp = 0;
00802 
00803     if (free_queue_.dequeue_head (temp) == -1)
00804       {
00805         // Grow the queue of available buffers.
00806         ACE_Notification_Buffer *temp1 = 0;
00807 
00808         ACE_NEW_RETURN (temp1,
00809                         ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00810                         -1);
00811 
00812         if (this->alloc_queue_.enqueue_head (temp1) == -1)
00813           {
00814             delete [] temp1;
00815             return -1;
00816           }
00817 
00818         // Start at 1 and enqueue only
00819         // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
00820         // the first one will be used right now.
00821         for (size_t i = 1;
00822              i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
00823              ++i)
00824           this->free_queue_.enqueue_head (temp1 + i);
00825 
00826         temp = temp1;
00827       }
00828 
00829     ACE_ASSERT (temp != 0);
00830     *temp = buffer;
00831 
00832     if (notify_queue_.enqueue_tail (temp) == -1)
00833       return -1;
00834 
00835     if (!notification_required)
00836       {
00837         // No failures.
00838         safe_handler.release ();
00839 
00840         return 0;
00841       }
00842   }
00843 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00844 
00845   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00846                          (char *) &buffer,
00847                          sizeof buffer,
00848                          timeout);
00849   if (n == -1)
00850     return -1;
00851 
00852   // No failures.
00853   safe_handler.release ();
00854 
00855   return 0;
00856 }

ACE_HANDLE ACE_Select_Reactor_Notify::notify_handle void   )  [virtual]
 

Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor

Implements ACE_Reactor_Notify.

Definition at line 883 of file Select_Reactor_Base.cpp.

References ACE_TRACE, and ACE_Pipe::read_handle().

00884 {
00885   ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00886 
00887   return this->notification_pipe_.read_handle ();
00888 }

int ACE_Select_Reactor_Notify::open ACE_Reactor_Impl ,
ACE_Timer_Queue = 0,
int  disable_notify_pipe = 0
[virtual]
 

Initialize.

Implements ACE_Reactor_Notify.

Definition at line 685 of file Select_Reactor_Base.cpp.

References ACE_NEW_RETURN, ACE_NONBLOCK, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_Timer_Queue, ACE_TRACE, ACE_OS::fcntl(), ACE_Pipe::open(), ACE_Pipe::read_handle(), ACE_Reactor_Impl::register_handler(), and ACE::set_flags().

00688 {
00689   ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00690 
00691   if (disable_notify_pipe == 0)
00692     {
00693       this->select_reactor_ =
00694         dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00695 
00696       if (select_reactor_ == 0)
00697         {
00698           errno = EINVAL;
00699           return -1;
00700         }
00701 
00702       if (this->notification_pipe_.open () == -1)
00703         return -1;
00704 #if defined (F_SETFD)
00705       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00706       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00707 #endif /* F_SETFD */
00708 
00709 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00710       ACE_Notification_Buffer *temp = 0;
00711 
00712       ACE_NEW_RETURN (temp,
00713                       ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00714                       -1);
00715 
00716       if (this->alloc_queue_.enqueue_head (temp) == -1)
00717         {
00718           delete [] temp;
00719           return -1;
00720         }
00721 
00722       for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00723         if (free_queue_.enqueue_head (temp + i) == -1)
00724           return -1;
00725 
00726 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00727 
00728       // There seems to be a Win32 bug with this...  Set this into
00729       // non-blocking mode.
00730       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00731                           ACE_NONBLOCK) == -1)
00732         return -1;
00733       else
00734         return this->select_reactor_->register_handler
00735           (this->notification_pipe_.read_handle (),
00736            this,
00737            ACE_Event_Handler::READ_MASK);
00738     }
00739   else
00740     {
00741       this->select_reactor_ = 0;
00742       return 0;
00743     }
00744 }

int ACE_Select_Reactor_Notify::purge_pending_notifications ACE_Event_Handler sh,
ACE_Reactor_Mask  mask = ACE_Event_Handler::ALL_EVENTS_MASK
[virtual]
 

Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. If eh == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error.

Implements ACE_Reactor_Notify.

Definition at line 578 of file Select_Reactor_Base.cpp.

References ACE_ASSERT, ACE_BIT_DISABLED, ACE_CLR_BITS, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_NOTSUP_RETURN, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Unbounded_Queue< T >::dequeue_head(), ACE_Notification_Buffer::eh_, ACE_Unbounded_Queue< T >::enqueue_head(), LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::remove_reference(), and ACE_Unbounded_Queue< T >::size().

00580 {
00581   ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00582 
00583 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00584 
00585   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00586 
00587   if (this->notify_queue_.is_empty ())
00588     return 0;
00589 
00590   ACE_Notification_Buffer *temp = 0;
00591   ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
00592 
00593   size_t queue_size = this->notify_queue_.size ();
00594   int number_purged = 0;
00595   size_t i;
00596   for (i = 0; i < queue_size; ++i)
00597     {
00598       if (-1 == this->notify_queue_.dequeue_head (temp))
00599         ACE_ERROR_RETURN ((LM_ERROR,
00600                            ACE_LIB_TEXT ("%p\n"),
00601                            ACE_LIB_TEXT ("dequeue_head")),
00602                           -1);
00603 
00604       // If this is not a Reactor notify (it is for a particular handler),
00605       // and it matches the specified handler (or purging all),
00606       // and applying the mask would totally eliminate the notification, then
00607       // release it and count the number purged.
00608       if ((0 != temp->eh_) &&
00609           (0 == eh || eh == temp->eh_) &&
00610           ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
00611                                                  // is left with nothing when
00612                                                  // applying the mask
00613       {
00614         if (-1 == this->free_queue_.enqueue_head (temp))
00615           ACE_ERROR_RETURN ((LM_ERROR,
00616                              ACE_LIB_TEXT ("%p\n"),
00617                              ACE_LIB_TEXT ("enqueue_head")),
00618                             -1);
00619 
00620         ACE_Event_Handler *event_handler = temp->eh_;
00621         event_handler->remove_reference ();
00622 
00623         ++number_purged;
00624       }
00625       else
00626       {
00627         // To preserve it, move it to the local_queue.
00628         // But first, if this is not a Reactor notify (it is for a particularhandler),
00629         // and it matches the specified handler (or purging all), then
00630         // apply the mask
00631         if ((0 != temp->eh_) &&
00632             (0 == eh || eh == temp->eh_))
00633           ACE_CLR_BITS(temp->mask_, mask);
00634         if (-1 == local_queue.enqueue_head (temp))
00635           return -1;
00636       }
00637     }
00638 
00639   if (this->notify_queue_.size ())
00640     { // should be empty!
00641       ACE_ASSERT (0);
00642       return -1;
00643     }
00644 
00645   // now put it back in the notify queue
00646   queue_size = local_queue.size ();
00647   for (i = 0; i < queue_size; ++i)
00648     {
00649       if (-1 == local_queue.dequeue_head (temp))
00650         ACE_ERROR_RETURN ((LM_ERROR,
00651                            ACE_LIB_TEXT ("%p\n"),
00652                            ACE_LIB_TEXT ("dequeue_head")),
00653                           -1);
00654 
00655       if (-1 == this->notify_queue_.enqueue_head (temp))
00656         ACE_ERROR_RETURN ((LM_ERROR,
00657                            ACE_LIB_TEXT ("%p\n"),
00658                            ACE_LIB_TEXT ("enqueue_head")),
00659                           -1);
00660     }
00661 
00662   return number_purged;
00663 
00664 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00665   ACE_UNUSED_ARG (eh);
00666   ACE_UNUSED_ARG (mask);
00667   ACE_NOTSUP_RETURN (-1);
00668 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00669 }

int ACE_Select_Reactor_Notify::read_notify_pipe ACE_HANDLE  handle,
ACE_Notification_Buffer buffer
[virtual]
 

Read one of the notify call on the handle into the buffer. This could be because of a thread trying to unblock the

Implements ACE_Reactor_Notify.

Definition at line 1015 of file Select_Reactor_Base.cpp.

References ACE_TRACE, EWOULDBLOCK, ACE::recv(), and ssize_t.

Referenced by handle_input().

01017 {
01018   ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
01019 
01020   ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
01021 
01022   if (n > 0)
01023     {
01024       // Check to see if we've got a short read.
01025       if (n != sizeof buffer)
01026         {
01027           ssize_t remainder = sizeof buffer - n;
01028 
01029           // If so, try to recover by reading the remainder.  If this
01030           // doesn't work we're in big trouble since the input stream
01031           // won't be aligned correctly.  I'm not sure quite what to
01032           // do at this point.  It's probably best just to return -1.
01033           if (ACE::recv (handle,
01034                          ((char *) &buffer) + n,
01035                          remainder) != remainder)
01036             return -1;
01037         }
01038 
01039 
01040       return 1;
01041     }
01042 
01043   // Return -1 if things have gone seriously  wrong.
01044   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
01045     return -1;
01046 
01047   return 0;
01048 }


Member Data Documentation

ACE_Select_Reactor_Notify::ACE_ALLOC_HOOK_DECLARE
 

Declare the dynamic allocation hooks.

Definition at line 215 of file Select_Reactor_Base.h.

int ACE_Select_Reactor_Notify::max_notify_iterations_ [protected]
 

Keeps track of the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify pipe before breaking out of its loop. By default, this is set to -1, which means "iterate until the pipe is empty."

Definition at line 239 of file Select_Reactor_Base.h.

ACE_Pipe ACE_Select_Reactor_Notify::notification_pipe_ [protected]
 

Contains the ACE_HANDLE the ACE_Select_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Select_Reactor will write to.

Definition at line 230 of file Select_Reactor_Base.h.

ACE_Select_Reactor_Impl* ACE_Select_Reactor_Notify::select_reactor_ [protected]
 

Keep a back pointer to the ACE_Select_Reactor. If this value if NULL then the ACE_Select_Reactor has been initialized with .

Definition at line 223 of file Select_Reactor_Base.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 11:28:47 2006 for ACE by doxygen 1.3.6