ACE_Dev_Poll_Reactor_Notify Class Reference

Event handler used for unblocking the ACE_Dev_Poll_Reactor from its event loop. More...

#include <Dev_Poll_Reactor.h>

Inheritance diagram for ACE_Dev_Poll_Reactor_Notify:

Inheritance graph
[legend]
Collaboration diagram for ACE_Dev_Poll_Reactor_Notify:

Collaboration graph
[legend]
List of all members.

Initialization and Termination Methods

Methods called when initializing and terminating this event handler.

virtual int open (ACE_Reactor_Impl *, ACE_Timer_Queue *timer_queue=0, int disable_notify=0)
virtual int close (void)
virtual int notify (ACE_Event_Handler *eh=0, ACE_Reactor_Mask mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask)
virtual ACE_HANDLE notify_handle (void)
virtual int is_dispatchable (ACE_Notification_Buffer &buffer)
 Verify whether the buffer has dispatchable info or not.

virtual int dispatch_notify (ACE_Notification_Buffer &buffer)
virtual int read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer)
virtual int handle_input (ACE_HANDLE handle)
virtual void max_notify_iterations (int)
virtual int max_notify_iterations (void)
virtual int purge_pending_notifications (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::ALL_EVENTS_MASK)
virtual void dump (void) const
 Dump the state of an object.

ACE_Dev_Poll_Reactordp_reactor_
ACE_Pipe notification_pipe_
int max_notify_iterations_

Public Member Functions

 ACE_Dev_Poll_Reactor_Notify (void)
 Constructor.


Detailed Description

Event handler used for unblocking the ACE_Dev_Poll_Reactor from its event loop.

This event handler is used internally by the ACE_Dev_Poll_Reactor as a means to allow a thread other then the one running the event loop to unblock the event loop.

Definition at line 166 of file Dev_Poll_Reactor.h.


Constructor & Destructor Documentation

ACE_BEGIN_VERSIONED_NAMESPACE_DECL ACE_Dev_Poll_Reactor_Notify::ACE_Dev_Poll_Reactor_Notify void   ) 
 

Constructor.

Definition at line 45 of file Dev_Poll_Reactor.cpp.

00046   : dp_reactor_ (0)
00047   , notification_pipe_ ()
00048   , max_notify_iterations_ (-1)
00049 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00050   , alloc_queue_ ()
00051   , notify_queue_ ()
00052   , free_queue_ ()
00053   , notify_queue_lock_ ()
00054 #endif  /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00055 {
00056 }


Member Function Documentation

int ACE_Dev_Poll_Reactor_Notify::close void   )  [virtual]
 

Implements ACE_Reactor_Notify.

Definition at line 115 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, ACE_Unbounded_Queue_Iterator< T >::advance(), ACE_Pipe::close(), ACE_Unbounded_Queue_Iterator< T >::next(), and notification_pipe_.

00116 {
00117   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close");
00118 
00119 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00120   // Free up the dynamically allocated resources.
00121   ACE_Notification_Buffer **b;
00122 
00123   for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
00124        alloc_iter.next (b) != 0;
00125        alloc_iter.advance ())
00126     {
00127       delete [] *b;
00128       *b = 0;
00129     }
00130 
00131   this->alloc_queue_.reset ();
00132   this->notify_queue_.reset ();
00133   this->free_queue_.reset ();
00134 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00135 
00136   return this->notification_pipe_.close ();
00137 }

int ACE_Dev_Poll_Reactor_Notify::dispatch_notifications int &  number_of_active_handles,
ACE_Handle_Set rd_mask
[virtual]
 

This method's interface is not very compatibile with this Reactor's design. It's not clear why this method is pure virtual either.

Implements ACE_Reactor_Notify.

Definition at line 225 of file Dev_Poll_Reactor.cpp.

References ACE_NOTSUP_RETURN, ACE_TRACE, handle_input(), notification_pipe_, and ACE_Pipe::read_handle().

