Select_Reactor_Base.cpp

Go to the documentation of this file.
00001 // $Id: Select_Reactor_Base.cpp 79332 2007-08-13 20:30:44Z sowayaa $
00002 
00003 #include "ace/Select_Reactor_Base.h"
00004 #include "ace/Reactor.h"
00005 #include "ace/Thread.h"
00006 #include "ace/SOCK_Acceptor.h"
00007 #include "ace/SOCK_Connector.h"
00008 #include "ace/Timer_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/Signal.h"
00011 #include "ace/OS_NS_fcntl.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/Select_Reactor_Base.inl"
00015 #endif /* __ACE_INLINE__ */
00016 
00017 #ifndef ACE_WIN32
00018 # include <algorithm>
00019 #endif  /* !ACE_WIN32 */
00020 
00021 ACE_RCSID (ace,
00022            Select_Reactor_Base,
00023            "$Id: Select_Reactor_Base.cpp 79332 2007-08-13 20:30:44Z sowayaa $")
00024 
00025 
00026 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 template<typename iterator>
00029 inline ACE_Event_Handler *
00030 ACE_SELECT_REACTOR_EVENT_HANDLER (iterator i)
00031 {
00032 #ifdef ACE_WIN32
00033   return (*i).item ();
00034 #else
00035   return (*i);
00036 #endif  /* ACE_WIN32 */
00037 }
00038 
00039 // Performs sanity checking on the ACE_HANDLE.
00040 
00041 bool
00042 ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
00043 {
00044   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
00045 #if defined (ACE_WIN32)
00046   // It's too expensive to perform more exhaustive validity checks on
00047   // Win32 due to the way that they implement SOCKET HANDLEs.
00048   if (handle == ACE_INVALID_HANDLE)
00049 #else /* !ACE_WIN32 */
00050   if (handle < 0
00051       || static_cast<size_type> (handle) >= this->event_handlers_.size ())
00052 #endif /* ACE_WIN32 */
00053     {
00054       errno = EINVAL;
00055       return true;
00056     }
00057 
00058   return false;
00059 }
00060 
00061 // Performs sanity checking on the ACE_HANDLE.
00062 
00063 bool
00064 ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
00065 {
00066   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
00067 #if defined (ACE_WIN32)
00068   // It's too expensive to perform more exhaustive validity checks on
00069   // Win32 due to the way that they implement SOCKET HANDLEs.
00070   if (handle != ACE_INVALID_HANDLE)
00071 #else /* !ACE_WIN32 */
00072   if (handle >= 0 && handle < this->max_handlep1_)
00073 #endif /* ACE_WIN32 */
00074     {
00075       return true;
00076     }
00077 
00078   // Don't bother setting errno.  It isn't used in the select()-based
00079   // reactors and incurs a TSS access.
00080   // errno = EINVAL;
00081 
00082   return false;
00083 }
00084 
00085 int
00086 ACE_Select_Reactor_Handler_Repository::open (size_t size)
00087 {
00088   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
00089 
00090 #if defined (ACE_WIN32)
00091   if (this->event_handlers_.open (size) == -1)
00092     return -1;
00093 #else
00094   if (this->event_handlers_.size (size) == -1)
00095     return -1;
00096 
00097   // Initialize the ACE_Event_Handler pointers to 0.
00098   std::fill (this->event_handlers_.begin (),
00099              this->event_handlers_.end (),
00100              static_cast<ACE_Event_Handler *> (0));
00101 
00102   this->max_handlep1_ = 0;
00103 #endif /* ACE_WIN32 */
00104 
00105   // Try to increase the number of handles if <size> is greater than
00106   // the current limit.
00107   return ACE::set_handle_limit (static_cast<int> (size), 1);
00108 }
00109 
00110 // Initialize a repository of the appropriate <size>.
00111 
00112 ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
00113   : select_reactor_ (select_reactor),
00114 #ifndef ACE_WIN32
00115     max_handlep1_ (0),
00116 #endif  /* !ACE_WIN32 */
00117     event_handlers_ ()
00118 {
00119   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
00120 }
00121 
00122 int
00123 ACE_Select_Reactor_Handler_Repository::unbind_all (void)
00124 {
00125   // Unbind all of the <handle, ACE_Event_Handler>s.
00126 #ifdef ACE_WIN32
00127   map_type::iterator const end = this->event_handlers_.end ();
00128   for (map_type::iterator pos = this->event_handlers_.begin ();
00129        pos != end;
00130        )
00131     {
00132       // Post-increment (*not* pre-increment) before unbind()ing since
00133       // the current iterator will be invalidated during the unbind()
00134       // operation.
00135       map_type::iterator const the_pos (pos++);
00136 
00137       ACE_HANDLE const handle = (*the_pos).key ();
00138       (void) this->unbind (handle,
00139                            the_pos,
00140                            ACE_Event_Handler::ALL_EVENTS_MASK);
00141     }
00142 #else
00143   // We could use the "end()" iterator but leveraging max_handlep1_
00144   // allows us to optimize away unnecessary accesses of nil event
00145   // handler pointers.
00146   map_type::iterator pos =
00147     this->event_handlers_.begin ();  // iterator == ACE_Event_Handler*
00148 
00149   for (ACE_HANDLE handle = 0;
00150        handle < this->max_handlep1_;
00151        ++handle)
00152     {
00153       (void) this->unbind (handle,
00154                            pos,
00155                            ACE_Event_Handler::ALL_EVENTS_MASK);
00156       ++pos;
00157     }
00158 #endif  /* ACE_WIN32 */
00159 
00160   return 0;
00161 }
00162 
00163 int
00164 ACE_Select_Reactor_Handler_Repository::close (void)
00165 {
00166   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
00167 
00168   return this->unbind_all ();
00169 }
00170 
00171 ACE_Select_Reactor_Handler_Repository::map_type::iterator
00172 ACE_Select_Reactor_Handler_Repository::find_eh (ACE_HANDLE handle)
00173 {
00174   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find_eh");
00175 
00176   map_type::iterator pos (this->event_handlers_.end ());
00177 
00178   // this code assumes the handle is in range.
00179 #if defined (ACE_WIN32)
00180   this->event_handlers_.find (handle, pos);
00181 #else
00182   map_type::iterator const tmp = &this->event_handlers_[handle];
00183 
00184   if (*tmp != 0)
00185     pos = tmp;
00186 #endif /* ACE_WIN32 */
00187 
00188   return pos;
00189 }
00190 
00191 // Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>.
00192 int
00193 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
00194                                              ACE_Event_Handler *event_handler,
00195                                              ACE_Reactor_Mask mask)
00196 {
00197   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
00198 
00199   if (event_handler == 0)
00200     return -1;
00201 
00202   if (handle == ACE_INVALID_HANDLE)
00203     handle = event_handler->get_handle ();
00204 
00205   if (this->invalid_handle (handle))
00206     return -1;
00207 
00208   // Is this handle already in the Reactor?
00209   bool existing_handle = false;
00210 
00211 #if defined (ACE_WIN32)
00212 
00213   map_type::ENTRY * entry = 0;
00214 
00215   int const result =
00216     this->event_handlers_.bind (handle, event_handler, entry);
00217 
00218   if (result == -1)
00219     {
00220       return -1;
00221     }
00222   else if (result == 1)  // Entry already exists.
00223     {
00224       // Cannot use a different handler for an existing handle.
00225       if (event_handler != entry->item ())
00226         {
00227           return -1;
00228         }
00229       else
00230         {
00231           // Remember that this handle is already registered in the
00232           // Reactor.
00233           existing_handle = true;
00234         }
00235     }
00236 
00237 #else
00238 
00239   // Check if this handle is already registered.
00240   ACE_Event_Handler * const current_handler =
00241     this->event_handlers_[handle];
00242 
00243   if (current_handler)
00244     {
00245       // Cannot use a different handler for an existing handle.
00246       if (current_handler != event_handler)
00247         return -1;
00248 
00249       // Remember that this handle is already registered in the
00250       // Reactor.
00251       existing_handle = true;
00252     }
00253 
00254   this->event_handlers_[handle] = event_handler;
00255 
00256   if (this->max_handlep1_ < handle + 1)
00257     this->max_handlep1_ = handle + 1;
00258 
00259 #endif /* ACE_WIN32 */
00260 
00261   if (this->select_reactor_.is_suspended_i (handle))
00262     {
00263       this->select_reactor_.bit_ops (handle,
00264                                      mask,
00265                                      this->select_reactor_.suspend_set_,
00266                                      ACE_Reactor::ADD_MASK);
00267     }
00268   else
00269     {
00270       this->select_reactor_.bit_ops (handle,
00271                                      mask,
00272                                      this->select_reactor_.wait_set_,
00273                                      ACE_Reactor::ADD_MASK);
00274 
00275       // Note the fact that we've changed the state of the <wait_set_>,
00276       // which is used by the dispatching loop to determine whether it can
00277       // keep going or if it needs to reconsult select().
00278       // this->select_reactor_.state_changed_ = 1;
00279     }
00280 
00281   // If new entry, call add_reference() if needed.
00282   if (!existing_handle)
00283     event_handler->add_reference ();
00284 
00285   return 0;
00286 }
00287 
00288 // Remove the binding of <ACE_HANDLE>.
00289 
00290 int
00291 ACE_Select_Reactor_Handler_Repository::unbind (
00292   ACE_HANDLE handle,
00293   map_type::iterator pos,
00294   ACE_Reactor_Mask mask)
00295 {
00296   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
00297 
00298   // Retrieve event handler before unbinding it from the map.  The
00299   // iterator pointing to it will no longer be valid once the handler
00300   // is unbound.
00301   ACE_Event_Handler * const event_handler =
00302     (pos == this->event_handlers_.end ()
00303      ? 0
00304      : ACE_SELECT_REACTOR_EVENT_HANDLER (pos));
00305 
00306   // Clear out the <mask> bits in the Select_Reactor's wait_set.
00307   this->select_reactor_.bit_ops (handle,
00308                                  mask,
00309                                  this->select_reactor_.wait_set_,
00310                                  ACE_Reactor::CLR_MASK);
00311 
00312   // And suspend_set.
00313   this->select_reactor_.bit_ops (handle,
00314                                  mask,
00315                                  this->select_reactor_.suspend_set_,
00316                                  ACE_Reactor::CLR_MASK);
00317 
00318   // Note the fact that we've changed the state of the <wait_set_>,
00319   // which is used by the dispatching loop to determine whether it can
00320   // keep going or if it needs to reconsult select().
00321   // this->select_reactor_.state_changed_ = 1;
00322 
00323   // If there are no longer any outstanding events on this <handle>
00324   // then we can totally shut down the Event_Handler.
00325 
00326   bool const has_any_wait_mask =
00327     (this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
00328      || this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
00329      || this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
00330   bool const has_any_suspend_mask =
00331     (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
00332      || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
00333      || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
00334 
00335   bool complete_removal = false;
00336 
00337   if (!has_any_wait_mask && !has_any_suspend_mask)
00338     {
00339 #if defined (ACE_WIN32)
00340       if (event_handler != 0 && this->event_handlers_.unbind (pos) == -1)
00341         return -1;  // Should not happen!
00342 #else
00343       this->event_handlers_[handle] = 0;
00344 
00345       if (this->max_handlep1_ == handle + 1)
00346         {
00347           // We've deleted the last entry, so we need to figure out
00348           // the last valid place in the array that is worth looking
00349           // at.
00350           ACE_HANDLE const wait_rd_max =
00351             this->select_reactor_.wait_set_.rd_mask_.max_set ();
00352           ACE_HANDLE const wait_wr_max =
00353             this->select_reactor_.wait_set_.wr_mask_.max_set ();
00354           ACE_HANDLE const wait_ex_max =
00355             this->select_reactor_.wait_set_.ex_mask_.max_set ();
00356 
00357           ACE_HANDLE const suspend_rd_max =
00358             this->select_reactor_.suspend_set_.rd_mask_.max_set ();
00359           ACE_HANDLE const suspend_wr_max =
00360             this->select_reactor_.suspend_set_.wr_mask_.max_set ();
00361           ACE_HANDLE const suspend_ex_max =
00362             this->select_reactor_.suspend_set_.ex_mask_.max_set ();
00363 
00364           // Compute the maximum of six values.
00365           this->max_handlep1_ = wait_rd_max;
00366           if (this->max_handlep1_ < wait_wr_max)
00367             this->max_handlep1_ = wait_wr_max;
00368           if (this->max_handlep1_ < wait_ex_max)
00369             this->max_handlep1_ = wait_ex_max;
00370 
00371           if (this->max_handlep1_ < suspend_rd_max)
00372             this->max_handlep1_ = suspend_rd_max;
00373           if (this->max_handlep1_ < suspend_wr_max)
00374             this->max_handlep1_ = suspend_wr_max;
00375           if (this->max_handlep1_ < suspend_ex_max)
00376             this->max_handlep1_ = suspend_ex_max;
00377 
00378           ++this->max_handlep1_;
00379         }
00380 
00381 #endif /* ACE_WIN32 */
00382 
00383       // The handle has been completely removed.
00384       complete_removal = true;
00385     }
00386 
00387   if (event_handler == 0)
00388     return -1;
00389 
00390   bool const requires_reference_counting =
00391     event_handler->reference_counting_policy ().value () ==
00392     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00393 
00394   // Close down the <Event_Handler> unless we've been instructed not
00395   // to.
00396   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
00397     (void) event_handler->handle_close (handle, mask);
00398 
00399   // Call remove_reference() if the removal is complete and reference
00400   // counting is needed.
00401   if (complete_removal && requires_reference_counting)
00402     {
00403       (void) event_handler->remove_reference ();
00404     }
00405 
00406   return 0;
00407 }
00408 
00409 ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
00410   (ACE_Select_Reactor_Handler_Repository const * s)
00411     : rep_ (s),
00412       current_ (s->event_handlers_.begin ())
00413 {
00414 }
00415 
00416 // Pass back the <next_item> that hasn't been seen in the Set.
00417 // Returns 0 when all items have been seen, else 1.
00418 
00419 bool
00420 ACE_Select_Reactor_Handler_Repository_Iterator::next (
00421   ACE_Event_Handler *&next_item)
00422 {
00423   bool result = true;
00424 
00425   if (this->done ())
00426     result = false;
00427   else
00428     next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->current_);
00429 
00430   return result;
00431 }
00432 
00433 // Move forward by one element in the set.
00434 
00435 bool
00436 ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
00437 {
00438 #ifdef ACE_WIN32
00439   // No need to explicitly limit search to "current" to
00440   // max_handlep1_ range.
00441   const_base_iterator const end = this->rep_->event_handlers_.end ();
00442 #else
00443   // Don't use ACE_Array_Base::end() since it may be larger than
00444   // event_handlers[max_handlep1_].
00445   const_base_iterator const end =
00446     &this->rep_->event_handlers_[this->rep_->max_handlep1 ()];
00447 #endif  /* ACE_WIN32 */
00448 
00449   if (this->current_ != end)
00450     ++this->current_;
00451 
00452 #ifndef ACE_WIN32
00453   // Advance to the next element containing a non-zero event handler.
00454   // There's no need to do this for the Windows case since the hash
00455   // map will only contain non-zero event handlers.
00456   while (this->current_ != end && (*(this->current_) == 0))
00457     ++this->current_;
00458 #endif  /* !ACE_WIN32 */
00459 
00460   return this->current_ != end;
00461 }
00462 
00463 // Dump the state of an object.
00464 
00465 void
00466 ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
00467 {
00468 #if defined (ACE_HAS_DUMP)
00469   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
00470 
00471   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00472   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("rep_ = %u"), this->rep_));
00473 # ifdef ACE_WIN32
00474   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = ")));
00475   this->current_.dump ();
00476 # else
00477   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = %@"), this->current_));
00478 # endif  /* ACE_WIN32 */
00479   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00480 #endif /* ACE_HAS_DUMP */
00481 }
00482 
00483 void
00484 ACE_Select_Reactor_Handler_Repository::dump (void) const
00485 {
00486 #if defined (ACE_HAS_DUMP)
00487   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
00488 
00489 # ifdef ACE_WIN32
00490 #  define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%@")
#  define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%u")
00491 # else
00492 #  define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%d")
00493 #  define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%d")
00494 # endif  /* ACE_WIN32 */
00495 
00496   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00497   ACE_DEBUG ((LM_DEBUG,
00498               ACE_TEXT ("max_handlep1_ = ")
00499               ACE_MAX_HANDLEP1_FORMAT_SPECIFIER
00500               ACE_TEXT ("\n"),
00501               this->max_handlep1 ()));
00502   ACE_DEBUG ((LM_DEBUG,  ACE_TEXT ("[")));
00503 
00504   ACE_Event_Handler *event_handler = 0;
00505 
00506   for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
00507        iter.next (event_handler) != 0;
00508        iter.advance ())
00509     ACE_DEBUG ((LM_DEBUG,
00510                 ACE_TEXT (" (event_handler = %@,")
00511                 ACE_TEXT (" event_handler->handle_ = ")
00512                 ACE_HANDLE_FORMAT_SPECIFIER
00513                 ACE_TEXT ("\n"),
00514                 event_handler,
00515                 event_handler->get_handle ()));
00516 
00517   ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" ]\n")));
00518   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00519 #endif /* ACE_HAS_DUMP */
00520 }
00521 
00522 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
00523 
00524 ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
00525   : max_notify_iterations_ (-1)
00526 {
00527 }
00528 
00529 ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void)
00530 {
00531 }
00532 
00533 void
00534 ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
00535 {
00536   // Must always be > 0 or < 0 to optimize the loop exit condition.
00537   if (iterations == 0)
00538     iterations = 1;
00539 
00540   this->max_notify_iterations_ = iterations;
00541 }
00542 
00543 int
00544 ACE_Select_Reactor_Notify::max_notify_iterations (void)
00545 {
00546   return this->max_notify_iterations_;
00547 }
00548 
00549 // purge_pending_notifications
00550 // Removes all entries from the notify_queue_ and each one that
00551 // matches <eh> is put on the free_queue_. The rest are saved on a
00552 // local queue and copied back to the notify_queue_ at the end.
00553 // Returns the number of entries removed. Returns -1 on error.
00554 // ACE_NOTSUP_RETURN if ACE_HAS_REACTOR_NOTIFICATION_QUEUE is not defined.
00555 int
00556 ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
00557                                                         ACE_Reactor_Mask  mask )
00558 {
00559   ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00560 
00561 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00562 
00563   return notification_queue_.purge_pending_notifications(eh, mask);
00564 
00565 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00566   ACE_UNUSED_ARG (eh);
00567   ACE_UNUSED_ARG (mask);
00568   ACE_NOTSUP_RETURN (-1);
00569 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00570 }
00571 
00572 void
00573 ACE_Select_Reactor_Notify::dump (void) const
00574 {
00575 #if defined (ACE_HAS_DUMP)
00576   ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00577 
00578   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00579   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00580   this->notification_pipe_.dump ();
00581   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00582 #endif /* ACE_HAS_DUMP */
00583 }
00584 
00585 int
00586 ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
00587                                  ACE_Timer_Queue *,
00588                                  int disable_notify_pipe)
00589 {
00590   ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00591 
00592   if (disable_notify_pipe == 0)
00593     {
00594       this->select_reactor_ =
00595         dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00596 
00597       if (select_reactor_ == 0)
00598         {
00599           errno = EINVAL;
00600           return -1;
00601         }
00602 
00603       if (this->notification_pipe_.open () == -1)
00604         return -1;
00605 #if defined (F_SETFD)
00606       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00607       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00608 #endif /* F_SETFD */
00609 
00610 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00611       if (notification_queue_.open() == -1)
00612         {
00613           return -1;
00614         }
00615 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00616 
00617       // There seems to be a Win32 bug with this...  Set this into
00618       // non-blocking mode.
00619       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00620                           ACE_NONBLOCK) == -1)
00621         return -1;
00622       else
00623         return this->select_reactor_->register_handler
00624           (this->notification_pipe_.read_handle (),
00625            this,
00626            ACE_Event_Handler::READ_MASK);
00627     }
00628   else
00629     {
00630       this->select_reactor_ = 0;
00631       return 0;
00632     }
00633 }
00634 
00635 int
00636 ACE_Select_Reactor_Notify::close (void)
00637 {
00638   ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00639 
00640 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00641   notification_queue_.reset();
00642 #else
00643   if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
00644     {
00645       // Please see Bug 2820, if we just close the pipe then we break
00646       // the reference counting rules.  Basically, all the event
00647       // handlers "stored" in the pipe had their reference counts
00648       // increased.  We need to decrease them before closing the
00649       // pipe....
00650       ACE_Notification_Buffer b;
00651       for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
00652            r > 0;
00653            r = read_notify_pipe(notification_pipe_.read_handle(), b))
00654         {
00655           if (b.eh_ == 0) continue;
00656           b.eh_->remove_reference();
00657         }
00658     }
00659 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00660 
00661   return this->notification_pipe_.close ();
00662 }
00663 
00664 int
00665 ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
00666                                    ACE_Reactor_Mask mask,
00667                                    ACE_Time_Value *timeout)
00668 {
00669   ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00670 
00671   // Just consider this method a "no-op" if there's no
00672   // <ACE_Select_Reactor> configured.
00673   if (this->select_reactor_ == 0)
00674     return 0;
00675 
00676   ACE_Event_Handler_var safe_handler (event_handler);
00677 
00678   if (event_handler)
00679     event_handler->add_reference ();
00680 
00681   ACE_Notification_Buffer buffer (event_handler, mask);
00682 
00683 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00684   int notification_required =
00685     notification_queue_.push_new_notification(buffer);
00686 
00687   if (notification_required == -1)
00688   {
00689     return -1;
00690   }
00691 
00692   if (notification_required == 0)
00693   {
00694     // No failures, the handler is now owned by the notification queue
00695     safe_handler.release ();
00696 
00697     return 0;
00698   }
00699 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00700 
00701   ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
00702                                (char *) &buffer,
00703                                sizeof buffer,
00704                                timeout);
00705   if (n == -1)
00706     return -1;
00707 
00708   // No failures.
00709   safe_handler.release ();
00710 
00711   return 0;
00712 }
00713 
00714 // Handles pending threads (if any) that are waiting to unblock the
00715 // Select_Reactor.
00716 
00717 int
00718 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
00719                                                    ACE_Handle_Set &rd_mask)
00720 {
00721   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00722 
00723   ACE_HANDLE const read_handle =
00724     this->notification_pipe_.read_handle ();
00725 
00726   if (read_handle != ACE_INVALID_HANDLE
00727       && rd_mask.is_set (read_handle))
00728     {
00729       --number_of_active_handles;
00730       rd_mask.clr_bit (read_handle);
00731       return this->handle_input (read_handle);
00732     }
00733   else
00734     return 0;
00735 }
00736 
00737 
00738 ACE_HANDLE
00739 ACE_Select_Reactor_Notify::notify_handle (void)
00740 {
00741   ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00742 
00743   return this->notification_pipe_.read_handle ();
00744 }
00745 
00746 
00747 int
00748 ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
00749 {
00750 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00751   ACE_UNUSED_ARG(buffer);
00752   return 1;
00753 #else
00754   // If eh == 0 then another thread is unblocking the
00755   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00756   // internal structures.  Otherwise, we need to dispatch the
00757   // appropriate handle_* method on the <ACE_Event_Handler>
00758   // pointer we've been passed.
00759   if (buffer.eh_ != 0)
00760     {
00761       return 1;
00762     }
00763   else
00764     {
00765       // has no dispatchable buffer
00766       return 0;
00767     }
00768 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00769 }
00770 
00771 int
00772 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00773 {
00774   int result = 0;
00775 
00776 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00777   // Dispatch one message from the notify queue, and put another in
00778   // the pipe if one is available.  Remember, the idea is to keep
00779   // exactly one message in the pipe at a time.
00780 
00781   bool more_messages_queued = false;
00782   ACE_Notification_Buffer next;
00783 
00784   result = notification_queue_.pop_next_notification(buffer, 
00785                                                      more_messages_queued, 
00786                                                      next);
00787 
00788   if (result == 0)
00789     {
00790       return 0;
00791     }
00792 
00793   if (result == -1)
00794     {
00795       return -1;
00796     }
00797 
00798   if(more_messages_queued)
00799     {
00800       (void) ACE::send(this->notification_pipe_.write_handle(),
00801             (char *)&next, sizeof(ACE_Notification_Buffer));
00802     }
00803 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00804 
00805   // If eh == 0 then another thread is unblocking the
00806   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00807   // internal structures.  Otherwise, we need to dispatch the
00808   // appropriate handle_* method on the <ACE_Event_Handler> pointer
00809   // we've been passed.
00810   if (buffer.eh_ != 0)
00811     {
00812       ACE_Event_Handler *event_handler =
00813         buffer.eh_;
00814 
00815       bool const requires_reference_counting =
00816         event_handler->reference_counting_policy ().value () ==
00817         ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00818 
00819       switch (buffer.mask_)
00820         {
00821         case ACE_Event_Handler::READ_MASK:
00822         case ACE_Event_Handler::ACCEPT_MASK:
00823           result = event_handler->handle_input (ACE_INVALID_HANDLE);
00824           break;
00825         case ACE_Event_Handler::WRITE_MASK:
00826           result = event_handler->handle_output (ACE_INVALID_HANDLE);
00827           break;
00828         case ACE_Event_Handler::EXCEPT_MASK:
00829           result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00830           break;
00831         case ACE_Event_Handler::QOS_MASK:
00832           result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00833           break;
00834         case ACE_Event_Handler::GROUP_QOS_MASK:
00835           result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00836           break;
00837         default:
00838           // Should we bail out if we get an invalid mask?
00839           ACE_ERROR ((LM_ERROR,
00840                       ACE_TEXT ("invalid mask = %d\n"),
00841                       buffer.mask_));
00842         }
00843 
00844       if (result == -1)
00845         event_handler->handle_close (ACE_INVALID_HANDLE,
00846                                      ACE_Event_Handler::EXCEPT_MASK);
00847 
00848       if (requires_reference_counting)
00849         {
00850           event_handler->remove_reference ();
00851         }
00852     }
00853 
00854   return 1;
00855 }
00856 
00857 int
00858 ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
00859                                              ACE_Notification_Buffer &buffer)
00860 {
00861   ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00862 
00863   ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00864 
00865   if (n > 0)
00866     {
00867       // Check to see if we've got a short read.
00868       if (n != sizeof buffer)
00869         {
00870           ssize_t const remainder = sizeof buffer - n;
00871 
00872           // If so, try to recover by reading the remainder.  If this
00873           // doesn't work we're in big trouble since the input stream
00874           // won't be aligned correctly.  I'm not sure quite what to
00875           // do at this point.  It's probably best just to return -1.
00876           if (ACE::recv (handle,
00877                          ((char *) &buffer) + n,
00878                          remainder) != remainder)
00879             return -1;
00880         }
00881 
00882 
00883       return 1;
00884     }
00885 
00886   // Return -1 if things have gone seriously  wrong.
00887   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00888     return -1;
00889 
00890   return 0;
00891 }
00892 
00893 
00894 int
00895 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
00896 {
00897   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
00898   // Precondition: this->select_reactor_.token_.current_owner () ==
00899   // ACE_Thread::self ();
00900 
00901   int number_dispatched = 0;
00902   int result = 0;
00903   ACE_Notification_Buffer buffer;
00904 
00905   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00906     {
00907       // Dispatch the buffer
00908       // NOTE: We count only if we made any dispatches ie. upcalls.
00909       if (this->dispatch_notify (buffer) > 0)
00910         ++number_dispatched;
00911 
00912       // Bail out if we've reached the <notify_threshold_>.  Note that
00913       // by default <notify_threshold_> is -1, so we'll loop until all
00914       // the notifications in the pipe have been dispatched.
00915       if (number_dispatched == this->max_notify_iterations_)
00916         break;
00917     }
00918 
00919   // Reassign number_dispatched to -1 if things have gone seriously
00920   // wrong.
00921   if (result < 0)
00922     number_dispatched = -1;
00923 
00924   // Enqueue ourselves into the list of waiting threads.  When we
00925   // reacquire the token we'll be off and running again with ownership
00926   // of the token.  The postcondition of this call is that
00927   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
00928   this->select_reactor_->renew ();
00929   return number_dispatched;
00930 }
00931 
00932 // -------------------------------------------
00933 
00934 int
00935 ACE_Select_Reactor_Impl::purge_pending_notifications (ACE_Event_Handler *eh,
00936                                                       ACE_Reactor_Mask mask)
00937 {
00938   if (this->notify_handler_ == 0)
00939     return 0;
00940   else
00941     return this->notify_handler_->purge_pending_notifications (eh, mask);
00942 }
00943 
00944 
00945 // Perform GET, CLR, SET, and ADD operations on the Handle_Sets.
00946 //
00947 // GET = 1, Retrieve current value
00948 // SET = 2, Set value of bits to new mask (changes the entire mask)
00949 // ADD = 3, Bitwise "or" the value into the mask (only changes
00950 //          enabled bits)
00951 // CLR = 4  Bitwise "and" the negation of the value out of the mask
00952 //          (only changes enabled bits)
00953 //
00954 // Returns the original mask.  Must be called with locks held.
00955 int
00956 ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
00957                                   ACE_Reactor_Mask mask,
00958                                   ACE_Select_Reactor_Handle_Set &handle_set,
00959                                   int ops)
00960 {
00961   ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops");
00962   if (this->handler_rep_.handle_in_range (handle) == 0)
00963     return -1;
00964 
00965 #if !defined (ACE_WIN32)
00966   ACE_Sig_Guard sb (0,
00967                     this->mask_signals_); // Block out all signals until method returns.
00968 #endif /* ACE_WIN32 */
00969 
00970   ACE_FDS_PTMF ptmf  = &ACE_Handle_Set::set_bit;
00971   u_long omask = ACE_Event_Handler::NULL_MASK;
00972 
00973   // Find the old reactor masks.  This automatically does the work of
00974   // the GET_MASK operation.
00975   if (handle_set.rd_mask_.is_set (handle))
00976     ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
00977   if (handle_set.wr_mask_.is_set (handle))
00978     ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
00979   if (handle_set.ex_mask_.is_set (handle))
00980     ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
00981 
00982   switch (ops)
00983     {
00984     case ACE_Reactor::GET_MASK:
00985       // The work for this operation is done in all cases at the
00986       // begining of the function.
00987       break;
00988     case ACE_Reactor::CLR_MASK:
00989       ptmf = &ACE_Handle_Set::clr_bit;
00990       // State was changed. we need to reflect that change in the
00991       // dispatch_mask I assume that only ACE_Reactor::CLR_MASK should
00992       // be treated here  which means we need to clear the handle|mask
00993       // from the current dispatch handler
00994       this->clear_dispatch_mask (handle, mask);
00995       /* FALLTHRU */
00996     case ACE_Reactor::SET_MASK:
00997       /* FALLTHRU */
00998     case ACE_Reactor::ADD_MASK:
00999 
01000       // The following code is rather subtle...  Note that if we are
01001       // doing a ACE_Reactor::SET_MASK then if the bit is not enabled
01002       // in the mask we need to clear the bit from the ACE_Handle_Set.
01003       // On the other hand, if we are doing a ACE_Reactor::CLR_MASK or
01004       // a ACE_Reactor::ADD_MASK we just carry out the operations
01005       // specified by the mask.
01006 
01007       // READ, ACCEPT, and CONNECT flag will place the handle in the
01008       // read set.
01009       if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
01010           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
01011           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
01012         {
01013           (handle_set.rd_mask_.*ptmf) (handle);
01014         }
01015       else if (ops == ACE_Reactor::SET_MASK)
01016         handle_set.rd_mask_.clr_bit (handle);
01017 
01018       // WRITE and CONNECT flag will place the handle in the write set
01019       if (ACE_BIT_ENABLED (mask,
01020                            ACE_Event_Handler::WRITE_MASK)
01021           || ACE_BIT_ENABLED (mask,
01022                               ACE_Event_Handler::CONNECT_MASK))
01023         {
01024           (handle_set.wr_mask_.*ptmf) (handle);
01025         }
01026       else if (ops == ACE_Reactor::SET_MASK)
01027         handle_set.wr_mask_.clr_bit (handle);
01028 
01029       // EXCEPT (and CONNECT on Win32) flag will place the handle in
01030       // the except set.
01031       if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
01032 #if defined (ACE_WIN32)
01033           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
01034 #endif /* ACE_WIN32 */
01035           )
01036         {
01037           (handle_set.ex_mask_.*ptmf) (handle);
01038         }
01039       else if (ops == ACE_Reactor::SET_MASK)
01040         handle_set.ex_mask_.clr_bit (handle);
01041       break;
01042     default:
01043       return -1;
01044     }
01045   return omask;
01046 }
01047 
01048 void
01049 ACE_Select_Reactor_Impl::clear_dispatch_mask (ACE_HANDLE handle,
01050                                               ACE_Reactor_Mask mask)
01051 {
01052   ACE_TRACE ("ACE_Select_Reactor_Impl::clear_dispatch_mask");
01053 
01054   //  Use handle and mask in order to modify the sets
01055   // (wait/suspend/ready/dispatch), that way, the dispatch_io_set loop
01056   // will not be interrupt, and there will no reason to rescan the
01057   // wait_set and re-calling select function, which is *very*
01058   // expensive. It seems that wait/suspend/ready sets are getting
01059   // updated in register/remove bind/unbind etc functions.  The only
01060   // thing need to be updated is the dispatch_set (also can  be found
01061   // in that file code as dispatch_mask).  Because of that, we need
01062   // that dispatch_set to be member of the ACE_Select_Reactor_impl in
01063   // Select_Reactor_Base.h file  That way we will have access to that
01064   // member in that function.
01065 
01066   // We kind of invalidate the iterator in dispatch_io_set because its
01067   // an array and index built from the original dispatch-set. Take a
01068   // look at dispatch_io_set for more details.
01069 
01070   // We only need to clr_bit, because we are interested in clearing the
01071   // handles that was removed, so no dispatching to these handles will
01072   // occur.
01073   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) ||
01074       ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
01075     {
01076       this->dispatch_set_.rd_mask_.clr_bit (handle);
01077     }
01078   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
01079     {
01080       this->dispatch_set_.wr_mask_.clr_bit (handle);
01081     }
01082   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
01083     {
01084       this->dispatch_set_.ex_mask_.clr_bit (handle);
01085     }
01086 
01087   // That will make the dispatch_io_set iterator re-start and rescan
01088   // the dispatch set.
01089   this->state_changed_ = true;
01090 }
01091 
01092 
01093 int
01094 ACE_Select_Reactor_Impl::resumable_handler (void)
01095 {
01096   // The select reactor has no handlers that can be resumed by the
01097   // application. So return 0;
01098 
01099   return 0;
01100 }
01101 
01102 ACE_END_VERSIONED_NAMESPACE_DECL
01103 

Generated on Sun Jan 27 12:05:36 2008 for ACE by doxygen 1.3.6