Dev_Poll_Reactor.cpp

Go to the documentation of this file.
00001 // $Id: Dev_Poll_Reactor.cpp 81315 2008-04-10 07:14:15Z johnnyw $
00002 
00003 #include "ace/OS_NS_errno.h"
00004 #include "ace/Dev_Poll_Reactor.h"
00005 #include "ace/Signal.h"
00006 #include "ace/Sig_Handler.h"
00007 
00008 ACE_RCSID (ace,
00009            Dev_Poll_Reactor,
00010            "$Id: Dev_Poll_Reactor.cpp 81315 2008-04-10 07:14:15Z johnnyw $")
00011 
00012 #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
00013 
00014 # include "ace/OS_NS_unistd.h"
00015 # include "ace/OS_NS_fcntl.h"
00016 # include "ace/OS_NS_stropts.h"
00017 
00018 # if defined (ACE_HAS_EVENT_POLL) && defined (linux)
00019 #  include /**/ <sys/epoll.h>
00020 # elif defined (ACE_HAS_DEV_POLL)
00021 #    if defined (linux)
00022 #      include /**/ <linux/devpoll.h>
00023 #    elif defined (HPUX_VERS) && HPUX_VERS < 1123
00024 #      include /**/ <devpoll.h>
00025 #    else
00026 #      include /**/ <sys/devpoll.h>
00027 #    endif  /* linux */
00028 # endif  /* ACE_HAS_DEV_POLL */
00029 
00030 #if !defined (__ACE_INLINE__)
00031 # include "ace/Dev_Poll_Reactor.inl"
00032 #endif /* __ACE_INLINE__ */
00033 
00034 
00035 #include "ace/Handle_Set.h"
00036 #include "ace/Reactor.h"
00037 #include "ace/Timer_Heap.h"
00038 #include "ace/Timer_Queue.h"
00039 #include "ace/ACE.h"
00040 #include "ace/Reverse_Lock_T.h"
00041 #include "ace/Recursive_Thread_Mutex.h"
00042 #include "ace/Null_Mutex.h"
00043 #include "ace/os_include/os_poll.h"
00044 #include "ace/OS_NS_sys_mman.h"
00045 #include "ace/Guard_T.h"
00046 #include "ace/OS_NS_string.h"
00047 #include "ace/OS_NS_sys_time.h"
00048 
00049 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00050 
00051 ACE_Dev_Poll_Reactor_Notify::ACE_Dev_Poll_Reactor_Notify (void)
00052   : dp_reactor_ (0)
00053   , notification_pipe_ ()
00054   , max_notify_iterations_ (-1)
00055 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00056   , notification_queue_ ()
00057 #endif  /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00058 {
00059 }
00060 
00061 int
00062 ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r,
00063                                    ACE_Timer_Queue * /* timer_queue */,
00064                                    int disable_notify_pipe)
00065 {
00066   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::open");
00067 
00068   if (disable_notify_pipe == 0)
00069     {
00070       this->dp_reactor_ = dynamic_cast<ACE_Dev_Poll_Reactor *> (r);
00071 
00072       if (this->dp_reactor_ == 0)
00073         {
00074           errno = EINVAL;
00075           return -1;
00076         }
00077 
00078       if (this->notification_pipe_.open () == -1)
00079         return -1;
00080 
00081 #if defined (F_SETFD)
00082       // close-on-exec
00083       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00084       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00085 #endif /* F_SETFD */
00086 
00087 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00088       if (notification_queue_.open () == -1)
00089         {
00090           return -1;
00091         }
00092 
00093       if (ACE::set_flags (this->notification_pipe_.write_handle (),
00094                           ACE_NONBLOCK) == -1)
00095         return -1;
00096 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00097 
00098       // Set the read handle into non-blocking mode since we need to
00099       // perform a "speculative" read when determining if there are
00100       // notifications to dispatch.
00101       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00102                           ACE_NONBLOCK) == -1)
00103         return -1;
00104     }
00105 
00106   return 0;
00107 }
00108 
00109 int
00110 ACE_Dev_Poll_Reactor_Notify::close (void)
00111 {
00112   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close");
00113 
00114 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00115   notification_queue_.reset ();
00116 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00117 
00118   return this->notification_pipe_.close ();
00119 }
00120 
00121 int
00122 ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh,
00123                                      ACE_Reactor_Mask mask,
00124                                      ACE_Time_Value *timeout)
00125 {
00126   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify");
00127 
00128   // Just consider this method a "no-op" if there's no
00129   // ACE_Dev_Poll_Reactor configured.
00130   if (this->dp_reactor_ == 0)
00131     return 0;
00132 
00133   ACE_Notification_Buffer buffer (eh, mask);
00134 
00135 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00136   ACE_UNUSED_ARG (timeout);
00137   ACE_Dev_Poll_Handler_Guard eh_guard (eh);
00138 
00139   int notification_required =
00140     notification_queue_.push_new_notification (buffer);
00141 
00142   if (notification_required == -1)
00143     return -1;             // Also decrement eh's reference count
00144 
00145   // The notification has been queued, so it will be delivered at some
00146   // point (and may have been already); release the refcnt guard.
00147   eh_guard.release ();
00148 
00149   if (notification_required == 0)
00150     return 0;
00151 
00152   // Now pop the pipe to force the callback for dispatching when ready. If
00153   // the send fails due to a full pipe, don't fail - assume the already-sent
00154   // pipe bytes will cause the entire notification queue to be processed.
00155   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00156                          (char *) &buffer,
00157                          1,             // Only need one byte to pop the pipe
00158                          &ACE_Time_Value::zero);
00159   if (n == -1 && (errno != ETIME && errno != EAGAIN))
00160     return -1;
00161 
00162   return 0;
00163 #else
00164 
00165   ACE_Dev_Poll_Handler_Guard eh_guard (eh);
00166 
00167   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00168                          (char *) &buffer,
00169                          sizeof buffer,
00170                          timeout);
00171   if (n == -1)
00172     return -1;
00173 
00174   eh_guard.release ();
00175 
00176   return 0;
00177 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00178 }
00179 
00180 int
00181 ACE_Dev_Poll_Reactor_Notify::dispatch_notifications (
00182   int & /* number_of_active_handles */,
00183   ACE_Handle_Set & /* rd_mask */)
00184 {
00185   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notifications");
00186 
00187   // This method is unimplemented in the ACE_Dev_Poll_Reactor.
00188   // Instead, the notification handler is invoked as part of the IO
00189   // event set.  Doing so alters the some documented semantics that
00190   // state that the notifications are handled before IO events.
00191   // Enforcing such semantics does not appear to be beneficial, and
00192   // also serves to slow down event dispatching particularly with this
00193   // ACE_Dev_Poll_Reactor.
00194 
00195   ACE_NOTSUP_RETURN (-1);
00196 }
00197 
00198 int
00199 ACE_Dev_Poll_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
00200                                                ACE_Notification_Buffer &buffer)
00201 {
00202   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::read_notify_pipe");
00203 
00204   // This is a (non-blocking) "speculative" read, i.e., we attempt to
00205   // read even if no event was polled on the read handle.  A
00206   // speculative read is necessary since notifications must be
00207   // dispatched before IO events.  We can avoid the speculative read
00208   // by "walking" the array of pollfd structures returned from
00209   // `/dev/poll' or `/dev/epoll' but that is potentially much more
00210   // expensive than simply checking for an EWOULDBLOCK.
00211   size_t to_read;
00212   char *read_p;
00213   bool have_one = false;
00214 
00215 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00216   // For the queued case, we'll try to read one byte (since that's what
00217   // the notify () tried to put in) but we don't need it - notifications can
00218   // be queued even if the pipe fills, so there may be more notifications
00219   // queued than there are bytes in the pipe.
00220   char b;
00221   read_p = &b;
00222   to_read = 1;
00223 
00224   // Before reading the byte, pop a message from the queue and queue a
00225   // new message unless the queue is now empty.  The protocol is to
00226   // keep a byte in the pipe as long as the queue is not empty.
00227   bool more_messages_queued = false;
00228   ACE_Notification_Buffer next;
00229 
00230   int result = notification_queue_.pop_next_notification (buffer,
00231                                                           more_messages_queued,
00232                                                           next);
00233 
00234   if (result == 0)
00235     {
00236       // remove the notification byte from the pipe, avoiding notification loop
00237       ACE::recv (handle, read_p, to_read);
00238       return 0;
00239     }
00240 
00241   if (result == -1)
00242     return -1;
00243 
00244   if (more_messages_queued)
00245     (void) ACE::send (this->notification_pipe_.write_handle (),
00246                       (char *)&next,
00247                       1 /* one byte is enough */,
00248                       &ACE_Time_Value::zero);
00249 #else
00250   to_read = sizeof buffer;
00251   read_p = (char *)&buffer;
00252 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00253 
00254   ssize_t n = ACE::recv (handle, read_p, to_read);
00255 
00256   if (n > 0)
00257     {
00258       // Check to see if we've got a short read.
00259       if (static_cast<size_t> (n) != to_read)
00260         {
00261           size_t remainder = to_read - n;
00262 
00263           // If so, try to recover by reading the remainder.  If this
00264           // doesn't work we're in big trouble since the input stream
00265           // won't be aligned correctly.  I'm not sure quite what to
00266           // do at this point.  It's probably best just to return -1.
00267           if (ACE::recv (handle, &read_p[n], remainder) <= 0)
00268             return -1;
00269         }
00270 
00271       return 1;
00272     }
00273 
00274   // Return -1 if things have gone seriously wrong.
00275   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00276     return -1;
00277 
00278   return have_one ? 1 : 0;
00279 }
00280 
00281 
00282 int
00283 ACE_Dev_Poll_Reactor_Notify::handle_input (ACE_HANDLE handle)
00284 {
00285   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::handle_input");
00286 
00287   // @@ We may end up dispatching this event handler twice:  once when
00288   //    performing the speculative read on the notification pipe
00289   //    handle, and once more when dispatching the IO events.
00290 
00291   // Precondition: this->select_reactor_.token_.current_owner () ==
00292   // ACE_Thread::self ();
00293 
00294   int number_dispatched = 0;
00295   int result = 0;
00296   ACE_Notification_Buffer buffer;
00297 
00298   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00299     {
00300       // Dispatch the buffer
00301       // NOTE: We count only if we made any dispatches ie. upcalls.
00302       if (this->dispatch_notify (buffer) > 0)
00303         ++number_dispatched;
00304 
00305       // Bail out if we've reached the <notify_threshold_>.  Note that
00306       // by default <notify_threshold_> is -1, so we'll loop until all
00307       // the available notifications have been dispatched.
00308       if (number_dispatched == this->max_notify_iterations_)
00309         break;
00310     }
00311 
00312   if (result == -1)
00313     {
00314       // Reassign number_dispatched to -1 if things have gone
00315       // seriously wrong.
00316       number_dispatched = -1;
00317     }
00318 
00319   // Enqueue ourselves into the list of waiting threads.  When we
00320   // reacquire the token we'll be off and running again with ownership
00321   // of the token.  The postcondition of this call is that
00322   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
00323   //this->select_reactor_->renew ();
00324 
00325   return number_dispatched;
00326 }
00327 
00328 ACE_HANDLE
00329 ACE_Dev_Poll_Reactor_Notify::notify_handle (void)
00330 {
00331   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify_handle");
00332 
00333   return this->notification_pipe_.read_handle ();
00334 }
00335 
00336 int
00337 ACE_Dev_Poll_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &)
00338 {
00339   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::is_dispatchable");
00340 
00341   ACE_NOTSUP_RETURN (-1);
00342 }
00343 
00344 int
00345 ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00346 {
00347   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notify");
00348 
00349   // If eh == 0 then another thread is unblocking the
00350   // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's
00351   // internal structures.  Otherwise, we need to dispatch the
00352   // appropriate handle_* method on the ACE_Event_Handler
00353   // pointer we've been passed.
00354   if (buffer.eh_ != 0)
00355     {
00356       int result = 0;
00357 
00358       // Guard the handler's refcount. Recall that when the notify
00359       // was queued, the refcount was incremented, so it need not be
00360       // now. The guard insures that it is decremented properly.
00361       ACE_Dev_Poll_Handler_Guard eh_guard (buffer.eh_, false);
00362 
00363       switch (buffer.mask_)
00364         {
00365         case ACE_Event_Handler::READ_MASK:
00366         case ACE_Event_Handler::ACCEPT_MASK:
00367           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00368           break;
00369         case ACE_Event_Handler::WRITE_MASK:
00370           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00371           break;
00372         case ACE_Event_Handler::EXCEPT_MASK:
00373           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00374           break;
00375         default:
00376           // Should we bail out if we get an invalid mask?
00377           ACE_ERROR ((LM_ERROR,
00378                       ACE_TEXT ("dispatch_notify invalid mask = %d\n"),
00379                       buffer.mask_));
00380         }
00381       if (result == -1)
00382         buffer.eh_->handle_close (ACE_INVALID_HANDLE, buffer.mask_);
00383     }
00384 
00385   return 1;
00386 }
00387 
00388 void
00389 ACE_Dev_Poll_Reactor_Notify::max_notify_iterations (int iterations)
00390 {
00391   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations");
00392 
00393   // Must always be > 0 or < 0 to optimize the loop exit condition.
00394   if (iterations == 0)
00395     iterations = 1;
00396 
00397   this->max_notify_iterations_ = iterations;
00398 }
00399 
00400 int
00401 ACE_Dev_Poll_Reactor_Notify::max_notify_iterations (void)
00402 {
00403   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations");
00404 
00405   return this->max_notify_iterations_;
00406 }
00407 
00408 int
00409 ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications (
00410   ACE_Event_Handler *eh,
00411   ACE_Reactor_Mask mask)
00412 {
00413   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications");
00414 
00415 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00416 
00417   return notification_queue_.purge_pending_notifications (eh, mask);
00418 
00419 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00420   ACE_UNUSED_ARG (eh);
00421   ACE_UNUSED_ARG (mask);
00422   ACE_NOTSUP_RETURN (-1);
00423 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00424 }
00425 
00426 void
00427 ACE_Dev_Poll_Reactor_Notify::dump (void) const
00428 {
00429 #if defined (ACE_HAS_DUMP)
00430   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dump");
00431 
00432   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00433   ACE_DEBUG ((LM_DEBUG,
00434               ACE_TEXT ("dp_reactor_ = %@"),
00435               this->dp_reactor_));
00436   this->notification_pipe_.dump ();
00437   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00438 #endif /* ACE_HAS_DUMP */
00439 }
00440 
00441 // -----------------------------------------------------------------
00442 
00443 ACE_Dev_Poll_Reactor_Handler_Repository::
00444 ACE_Dev_Poll_Reactor_Handler_Repository (void)
00445   : max_size_ (0),
00446     handlers_ (0)
00447 {
00448   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::ACE_Dev_Poll_Reactor_Handler_Repository");
00449 }
00450 
00451 int
00452 ACE_Dev_Poll_Reactor_Handler_Repository::invalid_handle (
00453   ACE_HANDLE handle) const
00454 {
00455   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::invalid_handle");
00456 
00457   if (handle < 0 || handle >= this->max_size_)
00458     {
00459       errno = EINVAL;
00460       return 1;
00461     }
00462   else
00463     return 0;
00464 }
00465 
00466 int
00467 ACE_Dev_Poll_Reactor_Handler_Repository::handle_in_range (
00468   ACE_HANDLE handle) const
00469 {
00470   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::handle_in_range");
00471 
00472   if (handle >= 0 && handle < this->max_size_)
00473     return 1;
00474   else
00475     {
00476       errno = EINVAL;
00477       return 0;
00478     }
00479 }
00480 
00481 int
00482 ACE_Dev_Poll_Reactor_Handler_Repository::open (size_t size)
00483 {
00484   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::open");
00485 
00486   this->max_size_ = size;
00487 
00488   // Try to allocate the memory.
00489   ACE_NEW_RETURN (this->handlers_,
00490                   ACE_Dev_Poll_Event_Tuple[size],
00491                   -1);
00492 
00493   // Try to increase the number of handles if <size> is greater than
00494   // the current limit.
00495   return ACE::set_handle_limit (size);
00496 }
00497 
00498 int
00499 ACE_Dev_Poll_Reactor_Handler_Repository::unbind_all (void)
00500 {
00501   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::unbind_all");
00502 
00503   // Unbind all of the event handlers.
00504   for (int handle = 0;
00505        handle < this->max_size_;
00506        ++handle)
00507     this->unbind (handle);
00508 
00509   return 0;
00510 }
00511 
00512 int
00513 ACE_Dev_Poll_Reactor_Handler_Repository::close (void)
00514 {
00515   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::close");
00516 
00517   if (this->handlers_ != 0)
00518     {
00519       this->unbind_all ();
00520 
00521       delete [] this->handlers_;
00522       this->handlers_ = 0;
00523     }
00524 
00525   return 0;
00526 }
00527 
00528 ACE_Event_Handler *
00529 ACE_Dev_Poll_Reactor_Handler_Repository::find (ACE_HANDLE handle,
00530                                                size_t *index_p)
00531 {
00532   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::find");
00533 
00534   ACE_Event_Handler *eh = 0;
00535 
00536   // Only bother to search for the <handle> if it's in range.
00537   if (this->handle_in_range (handle))
00538     {
00539       eh = this->handlers_[handle].event_handler;
00540       if (eh != 0)
00541         {
00542           if (index_p != 0)
00543             *index_p = handle;
00544         }
00545       else
00546         errno = ENOENT;
00547     }
00548 
00549   return eh;
00550 }
00551 
00552 int
00553 ACE_Dev_Poll_Reactor_Handler_Repository::bind (
00554   ACE_HANDLE handle,
00555   ACE_Event_Handler *event_handler,
00556   ACE_Reactor_Mask mask)
00557 {
00558   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::bind");
00559 
00560   if (event_handler == 0)
00561     return -1;
00562 
00563   if (handle == ACE_INVALID_HANDLE)
00564     handle = event_handler->get_handle ();
00565 
00566   if (this->invalid_handle (handle))
00567     return -1;
00568 
00569   this->handlers_[handle].event_handler = event_handler;
00570   this->handlers_[handle].mask = mask;
00571   event_handler->add_reference ();
00572 
00573   return 0;
00574 }
00575 
00576 int
00577 ACE_Dev_Poll_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
00578                                                  bool decr_refcnt)
00579 {
00580   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::unbind");
00581 
00582   if (this->find (handle) == 0)
00583     return -1;
00584 
00585   if (decr_refcnt)
00586     this->handlers_[handle].event_handler->remove_reference ();
00587   this->handlers_[handle].event_handler = 0;
00588   this->handlers_[handle].mask = ACE_Event_Handler::NULL_MASK;
00589   this->handlers_[handle].suspended = 0;
00590 
00591   return 0;
00592 }
00593 
00594 // -----------------------------------------------------------------
00595 
00596 ACE_Dev_Poll_Reactor::ACE_Dev_Poll_Reactor (ACE_Sig_Handler *sh,
00597                                             ACE_Timer_Queue *tq,
00598                                             int disable_notify_pipe,
00599                                             ACE_Reactor_Notify *notify,
00600                                             int mask_signals,
00601                                             int s_queue)
00602   : initialized_ (false)
00603   , poll_fd_ (ACE_INVALID_HANDLE)
00604   , size_ (0)
00605   // , ready_set_ ()
00606 #if defined (ACE_HAS_EVENT_POLL)
00607   , events_ (0)
00608   , start_pevents_ (0)
00609   , end_pevents_ (0)
00610 #else
00611   , dp_fds_ (0)
00612   , start_pfds_ (0)
00613   , end_pfds_ (0)
00614 #endif  /* ACE_HAS_EVENT_POLL */
00615   , deactivated_ (0)
00616   , token_ (*this, s_queue)
00617   , lock_adapter_ (token_)
00618   , timer_queue_ (0)
00619   , delete_timer_queue_ (false)
00620   , signal_handler_ (0)
00621   , delete_signal_handler_ (false)
00622   , notify_handler_ (0)
00623   , delete_notify_handler_ (false)
00624   , mask_signals_ (mask_signals)
00625   , restart_ (0)
00626 {
00627   ACE_TRACE ("ACE_Dev_Poll_Reactor::ACE_Dev_Poll_Reactor");
00628 
00629   if (this->open (ACE::max_handles (),
00630                   0,
00631                   sh,
00632                   tq,
00633                   disable_notify_pipe,
00634                   notify) == -1)
00635     ACE_ERROR ((LM_ERROR,
00636                 ACE_TEXT ("%p\n"),
00637                 ACE_TEXT ("ACE_Dev_Poll_Reactor::open ")
00638                 ACE_TEXT ("failed inside ")
00639                 ACE_TEXT ("ACE_Dev_Poll_Reactor::CTOR")));
00640 }
00641 
00642 ACE_Dev_Poll_Reactor::ACE_Dev_Poll_Reactor (size_t size,
00643                                             int rs,
00644                                             ACE_Sig_Handler *sh,
00645                                             ACE_Timer_Queue *tq,
00646                                             int disable_notify_pipe,
00647                                             ACE_Reactor_Notify *notify,
00648                                             int mask_signals,
00649                                             int s_queue)
00650   : initialized_ (false)
00651   , poll_fd_ (ACE_INVALID_HANDLE)
00652   , size_ (0)
00653   // , ready_set_ ()
00654 #if defined (ACE_HAS_EVENT_POLL)
00655   , events_ (0)
00656   , start_pevents_ (0)
00657   , end_pevents_ (0)
00658 #else
00659   , dp_fds_ (0)
00660   , start_pfds_ (0)
00661   , end_pfds_ (0)
00662 #endif  /* ACE_HAS_EVENT_POLL */
00663   , deactivated_ (0)
00664   , token_ (*this, s_queue)
00665   , lock_adapter_ (token_)
00666   , timer_queue_ (0)
00667   , delete_timer_queue_ (false)
00668   , signal_handler_ (0)
00669   , delete_signal_handler_ (false)
00670   , notify_handler_ (0)
00671   , delete_notify_handler_ (false)
00672   , mask_signals_ (mask_signals)
00673   , restart_ (0)
00674 {
00675   if (this->open (size,
00676                   rs,
00677                   sh,
00678                   tq,
00679                   disable_notify_pipe,
00680                   notify) == -1)
00681     ACE_ERROR ((LM_ERROR,
00682                 ACE_TEXT ("%p\n"),
00683                 ACE_TEXT ("ACE_Dev_Poll_Reactor::open ")
00684                 ACE_TEXT ("failed inside ACE_Dev_Poll_Reactor::CTOR")));
00685 }
00686 
00687 ACE_Dev_Poll_Reactor::~ACE_Dev_Poll_Reactor (void)
00688 {
00689   ACE_TRACE ("ACE_Dev_Poll_Reactor::~ACE_Dev_Poll_Reactor");
00690 
00691  (void) this->close ();
00692 }
00693 
00694 int
00695 ACE_Dev_Poll_Reactor::open (size_t size,
00696                             int restart,
00697                             ACE_Sig_Handler *sh,
00698                             ACE_Timer_Queue *tq,
00699                             int disable_notify_pipe,
00700                             ACE_Reactor_Notify *notify)
00701 {
00702   ACE_TRACE ("ACE_Dev_Poll_Reactor::open");
00703 
00704   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
00705 
00706   // Can't initialize ourselves more than once.
00707   if (this->initialized_)
00708     return -1;
00709 
00710   this->restart_ = restart;
00711   this->signal_handler_ = sh;
00712   this->timer_queue_ = tq;
00713   this->notify_handler_ = notify;
00714 
00715   int result = 0;
00716 
00717   // Allows the signal handler to be overridden.
00718   if (this->signal_handler_ == 0)
00719     {
00720       ACE_NEW_RETURN (this->signal_handler_,
00721                       ACE_Sig_Handler,
00722                       -1);
00723 
00724       if (this->signal_handler_ == 0)
00725         result = -1;
00726       else
00727         this->delete_signal_handler_ = true;
00728     }
00729 
00730   // Allows the timer queue to be overridden.
00731   if (result != -1 && this->timer_queue_ == 0)
00732     {
00733       ACE_NEW_RETURN (this->timer_queue_,
00734                       ACE_Timer_Heap,
00735                       -1);
00736 
00737       if (this->timer_queue_ == 0)
00738         result = -1;
00739       else
00740         this->delete_timer_queue_ = true;
00741     }
00742 
00743   // Allows the Notify_Handler to be overridden.
00744   if (result != -1 && this->notify_handler_ == 0)
00745     {
00746       ACE_NEW_RETURN (this->notify_handler_,
00747                       ACE_Dev_Poll_Reactor_Notify,
00748                       -1);
00749 
00750       if (this->notify_handler_ == 0)
00751         result = -1;
00752       else
00753         this->delete_notify_handler_ = true;
00754     }
00755 
00756 #if defined (ACE_HAS_EVENT_POLL)
00757 
00758   // Allocating event table:
00759   ACE_NEW_RETURN (this->events_, epoll_event[size], -1);
00760 
00761   // Initializing epoll:
00762   this->poll_fd_ = ::epoll_create (size);
00763   if (this->poll_fd_ == -1)
00764     result = -1;
00765 
00766 #else
00767 
00768   // Allocate the array before opening the device to avoid a potential
00769   // resource leak if allocation fails.
00770   ACE_NEW_RETURN (this->dp_fds_,
00771                   pollfd[size],
00772                   -1);
00773 
00774   // Open the `/dev/poll' character device.
00775   this->poll_fd_ = ACE_OS::open ("/dev/poll", O_RDWR);
00776   if (this->poll_fd_ == ACE_INVALID_HANDLE)
00777     result = -1;
00778 
00779 #endif  /* ACE_HAS_EVENT_POLL */
00780 
00781   if (result != -1 && this->handler_rep_.open (size) == -1)
00782     result = -1;
00783 
00784   // Registration of the notification handler must be done after the
00785   // /dev/poll device has been fully initialized.
00786   else if (this->notify_handler_->open (this,
00787                                         0,
00788                                         disable_notify_pipe) == -1
00789            || (disable_notify_pipe == 0
00790                && this->register_handler_i (
00791                                             this->notify_handler_->notify_handle (),
00792                                             this->notify_handler_,
00793                                             ACE_Event_Handler::READ_MASK) == -1))
00794     result = -1;
00795 
00796   this->size_ = size;
00797 
00798   if (result != -1)
00799     // We're all set to go.
00800     this->initialized_ = true;
00801   else
00802     // This will close down all the allocated resources properly.
00803  (void) this->close ();
00804 
00805   return result;
00806 }
00807 
00808 int
00809 ACE_Dev_Poll_Reactor::current_info (ACE_HANDLE, size_t & /* size */)
00810 {
00811   ACE_NOTSUP_RETURN (-1);
00812 }
00813 
00814 
00815 int
00816 ACE_Dev_Poll_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
00817 {
00818   if (this->delete_signal_handler_)
00819     delete this->signal_handler_;
00820 
00821   this->signal_handler_ = signal_handler;
00822   this->delete_signal_handler_ = false;
00823 
00824   return 0;
00825 }
00826 
00827 int
00828 ACE_Dev_Poll_Reactor::timer_queue (ACE_Timer_Queue *tq)
00829 {
00830   if (this->delete_timer_queue_)
00831     delete this->timer_queue_;
00832 
00833   this->timer_queue_ = tq;
00834   this->delete_timer_queue_ = false;
00835 
00836   return 0;
00837 
00838 }
00839 
00840 ACE_Timer_Queue *
00841 ACE_Dev_Poll_Reactor::timer_queue (void) const
00842 {
00843   return this->timer_queue_;
00844 }
00845 
00846 int
00847 ACE_Dev_Poll_Reactor::close (void)
00848 {
00849   ACE_TRACE ("ACE_Dev_Poll_Reactor::close");
00850 
00851   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
00852 
00853   int result = 0;
00854 
00855   if (this->poll_fd_ != ACE_INVALID_HANDLE)
00856     {
00857       result = ACE_OS::close (this->poll_fd_);
00858     }
00859 
00860 #if defined (ACE_HAS_EVENT_POLL)
00861 
00862   delete [] this->events_;
00863   this->events_ = 0;
00864 
00865 #else
00866 
00867   delete [] this->dp_fds_;
00868   this->dp_fds_ = 0;
00869 
00870 #endif  /* ACE_HAS_EVENT_POLL */
00871 
00872   if (this->delete_signal_handler_)
00873     {
00874       delete this->signal_handler_;
00875       this->signal_handler_ = 0;
00876       this->delete_signal_handler_ = false;
00877     }
00878 
00879  (void) this->handler_rep_.close ();
00880 
00881   if (this->delete_timer_queue_)
00882     {
00883       delete this->timer_queue_;
00884       this->timer_queue_ = 0;
00885       this->delete_timer_queue_ = false;
00886     }
00887 
00888   if (this->notify_handler_ != 0)
00889     this->notify_handler_->close ();
00890 
00891   if (this->delete_notify_handler_)
00892     {
00893       delete this->notify_handler_;
00894       this->notify_handler_ = 0;
00895       this->delete_notify_handler_ = false;
00896     }
00897 
00898   this->poll_fd_ = ACE_INVALID_HANDLE;
00899 
00900 #if defined (ACE_HAS_EVENT_POLL)
00901   this->start_pevents_ = 0;
00902   this->end_pevents_   = 0;
00903 #else
00904   this->start_pfds_ = 0;
00905   this->end_pfds_ = 0;
00906 #endif /* ACE_HAS_EVENT_POLL */
00907 
00908   this->initialized_ = false;
00909 
00910   return result;
00911 }
00912 
00913 int
00914 ACE_Dev_Poll_Reactor::work_pending (const ACE_Time_Value & max_wait_time)
00915 {
00916   ACE_TRACE ("ACE_Dev_Poll_Reactor::work_pending");
00917 
00918   // Stash the current time
00919   //
00920   // The destructor of this object will automatically compute how much
00921   // time elapsed since this method was called.
00922   ACE_Time_Value mwt (max_wait_time);
00923   ACE_MT (ACE_Countdown_Time countdown (&mwt));
00924 
00925   Token_Guard guard (this->token_);
00926   int const result = guard.acquire_quietly (&mwt);
00927 
00928   // If the guard is NOT the owner just return the retval
00929   if (!guard.is_owner ())
00930     return result;
00931 
00932   // Update the countdown to reflect time waiting for the mutex.
00933   ACE_MT (countdown.update ());
00934 
00935   return this->work_pending_i (&mwt);
00936 }
00937 
00938 int
00939 ACE_Dev_Poll_Reactor::work_pending_i (ACE_Time_Value * max_wait_time)
00940 {
00941   ACE_TRACE ("ACE_Dev_Poll_Reactor::work_pending_i");
00942 
00943   if (this->deactivated_)
00944     return 0;
00945 
00946 #if defined (ACE_HAS_EVENT_POLL)
00947   if (this->start_pevents_ != this->end_pevents_)
00948 #else
00949   if (this->start_pfds_ != this->end_pfds_)
00950 #endif /* ACE_HAS_EVENT_POLL */
00951     return 1;  // We still have work_pending (). Do not poll for
00952                // additional events.
00953 
00954   ACE_Time_Value timer_buf (0);
00955   ACE_Time_Value *this_timeout =
00956     this->timer_queue_->calculate_timeout (max_wait_time, &timer_buf);
00957 
00958   // Check if we have timers to fire.
00959   int const timers_pending =
00960  ((this_timeout != 0 && max_wait_time == 0)
00961      || (this_timeout != 0 && max_wait_time != 0
00962          && *this_timeout != *max_wait_time) ? 1 : 0);
00963 
00964   long const timeout =
00965  (this_timeout == 0
00966      ? -1 /* Infinity */
00967      : static_cast<long> (this_timeout->msec ()));
00968 
00969 #if defined (ACE_HAS_EVENT_POLL)
00970 
00971    // Wait for events.
00972    int const nfds = ::epoll_wait (this->poll_fd_,
00973                                   this->events_,
00974                                   this->size_,
00975                                   static_cast<int> (timeout));
00976 
00977   if (nfds > 0)
00978     {
00979       this->start_pevents_ = this->events_;
00980       this->end_pevents_ = this->start_pevents_ + nfds;
00981     }
00982 
00983 #else
00984 
00985   struct dvpoll dvp;
00986 
00987   dvp.dp_fds = this->dp_fds_;
00988   dvp.dp_nfds = this->size_;
00989   dvp.dp_timeout = timeout;  // Milliseconds
00990 
00991   // Poll for events
00992   int const nfds = ACE_OS::ioctl (this->poll_fd_, DP_POLL, &dvp);
00993 
00994   // Retrieve the results from the pollfd array.
00995   this->start_pfds_ = dvp.dp_fds;
00996 
00997   // If nfds == 0 then end_pfds_ == start_pfds_ meaning that there is
00998   // no work pending.  If nfds > 0 then there is work pending.
00999   // Otherwise an error occurred.
01000   if (nfds > -1)
01001     this->end_pfds_ = this->start_pfds_ + nfds;
01002 #endif  /* ACE_HAS_EVENT_POLL */
01003 
01004   // If timers are pending, override any timeout from the poll.
01005   return (nfds == 0 && timers_pending != 0 ? 1 : nfds);
01006 }
01007 
01008 
01009 int
01010 ACE_Dev_Poll_Reactor::handle_events (ACE_Time_Value *max_wait_time)
01011 {
01012   ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events");
01013 
01014   // Stash the current time
01015   //
01016   // The destructor of this object will automatically compute how much
01017   // time elapsed since this method was called.
01018   ACE_MT (ACE_Countdown_Time countdown (max_wait_time));
01019 
01020   Token_Guard guard (this->token_);
01021   int const result = guard.acquire_quietly (max_wait_time);
01022 
01023   // If the guard is NOT the owner just return the retval
01024   if (!guard.is_owner ())
01025     return result;
01026 
01027   if (this->deactivated_)
01028     return -1;
01029 
01030   // Update the countdown to reflect time waiting for the mutex.
01031   ACE_MT (countdown.update ());
01032 
01033   return this->handle_events_i (max_wait_time, guard);
01034 }
01035 
01036 int
01037 ACE_Dev_Poll_Reactor::handle_events_i (ACE_Time_Value *max_wait_time,
01038                                        Token_Guard &guard)
01039 {
01040   ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events_i");
01041 
01042   int result = 0;
01043   // int active_handle_count = 0;
01044 
01045   // Poll for events
01046   //
01047   // If the underlying ioctl () call was interrupted via the interrupt
01048   // signal (i.e. returned -1 with errno == EINTR) then the loop will
01049   // be restarted if so desired.
01050   do
01051     {
01052       result = this->work_pending_i (max_wait_time);
01053       if (result == -1)
01054         ACE_ERROR ((LM_ERROR, "%t: %p\n", "work_pending_i"));
01055     }
01056   while (result == -1 && this->restart_ != 0 && errno == EINTR);
01057 
01058   if (result == 0 || (result == -1 && errno == ETIME))
01059     return 0;
01060   else if (result == -1)
01061     {
01062       if (errno != EINTR)
01063         return -1;
01064 
01065       // Bail out -- we got here since the poll was interrupted.
01066       // If it was due to a signal registered through our ACE_Sig_Handler,
01067       // then it was dispatched, so we count it in the number of events
01068       // handled rather than cause an error return.
01069       if (ACE_Sig_Handler::sig_pending () != 0)
01070         {
01071           ACE_Sig_Handler::sig_pending (0);
01072           return 1;
01073         }
01074       return -1;
01075     }
01076 
01077   // Dispatch an event.
01078   return this->dispatch (guard);
01079 }
01080 
01081 // Dispatch an event. On entry, the token is held by the caller. If an
01082 // event is found to dispatch, the token is released before dispatching it.
01083 int
01084 ACE_Dev_Poll_Reactor::dispatch (Token_Guard &guard)
01085 {
01086   ACE_TRACE ("ACE_Dev_Poll_Reactor::dispatch");
01087 
01088   // Perform the Template Method for dispatching the first located event.
01089   // We dispatch only one to effectively dispatch events concurrently.
01090   // As soon as an event is located, the token is released, allowing the
01091   // next waiter to begin getting an event while we dispatch one here.
01092   int result = 0;
01093 
01094   // Handle timers early since they may have higher latency
01095   // constraints than I/O handlers.  Ideally, the order of
01096   // dispatching should be a strategy...
01097   if ((result = this->dispatch_timer_handler (guard)) != 0)
01098     return result;
01099 
01100   // Check to see if there are no more I/O handles left to
01101   // dispatch AFTER we've handled the timers.
01102 
01103   // Finally, dispatch the I/O handlers.
01104   result = this->dispatch_io_event (guard);
01105 
01106   return result;
01107 }
01108 
01109 int
01110 ACE_Dev_Poll_Reactor::dispatch_timer_handler (Token_Guard &guard)
01111 {
01112   if (this->timer_queue_->is_empty ())
01113     return 0;       // Empty timer queue so cannot have any expired timers.
01114 
01115   // Get the current time
01116   ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
01117                            this->timer_queue_->timer_skew ());
01118 
01119   // Look for a node in the timer queue whose timer <= the present
01120   // time.
01121   ACE_Timer_Node_Dispatch_Info info;
01122   if (this->timer_queue_->dispatch_info (cur_time, info))
01123     {
01124       const void *upcall_act = 0;
01125 
01126       // Preinvoke (handles refcount if needed, etc.)
01127       this->timer_queue_->preinvoke (info, cur_time, upcall_act);
01128 
01129       // Release the token before expiration upcall.
01130       guard.release_token ();
01131 
01132       // call the functor
01133       this->timer_queue_->upcall (info, cur_time);
01134 
01135       // Postinvoke (undo refcount if needed, etc.)
01136       this->timer_queue_->postinvoke (info, cur_time, upcall_act);
01137 
01138       // We have dispatched a timer
01139       return 1;
01140     }
01141 
01142   return 0;
01143 }
01144 
01145 #if 0
01146 int
01147 ACE_Dev_Poll_Reactor::dispatch_notification_handlers (
01148   ACE_Select_Reactor_Handle_Set &dispatch_set,
01149   int &number_of_active_handles,
01150   int &number_of_handlers_dispatched)
01151 {
01152   // Check to see if the ACE_HANDLE associated with the
01153   // Dev_Poll_Reactor's notify hook is enabled.  If so, it means that
01154   // one or more other threads are trying to update the
01155   // ACE_Dev_Poll_Reactor's internal tables or the notify pipe is
01156   // enabled.  We'll handle all these threads and notifications, and
01157   // then break out to continue the event loop.
01158 
01159   const int n =
01160     this->notify_handler_->dispatch_notifications (number_of_active_handles,
01161                                                    dispatch_set.rd_mask_);
01162 
01163   if (n == -1)
01164     return -1;
01165   else
01166     number_of_handlers_dispatched += n;
01167 
01168   return /* this->state_changed_ ? -1 : */ 0;
01169 }
01170 #endif  /* 0 */
01171 
01172 int
01173 ACE_Dev_Poll_Reactor::dispatch_io_event (Token_Guard &guard)
01174 {
01175 
01176   // Define bits to check for while dispatching.
01177 #if defined (ACE_HAS_EVENT_POLL)
01178   const __uint32_t out_event = EPOLLOUT;
01179   const __uint32_t exc_event = EPOLLPRI;
01180   const __uint32_t in_event  = EPOLLIN;
01181   const __uint32_t err_event = EPOLLHUP | EPOLLERR;
01182 #else
01183   const short out_event = POLLOUT;
01184   const short exc_event = POLLPRI;
01185   const short in_event  = POLLIN;
01186   const short err_event = 0;              // No known bits for this
01187 #endif /* ACE_HAS_EVENT_POLL */
01188 
01189   // Since the underlying event demultiplexing mechansim (`/dev/poll'
01190   // or '/dev/epoll') is stateful, and since only one result buffer is
01191   // used, all pending events (i.e. those retrieved from a previous
01192   // poll) must be dispatched before any additional event can be
01193   // polled.  As such, the Dev_Poll_Reactor keeps track of the
01194   // progress of events that have been dispatched.
01195 
01196   // Dispatch the events.
01197   //
01198   // Select the first available handle with event (s) pending. Check for
01199   // event type in defined order of dispatch: output, exception, input.
01200   // When an event is located, clear its bit in the dispatch set. If there
01201   // are no more events for the handle, also increment the pfds pointer
01202   // to move to the next handle ready.
01203   //
01204   // Notice that pfds only contains file descriptors that have
01205   // received events.
01206 #if defined (ACE_HAS_EVENT_POLL)
01207   struct epoll_event *& pfds = this->start_pevents_;
01208   if (pfds < this->end_pevents_)
01209 #else
01210   struct pollfd *& pfds = this->start_pfds_;
01211   if (pfds < this->end_pfds_)
01212 #endif /* ACE_HAS_EVENT_POLL */
01213     {
01214 #if defined (ACE_HAS_EVENT_POLL)
01215       const ACE_HANDLE handle   = pfds->data.fd;
01216       __uint32_t &revents       = pfds->events;
01217 #else
01218       const ACE_HANDLE handle = pfds->fd;
01219       short &revents          = pfds->revents;
01220 #endif /* ACE_HAS_EVENT_POLL */
01221 
01222       // Figure out what to do first in order to make it easier to manage
01223       // the bit twiddling and possible pfds increment before releasing
01224       // the token for dispatch.
01225       // Note that if there's an error (such as the handle was closed
01226       // without being removed from the event set) the EPOLLHUP and/or
01227       // EPOLLERR bits will be set in revents.
01228       bool disp_out = false;
01229       bool disp_exc = false;
01230       bool disp_in  = false;
01231       if (ACE_BIT_ENABLED (revents, out_event))
01232         {
01233           disp_out = true;
01234           ACE_CLR_BITS (revents, out_event);
01235         }
01236       else if (ACE_BIT_ENABLED (revents, exc_event))
01237         {
01238           disp_exc = true;
01239           ACE_CLR_BITS (revents, exc_event);
01240         }
01241       else if (ACE_BIT_ENABLED (revents, in_event))
01242         {
01243           disp_in = true;
01244           ACE_CLR_BITS (revents, in_event);
01245         }
01246       else if (ACE_BIT_ENABLED (revents, err_event))
01247         {
01248           this->remove_handler_i (handle, ACE_Event_Handler::ALL_EVENTS_MASK);
01249           ++pfds;
01250           return 1;
01251         }
01252       else
01253         {
01254           ACE_ERROR ((LM_ERROR, ACE_TEXT (" (%t) dispatch_io h %d unknown events 0x%x\n"), handle, revents));
01255           // ACE_ASSERT (0);
01256         }
01257 
01258       // Increment the pointer to the next element before we
01259       // release the token.  Otherwise event handlers end up being
01260       // dispatched multiple times for the same poll.
01261       if (revents == 0)
01262         ++pfds;
01263 
01264       /* When using sys_epoll, we can attach arbitrary user
01265          data to the descriptor, so it can be delivered when
01266          activity is detected. Perhaps we should store event
01267          handler together with descriptor, instead of looking
01268          it up in a repository ? Could it boost performance ?
01269       */
01270       ACE_Event_Handler *eh = this->handler_rep_.find (handle);
01271 
01272       if (eh)
01273         {
01274           // Modify the reference count in an exception-safe way.
01275           // Note that eh could be the notify handler. It's not strictly
01276           // necessary to manage its refcount, but since we don't enable
01277           // the counting policy, it won't do much. Management of the
01278           // notified handlers themselves is done in the notify handler.
01279           ACE_Dev_Poll_Handler_Guard eh_guard (eh);
01280 
01281           // Release the reactor token before upcall.
01282           guard.release_token ();
01283 
01284           // Dispatch the detected event
01285           if (disp_out)
01286             {
01287               const int status =
01288                 this->upcall (eh, &ACE_Event_Handler::handle_output, handle);
01289 
01290               if (status < 0)
01291                 // Note that the token is reacquired in remove_handler ().
01292                 this->remove_handler (handle, ACE_Event_Handler::WRITE_MASK);
01293               return 1;
01294             }
01295 
01296           if (disp_exc)
01297             {
01298               const int status =
01299                 this->upcall (eh, &ACE_Event_Handler::handle_exception, handle);
01300 
01301               if (status < 0)
01302                 // Note that the token is reacquired in remove_handler ().
01303                 this->remove_handler (handle, ACE_Event_Handler::EXCEPT_MASK);
01304               return 1;
01305             }
01306 
01307           if (disp_in)
01308             {
01309               const int status =
01310                 this->upcall (eh, &ACE_Event_Handler::handle_input, handle);
01311 
01312               if (status < 0)
01313                 // Note that the token is reacquired in remove_handler ().
01314                 this->remove_handler (handle, ACE_Event_Handler::READ_MASK);
01315               return 1;
01316             }
01317         } // The reactor token is reacquired upon leaving this scope.
01318     }
01319 
01320   return 0;
01321 }
01322 
01323 int
01324 ACE_Dev_Poll_Reactor::alertable_handle_events (ACE_Time_Value *max_wait_time)
01325 {
01326   ACE_TRACE ("ACE_Dev_Poll_Reactor::alertable_handle_events");
01327 
01328   return this->handle_events (max_wait_time);
01329 }
01330 
01331 int
01332 ACE_Dev_Poll_Reactor::handle_events (ACE_Time_Value &max_wait_time)
01333 {
01334   ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events");
01335 
01336   return this->handle_events (&max_wait_time);
01337 }
01338 
01339 int
01340 ACE_Dev_Poll_Reactor::alertable_handle_events (ACE_Time_Value &max_wait_time)
01341 {
01342   ACE_TRACE ("ACE_Dev_Poll_Reactor::alertable_handle_events");
01343 
01344   return this->handle_events (max_wait_time);
01345 }
01346 
01347 int
01348 ACE_Dev_Poll_Reactor::deactivated (void)
01349 {
01350   return this->deactivated_;
01351 }
01352 
01353 void
01354 ACE_Dev_Poll_Reactor::deactivate (int do_stop)
01355 {
01356   this->deactivated_ = do_stop;
01357   this->wakeup_all_threads ();
01358 }
01359 
01360 int
01361 ACE_Dev_Poll_Reactor::register_handler (ACE_Event_Handler *handler,
01362                                         ACE_Reactor_Mask mask)
01363 {
01364   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01365 
01366   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01367 
01368   return this->register_handler_i (handler->get_handle (),
01369                                    handler,
01370                                    mask);
01371 }
01372 
01373 int
01374 ACE_Dev_Poll_Reactor::register_handler (ACE_HANDLE handle,
01375                                         ACE_Event_Handler *event_handler,
01376                                         ACE_Reactor_Mask mask)
01377 {
01378   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01379 
01380   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01381 
01382   return this->register_handler_i (handle,
01383                                    event_handler,
01384                                    mask);
01385 }
01386 
01387 int
01388 ACE_Dev_Poll_Reactor::register_handler_i (ACE_HANDLE handle,
01389                                           ACE_Event_Handler *event_handler,
01390                                           ACE_Reactor_Mask mask)
01391 {
01392   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler_i");
01393 
01394   if (handle == ACE_INVALID_HANDLE
01395       || mask == ACE_Event_Handler::NULL_MASK)
01396     {
01397       errno = EINVAL;
01398       return -1;
01399     }
01400 
01401  if (this->handler_rep_.find (handle) == 0)
01402    {
01403      // Handler not present in the repository.  Bind it.
01404      if (this->handler_rep_.bind (handle, event_handler, mask) != 0)
01405        return -1;
01406 
01407 #if defined (ACE_HAS_EVENT_POLL)
01408 
01409      struct epoll_event epev;
01410      ACE_OS::memset (&epev, 0, sizeof (epev));
01411      static const int op = EPOLL_CTL_ADD;
01412 
01413      epev.events  = this->reactor_mask_to_poll_event (mask);
01414      epev.data.fd = handle;
01415 
01416      if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1)
01417        {
01418          ACE_ERROR ((LM_ERROR, "%p\n", "epoll_ctl"));
01419  (void) this->handler_rep_.unbind (handle);
01420          return -1;
01421        }
01422 
01423 #endif /* ACE_HAS_EVENT_POLL */
01424    }
01425  else
01426    {
01427      // Handler is already present in the repository, so register it
01428      // again, possibly for different event.  Add new mask to the
01429      // current one.
01430      if (this->mask_ops_i (handle, mask, ACE_Reactor::ADD_MASK) == -1)
01431        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "mask_ops_i"), -1);
01432    }
01433 
01434 #ifndef  ACE_HAS_EVENT_POLL
01435 
01436   struct pollfd pfd;
01437 
01438   pfd.fd      = handle;
01439   pfd.events  = this->reactor_mask_to_poll_event (mask);
01440   pfd.revents = 0;
01441 
01442   // Add file descriptor to the "interest set."
01443   if (ACE_OS::write (this->poll_fd_, &pfd, sizeof (pfd)) != sizeof (pfd))
01444     {
01445  (void) this->handler_rep_.unbind (handle);
01446       return -1;
01447     }
01448 #endif /*ACE_HAS_EVENT_POLL*/
01449 
01450   // Note the fact that we've changed the state of the wait_set_,
01451   // which is used by the dispatching loop to determine whether it can
01452   // keep going or if it needs to reconsult select ().
01453   // this->state_changed_ = 1;
01454 
01455   return 0;
01456 }
01457 
01458 int
01459 ACE_Dev_Poll_Reactor::register_handler (
01460   ACE_HANDLE /* event_handle */,
01461   ACE_HANDLE /* io_handle */,
01462   ACE_Event_Handler * /* event_handler */,
01463   ACE_Reactor_Mask /* mask */)
01464 {
01465   ACE_NOTSUP_RETURN (-1);
01466 }
01467 
01468 int
01469 ACE_Dev_Poll_Reactor::register_handler (const ACE_Handle_Set &handle_set,
01470                                         ACE_Event_Handler *event_handler,
01471                                         ACE_Reactor_Mask mask)
01472 {
01473   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01474 
01475   ACE_Handle_Set_Iterator handle_iter (handle_set);
01476 
01477   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01478 
01479   // @@ It might be more efficient to construct a pollfd array and
01480   //    pass it to the write () call in register_handler_i () only once,
01481   //    instead of calling write () (a system call) once for each file
01482   //    descriptor.
01483 
01484   for (ACE_HANDLE h = handle_iter ();
01485        h != ACE_INVALID_HANDLE;
01486        h = handle_iter ())
01487     if (this->register_handler_i (h, event_handler, mask) == -1)
01488       return -1;
01489 
01490   return 0;
01491 }
01492 
01493 int
01494 ACE_Dev_Poll_Reactor::register_handler (int signum,
01495                                         ACE_Event_Handler *new_sh,
01496                                         ACE_Sig_Action *new_disp,
01497                                         ACE_Event_Handler **old_sh,
01498                                         ACE_Sig_Action *old_disp)
01499 {
01500   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01501 
01502   return this->signal_handler_->register_handler (signum,
01503                                                   new_sh,
01504                                                   new_disp,
01505                                                   old_sh,
01506                                                   old_disp);
01507 }
01508 
01509 int
01510 ACE_Dev_Poll_Reactor::register_handler (const ACE_Sig_Set &sigset,
01511                                         ACE_Event_Handler *new_sh,
01512                                         ACE_Sig_Action *new_disp)
01513 {
01514   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01515 
01516   int result = 0;
01517 
01518 #if (ACE_NSIG > 0)
01519 
01520   for (int s = 1; s < ACE_NSIG; ++s)
01521     if ((sigset.is_member (s) == 1)
01522         && this->signal_handler_->register_handler (s,
01523                                                     new_sh,
01524                                                     new_disp) == -1)
01525       result = -1;
01526 
01527 #else  /* ACE_NSIG <= 0 */
01528 
01529   ACE_UNUSED_ARG (sigset);
01530   ACE_UNUSED_ARG (new_sh);
01531   ACE_UNUSED_ARG (new_disp);
01532 
01533 #endif /* ACE_NSIG <= 0  */
01534 
01535   return result;
01536 }
01537 
01538 int
01539 ACE_Dev_Poll_Reactor::remove_handler (ACE_Event_Handler *handler,
01540                                       ACE_Reactor_Mask mask)
01541 {
01542   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01543 
01544   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01545 
01546   return this->remove_handler_i (handler->get_handle (), mask);
01547 }
01548 
01549 int
01550 ACE_Dev_Poll_Reactor::remove_handler (ACE_HANDLE handle,
01551                                       ACE_Reactor_Mask mask)
01552 {
01553   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01554 
01555   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01556 
01557   return this->remove_handler_i (handle, mask);
01558 }
01559 
01560 int
01561 ACE_Dev_Poll_Reactor::remove_handler_i (ACE_HANDLE handle,
01562                                         ACE_Reactor_Mask mask)
01563 {
01564   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler_i");
01565 
01566   ACE_Event_Handler *eh = this->handler_rep_.find (handle);
01567 
01568   if (eh == 0 ||
01569       this->mask_ops_i (handle, mask, ACE_Reactor::CLR_MASK) == -1)
01570     return -1;
01571 
01572   // Check for ref counting now - handle_close () may delete eh.
01573   bool const requires_reference_counting =
01574     eh->reference_counting_policy ().value () ==
01575     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
01576 
01577   if (ACE_BIT_DISABLED (mask, ACE_Event_Handler::DONT_CALL))
01578  (void) eh->handle_close (handle, mask);
01579 
01580   // If there are no longer any outstanding events on the given handle
01581   // then remove it from the handler repository.
01582   if (this->handler_rep_.mask (handle) == ACE_Event_Handler::NULL_MASK)
01583     this->handler_rep_.unbind (handle, requires_reference_counting);
01584 
01585   // Note the fact that we've changed the state of the wait_set,
01586   // i.e. the "interest set," which is used by the dispatching loop to
01587   // determine whether it can keep going or if it needs to reconsult
01588   // /dev/poll or /dev/epoll.
01589   // this->state_changed_ = 1;
01590 
01591   return 0;
01592 }
01593 
01594 int
01595 ACE_Dev_Poll_Reactor::remove_handler (const ACE_Handle_Set &handle_set,
01596                                       ACE_Reactor_Mask mask)
01597 {
01598   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01599 
01600   ACE_Handle_Set_Iterator handle_iter (handle_set);
01601 
01602   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01603 
01604   // @@ It might be more efficient to construct a pollfd array and
01605   //    pass it to the write () call in register_handler_i () only once,
01606   //    instead of calling write () (a system call) once for each file
01607   //    descriptor.
01608 
01609   for (ACE_HANDLE h = handle_iter ();
01610        h != ACE_INVALID_HANDLE;
01611        h = handle_iter ())
01612     if (this->remove_handler_i (h, mask) == -1)
01613       return -1;
01614 
01615   return 0;
01616 }
01617 
01618 int
01619 ACE_Dev_Poll_Reactor::remove_handler (int signum,
01620                                       ACE_Sig_Action *new_disp,
01621                                       ACE_Sig_Action *old_disp,
01622                                       int sigkey)
01623 {
01624   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01625 
01626   return this->signal_handler_->remove_handler (signum,
01627                                                 new_disp,
01628                                                 old_disp,
01629                                                 sigkey);
01630 }
01631 
01632 int
01633 ACE_Dev_Poll_Reactor::remove_handler (const ACE_Sig_Set &sigset)
01634 {
01635   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01636 
01637   int result = 0;
01638 
01639 #if (ACE_NSIG > 0)
01640 
01641   for (int s = 1; s < ACE_NSIG; ++s)
01642     if ((sigset.is_member (s) == 1)
01643         && this->signal_handler_->remove_handler (s) == -1)
01644       result = -1;
01645 
01646 #else  /* ACE_NSIG <= 0 */
01647 
01648   ACE_UNUSED_ARG (sigset);
01649 
01650 #endif /* ACE_NSIG <= 0 */
01651 
01652   return result;
01653 }
01654 
01655 int
01656 ACE_Dev_Poll_Reactor::suspend_handler (ACE_Event_Handler *event_handler)
01657 {
01658   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler");
01659 
01660   if (event_handler == 0)
01661     {
01662       errno = EINVAL;
01663       return -1;
01664     }
01665 
01666   ACE_HANDLE handle = event_handler->get_handle ();
01667 
01668   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01669 
01670   return this->suspend_handler_i (handle);
01671 }
01672 
01673 int
01674 ACE_Dev_Poll_Reactor::suspend_handler (ACE_HANDLE handle)
01675 {
01676   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler");
01677 
01678   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01679 
01680   return this->suspend_handler_i (handle);
01681 }
01682 
01683 int
01684 ACE_Dev_Poll_Reactor::suspend_handler (const ACE_Handle_Set &handles)
01685 {
01686   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler");
01687 
01688   ACE_Handle_Set_Iterator handle_iter (handles);
01689   ACE_HANDLE h;
01690 
01691   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01692 
01693   while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
01694     if (this->suspend_handler_i (h) == -1)
01695       return -1;
01696 
01697   return 0;
01698 }
01699 
01700 int
01701 ACE_Dev_Poll_Reactor::suspend_handlers (void)
01702 {
01703   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handlers");
01704 
01705   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01706 
01707   size_t const len = this->handler_rep_.size ();
01708 
01709   for (size_t i = 0; i < len; ++i)
01710     if (this->handler_rep_.suspended (i) == 0
01711         && this->suspend_handler_i (i) != 0)
01712       return -1;
01713 
01714   return 0;
01715 }
01716 
01717 int
01718 ACE_Dev_Poll_Reactor::suspend_handler_i (ACE_HANDLE handle)
01719 {
01720   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler_i");
01721 
01722   if (this->handler_rep_.find (handle) == 0)
01723     return -1;
01724 
01725   if (this->handler_rep_.suspended (handle))
01726     return 0;  // Already suspended.  @@ Should this be an error?
01727 
01728   // Remove the handle from the "interest set."
01729   //
01730   // Note that the associated event handler is still in the handler
01731   // repository, but no events will be polled on the given handle thus
01732   // no event will be dispatched to the event handler.
01733 
01734 #if defined (ACE_HAS_EVENT_POLL)
01735 
01736   struct epoll_event epev;
01737   ACE_OS::memset (&epev, 0, sizeof (epev));
01738   static const int op = EPOLL_CTL_DEL;
01739 
01740   epev.events  = 0;
01741   epev.data.fd = handle;
01742 
01743   if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1)
01744     return -1;
01745 
01746 #else
01747 
01748   struct pollfd pfd[1];
01749 
01750   pfd[0].fd      = handle;
01751   pfd[0].events  = POLLREMOVE;
01752   pfd[0].revents = 0;
01753 
01754   if (ACE_OS::write (this->poll_fd_, pfd, sizeof (pfd)) != sizeof (pfd))
01755     return -1;
01756 
01757 #endif  /* ACE_HAS_EVENT_POLL */
01758 
01759   this->handler_rep_.suspend (handle);
01760 
01761   return 0;
01762 }
01763 
01764 int
01765 ACE_Dev_Poll_Reactor::resume_handler (ACE_Event_Handler *event_handler)
01766 {
01767   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler");
01768 
01769   if (event_handler == 0)
01770     {
01771       errno = EINVAL;
01772       return -1;
01773     }
01774 
01775   ACE_HANDLE handle = event_handler->get_handle ();
01776 
01777   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01778 
01779   return this->resume_handler_i (handle);
01780 }
01781 
01782 int
01783 ACE_Dev_Poll_Reactor::resume_handler (ACE_HANDLE handle)
01784 {
01785   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler");
01786 
01787   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01788 
01789   return this->resume_handler_i (handle);
01790 }
01791 
01792 int
01793 ACE_Dev_Poll_Reactor::resume_handler (const ACE_Handle_Set &handles)
01794 {
01795   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler");
01796 
01797   ACE_Handle_Set_Iterator handle_iter (handles);
01798   ACE_HANDLE h;
01799 
01800   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01801 
01802   while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
01803     if (this->resume_handler_i (h) == -1)
01804       return -1;
01805 
01806   return 0;
01807 }
01808 
01809 int
01810 ACE_Dev_Poll_Reactor::resume_handlers (void)
01811 {
01812   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handlers");
01813 
01814   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01815 
01816   size_t const len = this->handler_rep_.size ();
01817 
01818   for (size_t i = 0; i < len; ++i)
01819     if (this->handler_rep_.suspended (i)
01820         && this->resume_handler_i (i) != 0)
01821       return -1;
01822 
01823   return 0;
01824 }
01825 
01826 int
01827 ACE_Dev_Poll_Reactor::resume_handler_i (ACE_HANDLE handle)
01828 {
01829   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler_i");
01830 
01831   if (this->handler_rep_.find (handle) == 0
01832       && this->handler_rep_.suspended (handle) == 0)
01833     return -1;
01834 
01835   ACE_Reactor_Mask mask = this->handler_rep_.mask (handle);
01836 
01837   if (mask == ACE_Event_Handler::NULL_MASK)
01838     return -1;
01839 
01840   // Place the handle back in to the "interest set."
01841   //
01842   // Events for the given handle will once again be polled.
01843 
01844 #if defined (ACE_HAS_EVENT_POLL)
01845 
01846   struct epoll_event epev;
01847   ACE_OS::memset (&epev, 0, sizeof (epev));
01848   static const int op = EPOLL_CTL_ADD;
01849 
01850   epev.events  = this->reactor_mask_to_poll_event (mask);
01851   epev.data.fd = handle;
01852 
01853   if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1)
01854     return -1;
01855 
01856 #else
01857 
01858   struct pollfd pfd[1];
01859 
01860   pfd[0].fd      = handle;
01861   pfd[0].events  = this->reactor_mask_to_poll_event (mask);
01862   pfd[0].revents = 0;
01863 
01864   if (ACE_OS::write (this->poll_fd_, pfd, sizeof (pfd)) != sizeof (pfd))
01865     return -1;
01866 
01867 #endif  /* ACE_HAS_EVENT_POLL */
01868 
01869   this->handler_rep_.resume (handle);
01870 
01871   return 0;
01872 }
01873 
01874 int
01875 ACE_Dev_Poll_Reactor::resumable_handler (void)
01876 {
01877   // @@ Is this correct?
01878 
01879   return 0;
01880 }
01881 
01882 int
01883 ACE_Dev_Poll_Reactor::uses_event_associations (void)
01884 {
01885   // Since the Dev_Poll_Reactor does not do any event associations,
01886   // this method always return zero.
01887   return 0;
01888 }
01889 
01890 long
01891 ACE_Dev_Poll_Reactor::schedule_timer (ACE_Event_Handler *event_handler,
01892                                       const void *arg,
01893                                       const ACE_Time_Value &delay,
01894                                       const ACE_Time_Value &interval)
01895 {
01896   ACE_TRACE ("ACE_Dev_Poll_Reactor::schedule_timer");
01897 
01898   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01899 
01900   if (0 != this->timer_queue_)
01901     return this->timer_queue_->schedule
01902       (event_handler,
01903        arg,
01904        this->timer_queue_->gettimeofday () + delay,
01905        interval);
01906 
01907   errno = ESHUTDOWN;
01908   return -1;
01909 }
01910 
01911 int
01912 ACE_Dev_Poll_Reactor::reset_timer_interval (long timer_id,
01913                                             const ACE_Time_Value &interval)
01914 {
01915   ACE_TRACE ("ACE_Dev_Poll_Reactor::reset_timer_interval");
01916 
01917   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01918 
01919   if (0 != this->timer_queue_)
01920     return this->timer_queue_->reset_interval (timer_id, interval);
01921 
01922   errno = ESHUTDOWN;
01923   return -1;
01924 }
01925 
01926 int
01927 ACE_Dev_Poll_Reactor::cancel_timer (ACE_Event_Handler *event_handler,
01928                                     int dont_call_handle_close)
01929 {
01930   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_timer");
01931 
01932   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01933 
01934   return (this->timer_queue_ == 0
01935           ? 0
01936           : this->timer_queue_->cancel (event_handler,
01937                                         dont_call_handle_close));
01938 }
01939 
01940 int
01941 ACE_Dev_Poll_Reactor::cancel_timer (long timer_id,
01942                                     const void **arg,
01943                                     int dont_call_handle_close)
01944 {
01945   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_timer");
01946 
01947   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01948 
01949   return (this->timer_queue_ == 0
01950           ? 0
01951           : this->timer_queue_->cancel (timer_id,
01952                                         arg,
01953                                         dont_call_handle_close));
01954 }
01955 
01956 int
01957 ACE_Dev_Poll_Reactor::schedule_wakeup (ACE_Event_Handler *eh,
01958                                        ACE_Reactor_Mask mask)
01959 {
01960   ACE_TRACE ("ACE_Dev_Poll_Reactor::schedule_wakeup");
01961 
01962   return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::ADD_MASK);
01963 }
01964 
01965 int
01966 ACE_Dev_Poll_Reactor::schedule_wakeup (ACE_HANDLE handle,
01967                                        ACE_Reactor_Mask mask)
01968 {
01969   ACE_TRACE ("ACE_Dev_Poll_Reactor::schedule_wakeup");
01970 
01971   return this->mask_ops (handle, mask, ACE_Reactor::ADD_MASK);
01972 }
01973 
01974 int
01975 ACE_Dev_Poll_Reactor::cancel_wakeup (ACE_Event_Handler *eh,
01976                                      ACE_Reactor_Mask mask)
01977 {
01978   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_wakeup");
01979 
01980   return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::CLR_MASK);
01981 }
01982 
01983 int
01984 ACE_Dev_Poll_Reactor::cancel_wakeup (ACE_HANDLE handle,
01985                                      ACE_Reactor_Mask mask)
01986 {
01987   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_wakeup");
01988 
01989   return this->mask_ops (handle, mask, ACE_Reactor::CLR_MASK);
01990 }
01991 
01992 int
01993 ACE_Dev_Poll_Reactor::notify (ACE_Event_Handler *eh,
01994                               ACE_Reactor_Mask mask,
01995                               ACE_Time_Value *timeout)
01996 {
01997   ACE_TRACE ("ACE_Dev_Poll_Reactor::notify");
01998 
01999   ssize_t n = 0;
02000 
02001   // Pass over both the Event_Handler *and* the mask to allow the
02002   // caller to dictate which Event_Handler method the receiver
02003   // invokes.  Note that this call can timeout.
02004 
02005   n = this->notify_handler_->notify (eh, mask, timeout);
02006 
02007   return n == -1 ? -1 : 0;
02008 }
02009 
02010 void
02011 ACE_Dev_Poll_Reactor::max_notify_iterations (int iterations)
02012 {
02013   ACE_TRACE ("ACE_Dev_Poll_Reactor::max_notify_iterations");
02014 
02015   ACE_MT (ACE_GUARD (ACE_Dev_Poll_Reactor_Token, mon, this->token_));
02016 
02017   this->notify_handler_->max_notify_iterations (iterations);
02018 }
02019 
02020 int
02021 ACE_Dev_Poll_Reactor::max_notify_iterations (void)
02022 {
02023   ACE_TRACE ("ACE_Dev_Poll_Reactor::max_notify_iterations");
02024 
02025   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02026 
02027   return this->notify_handler_->max_notify_iterations ();
02028 }
02029 
02030 int
02031 ACE_Dev_Poll_Reactor::purge_pending_notifications (ACE_Event_Handler * eh,
02032                                                    ACE_Reactor_Mask mask)
02033 {
02034   if (this->notify_handler_ == 0)
02035     return 0;
02036 
02037   return this->notify_handler_->purge_pending_notifications (eh, mask);
02038 }
02039 
02040 ACE_Event_Handler *
02041 ACE_Dev_Poll_Reactor::find_handler (ACE_HANDLE handle)
02042 {
02043   ACE_MT (ACE_READ_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, 0));
02044 
02045   ACE_Event_Handler *event_handler = this->handler_rep_.find (handle);
02046   if (event_handler)
02047     event_handler->add_reference ();
02048   return event_handler;
02049 }
02050 
02051 int
02052 ACE_Dev_Poll_Reactor::handler (ACE_HANDLE handle,
02053                                ACE_Reactor_Mask mask,
02054                                ACE_Event_Handler **event_handler)
02055 {
02056   ACE_TRACE ("ACE_Dev_Poll_Reactor::handler");
02057 
02058   ACE_MT (ACE_READ_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02059 
02060   ACE_Event_Handler *h = this->handler_rep_.find (handle);
02061 
02062   if (h != 0
02063       && ACE_BIT_CMP_MASK (this->handler_rep_.mask (handle),
02064                            mask,  // Compare all bits in the mask
02065                            mask))
02066     {
02067       if (event_handler != 0)
02068         *event_handler = h;
02069 
02070       return 0;
02071     }
02072 
02073   return -1;
02074 }
02075 
02076 int
02077 ACE_Dev_Poll_Reactor::handler (int signum,
02078                                ACE_Event_Handler **eh)
02079 {
02080   ACE_TRACE ("ACE_Dev_Poll_Reactor::handler");
02081 
02082   ACE_Event_Handler *handler = this->signal_handler_->handler (signum);
02083 
02084   if (handler == 0)
02085     return -1;
02086   else if (eh != 0)
02087     *eh = handler;
02088 
02089   return 0;
02090 }
02091 
02092 bool
02093 ACE_Dev_Poll_Reactor::initialized (void)
02094 {
02095   ACE_TRACE ("ACE_Dev_Poll_Reactor::initialized");
02096 
02097   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, false));
02098 
02099   return this->initialized_;
02100 }
02101 
02102 size_t
02103 ACE_Dev_Poll_Reactor::size (void) const
02104 {
02105   return this->size_;
02106 }
02107 
02108 ACE_Lock &
02109 ACE_Dev_Poll_Reactor::lock (void)
02110 {
02111   ACE_TRACE ("ACE_Dev_Poll_Reactor::lock");
02112 
02113   return this->lock_adapter_;
02114 }
02115 
02116 void
02117 ACE_Dev_Poll_Reactor::wakeup_all_threads (void)
02118 {
02119   ACE_TRACE ("ACE_Dev_Poll_Reactor::wakeup_all_threads");
02120 
02121   // Send a notification, but don't block if there's no one to receive
02122   // it.
02123   this->notify (0,
02124                 ACE_Event_Handler::NULL_MASK,
02125  (ACE_Time_Value *) &ACE_Time_Value::zero);
02126 }
02127 
02128 int
02129 ACE_Dev_Poll_Reactor::owner (ACE_thread_t /* new_owner */,
02130                              ACE_thread_t * /* old_owner */)
02131 {
02132   ACE_TRACE ("ACE_Dev_Poll_Reactor::owner");
02133 
02134   // There is no need to set the owner of the event loop.  Multiple
02135   // threads may invoke the event loop simulataneously.
02136 
02137   return 0;
02138 }
02139 
02140 int
02141 ACE_Dev_Poll_Reactor::owner (ACE_thread_t * /* owner */)
02142 {
02143   ACE_TRACE ("ACE_Dev_Poll_Reactor::owner");
02144 
02145   // There is no need to set the owner of the event loop.  Multiple
02146   // threads may invoke the event loop simulataneously.
02147 
02148   return 0;
02149 }
02150 
02151 int
02152 ACE_Dev_Poll_Reactor::restart (void)
02153 {
02154   ACE_TRACE ("ACE_Dev_Poll_Reactor::restart");
02155 
02156   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02157 
02158   return this->restart_;
02159 }
02160 
02161 int
02162 ACE_Dev_Poll_Reactor::restart (int r)
02163 {
02164   ACE_TRACE ("ACE_Dev_Poll_Reactor::restart");
02165 
02166   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02167 
02168   int current_value = this->restart_;
02169   this->restart_ = r;
02170   return current_value;
02171 }
02172 
02173 void
02174 ACE_Dev_Poll_Reactor::requeue_position (int)
02175 {
02176   ACE_TRACE ("ACE_Dev_Poll_Reactor::requeue_position");
02177 }
02178 
02179 int
02180 ACE_Dev_Poll_Reactor::requeue_position (void)
02181 {
02182   ACE_TRACE ("ACE_Dev_Poll_Reactor::requeue_position");
02183 
02184   ACE_NOTSUP_RETURN (-1);
02185 }
02186 
02187 int
02188 ACE_Dev_Poll_Reactor::mask_ops (ACE_Event_Handler *event_handler,
02189                                 ACE_Reactor_Mask mask,
02190                                 int ops)
02191 {
02192   ACE_TRACE ("ACE_Dev_Poll_Reactor::mask_ops");
02193 
02194   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02195 
02196   return this->mask_ops_i (event_handler->get_handle (), mask, ops);
02197 }
02198 
02199 int
02200 ACE_Dev_Poll_Reactor::mask_ops (ACE_HANDLE handle,
02201                                 ACE_Reactor_Mask mask,
02202                                 int ops)
02203 {
02204   ACE_TRACE ("ACE_Dev_Poll_Reactor::mask_ops");
02205 
02206   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02207 
02208   return this->mask_ops_i (handle, mask, ops);
02209 }
02210 
02211 int
02212 ACE_Dev_Poll_Reactor::mask_ops_i (ACE_HANDLE handle,
02213                                   ACE_Reactor_Mask mask,
02214                                   int ops)
02215 {
02216   ACE_TRACE ("ACE_Dev_Poll_Reactor::mask_ops_i");
02217 
02218   if (this->handler_rep_.handle_in_range (handle) == 0)
02219     return -1;
02220 
02221   // Block out all signals until method returns.
02222   ACE_Sig_Guard sb;
02223 
02224   ACE_Reactor_Mask const old_mask = this->handler_rep_.mask (handle);
02225   ACE_Reactor_Mask new_mask = old_mask;
02226 
02227   // Perform GET, CLR, SET, and ADD operations on the interest/wait
02228   // set and the suspend set (if necessary).
02229   //
02230   // GET = 1, Retrieve current value
02231   // SET = 2, Set value of bits to new mask (changes the entire mask)
02232   // ADD = 3, Bitwise "or" the value into the mask (only changes
02233   //          enabled bits)
02234   // CLR = 4  Bitwise "and" the negation of the value out of the mask
02235   // (only changes enabled bits)
02236   //
02237   // Returns the original mask.
02238 
02239   switch (ops)
02240     {
02241     case ACE_Reactor::GET_MASK:
02242       // The work for this operation is done in all cases at the
02243       // begining of the function.
02244       return old_mask;
02245 
02246     case ACE_Reactor::CLR_MASK:
02247       ACE_CLR_BITS (new_mask, mask);
02248       break;
02249 
02250     case ACE_Reactor::SET_MASK:
02251       new_mask = mask;
02252       break;
02253 
02254     case ACE_Reactor::ADD_MASK:
02255       ACE_SET_BITS (new_mask, mask);
02256       break;
02257 
02258     default:
02259       return -1;
02260     }
02261 
02262   /// Reset the mask for the given handle.
02263   this->handler_rep_.mask (handle, new_mask);
02264 
02265   if (this->handler_rep_.suspended (handle) == 0)
02266     {
02267       // Only attempt to alter events for the handle from the
02268       // "interest set" if it hasn't been suspended.
02269 
02270       short const events = this->reactor_mask_to_poll_event (new_mask);
02271 
02272 #if defined (sun)
02273       // Apparently events cannot be updated on-the-fly on Solaris so
02274       // remove the existing events, and then add the new ones.
02275       struct pollfd pfd[2];
02276 
02277       pfd[0].fd      = handle;
02278       pfd[0].events  = POLLREMOVE;
02279       pfd[0].revents = 0;
02280       pfd[1].fd      = (events == POLLREMOVE ? ACE_INVALID_HANDLE : handle);
02281       pfd[1].events  = events;
02282       pfd[1].revents = 0;
02283 
02284       // Change the events associated with the given file descriptor.
02285       if (ACE_OS::write (this->poll_fd_,
02286                          pfd,
02287                          sizeof (pfd)) != sizeof (pfd))
02288         return -1;
02289 #elif defined (ACE_HAS_EVENT_POLL)
02290 
02291       struct epoll_event epev;
02292       ACE_OS::memset (&epev, 0, sizeof (epev));
02293       int op;
02294 
02295       // ACE_Event_Handler::NULL_MASK ???
02296       if (new_mask == 0)
02297         {
02298           op          = EPOLL_CTL_DEL;
02299           epev.events = 0;
02300         }
02301       else
02302         {
02303           op          = EPOLL_CTL_MOD;
02304           epev.events = events;
02305         }
02306 
02307       epev.data.fd = handle;
02308 
02309       if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1)
02310         {
02311           // If a handle is closed, epoll removes it from the poll set
02312           // automatically - we may not know about it yet. If that's the
02313           // case, a mod operation will fail with ENOENT. Retry it as
02314           // an add.
02315           if (op == EPOLL_CTL_MOD && errno == ENOENT &&
02316               ::epoll_ctl (this->poll_fd_, EPOLL_CTL_ADD, handle, &epev) == -1)
02317             return -1;
02318         }
02319 
02320 #else
02321       pollfd pfd[1];
02322 
02323       pfd[0].fd      = handle;
02324       pfd[0].events  = events;
02325       pfd[0].revents = 0;
02326 
02327       // Change the events associated with the given file descriptor.
02328       if (ACE_OS::write (this->poll_fd_,
02329                          pfd,
02330                          sizeof (pfd)) != sizeof (pfd))
02331         return -1;
02332 #endif /*ACE_HAS_EVENT_POLL  */
02333     }
02334 
02335   return old_mask;
02336 }
02337 
02338 int
02339 ACE_Dev_Poll_Reactor::ready_ops (ACE_Event_Handler * /* event_handler */,
02340                                  ACE_Reactor_Mask /* mask */,
02341                                  int /* ops */)
02342 {
02343   ACE_TRACE ("ACE_Dev_Poll_Reactor::ready_ops");
02344 
02345   // Since the Dev_Poll_Reactor uses the poll result buffer, the
02346   // ready_set cannot be directly manipulated outside of the event
02347   // loop.
02348   ACE_NOTSUP_RETURN (-1);
02349 }
02350 
02351 int
02352 ACE_Dev_Poll_Reactor::ready_ops (ACE_HANDLE /* handle */,
02353                                  ACE_Reactor_Mask /* mask */,
02354                                  int /* ops */)
02355 {
02356   ACE_TRACE ("ACE_Dev_Poll_Reactor::ready_ops");
02357 
02358   // Since the Dev_Poll_Reactor uses the poll result buffer, the
02359   // ready_set cannot be directly manipulated outside of the event
02360   // loop.
02361   ACE_NOTSUP_RETURN (-1);
02362 }
02363 
02364 void
02365 ACE_Dev_Poll_Reactor::dump (void) const
02366 {
02367 #if defined (ACE_HAS_DUMP)
02368   ACE_TRACE ("ACE_Dev_Poll_Reactor::dump");
02369 
02370   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02371   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("restart_ = %d\n"), this->restart_));
02372   ACE_DEBUG ((LM_DEBUG,
02373               ACE_TEXT ("initialized_ = %d"),
02374               this->initialized_));
02375   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("poll_fd_ = %d"), this->poll_fd_));
02376   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("size_ = %u"), this->size_));
02377   ACE_DEBUG ((LM_DEBUG,
02378               ACE_TEXT ("deactivated_ = %d"),
02379               this->deactivated_));
02380   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02381 #endif /* ACE_HAS_DUMP */
02382 }
02383 
02384 short
02385 ACE_Dev_Poll_Reactor::reactor_mask_to_poll_event (ACE_Reactor_Mask mask)
02386 {
02387   ACE_TRACE ("ACE_Dev_Poll_Reactor::reactor_mask_to_poll_event");
02388 
02389   if (mask == ACE_Event_Handler::NULL_MASK)
02390     // No event.  Remove from interest set.
02391 #if defined (ACE_HAS_EVENT_POLL)
02392     return EPOLL_CTL_DEL;
02393 #else
02394     return POLLREMOVE;
02395 #endif /* ACE_HAS_EVENT_POLL */
02396 
02397   short events = 0;
02398 
02399   // READ, ACCEPT, and CONNECT flag will place the handle in the
02400   // read set.
02401   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
02402       || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
02403       || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
02404     {
02405 #if defined (ACE_HAS_EVENT_POLL)
02406       ACE_SET_BITS (events, EPOLLIN);
02407 #else
02408       ACE_SET_BITS (events, POLLIN);
02409 #endif /*ACE_HAS_EVENT_POLL*/
02410     }
02411 
02412   // WRITE and CONNECT flag will place the handle in the write set.
02413   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK)
02414       || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
02415     {
02416 #if defined (ACE_HAS_EVENT_POLL)
02417       ACE_SET_BITS (events, EPOLLOUT);
02418 #else
02419       ACE_SET_BITS (events, POLLOUT);
02420 #endif /*ACE_HAS_EVENT_POLL*/
02421     }
02422 
02423   // EXCEPT flag will place the handle in the except set.
02424   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
02425     {
02426 #if defined (ACE_HAS_EVENT_POLL)
02427       ACE_SET_BITS (events, EPOLLPRI);
02428 #else
02429       ACE_SET_BITS (events, POLLPRI);
02430 #endif /*ACE_HAS_EVENT_POLL*/
02431     }
02432 
02433   return events;
02434 }
02435 
02436 namespace {
02437   void polite_sleep_hook (void *) { }
02438 }
02439 
02440 int
02441 ACE_Dev_Poll_Reactor::Token_Guard::acquire_quietly (ACE_Time_Value *max_wait)
02442 {
02443   ACE_TRACE ("ACE_Dev_Poll_Reactor::Token_Guard::acquire_quietly");
02444 
02445   // Acquire the token but don't ping any waiters; just queue up politely.
02446   int result = 0;
02447   if (max_wait)
02448     {
02449       ACE_Time_Value tv = ACE_OS::gettimeofday ();
02450       tv += *max_wait;
02451 
02452       ACE_MT (result = this->token_.acquire_read (&polite_sleep_hook,
02453                                                   0,
02454                                                   &tv));
02455     }
02456   else
02457     {
02458       ACE_MT (result = this->token_.acquire_read (&polite_sleep_hook));
02459     }
02460 
02461   // Check for timeouts and errors.
02462   if (result == -1)
02463     {
02464       if (errno == ETIME)
02465         return 0;
02466       else
02467         {
02468           ACE_ERROR ((LM_ERROR, "%t: %p\n", "token acquire_read"));
02469           return -1;
02470         }
02471     }
02472 
02473   // We got the token and so let us mark ourselves as owner
02474   this->owner_ = 1;
02475 
02476   return result;
02477 }
02478 
02479 int
02480 ACE_Dev_Poll_Reactor::Token_Guard::acquire (ACE_Time_Value *max_wait)
02481 {
02482   ACE_TRACE ("ACE_Dev_Poll_Reactor::Token_Guard::acquire");
02483 
02484   // Try to grab the token.  If someone if already there, don't wake
02485   // them up, just queue up in the thread pool.
02486   int result = 0;
02487   if (max_wait)
02488     {
02489       ACE_Time_Value tv = ACE_OS::gettimeofday ();
02490       tv += *max_wait;
02491 
02492       ACE_MT (result = this->token_.acquire (0, 0, &tv));
02493     }
02494   else
02495     {
02496       ACE_MT (result = this->token_.acquire ());
02497     }
02498 
02499   // Check for timeouts and errors.
02500   if (result == -1)
02501     {
02502       if (errno == ETIME)
02503         return 0;
02504       else
02505         return -1;
02506     }
02507 
02508   // We got the token and so let us mark ourseleves as owner
02509   this->owner_ = 1;
02510 
02511   return result;
02512 }
02513 
02514 ACE_END_VERSIONED_NAMESPACE_DECL
02515 
02516 #endif  /* ACE_HAS_EVENT_POLL || ACE_HAS_DEV_POLL */

Generated on Tue Feb 2 17:18:39 2010 for ACE by  doxygen 1.4.7