00228 {
00229   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notifications");
00230 
00231   // This method is unimplemented in the ACE_Dev_Poll_Reactor.
00232   // Instead, the notification handler is invoked as part of the IO
00233   // event set.  Doing so alters the some documented semantics that
00234   // state that the notifications are handled before IO events.
00235   // Enforcing such semantics does not appear to be beneficial, and
00236   // also serves to slow down event dispatching particularly with this
00237   // ACE_Dev_Poll_Reactor.
00238 
00239 #if 0
00240   ACE_HANDLE read_handle =
00241     this->notification_pipe_.read_handle ();
00242 
00243   // Note that we do not check if the handle has received any events.
00244   // Instead a non-blocking "speculative" read is performed.  If the
00245   // read returns with errno == EWOULDBLOCK then no notifications are
00246   // dispatched.  See ACE_Dev_Poll_Reactor_Notify::read_notify_pipe()
00247   // for details.
00248   if (read_handle != ACE_INVALID_HANDLE)
00249     {
00250       --number_of_active_handles;
00251 
00252       return this->handle_input (read_handle);
00253     }
00254   else
00255     return 0;
00256 #else
00257   ACE_NOTSUP_RETURN (-1);
00258 #endif  /* 0 */
00259 }

int ACE_Dev_Poll_Reactor_Notify::dispatch_notify ACE_Notification_Buffer buffer  )  [virtual]
 

Handle one notify call represented in buffer. This could be because of a thread trying to unblock the Reactor_Impl.

Implements ACE_Reactor_Notify.

Definition at line 403 of file Dev_Poll_Reactor.cpp.

References ACE_ERROR, ACE_LIB_TEXT, ACE_TRACE, ACE_Notification_Buffer::eh_, ACE_Event_Handler::handle_close(), ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), LM_ERROR, and ACE_Notification_Buffer::mask_.

Referenced by handle_input().

00404 {
00405   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notify");
00406 
00407   // If eh == 0 then another thread is unblocking the
00408   // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's
00409   // internal structures.  Otherwise, we need to dispatch the
00410   // appropriate handle_* method on the ACE_Event_Handler
00411   // pointer we've been passed.
00412   if (buffer.eh_ != 0)
00413     {
00414       int result = 0;
00415 
00416       // Guard the handler's refcount. Recall that when the notify
00417       // was queued, the refcount was incremented, so it need not be
00418       // now. The guard insures that it is decremented properly.
00419       ACE_Dev_Poll_Handler_Guard eh_guard (buffer.eh_, false);
00420 
00421       switch (buffer.mask_)
00422         {
00423         case ACE_Event_Handler::READ_MASK:
00424         case ACE_Event_Handler::ACCEPT_MASK:
00425           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00426           break;
00427         case ACE_Event_Handler::WRITE_MASK:
00428           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00429           break;
00430         case ACE_Event_Handler::EXCEPT_MASK:
00431           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00432           break;
00433         default:
00434           // Should we bail out if we get an invalid mask?
00435           ACE_ERROR ((LM_ERROR,
00436                       ACE_LIB_TEXT ("dispatch_notify invalid mask = %d\n"),
00437                       buffer.mask_));
00438         }
00439       if (result == -1)
00440         buffer.eh_->handle_close (ACE_INVALID_HANDLE, buffer.mask_);
00441     }
00442 
00443   return 1;
00444 }

void ACE_Dev_Poll_Reactor_Notify::dump void   )  const [virtual]
 

Dump the state of an object.

Implements ACE_Reactor_Notify.

Definition at line 561 of file Dev_Poll_Reactor.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Pipe::dump(), LM_DEBUG, and notification_pipe_.

00562 {
00563 #if defined (ACE_HAS_DUMP)
00564   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dump");
00565 
00566   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00567   ACE_DEBUG ((LM_DEBUG,
00568               ACE_LIB_TEXT ("dp_reactor_ = %@"),
00569               this->dp_reactor_));
00570   this->notification_pipe_.dump ();
00571   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00572 #endif /* ACE_HAS_DUMP */
00573 }

int ACE_Dev_Poll_Reactor_Notify::handle_input ACE_HANDLE  handle  )  [virtual]
 

Called back by the ACE_Dev_Poll_Reactor when a thread wants to unblock us.

Reimplemented from ACE_Event_Handler.

Definition at line 341 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, dispatch_notify(), max_notify_iterations_, and read_notify_pipe().

Referenced by dispatch_notifications().

