Dev_Poll_Reactor.cpp

Go to the documentation of this file.
00001 // $Id: Dev_Poll_Reactor.cpp 79332 2007-08-13 20:30:44Z sowayaa $
00002 
00003 #include "ace/OS_NS_errno.h"
00004 #include "ace/Dev_Poll_Reactor.h"
00005 #include "ace/Signal.h"
00006 #include "ace/Sig_Handler.h"
00007 
00008 ACE_RCSID (ace,
00009            Dev_Poll_Reactor,
00010            "$Id: Dev_Poll_Reactor.cpp 79332 2007-08-13 20:30:44Z sowayaa $")
00011 
00012 #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
00013 
00014 # include "ace/OS_NS_unistd.h"
00015 # include "ace/OS_NS_fcntl.h"
00016 # include "ace/OS_NS_stropts.h"
00017 
00018 # if defined (ACE_HAS_EVENT_POLL) && defined (linux)
00019 #  include /**/ <sys/epoll.h>
00020 # elif defined (ACE_HAS_DEV_POLL)
00021 #    if defined (linux)
00022 #      include /**/ <linux/devpoll.h>
00023 #    elif defined (HPUX_VERS) && HPUX_VERS < 1123
00024 #      include /**/ <devpoll.h>
00025 #    else
00026 #      include /**/ <sys/devpoll.h>
00027 #    endif  /* linux */
00028 # endif  /* ACE_HAS_DEV_POLL */
00029 
00030 #if !defined (__ACE_INLINE__)
00031 # include "ace/Dev_Poll_Reactor.inl"
00032 #endif /* __ACE_INLINE__ */
00033 
00034 
00035 #include "ace/Handle_Set.h"
00036 #include "ace/Reactor.h"
00037 #include "ace/Timer_Heap.h"
00038 #include "ace/Timer_Queue.h"
00039 #include "ace/ACE.h"
00040 #include "ace/Reverse_Lock_T.h"
00041 #include "ace/Recursive_Thread_Mutex.h"
00042 #include "ace/Null_Mutex.h"
00043 #include "ace/os_include/os_poll.h"
00044 #include "ace/OS_NS_sys_mman.h"
00045 
00046 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00047 
00048 ACE_Dev_Poll_Reactor_Notify::ACE_Dev_Poll_Reactor_Notify (void)
00049   : dp_reactor_ (0)
00050   , notification_pipe_ ()
00051   , max_notify_iterations_ (-1)
00052 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00053   , notification_queue_ ()
00054 #endif  /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00055 {
00056 }
00057 
00058 int
00059 ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r,
00060                                    ACE_Timer_Queue * /* timer_queue */,
00061                                    int disable_notify_pipe)
00062 {
00063   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::open");
00064 
00065   if (disable_notify_pipe == 0)
00066     {
00067       this->dp_reactor_ = dynamic_cast<ACE_Dev_Poll_Reactor *> (r);
00068 
00069       if (this->dp_reactor_ == 0)
00070         {
00071           errno = EINVAL;
00072           return -1;
00073         }
00074 
00075       if (this->notification_pipe_.open () == -1)
00076         return -1;
00077 
00078 #if defined (F_SETFD)
00079       // close-on-exec
00080       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00081       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00082 #endif /* F_SETFD */
00083 
00084 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00085       if (notification_queue_.open () == -1)
00086         {
00087           return -1;
00088         }
00089 
00090       if (ACE::set_flags (this->notification_pipe_.write_handle (),
00091                           ACE_NONBLOCK) == -1)
00092         return -1;
00093 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00094 
00095       // Set the read handle into non-blocking mode since we need to
00096       // perform a "speculative" read when determining if there are
00097       // notifications to dispatch.
00098       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00099                           ACE_NONBLOCK) == -1)
00100         return -1;
00101     }
00102 
00103   return 0;
00104 }
00105 
00106 int
00107 ACE_Dev_Poll_Reactor_Notify::close (void)
00108 {
00109   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close");
00110 
00111 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00112   notification_queue_.reset ();
00113 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00114 
00115   return this->notification_pipe_.close ();
00116 }
00117 
00118 int
00119 ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh,
00120                                      ACE_Reactor_Mask mask,
00121                                      ACE_Time_Value *timeout)
00122 {
00123   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify");
00124 
00125   // Just consider this method a "no-op" if there's no
00126   // ACE_Dev_Poll_Reactor configured.
00127   if (this->dp_reactor_ == 0)
00128     return 0;
00129 
00130   ACE_Notification_Buffer buffer (eh, mask);
00131 
00132 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00133   ACE_UNUSED_ARG (timeout);
00134   ACE_Dev_Poll_Handler_Guard eh_guard (eh);
00135 
00136   int notification_required =
00137     notification_queue_.push_new_notification (buffer);
00138 
00139   if (notification_required == -1)
00140     return -1;             // Also decrement eh's reference count
00141 
00142   // The notification has been queued, so it will be delivered at some
00143   // point (and may have been already); release the refcnt guard.
00144   eh_guard.release ();
00145 
00146   if (notification_required == 0)
00147     return 0;
00148 
00149   // Now pop the pipe to force the callback for dispatching when ready. If
00150   // the send fails due to a full pipe, don't fail - assume the already-sent
00151   // pipe bytes will cause the entire notification queue to be processed.
00152   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00153                          (char *) &buffer,
00154                          1,             // Only need one byte to pop the pipe
00155                          &ACE_Time_Value::zero);
00156   if (n == -1 && (errno != ETIME && errno != EAGAIN))
00157     return -1;
00158 
00159   return 0;
00160 #else
00161 
00162   ACE_Dev_Poll_Handler_Guard eh_guard (eh);
00163 
00164   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00165                          (char *) &buffer,
00166                          sizeof buffer,
00167                          timeout);
00168   if (n == -1)
00169     return -1;
00170 
00171   eh_guard.release ();
00172 
00173   return 0;
00174 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00175 }
00176 
00177 int
00178 ACE_Dev_Poll_Reactor_Notify::dispatch_notifications (
00179   int & /* number_of_active_handles */,
00180   ACE_Handle_Set & /* rd_mask */)
00181 {
00182   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notifications");
00183 
00184   // This method is unimplemented in the ACE_Dev_Poll_Reactor.
00185   // Instead, the notification handler is invoked as part of the IO
00186   // event set.  Doing so alters the some documented semantics that
00187   // state that the notifications are handled before IO events.
00188   // Enforcing such semantics does not appear to be beneficial, and
00189   // also serves to slow down event dispatching particularly with this
00190   // ACE_Dev_Poll_Reactor.
00191 
00192   ACE_NOTSUP_RETURN (-1);
00193 }
00194 
00195 int
00196 ACE_Dev_Poll_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
00197                                                ACE_Notification_Buffer &buffer)
00198 {
00199   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::read_notify_pipe");
00200 
00201   // This is a (non-blocking) "speculative" read, i.e., we attempt to
00202   // read even if no event was polled on the read handle.  A
00203   // speculative read is necessary since notifications must be
00204   // dispatched before IO events.  We can avoid the speculative read
00205   // by "walking" the array of pollfd structures returned from
00206   // `/dev/poll' or `/dev/epoll' but that is potentially much more
00207   // expensive than simply checking for an EWOULDBLOCK.
00208   size_t to_read;
00209   char *read_p;
00210   bool have_one = false;
00211 
00212 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00213   // For the queued case, we'll try to read one byte (since that's what
00214   // the notify () tried to put in) but we don't need it - notifications can
00215   // be queued even if the pipe fills, so there may be more notifications
00216   // queued than there are bytes in the pipe.
00217   char b;
00218   read_p = &b;
00219   to_read = 1;
00220 
00221   // Before reading the byte, pop a message from the queue and queue a
00222   // new message unless the queue is now empty.  The protocol is to
00223   // keep a byte in the pipe as long as the queue is not empty.
00224   bool more_messages_queued = false;
00225   ACE_Notification_Buffer next;
00226 
00227   int result = notification_queue_.pop_next_notification (buffer,
00228                                                           more_messages_queued,
00229                                                           next);
00230 
00231   if (result == 0)
00232     {
00233       // remove the notification byte from the pipe, avoiding notification loop
00234       ACE::recv (handle, read_p, to_read);
00235       return 0;
00236     }
00237 
00238   if (result == -1)
00239     return -1;
00240 
00241   if (more_messages_queued)
00242     (void) ACE::send (this->notification_pipe_.write_handle (),
00243                       (char *)&next,
00244                       1 /* one byte is enough */,
00245                       &ACE_Time_Value::zero);
00246 #else
00247   to_read = sizeof buffer;
00248   read_p = (char *)&buffer;
00249 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00250 
00251   ssize_t n = ACE::recv (handle, read_p, to_read);
00252 
00253   if (n > 0)
00254     {
00255       // Check to see if we've got a short read.
00256       if (static_cast<size_t> (n) != to_read)
00257         {
00258           size_t remainder = to_read - n;
00259 
00260           // If so, try to recover by reading the remainder.  If this
00261           // doesn't work we're in big trouble since the input stream
00262           // won't be aligned correctly.  I'm not sure quite what to
00263           // do at this point.  It's probably best just to return -1.
00264           if (ACE::recv (handle, &read_p[n], remainder) <= 0)
00265             return -1;
00266         }
00267 
00268       return 1;
00269     }
00270 
00271   // Return -1 if things have gone seriously wrong.
00272   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00273     return -1;
00274 
00275   return have_one ? 1 : 0;
00276 }
00277 
00278 
00279 int
00280 ACE_Dev_Poll_Reactor_Notify::handle_input (ACE_HANDLE handle)
00281 {
00282   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::handle_input");
00283 
00284   // @@ We may end up dispatching this event handler twice:  once when
00285   //    performing the speculative read on the notification pipe
00286   //    handle, and once more when dispatching the IO events.
00287 
00288   // Precondition: this->select_reactor_.token_.current_owner () ==
00289   // ACE_Thread::self ();
00290 
00291   int number_dispatched = 0;
00292   int result = 0;
00293   ACE_Notification_Buffer buffer;
00294 
00295   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00296     {
00297       // Dispatch the buffer
00298       // NOTE: We count only if we made any dispatches ie. upcalls.
00299       if (this->dispatch_notify (buffer) > 0)
00300         ++number_dispatched;
00301 
00302       // Bail out if we've reached the <notify_threshold_>.  Note that
00303       // by default <notify_threshold_> is -1, so we'll loop until all
00304       // the available notifications have been dispatched.
00305       if (number_dispatched == this->max_notify_iterations_)
00306         break;
00307     }
00308 
00309   if (result == -1)
00310     {
00311       // Reassign number_dispatched to -1 if things have gone
00312       // seriously wrong.
00313       number_dispatched = -1;
00314     }
00315 
00316   // Enqueue ourselves into the list of waiting threads.  When we
00317   // reacquire the token we'll be off and running again with ownership
00318   // of the token.  The postcondition of this call is that
00319   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
00320   //this->select_reactor_->renew ();
00321 
00322   return number_dispatched;
00323 }
00324 
00325 ACE_HANDLE
00326 ACE_Dev_Poll_Reactor_Notify::notify_handle (void)
00327 {
00328   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify_handle");
00329 
00330   return this->notification_pipe_.read_handle ();
00331 }
00332 
00333 int
00334 ACE_Dev_Poll_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &)
00335 {
00336   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::is_dispatchable");
00337 
00338   ACE_NOTSUP_RETURN (-1);
00339 }
00340 
00341 int
00342 ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00343 {
00344   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notify");
00345 
00346   // If eh == 0 then another thread is unblocking the
00347   // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's
00348   // internal structures.  Otherwise, we need to dispatch the
00349   // appropriate handle_* method on the ACE_Event_Handler
00350   // pointer we've been passed.
00351   if (buffer.eh_ != 0)
00352     {
00353       int result = 0;
00354 
00355       // Guard the handler's refcount. Recall that when the notify
00356       // was queued, the refcount was incremented, so it need not be
00357       // now. The guard insures that it is decremented properly.
00358       ACE_Dev_Poll_Handler_Guard eh_guard (buffer.eh_, false);
00359 
00360       switch (buffer.mask_)
00361         {
00362         case ACE_Event_Handler::READ_MASK:
00363         case ACE_Event_Handler::ACCEPT_MASK:
00364           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00365           break;
00366         case ACE_Event_Handler::WRITE_MASK:
00367           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00368           break;
00369         case ACE_Event_Handler::EXCEPT_MASK:
00370           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00371           break;
00372         default:
00373           // Should we bail out if we get an invalid mask?
00374           ACE_ERROR ((LM_ERROR,
00375                       ACE_TEXT ("dispatch_notify invalid mask = %d\n"),
00376                       buffer.mask_));
00377         }
00378       if (result == -1)
00379         buffer.eh_->handle_close (ACE_INVALID_HANDLE, buffer.mask_);
00380     }
00381 
00382   return 1;
00383 }
00384 
00385 void
00386 ACE_Dev_Poll_Reactor_Notify::max_notify_iterations (int iterations)
00387 {
00388   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations");
00389 
00390   // Must always be > 0 or < 0 to optimize the loop exit condition.
00391   if (iterations == 0)
00392     iterations = 1;
00393 
00394   this->max_notify_iterations_ = iterations;
00395 }
00396 
00397 int
00398 ACE_Dev_Poll_Reactor_Notify::max_notify_iterations (void)
00399 {
00400   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations");
00401 
00402   return this->max_notify_iterations_;
00403 }
00404 
00405 int
00406 ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications (
00407   ACE_Event_Handler *eh,
00408   ACE_Reactor_Mask mask)
00409 {
00410   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications");
00411 
00412 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00413 
00414   return notification_queue_.purge_pending_notifications (eh, mask);
00415 
00416 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00417   ACE_UNUSED_ARG (eh);
00418   ACE_UNUSED_ARG (mask);
00419   ACE_NOTSUP_RETURN (-1);
00420 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00421 }
00422 
00423 void
00424 ACE_Dev_Poll_Reactor_Notify::dump (void) const
00425 {
00426 #if defined (ACE_HAS_DUMP)
00427   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dump");
00428 
00429   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00430   ACE_DEBUG ((LM_DEBUG,
00431               ACE_TEXT ("dp_reactor_ = %@"),
00432               this->dp_reactor_));
00433   this->notification_pipe_.dump ();
00434   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00435 #endif /* ACE_HAS_DUMP */
00436 }
00437 
00438 // -----------------------------------------------------------------
00439 
00440 ACE_Dev_Poll_Reactor_Handler_Repository::
00441 ACE_Dev_Poll_Reactor_Handler_Repository (void)
00442   : max_size_ (0),
00443     handlers_ (0)
00444 {
00445   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::ACE_Dev_Poll_Reactor_Handler_Repository");
00446 }
00447 
00448 int
00449 ACE_Dev_Poll_Reactor_Handler_Repository::invalid_handle (
00450   ACE_HANDLE handle) const
00451 {
00452   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::invalid_handle");
00453 
00454   if (handle < 0 || handle >= this->max_size_)
00455     {
00456       errno = EINVAL;
00457       return 1;
00458     }
00459   else
00460     return 0;
00461 }
00462 
00463 int
00464 ACE_Dev_Poll_Reactor_Handler_Repository::handle_in_range (
00465   ACE_HANDLE handle) const
00466 {
00467   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::handle_in_range");
00468 
00469   if (handle >= 0 && handle < this->max_size_)
00470     return 1;
00471   else
00472     {
00473       errno = EINVAL;
00474       return 0;
00475     }
00476 }
00477 
00478 int
00479 ACE_Dev_Poll_Reactor_Handler_Repository::open (size_t size)
00480 {
00481   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::open");
00482 
00483   this->max_size_ = size;
00484 
00485   // Try to allocate the memory.
00486   ACE_NEW_RETURN (this->handlers_,
00487                   ACE_Dev_Poll_Event_Tuple[size],
00488                   -1);
00489 
00490   // Try to increase the number of handles if <size> is greater than
00491   // the current limit.
00492   return ACE::set_handle_limit (size);
00493 }
00494 
00495 int
00496 ACE_Dev_Poll_Reactor_Handler_Repository::unbind_all (void)
00497 {
00498   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::unbind_all");
00499 
00500   // Unbind all of the event handlers.
00501   for (int handle = 0;
00502        handle < this->max_size_;
00503        ++handle)
00504     this->unbind (handle);
00505 
00506   return 0;
00507 }
00508 
00509 int
00510 ACE_Dev_Poll_Reactor_Handler_Repository::close (void)
00511 {
00512   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::close");
00513 
00514   if (this->handlers_ != 0)
00515     {
00516       this->unbind_all ();
00517 
00518       delete [] this->handlers_;
00519       this->handlers_ = 0;
00520     }
00521 
00522   return 0;
00523 }
00524 
00525 ACE_Event_Handler *
00526 ACE_Dev_Poll_Reactor_Handler_Repository::find (ACE_HANDLE handle,
00527                                                size_t *index_p)
00528 {
00529   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::find");
00530 
00531   ACE_Event_Handler *eh = 0;
00532 
00533   // Only bother to search for the <handle> if it's in range.
00534   if (this->handle_in_range (handle))
00535     {
00536       eh = this->handlers_[handle].event_handler;
00537       if (eh != 0)
00538         {
00539           if (index_p != 0)
00540             *index_p = handle;
00541         }
00542       else
00543         errno = ENOENT;
00544     }
00545 
00546   return eh;
00547 }
00548 
00549 int
00550 ACE_Dev_Poll_Reactor_Handler_Repository::bind (
00551   ACE_HANDLE handle,
00552   ACE_Event_Handler *event_handler,
00553   ACE_Reactor_Mask mask)
00554 {
00555   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::bind");
00556 
00557   if (event_handler == 0)
00558     return -1;
00559 
00560   if (handle == ACE_INVALID_HANDLE)
00561     handle = event_handler->get_handle ();
00562 
00563   if (this->invalid_handle (handle))
00564     return -1;
00565 
00566   this->handlers_[handle].event_handler = event_handler;
00567   this->handlers_[handle].mask = mask;
00568   event_handler->add_reference ();
00569 
00570   return 0;
00571 }
00572 
00573 int
00574 ACE_Dev_Poll_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
00575                                                  bool decr_refcnt)
00576 {
00577   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::unbind");
00578 
00579   if (this->find (handle) == 0)
00580     return -1;
00581 
00582   if (decr_refcnt)
00583     this->handlers_[handle].event_handler->remove_reference ();
00584   this->handlers_[handle].event_handler = 0;
00585   this->handlers_[handle].mask = ACE_Event_Handler::NULL_MASK;
00586   this->handlers_[handle].suspended = 0;
00587 
00588   return 0;
00589 }
00590 
00591 // -----------------------------------------------------------------
00592 
00593 ACE_Dev_Poll_Reactor::ACE_Dev_Poll_Reactor (ACE_Sig_Handler *sh,
00594                                             ACE_Timer_Queue *tq,
00595                                             int disable_notify_pipe,
00596                                             ACE_Reactor_Notify *notify,
00597                                             int mask_signals,
00598                                             int s_queue)
00599   : initialized_ (false)
00600   , poll_fd_ (ACE_INVALID_HANDLE)
00601   , size_ (0)
00602   // , ready_set_ ()
00603 #if defined (ACE_HAS_EVENT_POLL)
00604   , events_ (0)
00605   , start_pevents_ (0)
00606   , end_pevents_ (0)
00607 #else
00608   , dp_fds_ (0)
00609   , start_pfds_ (0)
00610   , end_pfds_ (0)
00611 #endif  /* ACE_HAS_EVENT_POLL */
00612   , deactivated_ (0)
00613   , token_ (*this, s_queue)
00614   , lock_adapter_ (token_)
00615   , timer_queue_ (0)
00616   , delete_timer_queue_ (0)
00617   , signal_handler_ (0)
00618   , delete_signal_handler_ (0)
00619   , notify_handler_ (0)
00620   , delete_notify_handler_ (0)
00621   , mask_signals_ (mask_signals)
00622   , restart_ (0)
00623 {
00624   ACE_TRACE ("ACE_Dev_Poll_Reactor::ACE_Dev_Poll_Reactor");
00625 
00626   if (this->open (ACE::max_handles (),
00627                   0,
00628                   sh,
00629                   tq,
00630                   disable_notify_pipe,
00631                   notify) == -1)
00632     ACE_ERROR ((LM_ERROR,
00633                 ACE_TEXT ("%p\n"),
00634                 ACE_TEXT ("ACE_Dev_Poll_Reactor::open ")
00635                 ACE_TEXT ("failed inside ")
00636                 ACE_TEXT ("ACE_Dev_Poll_Reactor::CTOR")));
00637 }
00638 
00639 ACE_Dev_Poll_Reactor::ACE_Dev_Poll_Reactor (size_t size,
00640                                             int rs,
00641                                             ACE_Sig_Handler *sh,
00642                                             ACE_Timer_Queue *tq,
00643                                             int disable_notify_pipe,
00644                                             ACE_Reactor_Notify *notify,
00645                                             int mask_signals,
00646                                             int s_queue)
00647   : initialized_ (false)
00648   , poll_fd_ (ACE_INVALID_HANDLE)
00649   , size_ (0)
00650   // , ready_set_ ()
00651 #if defined (ACE_HAS_EVENT_POLL)
00652   , events_ (0)
00653   , start_pevents_ (0)
00654   , end_pevents_ (0)
00655 #else
00656   , dp_fds_ (0)
00657   , start_pfds_ (0)
00658   , end_pfds_ (0)
00659 #endif  /* ACE_HAS_EVENT_POLL */
00660   , deactivated_ (0)
00661   , token_ (*this, s_queue)
00662   , lock_adapter_ (token_)
00663   , timer_queue_ (0)
00664   , delete_timer_queue_ (0)
00665   , signal_handler_ (0)
00666   , delete_signal_handler_ (0)
00667   , notify_handler_ (0)
00668   , delete_notify_handler_ (0)
00669   , mask_signals_ (mask_signals)
00670   , restart_ (0)
00671 {
00672   if (this->open (size,
00673                   rs,
00674                   sh,
00675                   tq,
00676                   disable_notify_pipe,
00677                   notify) == -1)
00678     ACE_ERROR ((LM_ERROR,
00679                 ACE_TEXT ("%p\n"),
00680                 ACE_TEXT ("ACE_Dev_Poll_Reactor::open ")
00681                 ACE_TEXT ("failed inside ACE_Dev_Poll_Reactor::CTOR")));
00682 }
00683 
00684 ACE_Dev_Poll_Reactor::~ACE_Dev_Poll_Reactor (void)
00685 {
00686   ACE_TRACE ("ACE_Dev_Poll_Reactor::~ACE_Dev_Poll_Reactor");
00687 
00688  (void) this->close ();
00689 }
00690 
00691 int
00692 ACE_Dev_Poll_Reactor::open (size_t size,
00693                             int restart,
00694                             ACE_Sig_Handler *sh,
00695                             ACE_Timer_Queue *tq,
00696                             int disable_notify_pipe,
00697                             ACE_Reactor_Notify *notify)
00698 {
00699   ACE_TRACE ("ACE_Dev_Poll_Reactor::open");
00700 
00701   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
00702 
00703   // Can't initialize ourselves more than once.
00704   if (this->initialized_)
00705     return -1;
00706 
00707   this->restart_ = restart;
00708   this->signal_handler_ = sh;
00709   this->timer_queue_ = tq;
00710   this->notify_handler_ = notify;
00711 
00712   int result = 0;
00713 
00714   // Allows the signal handler to be overridden.
00715   if (this->signal_handler_ == 0)
00716     {
00717       ACE_NEW_RETURN (this->signal_handler_,
00718                       ACE_Sig_Handler,
00719                       -1);
00720 
00721       if (this->signal_handler_ == 0)
00722         result = -1;
00723       else
00724         this->delete_signal_handler_ = 1;
00725     }
00726 
00727   // Allows the timer queue to be overridden.
00728   if (result != -1 && this->timer_queue_ == 0)
00729     {
00730       ACE_NEW_RETURN (this->timer_queue_,
00731                       ACE_Timer_Heap,
00732                       -1);
00733 
00734       if (this->timer_queue_ == 0)
00735         result = -1;
00736       else
00737         this->delete_timer_queue_ = 1;
00738     }
00739 
00740   // Allows the Notify_Handler to be overridden.
00741   if (result != -1 && this->notify_handler_ == 0)
00742     {
00743       ACE_NEW_RETURN (this->notify_handler_,
00744                       ACE_Dev_Poll_Reactor_Notify,
00745                       -1);
00746 
00747       if (this->notify_handler_ == 0)
00748         result = -1;
00749       else
00750         this->delete_notify_handler_ = 1;
00751     }
00752 
00753 #if defined (ACE_HAS_EVENT_POLL)
00754 
00755   // Allocating event table:
00756   ACE_NEW_RETURN (this->events_, epoll_event[size], -1);
00757 
00758   // Initializing epoll:
00759   this->poll_fd_ = ::epoll_create (size);
00760   if (this->poll_fd_ == -1)
00761     result = -1;
00762 
00763 #else
00764 
00765   // Allocate the array before opening the device to avoid a potential
00766   // resource leak if allocation fails.
00767   ACE_NEW_RETURN (this->dp_fds_,
00768                   pollfd[size],
00769                   -1);
00770 
00771   // Open the `/dev/poll' character device.
00772   this->poll_fd_ = ACE_OS::open ("/dev/poll", O_RDWR);
00773   if (this->poll_fd_ == ACE_INVALID_HANDLE)
00774     result = -1;
00775 
00776 #endif  /* ACE_HAS_EVENT_POLL */
00777 
00778   if (result != -1 && this->handler_rep_.open (size) == -1)
00779     result = -1;
00780 
00781   // Registration of the notification handler must be done after the
00782   // /dev/poll device has been fully initialized.
00783   else if (this->notify_handler_->open (this,
00784                                         0,
00785                                         disable_notify_pipe) == -1
00786            || (disable_notify_pipe == 0
00787                && this->register_handler_i (
00788                                             this->notify_handler_->notify_handle (),
00789                                             this->notify_handler_,
00790                                             ACE_Event_Handler::READ_MASK) == -1))
00791     result = -1;
00792 
00793   this->size_ = size;
00794 
00795   if (result != -1)
00796     // We're all set to go.
00797     this->initialized_ = true;
00798   else
00799     // This will close down all the allocated resources properly.
00800  (void) this->close ();
00801 
00802   return result;
00803 }
00804 
00805 int
00806 ACE_Dev_Poll_Reactor::current_info (ACE_HANDLE, size_t & /* size */)
00807 {
00808   ACE_NOTSUP_RETURN (-1);
00809 }
00810 
00811 
00812 int
00813 ACE_Dev_Poll_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
00814 {
00815   if (this->delete_signal_handler_ && this->signal_handler_)
00816     delete this->signal_handler_;
00817 
00818   this->signal_handler_ = signal_handler;
00819   this->delete_signal_handler_ = 0;
00820 
00821   return 0;
00822 }
00823 
00824 int
00825 ACE_Dev_Poll_Reactor::timer_queue (ACE_Timer_Queue *tq)
00826 {
00827   if (this->delete_timer_queue_ && this->timer_queue_)
00828     delete this->timer_queue_;
00829 
00830   this->timer_queue_ = tq;
00831   this->delete_timer_queue_ = 0;
00832 
00833   return 0;
00834 
00835 }
00836 
00837 ACE_Timer_Queue *
00838 ACE_Dev_Poll_Reactor::timer_queue (void) const
00839 {
00840   return this->timer_queue_;
00841 }
00842 
00843 int
00844 ACE_Dev_Poll_Reactor::close (void)
00845 {
00846   ACE_TRACE ("ACE_Dev_Poll_Reactor::close");
00847 
00848   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
00849 
00850   int result = 0;
00851 
00852   if (this->poll_fd_ != ACE_INVALID_HANDLE)
00853     {
00854       result = ACE_OS::close (this->poll_fd_);
00855     }
00856 
00857 #if defined (ACE_HAS_EVENT_POLL)
00858 
00859   delete [] this->events_;
00860   this->events_ = 0;
00861 
00862 #else
00863 
00864   delete [] this->dp_fds_;
00865   this->dp_fds_ = 0;
00866 
00867 #endif  /* ACE_HAS_EVENT_POLL */
00868 
00869   if (this->delete_signal_handler_)
00870     {
00871       delete this->signal_handler_;
00872       this->signal_handler_ = 0;
00873       this->delete_signal_handler_ = 0;
00874     }
00875 
00876  (void) this->handler_rep_.close ();
00877 
00878   if (this->delete_timer_queue_)
00879     {
00880       delete this->timer_queue_;
00881       this->timer_queue_ = 0;
00882       this->delete_timer_queue_ = 0;
00883     }
00884 
00885   if (this->notify_handler_ != 0)
00886     this->notify_handler_->close ();
00887 
00888   if (this->delete_notify_handler_)
00889     {
00890       delete this->notify_handler_;
00891       this->notify_handler_ = 0;
00892       this->delete_notify_handler_ = 0;
00893     }
00894 
00895   this->poll_fd_ = ACE_INVALID_HANDLE;
00896 
00897 #if defined (ACE_HAS_EVENT_POLL)
00898   this->start_pevents_ = 0;
00899   this->end_pevents_   = 0;
00900 #else
00901   this->start_pfds_ = 0;
00902   this->end_pfds_ = 0;
00903 #endif /* ACE_HAS_EVENT_POLL */
00904 
00905   this->initialized_ = false;
00906 
00907   return result;
00908 }
00909 
00910 int
00911 ACE_Dev_Poll_Reactor::work_pending (const ACE_Time_Value & max_wait_time)
00912 {
00913   ACE_TRACE ("ACE_Dev_Poll_Reactor::work_pending");
00914 
00915   // Stash the current time
00916   //
00917   // The destructor of this object will automatically compute how much
00918   // time elapsed since this method was called.
00919   ACE_Time_Value mwt (max_wait_time);
00920   ACE_MT (ACE_Countdown_Time countdown (&mwt));
00921 
00922   Token_Guard guard (this->token_);
00923   int result = guard.acquire_quietly (&mwt);
00924 
00925   // If the guard is NOT the owner just return the retval
00926   if (!guard.is_owner ())
00927     return result;
00928 
00929   // Update the countdown to reflect time waiting for the mutex.
00930   ACE_MT (countdown.update ());
00931 
00932   return this->work_pending_i (&mwt);
00933 }
00934 
00935 int
00936 ACE_Dev_Poll_Reactor::work_pending_i (ACE_Time_Value * max_wait_time)
00937 {
00938   ACE_TRACE ("ACE_Dev_Poll_Reactor::work_pending_i");
00939 
00940   if (this->deactivated_)
00941     return 0;
00942 
00943 #if defined (ACE_HAS_EVENT_POLL)
00944   if (this->start_pevents_ != this->end_pevents_)
00945 #else
00946   if (this->start_pfds_ != this->end_pfds_)
00947 #endif /* ACE_HAS_EVENT_POLL */
00948     return 1;  // We still have work_pending (). Do not poll for
00949                // additional events.
00950 
00951   ACE_Time_Value timer_buf (0);
00952   ACE_Time_Value *this_timeout = 0;
00953 
00954   this_timeout = this->timer_queue_->calculate_timeout (max_wait_time,
00955                                                         &timer_buf);
00956 
00957   // Check if we have timers to fire.
00958   const int timers_pending =
00959  ((this_timeout != 0 && max_wait_time == 0)
00960      || (this_timeout != 0 && max_wait_time != 0
00961          && *this_timeout != *max_wait_time) ? 1 : 0);
00962 
00963   const long timeout =
00964  (this_timeout == 0
00965      ? -1 /* Infinity */
00966      : static_cast<long> (this_timeout->msec ()));
00967 
00968 #if defined (ACE_HAS_EVENT_POLL)
00969 
00970    // Wait for events.
00971    const int nfds = ::epoll_wait (this->poll_fd_,
00972                                   this->events_,
00973                                   this->size_,
00974                                   static_cast<int> (timeout));
00975 
00976   if (nfds > 0)
00977     {
00978       this->start_pevents_ = this->events_;
00979       this->end_pevents_ = this->start_pevents_ + nfds;
00980     }
00981 
00982 #else
00983 
00984   struct dvpoll dvp;
00985 
00986   dvp.dp_fds = this->dp_fds_;
00987   dvp.dp_nfds = this->size_;
00988   dvp.dp_timeout = timeout;  // Milliseconds
00989 
00990   // Poll for events
00991   const int nfds = ACE_OS::ioctl (this->poll_fd_, DP_POLL, &dvp);
00992 
00993   // Retrieve the results from the pollfd array.
00994   this->start_pfds_ = dvp.dp_fds;
00995 
00996   // If nfds == 0 then end_pfds_ == start_pfds_ meaning that there is
00997   // no work pending.  If nfds > 0 then there is work pending.
00998   // Otherwise an error occurred.
00999   if (nfds > -1)
01000     this->end_pfds_ = this->start_pfds_ + nfds;
01001 #endif  /* ACE_HAS_EVENT_POLL */
01002 
01003   // If timers are pending, override any timeout from the poll.
01004   return (nfds == 0 && timers_pending != 0 ? 1 : nfds);
01005 }
01006 
01007 
01008 int
01009 ACE_Dev_Poll_Reactor::handle_events (ACE_Time_Value *max_wait_time)
01010 {
01011   ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events");
01012 
01013   // Stash the current time
01014   //
01015   // The destructor of this object will automatically compute how much
01016   // time elapsed since this method was called.
01017   ACE_MT (ACE_Countdown_Time countdown (max_wait_time));
01018 
01019   Token_Guard guard (this->token_);
01020   int result = guard.acquire_quietly (max_wait_time);
01021 
01022   // If the guard is NOT the owner just return the retval
01023   if (!guard.is_owner ())
01024     return result;
01025 
01026   if (this->deactivated_)
01027     return -1;
01028 
01029   // Update the countdown to reflect time waiting for the mutex.
01030   ACE_MT (countdown.update ());
01031 
01032   return this->handle_events_i (max_wait_time, guard);
01033 }
01034 
01035 int
01036 ACE_Dev_Poll_Reactor::handle_events_i (ACE_Time_Value *max_wait_time,
01037                                        Token_Guard &guard)
01038 {
01039   ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events_i");
01040 
01041   int result = 0;
01042   // int active_handle_count = 0;
01043 
01044   // Poll for events
01045   //
01046   // If the underlying ioctl () call was interrupted via the interrupt
01047   // signal (i.e. returned -1 with errno == EINTR) then the loop will
01048   // be restarted if so desired.
01049   do
01050     {
01051       result = this->work_pending_i (max_wait_time);
01052       if (result == -1)
01053         ACE_ERROR ((LM_ERROR, "%t: %p\n", "work_pending_i"));
01054     }
01055   while (result == -1 && this->restart_ != 0 && errno == EINTR);
01056 
01057   if (result == 0 || (result == -1 && errno == ETIME))
01058     return 0;
01059   else if (result == -1)
01060     {
01061       if (errno != EINTR)
01062         return -1;
01063 
01064       // Bail out -- we got here since the poll was interrupted.
01065       // If it was due to a signal registered through our ACE_Sig_Handler,
01066       // then it was dispatched, so we count it in the number of events
01067       // handled rather than cause an error return.
01068       if (ACE_Sig_Handler::sig_pending () != 0)
01069         {
01070           ACE_Sig_Handler::sig_pending (0);
01071           return 1;
01072         }
01073       return -1;
01074     }
01075 
01076   // Dispatch an event.
01077   return this->dispatch (guard);
01078 }
01079 
01080 // Dispatch an event. On entry, the token is held by the caller. If an
01081 // event is found to dispatch, the token is released before dispatching it.
01082 int
01083 ACE_Dev_Poll_Reactor::dispatch (Token_Guard &guard)
01084 {
01085   ACE_TRACE ("ACE_Dev_Poll_Reactor::dispatch");
01086 
01087   // Perform the Template Method for dispatching the first located event.
01088   // We dispatch only one to effectively dispatch events concurrently.
01089   // As soon as an event is located, the token is released, allowing the
01090   // next waiter to begin getting an event while we dispatch one here.
01091   int result = 0;
01092 
01093   // Handle timers early since they may have higher latency
01094   // constraints than I/O handlers.  Ideally, the order of
01095   // dispatching should be a strategy...
01096   if ((result = this->dispatch_timer_handler (guard)) != 0)
01097     return result;
01098 
01099   // Check to see if there are no more I/O handles left to
01100   // dispatch AFTER we've handled the timers.
01101 
01102   // Finally, dispatch the I/O handlers.
01103   result = this->dispatch_io_event (guard);
01104 
01105   return result;
01106 }
01107 
01108 int
01109 ACE_Dev_Poll_Reactor::dispatch_timer_handler (Token_Guard &guard)
01110 {
01111   if (this->timer_queue_->is_empty ())
01112     return 0;       // Empty timer queue so cannot have any expired timers.
01113 
01114   // Get the current time
01115   ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
01116                            this->timer_queue_->timer_skew ());
01117 
01118   // Look for a node in the timer queue whose timer <= the present
01119   // time.
01120   ACE_Timer_Node_Dispatch_Info info;
01121   if (this->timer_queue_->dispatch_info (cur_time, info))
01122     {
01123       const void *upcall_act = 0;
01124 
01125       // Preinvoke (handles refcount if needed, etc.)
01126       this->timer_queue_->preinvoke (info, cur_time, upcall_act);
01127 
01128       // Release the token before expiration upcall.
01129       guard.release_token ();
01130 
01131       // call the functor
01132       this->timer_queue_->upcall (info, cur_time);
01133 
01134       // Postinvoke (undo refcount if needed, etc.)
01135       this->timer_queue_->postinvoke (info, cur_time, upcall_act);
01136 
01137       // We have dispatched a timer
01138       return 1;
01139     }
01140 
01141   return 0;
01142 }
01143 
01144 #if 0
01145 int
01146 ACE_Dev_Poll_Reactor::dispatch_notification_handlers (
01147   ACE_Select_Reactor_Handle_Set &dispatch_set,
01148   int &number_of_active_handles,
01149   int &number_of_handlers_dispatched)
01150 {
01151   // Check to see if the ACE_HANDLE associated with the
01152   // Dev_Poll_Reactor's notify hook is enabled.  If so, it means that
01153   // one or more other threads are trying to update the
01154   // ACE_Dev_Poll_Reactor's internal tables or the notify pipe is
01155   // enabled.  We'll handle all these threads and notifications, and
01156   // then break out to continue the event loop.
01157 
01158   const int n =
01159     this->notify_handler_->dispatch_notifications (number_of_active_handles,
01160                                                    dispatch_set.rd_mask_);
01161 
01162   if (n == -1)
01163     return -1;
01164   else
01165     number_of_handlers_dispatched += n;
01166 
01167   return /* this->state_changed_ ? -1 : */ 0;
01168 }
01169 #endif  /* 0 */
01170 
01171 int
01172 ACE_Dev_Poll_Reactor::dispatch_io_event (Token_Guard &guard)
01173 {
01174 
01175   // Define bits to check for while dispatching.
01176 #if defined (ACE_HAS_EVENT_POLL)
01177   const __uint32_t out_event = EPOLLOUT;
01178   const __uint32_t exc_event = EPOLLPRI;
01179   const __uint32_t in_event  = EPOLLIN;
01180   const __uint32_t err_event = EPOLLHUP | EPOLLERR;
01181 #else
01182   const short out_event = POLLOUT;
01183   const short exc_event = POLLPRI;
01184   const short in_event  = POLLIN;
01185   const short err_event = 0;              // No known bits for this
01186 #endif /* ACE_HAS_EVENT_POLL */
01187 
01188   // Since the underlying event demultiplexing mechansim (`/dev/poll'
01189   // or '/dev/epoll') is stateful, and since only one result buffer is
01190   // used, all pending events (i.e. those retrieved from a previous
01191   // poll) must be dispatched before any additional event can be
01192   // polled.  As such, the Dev_Poll_Reactor keeps track of the
01193   // progress of events that have been dispatched.
01194 
01195   // Dispatch the events.
01196   //
01197   // Select the first available handle with event (s) pending. Check for
01198   // event type in defined order of dispatch: output, exception, input.
01199   // When an event is located, clear its bit in the dispatch set. If there
01200   // are no more events for the handle, also increment the pfds pointer
01201   // to move to the next handle ready.
01202   //
01203   // Notice that pfds only contains file descriptors that have
01204   // received events.
01205 #if defined (ACE_HAS_EVENT_POLL)
01206   struct epoll_event *& pfds = this->start_pevents_;
01207   if (pfds < this->end_pevents_)
01208 #else
01209   struct pollfd *& pfds = this->start_pfds_;
01210   if (pfds < this->end_pfds_)
01211 #endif /* ACE_HAS_EVENT_POLL */
01212     {
01213 #if defined (ACE_HAS_EVENT_POLL)
01214       const ACE_HANDLE handle   = pfds->data.fd;
01215       __uint32_t &revents       = pfds->events;
01216 #else
01217       const ACE_HANDLE handle = pfds->fd;
01218       short &revents          = pfds->revents;
01219 #endif /* ACE_HAS_EVENT_POLL */
01220 
01221       // Figure out what to do first in order to make it easier to manage
01222       // the bit twiddling and possible pfds increment before releasing
01223       // the token for dispatch.
01224       // Note that if there's an error (such as the handle was closed
01225       // without being removed from the event set) the EPOLLHUP and/or
01226       // EPOLLERR bits will be set in revents.
01227       bool disp_out = false;
01228       bool disp_exc = false;
01229       bool disp_in  = false;
01230       if (ACE_BIT_ENABLED (revents, out_event))
01231         {
01232           disp_out = true;
01233           ACE_CLR_BITS (revents, out_event);
01234         }
01235       else if (ACE_BIT_ENABLED (revents, exc_event))
01236         {
01237           disp_exc = true;
01238           ACE_CLR_BITS (revents, exc_event);
01239         }
01240       else if (ACE_BIT_ENABLED (revents, in_event))
01241         {
01242           disp_in = true;
01243           ACE_CLR_BITS (revents, in_event);
01244         }
01245       else if (ACE_BIT_ENABLED (revents, err_event))
01246         {
01247           this->remove_handler_i (handle, ACE_Event_Handler::ALL_EVENTS_MASK);
01248           ++pfds;
01249           return 1;
01250         }
01251       else
01252         {
01253           ACE_ERROR ((LM_ERROR, ACE_TEXT (" (%t) dispatch_io h %d unknown events 0x%x\n"), handle, revents));
01254           // ACE_ASSERT (0);
01255         }
01256 
01257       // Increment the pointer to the next element before we
01258       // release the token.  Otherwise event handlers end up being
01259       // dispatched multiple times for the same poll.
01260       if (revents == 0)
01261         ++pfds;
01262 
01263       /* When using sys_epoll, we can attach arbitrary user
01264          data to the descriptor, so it can be delivered when
01265          activity is detected. Perhaps we should store event
01266          handler together with descriptor, instead of looking
01267          it up in a repository ? Could it boost performance ?
01268       */
01269       ACE_Event_Handler *eh = this->handler_rep_.find (handle);
01270 
01271       if (eh)
01272         {
01273           // Modify the reference count in an exception-safe way.
01274           // Note that eh could be the notify handler. It's not strictly
01275           // necessary to manage its refcount, but since we don't enable
01276           // the counting policy, it won't do much. Management of the
01277           // notified handlers themselves is done in the notify handler.
01278           ACE_Dev_Poll_Handler_Guard eh_guard (eh);
01279 
01280           // Release the reactor token before upcall.
01281           guard.release_token ();
01282 
01283           // Dispatch the detected event
01284           if (disp_out)
01285             {
01286               const int status =
01287                 this->upcall (eh, &ACE_Event_Handler::handle_output, handle);
01288 
01289               if (status < 0)
01290                 // Note that the token is reacquired in remove_handler ().
01291                 this->remove_handler (handle, ACE_Event_Handler::WRITE_MASK);
01292               return 1;
01293             }
01294 
01295           if (disp_exc)
01296             {
01297               const int status =
01298                 this->upcall (eh, &ACE_Event_Handler::handle_exception, handle);
01299 
01300               if (status < 0)
01301                 // Note that the token is reacquired in remove_handler ().
01302                 this->remove_handler (handle, ACE_Event_Handler::EXCEPT_MASK);
01303               return 1;
01304             }
01305 
01306           if (disp_in)
01307             {
01308               const int status =
01309                 this->upcall (eh, &ACE_Event_Handler::handle_input, handle);
01310 
01311               if (status < 0)
01312                 // Note that the token is reacquired in remove_handler ().
01313                 this->remove_handler (handle, ACE_Event_Handler::READ_MASK);
01314               return 1;
01315             }
01316         } // The reactor token is reacquired upon leaving this scope.
01317     }
01318 
01319   return 0;
01320 }
01321 
01322 int
01323 ACE_Dev_Poll_Reactor::alertable_handle_events (ACE_Time_Value *max_wait_time)
01324 {
01325   ACE_TRACE ("ACE_Dev_Poll_Reactor::alertable_handle_events");
01326 
01327   return this->handle_events (max_wait_time);
01328 }
01329 
01330 int
01331 ACE_Dev_Poll_Reactor::handle_events (ACE_Time_Value &max_wait_time)
01332 {
01333   ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events");
01334 
01335   return this->handle_events (&max_wait_time);
01336 }
01337 
01338 int
01339 ACE_Dev_Poll_Reactor::alertable_handle_events (ACE_Time_Value &max_wait_time)
01340 {
01341   ACE_TRACE ("ACE_Dev_Poll_Reactor::alertable_handle_events");
01342 
01343   return this->handle_events (max_wait_time);
01344 }
01345 
01346 int
01347 ACE_Dev_Poll_Reactor::deactivated (void)
01348 {
01349   return this->deactivated_;
01350 }
01351 
01352 void
01353 ACE_Dev_Poll_Reactor::deactivate (int do_stop)
01354 {
01355   this->deactivated_ = do_stop;
01356   this->wakeup_all_threads ();
01357 }
01358 
01359 int
01360 ACE_Dev_Poll_Reactor::register_handler (ACE_Event_Handler *handler,
01361                                         ACE_Reactor_Mask mask)
01362 {
01363   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01364 
01365   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01366 
01367   return this->register_handler_i (handler->get_handle (),
01368                                    handler,
01369                                    mask);
01370 }
01371 
01372 int
01373 ACE_Dev_Poll_Reactor::register_handler (ACE_HANDLE handle,
01374                                         ACE_Event_Handler *event_handler,
01375                                         ACE_Reactor_Mask mask)
01376 {
01377   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01378 
01379   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01380 
01381   return this->register_handler_i (handle,
01382                                    event_handler,
01383                                    mask);
01384 }
01385 
01386 int
01387 ACE_Dev_Poll_Reactor::register_handler_i (ACE_HANDLE handle,
01388                                           ACE_Event_Handler *event_handler,
01389                                           ACE_Reactor_Mask mask)
01390 {
01391   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler_i");
01392 
01393   if (handle == ACE_INVALID_HANDLE
01394       || mask == ACE_Event_Handler::NULL_MASK)
01395     {
01396       errno = EINVAL;
01397       return -1;
01398     }
01399 
01400  if (this->handler_rep_.find (handle) == 0)
01401    {
01402      // Handler not present in the repository.  Bind it.
01403      if (this->handler_rep_.bind (handle, event_handler, mask) != 0)
01404        return -1;
01405 
01406 #if defined (ACE_HAS_EVENT_POLL)
01407 
01408      struct epoll_event epev;
01409      ACE_OS::memset (&epev, 0, sizeof (epev));
01410      static const int op = EPOLL_CTL_ADD;
01411 
01412      epev.events  = this->reactor_mask_to_poll_event (mask);
01413      epev.data.fd = handle;
01414 
01415      if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1)
01416        {
01417          ACE_ERROR ((LM_ERROR, "%p\n", "epoll_ctl"));
01418  (void) this->handler_rep_.unbind (handle);
01419          return -1;
01420        }
01421 
01422 #endif /* ACE_HAS_EVENT_POLL */
01423    }
01424  else
01425    {
01426      // Handler is already present in the repository, so register it
01427      // again, possibly for different event.  Add new mask to the
01428      // current one.
01429      if (this->mask_ops_i (handle, mask, ACE_Reactor::ADD_MASK) == -1)
01430        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "mask_ops_i"), -1);
01431    }
01432 
01433 #ifndef  ACE_HAS_EVENT_POLL
01434 
01435   struct pollfd pfd;
01436 
01437   pfd.fd      = handle;
01438   pfd.events  = this->reactor_mask_to_poll_event (mask);
01439   pfd.revents = 0;
01440 
01441   // Add file descriptor to the "interest set."
01442   if (ACE_OS::write (this->poll_fd_, &pfd, sizeof (pfd)) != sizeof (pfd))
01443     {
01444  (void) this->handler_rep_.unbind (handle);
01445       return -1;
01446     }
01447 #endif /*ACE_HAS_EVENT_POLL*/
01448 
01449   // Note the fact that we've changed the state of the wait_set_,
01450   // which is used by the dispatching loop to determine whether it can
01451   // keep going or if it needs to reconsult select ().
01452   // this->state_changed_ = 1;
01453 
01454   return 0;
01455 }
01456 
01457 int
01458 ACE_Dev_Poll_Reactor::register_handler (
01459   ACE_HANDLE /* event_handle */,
01460   ACE_HANDLE /* io_handle */,
01461   ACE_Event_Handler * /* event_handler */,
01462   ACE_Reactor_Mask /* mask */)
01463 {
01464   ACE_NOTSUP_RETURN (-1);
01465 }
01466 
01467 int
01468 ACE_Dev_Poll_Reactor::register_handler (const ACE_Handle_Set &handle_set,
01469                                         ACE_Event_Handler *event_handler,
01470                                         ACE_Reactor_Mask mask)
01471 {
01472   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01473 
01474   ACE_Handle_Set_Iterator handle_iter (handle_set);
01475 
01476   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01477 
01478   // @@ It might be more efficient to construct a pollfd array and
01479   //    pass it to the write () call in register_handler_i () only once,
01480   //    instead of calling write () (a system call) once for each file
01481   //    descriptor.
01482 
01483   for (ACE_HANDLE h = handle_iter ();
01484        h != ACE_INVALID_HANDLE;
01485        h = handle_iter ())
01486     if (this->register_handler_i (h, event_handler, mask) == -1)
01487       return -1;
01488 
01489   return 0;
01490 }
01491 
01492 int
01493 ACE_Dev_Poll_Reactor::register_handler (int signum,
01494                                         ACE_Event_Handler *new_sh,
01495                                         ACE_Sig_Action *new_disp,
01496                                         ACE_Event_Handler **old_sh,
01497                                         ACE_Sig_Action *old_disp)
01498 {
01499   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01500 
01501   return this->signal_handler_->register_handler (signum,
01502                                                   new_sh,
01503                                                   new_disp,
01504                                                   old_sh,
01505                                                   old_disp);
01506 }
01507 
01508 int
01509 ACE_Dev_Poll_Reactor::register_handler (const ACE_Sig_Set &sigset,
01510                                         ACE_Event_Handler *new_sh,
01511                                         ACE_Sig_Action *new_disp)
01512 {
01513   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01514 
01515   int result = 0;
01516 
01517 #if (ACE_NSIG > 0)
01518 
01519   for (int s = 1; s < ACE_NSIG; ++s)
01520     if ((sigset.is_member (s) == 1)
01521         && this->signal_handler_->register_handler (s,
01522                                                     new_sh,
01523                                                     new_disp) == -1)
01524       result = -1;
01525 
01526 #else  /* ACE_NSIG <= 0 */
01527 
01528   ACE_UNUSED_ARG (sigset);
01529   ACE_UNUSED_ARG (new_sh);
01530   ACE_UNUSED_ARG (new_disp);
01531 
01532 #endif /* ACE_NSIG <= 0  */
01533 
01534   return result;
01535 }
01536 
01537 int
01538 ACE_Dev_Poll_Reactor::remove_handler (ACE_Event_Handler *handler,
01539                                       ACE_Reactor_Mask mask)
01540 {
01541   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01542 
01543   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01544 
01545   return this->remove_handler_i (handler->get_handle (), mask);
01546 }
01547 
01548 int
01549 ACE_Dev_Poll_Reactor::remove_handler (ACE_HANDLE handle,
01550                                       ACE_Reactor_Mask mask)
01551 {
01552   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01553 
01554   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01555 
01556   return this->remove_handler_i (handle, mask);
01557 }
01558 
01559 int
01560 ACE_Dev_Poll_Reactor::remove_handler_i (ACE_HANDLE handle,
01561                                         ACE_Reactor_Mask mask)
01562 {
01563   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler_i");
01564 
01565   ACE_Event_Handler *eh = this->handler_rep_.find (handle);
01566 
01567   if (eh == 0 ||
01568       this->mask_ops_i (handle, mask, ACE_Reactor::CLR_MASK) == -1)
01569     return -1;
01570 
01571   // Check for ref counting now - handle_close () may delete eh.
01572   bool const requires_reference_counting =
01573     eh->reference_counting_policy ().value () ==
01574     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
01575 
01576   if (ACE_BIT_DISABLED (mask, ACE_Event_Handler::DONT_CALL))
01577  (void) eh->handle_close (handle, mask);
01578 
01579   // If there are no longer any outstanding events on the given handle
01580   // then remove it from the handler repository.
01581   if (this->handler_rep_.mask (handle) == ACE_Event_Handler::NULL_MASK)
01582     this->handler_rep_.unbind (handle, requires_reference_counting);
01583 
01584   // Note the fact that we've changed the state of the wait_set,
01585   // i.e. the "interest set," which is used by the dispatching loop to
01586   // determine whether it can keep going or if it needs to reconsult
01587   // /dev/poll or /dev/epoll.
01588   // this->state_changed_ = 1;
01589 
01590   return 0;
01591 }
01592 
01593 int
01594 ACE_Dev_Poll_Reactor::remove_handler (const ACE_Handle_Set &handle_set,
01595                                       ACE_Reactor_Mask mask)
01596 {
01597   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01598 
01599   ACE_Handle_Set_Iterator handle_iter (handle_set);
01600 
01601   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01602 
01603   // @@ It might be more efficient to construct a pollfd array and
01604   //    pass it to the write () call in register_handler_i () only once,
01605   //    instead of calling write () (a system call) once for each file
01606   //    descriptor.
01607 
01608   for (ACE_HANDLE h = handle_iter ();
01609        h != ACE_INVALID_HANDLE;
01610        h = handle_iter ())
01611     if (this->remove_handler_i (h, mask) == -1)
01612       return -1;
01613 
01614   return 0;
01615 }
01616 
01617 int
01618 ACE_Dev_Poll_Reactor::remove_handler (int signum,
01619                                       ACE_Sig_Action *new_disp,
01620                                       ACE_Sig_Action *old_disp,
01621                                       int sigkey)
01622 {
01623   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01624 
01625   return this->signal_handler_->remove_handler (signum,
01626                                                 new_disp,
01627                                                 old_disp,
01628                                                 sigkey);
01629 }
01630 
01631 int
01632 ACE_Dev_Poll_Reactor::remove_handler (const ACE_Sig_Set &sigset)
01633 {
01634   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01635 
01636   int result = 0;
01637 
01638 #if (ACE_NSIG > 0)
01639 
01640   for (int s = 1; s < ACE_NSIG; ++s)
01641     if ((sigset.is_member (s) == 1)
01642         && this->signal_handler_->remove_handler (s) == -1)
01643       result = -1;
01644 
01645 #else  /* ACE_NSIG <= 0 */
01646 
01647   ACE_UNUSED_ARG (sigset);
01648 
01649 #endif /* ACE_NSIG <= 0 */
01650 
01651   return result;
01652 }
01653 
01654 int
01655 ACE_Dev_Poll_Reactor::suspend_handler (ACE_Event_Handler *event_handler)
01656 {
01657   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler");
01658 
01659   if (event_handler == 0)
01660     {
01661       errno = EINVAL;
01662       return -1;
01663     }
01664 
01665   ACE_HANDLE handle = event_handler->get_handle ();
01666 
01667   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01668 
01669   return this->suspend_handler_i (handle);
01670 }
01671 
01672 int
01673 ACE_Dev_Poll_Reactor::suspend_handler (ACE_HANDLE handle)
01674 {
01675   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler");
01676 
01677   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01678 
01679   return this->suspend_handler_i (handle);
01680 }
01681 
01682 int
01683 ACE_Dev_Poll_Reactor::suspend_handler (const ACE_Handle_Set &handles)
01684 {
01685   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler");
01686 
01687   ACE_Handle_Set_Iterator handle_iter (handles);
01688   ACE_HANDLE h;
01689 
01690   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01691 
01692   while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
01693     if (this->suspend_handler_i (h) == -1)
01694       return -1;
01695 
01696   return 0;
01697 }
01698 
01699 int
01700 ACE_Dev_Poll_Reactor::suspend_handlers (void)
01701 {
01702   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handlers");
01703 
01704   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01705 
01706   const size_t len = this->handler_rep_.size ();
01707 
01708   for (size_t i = 0; i < len; ++i)
01709     if (this->handler_rep_.suspended (i) == 0
01710         && this->suspend_handler_i (i) != 0)
01711       return -1;
01712 
01713   return 0;
01714 }
01715 
01716 int
01717 ACE_Dev_Poll_Reactor::suspend_handler_i (ACE_HANDLE handle)
01718 {
01719   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler_i");
01720 
01721   if (this->handler_rep_.find (handle) == 0)
01722     return -1;
01723 
01724   if (this->handler_rep_.suspended (handle))
01725     return 0;  // Already suspended.  @@ Should this be an error?
01726 
01727   // Remove the handle from the "interest set."
01728   //
01729   // Note that the associated event handler is still in the handler
01730   // repository, but no events will be polled on the given handle thus
01731   // no event will be dispatched to the event handler.
01732 
01733 #if defined (ACE_HAS_EVENT_POLL)
01734 
01735   struct epoll_event epev;
01736   ACE_OS::memset (&epev, 0, sizeof (epev));
01737   static const int op = EPOLL_CTL_DEL;
01738 
01739   epev.events  = 0;
01740   epev.data.fd = handle;
01741 
01742   if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1)
01743     return -1;
01744 
01745 #else
01746 
01747   struct pollfd pfd[1];
01748 
01749   pfd[0].fd      = handle;
01750   pfd[0].events  = POLLREMOVE;
01751   pfd[0].revents = 0;
01752 
01753   if (ACE_OS::write (this->poll_fd_, pfd, sizeof (pfd)) != sizeof (pfd))
01754     return -1;
01755 
01756 #endif  /* ACE_HAS_EVENT_POLL */
01757 
01758   this->handler_rep_.suspend (handle);
01759 
01760   return 0;
01761 }
01762 
01763 int
01764 ACE_Dev_Poll_Reactor::resume_handler (ACE_Event_Handler *event_handler)
01765 {
01766   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler");
01767 
01768   if (event_handler == 0)
01769     {
01770       errno = EINVAL;
01771       return -1;
01772     }
01773 
01774   ACE_HANDLE handle = event_handler->get_handle ();
01775 
01776   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01777 
01778   return this->resume_handler_i (handle);
01779 }
01780 
01781 int
01782 ACE_Dev_Poll_Reactor::resume_handler (ACE_HANDLE handle)
01783 {
01784   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler");
01785 
01786   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01787 
01788   return this->resume_handler_i (handle);
01789 }
01790 
01791 int
01792 ACE_Dev_Poll_Reactor::resume_handler (const ACE_Handle_Set &handles)
01793 {
01794   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler");
01795 
01796   ACE_Handle_Set_Iterator handle_iter (handles);
01797   ACE_HANDLE h;
01798 
01799   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01800 
01801   while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
01802     if (this->resume_handler_i (h) == -1)
01803       return -1;
01804 
01805   return 0;
01806 }
01807 
01808 int
01809 ACE_Dev_Poll_Reactor::resume_handlers (void)
01810 {
01811   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handlers");
01812 
01813   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01814 
01815   const size_t len = this->handler_rep_.size ();
01816 
01817   for (size_t i = 0; i < len; ++i)
01818     if (this->handler_rep_.suspended (i)
01819         && this->resume_handler_i (i) != 0)
01820       return -1;
01821 
01822   return 0;
01823 }
01824 
01825 int
01826 ACE_Dev_Poll_Reactor::resume_handler_i (ACE_HANDLE handle)
01827 {
01828   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler_i");
01829 
01830   if (this->handler_rep_.find (handle) == 0
01831       && this->handler_rep_.suspended (handle) == 0)
01832     return -1;
01833 
01834   ACE_Reactor_Mask mask = this->handler_rep_.mask (handle);
01835 
01836   if (mask == ACE_Event_Handler::NULL_MASK)
01837     return -1;
01838 
01839   // Place the handle back in to the "interest set."
01840   //
01841   // Events for the given handle will once again be polled.
01842 
01843 #if defined (ACE_HAS_EVENT_POLL)
01844 
01845   struct epoll_event epev;
01846   ACE_OS::memset (&epev, 0, sizeof (epev));
01847   static const int op = EPOLL_CTL_ADD;
01848 
01849   epev.events  = this->reactor_mask_to_poll_event (mask);
01850   epev.data.fd = handle;
01851 
01852   if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1)
01853     return -1;
01854 
01855 #else
01856 
01857   struct pollfd pfd[1];
01858 
01859   pfd[0].fd      = handle;
01860   pfd[0].events  = this->reactor_mask_to_poll_event (mask);
01861   pfd[0].revents = 0;
01862 
01863   if (ACE_OS::write (this->poll_fd_, pfd, sizeof (pfd)) != sizeof (pfd))
01864     return -1;
01865 
01866 #endif  /* ACE_HAS_EVENT_POLL */
01867 
01868   this->handler_rep_.resume (handle);
01869 
01870   return 0;
01871 }
01872 
01873 int
01874 ACE_Dev_Poll_Reactor::resumable_handler (void)
01875 {
01876   // @@ Is this correct?
01877 
01878   return 0;
01879 }
01880 
01881 int
01882 ACE_Dev_Poll_Reactor::uses_event_associations (void)
01883 {
01884   // Since the Dev_Poll_Reactor does not do any event associations,
01885   // this method always return zero.
01886   return 0;
01887 }
01888 
01889 long
01890 ACE_Dev_Poll_Reactor::schedule_timer (ACE_Event_Handler *event_handler,
01891                                       const void *arg,
01892                                       const ACE_Time_Value &delay,
01893                                       const ACE_Time_Value &interval)
01894 {
01895   ACE_TRACE ("ACE_Dev_Poll_Reactor::schedule_timer");
01896 
01897   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01898 
01899   if (0 != this->timer_queue_)
01900     return this->timer_queue_->schedule
01901  (event_handler,
01902        arg,
01903        this->timer_queue_->gettimeofday () + delay,
01904        interval);
01905 
01906   errno = ESHUTDOWN;
01907   return -1;
01908 }
01909 
01910 int
01911 ACE_Dev_Poll_Reactor::reset_timer_interval (long timer_id,
01912                                             const ACE_Time_Value &interval)
01913 {
01914   ACE_TRACE ("ACE_Dev_Poll_Reactor::reset_timer_interval");
01915 
01916   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01917 
01918   if (0 != this->timer_queue_)
01919     return this->timer_queue_->reset_interval (timer_id, interval);
01920 
01921   errno = ESHUTDOWN;
01922   return -1;
01923 }
01924 
01925 int
01926 ACE_Dev_Poll_Reactor::cancel_timer (ACE_Event_Handler *event_handler,
01927                                     int dont_call_handle_close)
01928 {
01929   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_timer");
01930 
01931   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01932 
01933   return (this->timer_queue_ == 0
01934           ? 0
01935           : this->timer_queue_->cancel (event_handler,
01936                                         dont_call_handle_close));
01937 }
01938 
01939 int
01940 ACE_Dev_Poll_Reactor::cancel_timer (long timer_id,
01941                                     const void **arg,
01942                                     int dont_call_handle_close)
01943 {
01944   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_timer");
01945 
01946   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
01947 
01948   return (this->timer_queue_ == 0
01949           ? 0
01950           : this->timer_queue_->cancel (timer_id,
01951                                         arg,
01952                                         dont_call_handle_close));
01953 }
01954 
01955 int
01956 ACE_Dev_Poll_Reactor::schedule_wakeup (ACE_Event_Handler *eh,
01957                                        ACE_Reactor_Mask mask)
01958 {
01959   ACE_TRACE ("ACE_Dev_Poll_Reactor::schedule_wakeup");
01960 
01961   return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::ADD_MASK);
01962 }
01963 
01964 int
01965 ACE_Dev_Poll_Reactor::schedule_wakeup (ACE_HANDLE handle,
01966                                        ACE_Reactor_Mask mask)
01967 {
01968   ACE_TRACE ("ACE_Dev_Poll_Reactor::schedule_wakeup");
01969 
01970   return this->mask_ops (handle, mask, ACE_Reactor::ADD_MASK);
01971 }
01972 
01973 int
01974 ACE_Dev_Poll_Reactor::cancel_wakeup (ACE_Event_Handler *eh,
01975                                      ACE_Reactor_Mask mask)
01976 {
01977   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_wakeup");
01978 
01979   return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::CLR_MASK);
01980 }
01981 
01982 int
01983 ACE_Dev_Poll_Reactor::cancel_wakeup (ACE_HANDLE handle,
01984                                      ACE_Reactor_Mask mask)
01985 {
01986   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_wakeup");
01987 
01988   return this->mask_ops (handle, mask, ACE_Reactor::CLR_MASK);
01989 }
01990 
01991 int
01992 ACE_Dev_Poll_Reactor::notify (ACE_Event_Handler *eh,
01993                               ACE_Reactor_Mask mask,
01994                               ACE_Time_Value *timeout)
01995 {
01996   ACE_TRACE ("ACE_Dev_Poll_Reactor::notify");
01997 
01998   ssize_t n = 0;
01999 
02000   // Pass over both the Event_Handler *and* the mask to allow the
02001   // caller to dictate which Event_Handler method the receiver
02002   // invokes.  Note that this call can timeout.
02003 
02004   n = this->notify_handler_->notify (eh, mask, timeout);
02005 
02006   return n == -1 ? -1 : 0;
02007 }
02008 
02009 void
02010 ACE_Dev_Poll_Reactor::max_notify_iterations (int iterations)
02011 {
02012   ACE_TRACE ("ACE_Dev_Poll_Reactor::max_notify_iterations");
02013 
02014   ACE_MT (ACE_GUARD (ACE_Dev_Poll_Reactor_Token, mon, this->token_));
02015 
02016   this->notify_handler_->max_notify_iterations (iterations);
02017 }
02018 
02019 int
02020 ACE_Dev_Poll_Reactor::max_notify_iterations (void)
02021 {
02022   ACE_TRACE ("ACE_Dev_Poll_Reactor::max_notify_iterations");
02023 
02024   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02025 
02026   return this->notify_handler_->max_notify_iterations ();
02027 }
02028 
02029 int
02030 ACE_Dev_Poll_Reactor::purge_pending_notifications (ACE_Event_Handler * eh,
02031                                                    ACE_Reactor_Mask mask)
02032 {
02033   if (this->notify_handler_ == 0)
02034     return 0;
02035 
02036   return this->notify_handler_->purge_pending_notifications (eh, mask);
02037 }
02038 
02039 ACE_Event_Handler *
02040 ACE_Dev_Poll_Reactor::find_handler (ACE_HANDLE handle)
02041 {
02042   ACE_MT (ACE_READ_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, 0));
02043 
02044   ACE_Event_Handler *event_handler = this->handler_rep_.find (handle);
02045   if (event_handler)
02046     event_handler->add_reference ();
02047   return event_handler;
02048 }
02049 
02050 int
02051 ACE_Dev_Poll_Reactor::handler (ACE_HANDLE handle,
02052                                ACE_Reactor_Mask mask,
02053                                ACE_Event_Handler **event_handler)
02054 {
02055   ACE_TRACE ("ACE_Dev_Poll_Reactor::handler");
02056 
02057   ACE_MT (ACE_READ_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02058 
02059   ACE_Event_Handler *h = this->handler_rep_.find (handle);
02060 
02061   if (h != 0
02062       && ACE_BIT_CMP_MASK (this->handler_rep_.mask (handle),
02063                            mask,  // Compare all bits in the mask
02064                            mask))
02065     {
02066       if (event_handler != 0)
02067         *event_handler = h;
02068 
02069       return 0;
02070     }
02071 
02072   return -1;
02073 }
02074 
02075 int
02076 ACE_Dev_Poll_Reactor::handler (int signum,
02077                                ACE_Event_Handler **eh)
02078 {
02079   ACE_TRACE ("ACE_Dev_Poll_Reactor::handler");
02080 
02081   ACE_Event_Handler *handler = this->signal_handler_->handler (signum);
02082 
02083   if (handler == 0)
02084     return -1;
02085   else if (eh != 0)
02086     *eh = handler;
02087 
02088   return 0;
02089 }
02090 
02091 bool
02092 ACE_Dev_Poll_Reactor::initialized (void)
02093 {
02094   ACE_TRACE ("ACE_Dev_Poll_Reactor::initialized");
02095 
02096   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, false));
02097 
02098   return this->initialized_;
02099 }
02100 
02101 size_t
02102 ACE_Dev_Poll_Reactor::size (void) const
02103 {
02104   return this->size_;
02105 }
02106 
02107 ACE_Lock &
02108 ACE_Dev_Poll_Reactor::lock (void)
02109 {
02110   ACE_TRACE ("ACE_Dev_Poll_Reactor::lock");
02111 
02112   return this->lock_adapter_;
02113 }
02114 
02115 void
02116 ACE_Dev_Poll_Reactor::wakeup_all_threads (void)
02117 {
02118   ACE_TRACE ("ACE_Dev_Poll_Reactor::wakeup_all_threads");
02119 
02120   // Send a notification, but don't block if there's no one to receive
02121   // it.
02122   this->notify (0,
02123                 ACE_Event_Handler::NULL_MASK,
02124  (ACE_Time_Value *) &ACE_Time_Value::zero);
02125 }
02126 
02127 int
02128 ACE_Dev_Poll_Reactor::owner (ACE_thread_t /* new_owner */,
02129                              ACE_thread_t * /* old_owner */)
02130 {
02131   ACE_TRACE ("ACE_Dev_Poll_Reactor::owner");
02132 
02133   // There is no need to set the owner of the event loop.  Multiple
02134   // threads may invoke the event loop simulataneously.
02135 
02136   return 0;
02137 }
02138 
02139 int
02140 ACE_Dev_Poll_Reactor::owner (ACE_thread_t * /* owner */)
02141 {
02142   ACE_TRACE ("ACE_Dev_Poll_Reactor::owner");
02143 
02144   // There is no need to set the owner of the event loop.  Multiple
02145   // threads may invoke the event loop simulataneously.
02146 
02147   return 0;
02148 }
02149 
02150 int
02151 ACE_Dev_Poll_Reactor::restart (void)
02152 {
02153   ACE_TRACE ("ACE_Dev_Poll_Reactor::restart");
02154 
02155   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02156 
02157   return this->restart_;
02158 }
02159 
02160 int
02161 ACE_Dev_Poll_Reactor::restart (int r)
02162 {
02163   ACE_TRACE ("ACE_Dev_Poll_Reactor::restart");
02164 
02165   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02166 
02167   int current_value = this->restart_;
02168   this->restart_ = r;
02169   return current_value;
02170 }
02171 
02172 void
02173 ACE_Dev_Poll_Reactor::requeue_position (int)
02174 {
02175   ACE_TRACE ("ACE_Dev_Poll_Reactor::requeue_position");
02176 }
02177 
02178 int
02179 ACE_Dev_Poll_Reactor::requeue_position (void)
02180 {
02181   ACE_TRACE ("ACE_Dev_Poll_Reactor::requeue_position");
02182 
02183   ACE_NOTSUP_RETURN (-1);
02184 }
02185 
02186 int
02187 ACE_Dev_Poll_Reactor::mask_ops (ACE_Event_Handler *event_handler,
02188                                 ACE_Reactor_Mask mask,
02189                                 int ops)
02190 {
02191   ACE_TRACE ("ACE_Dev_Poll_Reactor::mask_ops");
02192 
02193   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02194 
02195   return this->mask_ops_i (event_handler->get_handle (), mask, ops);
02196 }
02197 
02198 int
02199 ACE_Dev_Poll_Reactor::mask_ops (ACE_HANDLE handle,
02200                                 ACE_Reactor_Mask mask,
02201                                 int ops)
02202 {
02203   ACE_TRACE ("ACE_Dev_Poll_Reactor::mask_ops");
02204 
02205   ACE_MT (ACE_GUARD_RETURN (ACE_Dev_Poll_Reactor_Token, mon, this->token_, -1));
02206 
02207   return this->mask_ops_i (handle, mask, ops);
02208 }
02209 
02210 int
02211 ACE_Dev_Poll_Reactor::mask_ops_i (ACE_HANDLE handle,
02212                                   ACE_Reactor_Mask mask,
02213                                   int ops)
02214 {
02215   ACE_TRACE ("ACE_Dev_Poll_Reactor::mask_ops_i");
02216 
02217   if (this->handler_rep_.handle_in_range (handle) == 0)
02218     return -1;
02219 
02220   // Block out all signals until method returns.
02221   ACE_Sig_Guard sb;
02222 
02223   const ACE_Reactor_Mask old_mask = this->handler_rep_.mask (handle);
02224   ACE_Reactor_Mask new_mask = old_mask;
02225 
02226   // Perform GET, CLR, SET, and ADD operations on the interest/wait
02227   // set and the suspend set (if necessary).
02228   //
02229   // GET = 1, Retrieve current value
02230   // SET = 2, Set value of bits to new mask (changes the entire mask)
02231   // ADD = 3, Bitwise "or" the value into the mask (only changes
02232   //          enabled bits)
02233   // CLR = 4  Bitwise "and" the negation of the value out of the mask
02234   // (only changes enabled bits)
02235   //
02236   // Returns the original mask.
02237 
02238   switch (ops)
02239     {
02240     case ACE_Reactor::GET_MASK:
02241       // The work for this operation is done in all cases at the
02242       // begining of the function.
02243       return old_mask;
02244 
02245     case ACE_Reactor::CLR_MASK:
02246       ACE_CLR_BITS (new_mask, mask);
02247       break;
02248 
02249     case ACE_Reactor::SET_MASK:
02250       new_mask = mask;
02251       break;
02252 
02253     case ACE_Reactor::ADD_MASK:
02254       ACE_SET_BITS (new_mask, mask);
02255       break;
02256 
02257     default:
02258       return -1;
02259     }
02260 
02261   /// Reset the mask for the given handle.
02262   this->handler_rep_.mask (handle, new_mask);
02263 
02264   if (this->handler_rep_.suspended (handle) == 0)
02265     {
02266       // Only attempt to alter events for the handle from the
02267       // "interest set" if it hasn't been suspended.
02268 
02269       const short events = this->reactor_mask_to_poll_event (new_mask);
02270 
02271 #if defined (sun)
02272       // Apparently events cannot be updated on-the-fly on Solaris so
02273       // remove the existing events, and then add the new ones.
02274       struct pollfd pfd[2];
02275 
02276       pfd[0].fd      = handle;
02277       pfd[0].events  = POLLREMOVE;
02278       pfd[0].revents = 0;
02279       pfd[1].fd      = (events == POLLREMOVE ? ACE_INVALID_HANDLE : handle);
02280       pfd[1].events  = events;
02281       pfd[1].revents = 0;
02282 
02283       // Change the events associated with the given file descriptor.
02284       if (ACE_OS::write (this->poll_fd_,
02285                          pfd,
02286                          sizeof (pfd)) != sizeof (pfd))
02287         return -1;
02288 #elif defined (ACE_HAS_EVENT_POLL)
02289 
02290       struct epoll_event epev;
02291       ACE_OS::memset (&epev, 0, sizeof (epev));
02292       int op;
02293 
02294       // ACE_Event_Handler::NULL_MASK ???
02295       if (new_mask == 0)
02296         {
02297           op          = EPOLL_CTL_DEL;
02298           epev.events = 0;
02299         }
02300       else
02301         {
02302           op          = EPOLL_CTL_MOD;
02303           epev.events = events;
02304         }
02305 
02306       epev.data.fd = handle;
02307 
02308       if (::epoll_ctl (this->poll_fd_, op, handle, &epev) == -1)
02309         {
02310           // If a handle is closed, epoll removes it from the poll set
02311           // automatically - we may not know about it yet. If that's the
02312           // case, a mod operation will fail with ENOENT. Retry it as
02313           // an add.
02314           if (op == EPOLL_CTL_MOD && errno == ENOENT &&
02315               ::epoll_ctl (this->poll_fd_, EPOLL_CTL_ADD, handle, &epev) == -1)
02316             return -1;
02317         }
02318 
02319 #else
02320       pollfd pfd[1];
02321 
02322       pfd[0].fd      = handle;
02323       pfd[0].events  = events;
02324       pfd[0].revents = 0;
02325 
02326       // Change the events associated with the given file descriptor.
02327       if (ACE_OS::write (this->poll_fd_,
02328                          pfd,
02329                          sizeof (pfd)) != sizeof (pfd))
02330         return -1;
02331 #endif /*ACE_HAS_EVENT_POLL  */
02332     }
02333 
02334   return old_mask;
02335 }
02336 
02337 int
02338 ACE_Dev_Poll_Reactor::ready_ops (ACE_Event_Handler * /* event_handler */,
02339                                  ACE_Reactor_Mask /* mask */,
02340                                  int /* ops */)
02341 {
02342   ACE_TRACE ("ACE_Dev_Poll_Reactor::ready_ops");
02343 
02344   // Since the Dev_Poll_Reactor uses the poll result buffer, the
02345   // ready_set cannot be directly manipulated outside of the event
02346   // loop.
02347   ACE_NOTSUP_RETURN (-1);
02348 }
02349 
02350 int
02351 ACE_Dev_Poll_Reactor::ready_ops (ACE_HANDLE /* handle */,
02352                                  ACE_Reactor_Mask /* mask */,
02353                                  int /* ops */)
02354 {
02355   ACE_TRACE ("ACE_Dev_Poll_Reactor::ready_ops");
02356 
02357   // Since the Dev_Poll_Reactor uses the poll result buffer, the
02358   // ready_set cannot be directly manipulated outside of the event
02359   // loop.
02360   ACE_NOTSUP_RETURN (-1);
02361 }
02362 
02363 void
02364 ACE_Dev_Poll_Reactor::dump (void) const
02365 {
02366 #if defined (ACE_HAS_DUMP)
02367   ACE_TRACE ("ACE_Dev_Poll_Reactor::dump");
02368 
02369   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02370   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("restart_ = %d\n"), this->restart_));
02371   ACE_DEBUG ((LM_DEBUG,
02372               ACE_TEXT ("initialized_ = %d"),
02373               this->initialized_));
02374   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("poll_fd_ = %d"), this->poll_fd_));
02375   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("size_ = %u"), this->size_));
02376   ACE_DEBUG ((LM_DEBUG,
02377               ACE_TEXT ("deactivated_ = %d"),
02378               this->deactivated_));
02379   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02380 #endif /* ACE_HAS_DUMP */
02381 }
02382 
02383 short
02384 ACE_Dev_Poll_Reactor::reactor_mask_to_poll_event (ACE_Reactor_Mask mask)
02385 {
02386   ACE_TRACE ("ACE_Dev_Poll_Reactor::reactor_mask_to_poll_event");
02387 
02388   if (mask == ACE_Event_Handler::NULL_MASK)
02389     // No event.  Remove from interest set.
02390 #if defined (ACE_HAS_EVENT_POLL)
02391     return EPOLL_CTL_DEL;
02392 #else
02393     return POLLREMOVE;
02394 #endif /* ACE_HAS_EVENT_POLL */
02395 
02396   short events = 0;
02397 
02398   // READ, ACCEPT, and CONNECT flag will place the handle in the
02399   // read set.
02400   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
02401       || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
02402       || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
02403     {
02404 #if defined (ACE_HAS_EVENT_POLL)
02405       ACE_SET_BITS (events, EPOLLIN);
02406 #else
02407       ACE_SET_BITS (events, POLLIN);
02408 #endif /*ACE_HAS_EVENT_POLL*/
02409     }
02410 
02411   // WRITE and CONNECT flag will place the handle in the write set.
02412   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK)
02413       || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
02414     {
02415 #if defined (ACE_HAS_EVENT_POLL)
02416       ACE_SET_BITS (events, EPOLLOUT);
02417 #else
02418       ACE_SET_BITS (events, POLLOUT);
02419 #endif /*ACE_HAS_EVENT_POLL*/
02420     }
02421 
02422   // EXCEPT flag will place the handle in the except set.
02423   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
02424     {
02425 #if defined (ACE_HAS_EVENT_POLL)
02426       ACE_SET_BITS (events, EPOLLPRI);
02427 #else
02428       ACE_SET_BITS (events, POLLPRI);
02429 #endif /*ACE_HAS_EVENT_POLL*/
02430     }
02431 
02432   return events;
02433 }
02434 
02435 namespace {
02436   void polite_sleep_hook (void *) { }
02437 }
02438 
02439 int
02440 ACE_Dev_Poll_Reactor::Token_Guard::acquire_quietly (ACE_Time_Value *max_wait)
02441 {
02442   ACE_TRACE ("ACE_Dev_Poll_Reactor::Token_Guard::acquire_quietly");
02443 
02444   // Acquire the token but don't ping any waiters; just queue up politely.
02445   int result = 0;
02446   if (max_wait)
02447     {
02448       ACE_Time_Value tv = ACE_OS::gettimeofday ();
02449       tv += *max_wait;
02450 
02451       ACE_MT (result = this->token_.acquire_read (&polite_sleep_hook,
02452                                                   0,
02453                                                   &tv));
02454     }
02455   else
02456     {
02457       ACE_MT (result = this->token_.acquire_read (&polite_sleep_hook));
02458     }
02459 
02460   // Check for timeouts and errors.
02461   if (result == -1)
02462     {
02463       if (errno == ETIME)
02464         return 0;
02465       else
02466         {
02467           ACE_ERROR ((LM_ERROR, "%t: %p\n", "token acquire_read"));
02468           return -1;
02469         }
02470     }
02471 
02472   // We got the token and so let us mark ourselves as owner
02473   this->owner_ = 1;
02474 
02475   return result;
02476 }
02477 
02478 int
02479 ACE_Dev_Poll_Reactor::Token_Guard::acquire (ACE_Time_Value *max_wait)
02480 {
02481   ACE_TRACE ("ACE_Dev_Poll_Reactor::Token_Guard::acquire");
02482 
02483   // Try to grab the token.  If someone if already there, don't wake
02484   // them up, just queue up in the thread pool.
02485   int result = 0;
02486   if (max_wait)
02487     {
02488       ACE_Time_Value tv = ACE_OS::gettimeofday ();
02489       tv += *max_wait;
02490 
02491       ACE_MT (result = this->token_.acquire (0, 0, &tv));
02492     }
02493   else
02494     {
02495       ACE_MT (result = this->token_.acquire ());
02496     }
02497 
02498   // Check for timeouts and errors.
02499   if (result == -1)
02500     {
02501       if (errno == ETIME)
02502         return 0;
02503       else
02504         return -1;
02505     }
02506 
02507   // We got the token and so let us mark ourseleves as owner
02508   this->owner_ = 1;
02509 
02510   return result;
02511 }
02512 
02513 ACE_END_VERSIONED_NAMESPACE_DECL
02514 
02515 #endif  /* ACE_HAS_EVENT_POLL || ACE_HAS_DEV_POLL */

Generated on Sun Jan 27 12:05:24 2008 for ACE by doxygen 1.3.6