00342 {
00343   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::handle_input");
00344 
00345   // @@ We may end up dispatching this event handler twice:  once when
00346   //    performing the speculative read on the notification pipe
00347   //    handle, and once more when dispatching the IO events.
00348 
00349   // Precondition: this->select_reactor_.token_.current_owner () ==
00350   // ACE_Thread::self ();
00351 
00352   int number_dispatched = 0;
00353   int result = 0;
00354   ACE_Notification_Buffer buffer;
00355 
00356   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00357     {
00358       // Dispatch the buffer
00359       // NOTE: We count only if we made any dispatches ie. upcalls.
00360       if (this->dispatch_notify (buffer) > 0)
00361         ++number_dispatched;
00362 
00363       // Bail out if we've reached the <notify_threshold_>.  Note that
00364       // by default <notify_threshold_> is -1, so we'll loop until all
00365       // the available notifications have been dispatched.
00366       if (number_dispatched == this->max_notify_iterations_)
00367         break;
00368     }
00369 
00370   if (result == -1)
00371     {
00372       // Reassign number_dispatched to -1 if things have gone
00373       // seriously wrong.
00374       number_dispatched = -1;
00375     }
00376 
00377   // Enqueue ourselves into the list of waiting threads.  When we
00378   // reacquire the token we'll be off and running again with ownership
00379   // of the token.  The postcondition of this call is that
00380   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
00381   //this->select_reactor_->renew ();
00382 
00383   return number_dispatched;
00384 }

int ACE_Dev_Poll_Reactor_Notify::is_dispatchable ACE_Notification_Buffer buffer  )  [virtual]
 

Verify whether the buffer has dispatchable info or not.

Implements ACE_Reactor_Notify.

Definition at line 395 of file Dev_Poll_Reactor.cpp.

References ACE_NOTSUP_RETURN, and ACE_TRACE.

00396 {
00397   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::is_dispatchable");
00398 
00399   ACE_NOTSUP_RETURN (-1);
00400 }

int ACE_Dev_Poll_Reactor_Notify::max_notify_iterations void   )  [virtual]
 

Get the maximum number of times that the handle_input method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify queue before breaking out of its event loop.

Implements ACE_Reactor_Notify.

Definition at line 459 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, and max_notify_iterations_.

00460 {
00461   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations");
00462 
00463   return this->max_notify_iterations_;
00464 }

void ACE_Dev_Poll_Reactor_Notify::max_notify_iterations int   )  [virtual]
 

Set the maximum number of times that the handle_input method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify queue before breaking out of the event loop. By default, this is set to -1, which means "iterate until the queue is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.

Implements ACE_Reactor_Notify.

Definition at line 447 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, and max_notify_iterations_.

00448 {
00449   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations");
00450 
00451   // Must always be > 0 or < 0 to optimize the loop exit condition.
00452   if (iterations == 0)
00453     iterations = 1;
00454 
00455   this->max_notify_iterations_ = iterations;
00456 }

int ACE_Dev_Poll_Reactor_Notify::notify ACE_Event_Handler eh = 0,
ACE_Reactor_Mask  mask = ACE_Event_Handler::EXCEPT_MASK,
ACE_Time_Value timeout = 0
[virtual]
 

Called by a thread when it wants to unblock the Reactor_Impl. This wakes up the Reactor_Impl if currently blocked. Pass over both the Event_Handler and the mask to allow the caller to dictate which Event_Handler method the Reactor_Impl will invoke. The ACE_Time_Value indicates how long to block trying to notify the Reactor_Impl. If timeout == 0, the caller will block until action is possible, else will wait until the relative time specified in *timeout elapses).

Implements ACE_Reactor_Notify.

Definition at line 140 of file Dev_Poll_Reactor.cpp.

References ACE_ASSERT, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_Reactor_Mask, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_SYNCH_MUTEX, ACE_TRACE, dp_reactor_, ETIME, ACE_Dev_Poll_Handler_Guard::release(), ACE::send(), and ssize_t.

00143 {
00144   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify");
00145 
00146   // Just consider this method a "no-op" if there's no
00147   // ACE_Dev_Poll_Reactor configured.
00148   if (this->dp_reactor_ == 0)
00149     return 0;
00150 
00151   ACE_Notification_Buffer buffer (eh, mask);
00152 
00153 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00154 
00155   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00156 
00157   // Locate a free buffer in the queue. Enlarge the queue if needed.
00158   ACE_Notification_Buffer *temp = 0;
00159 
00160   if (free_queue_.dequeue_head (temp) == -1)
00161     {
00162       // Grow the queue of available buffers.
00163       ACE_Notification_Buffer *temp1;
00164 
00165       ACE_NEW_RETURN (temp1,
00166                       ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00167                       -1);
00168 
00169       if (this->alloc_queue_.enqueue_head (temp1) == -1)
00170         return -1;
00171 
00172       // Start at 1 and enqueue only
00173       // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
00174       // the first one will be used right now.
00175       for (size_t i = 1;
00176            i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
00177            ++i)
00178         this->free_queue_.enqueue_head (temp1 + i);
00179 
00180       temp = temp1;
00181     }
00182 
00183   ACE_ASSERT (temp != 0);
00184   *temp = buffer;
00185 
00186   ACE_Dev_Poll_Handler_Guard eh_guard (eh);
00187 
00188   if (notify_queue_.enqueue_tail (temp) == -1)
00189     return -1;
00190 
00191   // Now pop the pipe to force the callback for dispatching when ready. If
00192   // the send fails due to a full pipe, don't fail - assume the already-sent
00193   // pipe bytes will cause the entire notification queue to be processed.
00194   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00195                          (char *) &buffer,
00196                          1,             // Only need one byte to pop the pipe
00197                          timeout);
00198   if (n == -1 && (errno != ETIME && errno != EAGAIN))
00199     return -1;
00200 
00201   // Since the notify is queued (and maybe already delivered by now)
00202   // we can simply release the guard. The dispatch of this notification
00203   // will decrement the reference count.
00204   eh_guard.release ();
00205 
00206   return 0;
00207 #else
00208 
00209   ACE_Dev_Poll_Handler_Guard eh_guard (eh);
00210 
00211   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00212                          (char *) &buffer,
00213                          sizeof buffer,
00214                          timeout);
00215   if (n == -1)
00216     return -1;
00217 
00218   eh_guard.release ();
00219 
00220   return 0;
00221 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00222 }

ACE_HANDLE ACE_Dev_Poll_Reactor_Notify::notify_handle void   )  [virtual]
 

Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Reactor_Impl.

Implements ACE_Reactor_Notify.

Definition at line 387 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, notification_pipe_, and ACE_Pipe::read_handle().

00388 {
00389   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify_handle");
00390 
00391   return this->notification_pipe_.read_handle ();
00392 }

int ACE_Dev_Poll_Reactor_Notify::open ACE_Reactor_Impl ,
ACE_Timer_Queue timer_queue = 0,
int  disable_notify = 0
[virtual]
 

Implements ACE_Reactor_Notify.

Definition at line 59 of file Dev_Poll_Reactor.cpp.

References ACE_NEW_RETURN, ACE_NONBLOCK, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_Timer_Queue, ACE_TRACE, dp_reactor_, ACE_OS::fcntl(), notification_pipe_, ACE_Pipe::open(), and ACE::set_flags().

00062 {
00063   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::open");
00064 
00065   if (disable_notify_pipe == 0)
00066     {
00067       this->dp_reactor_ = dynamic_cast<ACE_Dev_Poll_Reactor *> (r);
00068 
00069       if (this->dp_reactor_ == 0)
00070         {
00071           errno = EINVAL;
00072           return -1;
00073         }
00074 
00075       if (this->notification_pipe_.open () == -1)
00076         return -1;
00077 
00078 #if defined (F_SETFD)
00079       // close-on-exec
00080       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00081       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00082 #endif /* F_SETFD */
00083 
00084 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00085       ACE_Notification_Buffer *temp;
00086 
00087       ACE_NEW_RETURN (temp,
00088                       ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00089                       -1);
00090 
00091       if (this->alloc_queue_.enqueue_head (temp) == -1)
00092         return -1;
00093 
00094       for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00095         if (free_queue_.enqueue_head (temp + i) == -1)
00096           return -1;
00097 
00098       if (ACE::set_flags (this->notification_pipe_.write_handle (),
00099                           ACE_NONBLOCK) == -1)
00100         return -1;
00101 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00102 
00103       // Set the read handle into non-blocking mode since we need to
00104       // perform a "speculative" read when determining if their are
00105       // notifications to dispatch.
00106       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00107                           ACE_NONBLOCK) == -1)
00108         return -1;
00109     }
00110 
00111   return 0;
00112 }

int ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications ACE_Event_Handler = 0,
ACE_Reactor_Mask  = ACE_Event_Handler::ALL_EVENTS_MASK
[virtual]
 

Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. Returns the number of notifications purged. Returns -1 on error.

Implements ACE_Reactor_Notify.

Definition at line 467 of file Dev_Poll_Reactor.cpp.

References ACE_ASSERT, ACE_BIT_DISABLED, ACE_CLR_BITS, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_NOTSUP_RETURN, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Unbounded_Queue< T >::dequeue_head(), ACE_Notification_Buffer::eh_, ACE_Unbounded_Queue< T >::enqueue_head(), LM_ERROR, ACE_Notification_Buffer::mask_, and ACE_Unbounded_Queue< T >::size().

00470 {
00471   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications");
00472 
00473 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00474 
00475   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00476 
00477   if (this->notify_queue_.is_empty ())
00478     return 0;
00479 
00480   ACE_Notification_Buffer *temp;
00481   ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
00482 
00483   size_t queue_size = this->notify_queue_.size ();
00484   int number_purged = 0;
00485   size_t i;
00486   for (i = 0; i < queue_size; ++i)
00487     {
00488       if (-1 == this->notify_queue_.dequeue_head (temp))
00489         ACE_ERROR_RETURN ((LM_ERROR,
00490                            ACE_LIB_TEXT ("%p\n"),
00491                            ACE_LIB_TEXT ("dequeue_head")),
00492                           -1);
00493 
00494       // If this is not a Reactor notify (it is for a particular
00495       // handler), and it matches the specified handler (or purging
00496       // all), and applying the mask would totally eliminate the
00497       // notification, then release it and count the number purged.
00498       if ((0 != temp->eh_) &&
00499           (0 == eh || eh == temp->eh_) &&
00500           ACE_BIT_DISABLED (temp->mask_, ~mask)) // The existing
00501                                                  // notification mask
00502                                                  // is left with
00503                                                  // nothing when
00504                                                  // applying the mask.
00505         {
00506           if (this->free_queue_.enqueue_head (temp) == -1)
00507             ACE_ERROR_RETURN ((LM_ERROR,
00508                                ACE_LIB_TEXT ("%p\n"),
00509                                ACE_LIB_TEXT ("enqueue_head")),
00510                               -1);
00511           ++number_purged;
00512         }
00513       else
00514         {
00515           // To preserve it, move it to the local_queue.
00516           // But first, if this is not a Reactor notify (it is for a
00517           // particular handler), and it matches the specified handler
00518           // (or purging all), then apply the mask.
00519           if ((0 != temp->eh_) &&
00520               (0 == eh || eh == temp->eh_))
00521             ACE_CLR_BITS(temp->mask_, mask);
00522           if (-1 == local_queue.enqueue_head (temp))
00523             return -1;
00524         }
00525     }
00526 
00527   if (this->notify_queue_.size ())
00528     {
00529       // Should be empty!
00530       ACE_ASSERT (0);
00531       return -1;
00532     }
00533 
00534   // Now put it back in the notify queue.
00535   queue_size = local_queue.size ();
00536   for (i = 0; i < queue_size; ++i)
00537     {
00538       if (-1 == local_queue.dequeue_head (temp))
00539         ACE_ERROR_RETURN ((LM_ERROR,
00540                            ACE_LIB_TEXT ("%p\n"),
00541                            ACE_LIB_TEXT ("dequeue_head")),
00542                           -1);
00543 
00544       if (-1 == this->notify_queue_.enqueue_head (temp))
00545         ACE_ERROR_RETURN ((LM_ERROR,
00546                            ACE_LIB_TEXT ("%p\n"),
00547                            ACE_LIB_TEXT ("enqueue_head")),
00548                           -1);
00549     }
00550 
00551   return number_purged;
00552 
00553 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00554   ACE_UNUSED_ARG (eh);
00555   ACE_UNUSED_ARG (mask);
00556   ACE_NOTSUP_RETURN (-1);
00557 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00558 }

int ACE_Dev_Poll_Reactor_Notify::read_notify_pipe ACE_HANDLE  handle,
ACE_Notification_Buffer buffer
[virtual]
 

Read one notify call on the handle into buffer. This could be because of a thread trying to unblock the Reactor_Impl.

Implements ACE_Reactor_Notify.

Definition at line 262 of file Dev_Poll_Reactor.cpp.

References ACE_ERROR, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_SYNCH_MUTEX, ACE_TRACE, EWOULDBLOCK, LM_ERROR, ACE::recv(), and ssize_t.

Referenced by handle_input().

00264 {
00265   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::read_notify_pipe");
00266 
00267   // This is a (non-blocking) "speculative" read, i.e., we attempt to
00268   // read even if no event was polled on the read handle.  A
00269   // speculative read is necessary since notifications must be
00270   // dispatched before IO events.  We can avoid the speculative read
00271   // by "walking" the array of pollfd structures returned from
00272   // `/dev/poll' or `/dev/epoll' but that is potentially much more
00273   // expensive than simply checking for an EWOULDBLOCK.
00274   size_t to_read;
00275   char *read_p;
00276   bool have_one = false;
00277 
00278 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00279   // For the queued case, we'll try to read one byte (since that's what
00280   // the notify() tried to put in) but we don't need it - notifications can
00281   // be queued even if the pipe fills, so there may be more notifications
00282   // queued than there are bytes in the pipe.
00283   char b;
00284   read_p = &b;
00285   to_read = 1;
00286   ACE_Notification_Buffer *temp;
00287 
00288   // New scope to release the guard before trying the recv().
00289   {
00290     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00291 
00292     if (notify_queue_.is_empty ())
00293       return 0;
00294     else if (notify_queue_.dequeue_head (temp) == -1)
00295       ACE_ERROR_RETURN ((LM_ERROR,
00296                          ACE_LIB_TEXT ("%p\n"),
00297                          ACE_LIB_TEXT ("read_notify_pipe: dequeue_head")),
00298                         -1);
00299     buffer = *temp;
00300     have_one = true;
00301     if (free_queue_.enqueue_head (temp) == -1)
00302       ACE_ERROR ((LM_ERROR,
00303                   ACE_LIB_TEXT ("%p\n"),
00304                   ACE_LIB_TEXT ("read_notify_pipe: enqueue_head")));
00305   }
00306 
00307 #else
00308   to_read = sizeof buffer;
00309   read_p = (char *)&buffer;
00310 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00311 
00312   ssize_t n = ACE::recv (handle, read_p, to_read);
00313 
00314   if (n > 0)
00315     {
00316       // Check to see if we've got a short read.
00317       if (static_cast<size_t>(n) != to_read)
00318         {
00319           size_t remainder = to_read - n;
00320 
00321           // If so, try to recover by reading the remainder.  If this
00322           // doesn't work we're in big trouble since the input stream
00323           // won't be aligned correctly.  I'm not sure quite what to
00324           // do at this point.  It's probably best just to return -1.
00325           if (ACE::recv (handle, &read_p[n], remainder) <= 0)
00326             return -1;
00327         }
00328 
00329       return 1;
00330     }
00331 
00332   // Return -1 if things have gone seriously wrong.
00333   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00334     return -1;
00335 
00336   return have_one ? 1 : 0;
00337 }


Member Data Documentation

ACE_Dev_Poll_Reactor* ACE_Dev_Poll_Reactor_Notify::dp_reactor_ [protected]
 

Keep a back pointer to the ACE_Dev_Poll_Reactor. If this value if NULL then the ACE_Dev_Poll_Reactor has been initialized with disable_notify_pipe.

Definition at line 267 of file Dev_Poll_Reactor.h.

Referenced by notify(), and open().

int ACE_Dev_Poll_Reactor_Notify::max_notify_iterations_ [protected]
 

Keeps track of the maximum number of times that the ACE_Dev_Poll_Reactor_Notify::handle_input method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty."

Definition at line 283 of file Dev_Poll_Reactor.h.

Referenced by handle_input(), and max_notify_iterations().

ACE_Pipe ACE_Dev_Poll_Reactor_Notify::notification_pipe_ [protected]
 

Contains the ACE_HANDLE the ACE_Dev_Poll_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Dev_Poll_Reactor will write to.

Definition at line 274 of file Dev_Poll_Reactor.h.

Referenced by close(), dispatch_notifications(), dump(), notify_handle(), and open().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 11:21:36 2006 for ACE by doxygen 1.3.6