Select_Reactor_Base.cpp

Go to the documentation of this file.
00001 // $Id: Select_Reactor_Base.cpp 81153 2008-03-29 08:17:58Z johnnyw $
00002 
00003 #include "ace/Select_Reactor_Base.h"
00004 #include "ace/Reactor.h"
00005 #include "ace/Thread.h"
00006 #include "ace/SOCK_Acceptor.h"
00007 #include "ace/SOCK_Connector.h"
00008 #include "ace/Timer_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/Signal.h"
00011 #include "ace/OS_NS_fcntl.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/Select_Reactor_Base.inl"
00015 #endif /* __ACE_INLINE__ */
00016 
00017 #ifndef ACE_WIN32
00018 # include <algorithm>
00019 #endif  /* !ACE_WIN32 */
00020 
00021 ACE_RCSID (ace,
00022            Select_Reactor_Base,
00023            "$Id: Select_Reactor_Base.cpp 81153 2008-03-29 08:17:58Z johnnyw $")
00024 
00025 
00026 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 template<typename iterator>
00029 inline ACE_Event_Handler *
00030 ACE_SELECT_REACTOR_EVENT_HANDLER (iterator i)
00031 {
00032 #ifdef ACE_WIN32
00033   return (*i).item ();
00034 #else
00035   return (*i);
00036 #endif  /* ACE_WIN32 */
00037 }
00038 
00039 // Performs sanity checking on the ACE_HANDLE.
00040 
00041 bool
00042 ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
00043 {
00044   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
00045 #if defined (ACE_WIN32)
00046   // It's too expensive to perform more exhaustive validity checks on
00047   // Win32 due to the way that they implement SOCKET HANDLEs.
00048   if (handle == ACE_INVALID_HANDLE)
00049 #else /* !ACE_WIN32 */
00050   if (handle < 0
00051       || static_cast<size_type> (handle) >= this->event_handlers_.size ())
00052 #endif /* ACE_WIN32 */
00053     {
00054       errno = EINVAL;
00055       return true;
00056     }
00057 
00058   return false;
00059 }
00060 
00061 // Performs sanity checking on the ACE_HANDLE.
00062 
00063 bool
00064 ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
00065 {
00066   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
00067 #if defined (ACE_WIN32)
00068   // It's too expensive to perform more exhaustive validity checks on
00069   // Win32 due to the way that they implement SOCKET HANDLEs.
00070   if (handle != ACE_INVALID_HANDLE)
00071 #else /* !ACE_WIN32 */
00072   if (handle >= 0 && handle < this->max_handlep1_)
00073 #endif /* ACE_WIN32 */
00074     {
00075       return true;
00076     }
00077 
00078   // Don't bother setting errno.  It isn't used in the select()-based
00079   // reactors and incurs a TSS access.
00080   // errno = EINVAL;
00081 
00082   return false;
00083 }
00084 
00085 int
00086 ACE_Select_Reactor_Handler_Repository::open (size_t size)
00087 {
00088   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
00089 
00090 #if defined (ACE_WIN32)
00091   if (this->event_handlers_.open (size) == -1)
00092     return -1;
00093 #else
00094   if (this->event_handlers_.size (size) == -1)
00095     return -1;
00096 
00097   // Initialize the ACE_Event_Handler pointers to 0.
00098   std::fill (this->event_handlers_.begin (),
00099              this->event_handlers_.end (),
00100              static_cast<ACE_Event_Handler *> (0));
00101 
00102   this->max_handlep1_ = 0;
00103 #endif /* ACE_WIN32 */
00104 
00105   // Try to increase the number of handles if <size> is greater than
00106   // the current limit.
00107   return ACE::set_handle_limit (static_cast<int> (size), 1);
00108 }
00109 
00110 // Initialize a repository of the appropriate <size>.
00111 
00112 ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
00113   : select_reactor_ (select_reactor),
00114 #ifndef ACE_WIN32
00115     max_handlep1_ (0),
00116 #endif  /* !ACE_WIN32 */
00117     event_handlers_ ()
00118 {
00119   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
00120 }
00121 
00122 int
00123 ACE_Select_Reactor_Handler_Repository::unbind_all (void)
00124 {
00125   // Unbind all of the <handle, ACE_Event_Handler>s.
00126 #ifdef ACE_WIN32
00127   map_type::iterator const end = this->event_handlers_.end ();
00128   for (map_type::iterator pos = this->event_handlers_.begin ();
00129        pos != end;
00130        )
00131     {
00132       // Post-increment (*not* pre-increment) before unbind()ing since
00133       // the current iterator will be invalidated during the unbind()
00134       // operation.
00135       map_type::iterator const the_pos (pos++);
00136 
00137       ACE_HANDLE const handle = (*the_pos).key ();
00138       (void) this->unbind (handle,
00139                            the_pos,
00140                            ACE_Event_Handler::ALL_EVENTS_MASK);
00141     }
00142 #else
00143   // We could use the "end()" iterator but leveraging max_handlep1_
00144   // allows us to optimize away unnecessary accesses of nil event
00145   // handler pointers.
00146   map_type::iterator pos =
00147     this->event_handlers_.begin ();  // iterator == ACE_Event_Handler*
00148 
00149   for (ACE_HANDLE handle = 0;
00150        handle < this->max_handlep1_;
00151        ++handle)
00152     {
00153       (void) this->unbind (handle,
00154                            pos,
00155                            ACE_Event_Handler::ALL_EVENTS_MASK);
00156       ++pos;
00157     }
00158 #endif  /* ACE_WIN32 */
00159 
00160   return 0;
00161 }
00162 
00163 int
00164 ACE_Select_Reactor_Handler_Repository::close (void)
00165 {
00166   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
00167 
00168   return this->unbind_all ();
00169 }
00170 
00171 ACE_Select_Reactor_Handler_Repository::map_type::iterator
00172 ACE_Select_Reactor_Handler_Repository::find_eh (ACE_HANDLE handle)
00173 {
00174   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find_eh");
00175 
00176   map_type::iterator pos (this->event_handlers_.end ());
00177 
00178   // this code assumes the handle is in range.
00179 #if defined (ACE_WIN32)
00180   this->event_handlers_.find (handle, pos);
00181 #else
00182   map_type::iterator const tmp = &this->event_handlers_[handle];
00183 
00184   if (*tmp != 0)
00185     pos = tmp;
00186 #endif /* ACE_WIN32 */
00187 
00188   return pos;
00189 }
00190 
00191 // Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>.
00192 int
00193 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
00194                                              ACE_Event_Handler *event_handler,
00195                                              ACE_Reactor_Mask mask)
00196 {
00197   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
00198 
00199   if (event_handler == 0)
00200     return -1;
00201 
00202   if (handle == ACE_INVALID_HANDLE)
00203     handle = event_handler->get_handle ();
00204 
00205   if (this->invalid_handle (handle))
00206     return -1;
00207 
00208   // Is this handle already in the Reactor?
00209   bool existing_handle = false;
00210 
00211 #if defined (ACE_WIN32)
00212 
00213   map_type::ENTRY * entry = 0;
00214 
00215   int const result =
00216     this->event_handlers_.bind (handle, event_handler, entry);
00217 
00218   if (result == -1)
00219     {
00220       return -1;
00221     }
00222   else if (result == 1)  // Entry already exists.
00223     {
00224       // Cannot use a different handler for an existing handle.
00225       if (event_handler != entry->item ())
00226         {
00227           return -1;
00228         }
00229       else
00230         {
00231           // Remember that this handle is already registered in the
00232           // Reactor.
00233           existing_handle = true;
00234         }
00235     }
00236 
00237 #else
00238 
00239   // Check if this handle is already registered.
00240   ACE_Event_Handler * const current_handler =
00241     this->event_handlers_[handle];
00242 
00243   if (current_handler)
00244     {
00245       // Cannot use a different handler for an existing handle.
00246       if (current_handler != event_handler)
00247         return -1;
00248 
00249       // Remember that this handle is already registered in the
00250       // Reactor.
00251       existing_handle = true;
00252     }
00253 
00254   this->event_handlers_[handle] = event_handler;
00255 
00256   if (this->max_handlep1_ < handle + 1)
00257     this->max_handlep1_ = handle + 1;
00258 
00259 #endif /* ACE_WIN32 */
00260 
00261   if (this->select_reactor_.is_suspended_i (handle))
00262     {
00263       this->select_reactor_.bit_ops (handle,
00264                                      mask,
00265                                      this->select_reactor_.suspend_set_,
00266                                      ACE_Reactor::ADD_MASK);
00267     }
00268   else
00269     {
00270       this->select_reactor_.bit_ops (handle,
00271                                      mask,
00272                                      this->select_reactor_.wait_set_,
00273                                      ACE_Reactor::ADD_MASK);
00274 
00275       // Note the fact that we've changed the state of the <wait_set_>,
00276       // which is used by the dispatching loop to determine whether it can
00277       // keep going or if it needs to reconsult select().
00278       // this->select_reactor_.state_changed_ = 1;
00279     }
00280 
00281   // If new entry, call add_reference() if needed.
00282   if (!existing_handle)
00283     event_handler->add_reference ();
00284 
00285   return 0;
00286 }
00287 
00288 // Remove the binding of <ACE_HANDLE>.
00289 
00290 int
00291 ACE_Select_Reactor_Handler_Repository::unbind (
00292   ACE_HANDLE handle,
00293   map_type::iterator pos,
00294   ACE_Reactor_Mask mask)
00295 {
00296   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
00297 
00298   // Retrieve event handler before unbinding it from the map.  The
00299   // iterator pointing to it will no longer be valid once the handler
00300   // is unbound.
00301   ACE_Event_Handler * const event_handler =
00302     (pos == this->event_handlers_.end ()
00303      ? 0
00304      : ACE_SELECT_REACTOR_EVENT_HANDLER (pos));
00305 
00306   // Clear out the <mask> bits in the Select_Reactor's wait_set.
00307   this->select_reactor_.bit_ops (handle,
00308                                  mask,
00309                                  this->select_reactor_.wait_set_,
00310                                  ACE_Reactor::CLR_MASK);
00311 
00312   // And suspend_set.
00313   this->select_reactor_.bit_ops (handle,
00314                                  mask,
00315                                  this->select_reactor_.suspend_set_,
00316                                  ACE_Reactor::CLR_MASK);
00317 
00318   // Note the fact that we've changed the state of the <wait_set_>,
00319   // which is used by the dispatching loop to determine whether it can
00320   // keep going or if it needs to reconsult select().
00321   // this->select_reactor_.state_changed_ = 1;
00322 
00323   // If there are no longer any outstanding events on this <handle>
00324   // then we can totally shut down the Event_Handler.
00325 
00326   bool const has_any_wait_mask =
00327     (this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
00328      || this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
00329      || this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
00330   bool const has_any_suspend_mask =
00331     (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
00332      || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
00333      || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
00334 
00335   bool complete_removal = false;
00336 
00337   if (!has_any_wait_mask && !has_any_suspend_mask)
00338     {
00339 #if defined (ACE_WIN32)
00340       if (event_handler != 0 && this->event_handlers_.unbind (pos) == -1)
00341         return -1;  // Should not happen!
00342 #else
00343       this->event_handlers_[handle] = 0;
00344 
00345       if (this->max_handlep1_ == handle + 1)
00346         {
00347           // We've deleted the last entry, so we need to figure out
00348           // the last valid place in the array that is worth looking
00349           // at.
00350           ACE_HANDLE const wait_rd_max =
00351             this->select_reactor_.wait_set_.rd_mask_.max_set ();
00352           ACE_HANDLE const wait_wr_max =
00353             this->select_reactor_.wait_set_.wr_mask_.max_set ();
00354           ACE_HANDLE const wait_ex_max =
00355             this->select_reactor_.wait_set_.ex_mask_.max_set ();
00356 
00357           ACE_HANDLE const suspend_rd_max =
00358             this->select_reactor_.suspend_set_.rd_mask_.max_set ();
00359           ACE_HANDLE const suspend_wr_max =
00360             this->select_reactor_.suspend_set_.wr_mask_.max_set ();
00361           ACE_HANDLE const suspend_ex_max =
00362             this->select_reactor_.suspend_set_.ex_mask_.max_set ();
00363 
00364           // Compute the maximum of six values.
00365           this->max_handlep1_ = wait_rd_max;
00366           if (this->max_handlep1_ < wait_wr_max)
00367             this->max_handlep1_ = wait_wr_max;
00368           if (this->max_handlep1_ < wait_ex_max)
00369             this->max_handlep1_ = wait_ex_max;
00370 
00371           if (this->max_handlep1_ < suspend_rd_max)
00372             this->max_handlep1_ = suspend_rd_max;
00373           if (this->max_handlep1_ < suspend_wr_max)
00374             this->max_handlep1_ = suspend_wr_max;
00375           if (this->max_handlep1_ < suspend_ex_max)
00376             this->max_handlep1_ = suspend_ex_max;
00377 
00378           ++this->max_handlep1_;
00379         }
00380 
00381 #endif /* ACE_WIN32 */
00382 
00383       // The handle has been completely removed.
00384       complete_removal = true;
00385     }
00386 
00387   if (event_handler == 0)
00388     return -1;
00389 
00390   bool const requires_reference_counting =
00391     event_handler->reference_counting_policy ().value () ==
00392     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00393 
00394   // Close down the <Event_Handler> unless we've been instructed not
00395   // to.
00396   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
00397     (void) event_handler->handle_close (handle, mask);
00398 
00399   // Call remove_reference() if the removal is complete and reference
00400   // counting is needed.
00401   if (complete_removal && requires_reference_counting)
00402     {
00403       (void) event_handler->remove_reference ();
00404     }
00405 
00406   return 0;
00407 }
00408 
00409 ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
00410   (ACE_Select_Reactor_Handler_Repository const * s)
00411     : rep_ (s),
00412       current_ (s->event_handlers_.begin ())
00413 {
00414 #ifndef ACE_WIN32
00415   // Don't use ACE_Array_Base::end() since it may be larger than
00416   // event_handlers[max_handlep1_].
00417   const_base_iterator const end =
00418     &this->rep_->event_handlers_[this->rep_->max_handlep1 ()];
00419 
00420   // Advance to the next element containing a non-zero event handler.
00421   // There's no need to do this for the Windows case since the hash
00422   // map will only contain non-zero event handlers.
00423   while (this->current_ != end && (*(this->current_) == 0))
00424     ++this->current_;
00425 #endif
00426 }
00427 
00428 // Pass back the <next_item> that hasn't been seen in the Set.
00429 // Returns 0 when all items have been seen, else 1.
00430 
00431 bool
00432 ACE_Select_Reactor_Handler_Repository_Iterator::next (
00433   ACE_Event_Handler *&next_item)
00434 {
00435   bool result = true;
00436 
00437   if (this->done ())
00438     result = false;
00439   else
00440     next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->current_);
00441 
00442   return result;
00443 }
00444 
00445 // Move forward by one element in the set.
00446 
00447 bool
00448 ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
00449 {
00450 #ifdef ACE_WIN32
00451   // No need to explicitly limit search to "current" to
00452   // max_handlep1_ range.
00453   const_base_iterator const end = this->rep_->event_handlers_.end ();
00454 #else
00455   // Don't use ACE_Array_Base::end() since it may be larger than
00456   // event_handlers[max_handlep1_].
00457   const_base_iterator const end =
00458     &this->rep_->event_handlers_[this->rep_->max_handlep1 ()];
00459 #endif  /* ACE_WIN32 */
00460 
00461   if (this->current_ != end)
00462     ++this->current_;
00463 
00464 #ifndef ACE_WIN32
00465   // Advance to the next element containing a non-zero event handler.
00466   // There's no need to do this for the Windows case since the hash
00467   // map will only contain non-zero event handlers.
00468   while (this->current_ != end && (*(this->current_) == 0))
00469     ++this->current_;
00470 #endif  /* !ACE_WIN32 */
00471 
00472   return this->current_ != end;
00473 }
00474 
00475 // Dump the state of an object.
00476 
00477 void
00478 ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
00479 {
00480 #if defined (ACE_HAS_DUMP)
00481   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
00482 
00483   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00484   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("rep_ = %u"), this->rep_));
00485 # ifdef ACE_WIN32
00486   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = ")));
00487   this->current_.dump ();
00488 # else
00489   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = %@"), this->current_));
00490 # endif  /* ACE_WIN32 */
00491   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00492 #endif /* ACE_HAS_DUMP */
00493 }
00494 
00495 void
00496 ACE_Select_Reactor_Handler_Repository::dump (void) const
00497 {
00498 #if defined (ACE_HAS_DUMP)
00499   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
00500 
00501 # ifdef ACE_WIN32
00502 #  define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%@")
#  define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%u")
00503 # else
00504 #  define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%d")
00505 #  define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%d")
00506 # endif  /* ACE_WIN32 */
00507 
00508   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00509   ACE_DEBUG ((LM_DEBUG,
00510               ACE_TEXT ("max_handlep1_ = ")
00511               ACE_MAX_HANDLEP1_FORMAT_SPECIFIER
00512               ACE_TEXT ("\n"),
00513               this->max_handlep1 ()));
00514   ACE_DEBUG ((LM_DEBUG,  ACE_TEXT ("[")));
00515 
00516   ACE_Event_Handler *event_handler = 0;
00517 
00518   for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
00519        iter.next (event_handler) != 0;
00520        iter.advance ())
00521     ACE_DEBUG ((LM_DEBUG,
00522                 ACE_TEXT (" (event_handler = %@,")
00523                 ACE_TEXT (" event_handler->handle_ = ")
00524                 ACE_HANDLE_FORMAT_SPECIFIER
00525                 ACE_TEXT ("\n"),
00526                 event_handler,
00527                 event_handler->get_handle ()));
00528 
00529   ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" ]\n")));
00530   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00531 #endif /* ACE_HAS_DUMP */
00532 }
00533 
00534 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
00535 
00536 ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
00537   : max_notify_iterations_ (-1)
00538 {
00539 }
00540 
00541 ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void)
00542 {
00543 }
00544 
00545 void
00546 ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
00547 {
00548   // Must always be > 0 or < 0 to optimize the loop exit condition.
00549   if (iterations == 0)
00550     iterations = 1;
00551 
00552   this->max_notify_iterations_ = iterations;
00553 }
00554 
00555 int
00556 ACE_Select_Reactor_Notify::max_notify_iterations (void)
00557 {
00558   return this->max_notify_iterations_;
00559 }
00560 
00561 // purge_pending_notifications
00562 // Removes all entries from the notify_queue_ and each one that
00563 // matches <eh> is put on the free_queue_. The rest are saved on a
00564 // local queue and copied back to the notify_queue_ at the end.
00565 // Returns the number of entries removed. Returns -1 on error.
00566 // ACE_NOTSUP_RETURN if ACE_HAS_REACTOR_NOTIFICATION_QUEUE is not defined.
00567 int
00568 ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
00569                                                         ACE_Reactor_Mask  mask )
00570 {
00571   ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00572 
00573 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00574 
00575   return notification_queue_.purge_pending_notifications(eh, mask);
00576 
00577 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00578   ACE_UNUSED_ARG (eh);
00579   ACE_UNUSED_ARG (mask);
00580   ACE_NOTSUP_RETURN (-1);
00581 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00582 }
00583 
00584 void
00585 ACE_Select_Reactor_Notify::dump (void) const
00586 {
00587 #if defined (ACE_HAS_DUMP)
00588   ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00589 
00590   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00591   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00592   this->notification_pipe_.dump ();
00593   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00594 #endif /* ACE_HAS_DUMP */
00595 }
00596 
00597 int
00598 ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
00599                                  ACE_Timer_Queue *,
00600                                  int disable_notify_pipe)
00601 {
00602   ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00603 
00604   if (disable_notify_pipe == 0)
00605     {
00606       this->select_reactor_ =
00607         dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00608 
00609       if (select_reactor_ == 0)
00610         {
00611           errno = EINVAL;
00612           return -1;
00613         }
00614 
00615       if (this->notification_pipe_.open () == -1)
00616         return -1;
00617 #if defined (F_SETFD)
00618       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00619       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00620 #endif /* F_SETFD */
00621 
00622 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00623       if (notification_queue_.open() == -1)
00624         {
00625           return -1;
00626         }
00627 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00628 
00629       // There seems to be a Win32 bug with this...  Set this into
00630       // non-blocking mode.
00631       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00632                           ACE_NONBLOCK) == -1)
00633         return -1;
00634       else
00635         return this->select_reactor_->register_handler
00636           (this->notification_pipe_.read_handle (),
00637            this,
00638            ACE_Event_Handler::READ_MASK);
00639     }
00640   else
00641     {
00642       this->select_reactor_ = 0;
00643       return 0;
00644     }
00645 }
00646 
00647 int
00648 ACE_Select_Reactor_Notify::close (void)
00649 {
00650   ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00651 
00652 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00653   notification_queue_.reset();
00654 #else
00655   if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
00656     {
00657       // Please see Bug 2820, if we just close the pipe then we break
00658       // the reference counting rules.  Basically, all the event
00659       // handlers "stored" in the pipe had their reference counts
00660       // increased.  We need to decrease them before closing the
00661       // pipe....
00662       ACE_Notification_Buffer b;
00663       for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
00664            r > 0;
00665            r = read_notify_pipe(notification_pipe_.read_handle(), b))
00666         {
00667           if (b.eh_ != 0)
00668             {
00669               b.eh_->remove_reference();
00670             }
00671         }
00672     }
00673 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00674 
00675   return this->notification_pipe_.close ();
00676 }
00677 
00678 int
00679 ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
00680                                    ACE_Reactor_Mask mask,
00681                                    ACE_Time_Value *timeout)
00682 {
00683   ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00684 
00685   // Just consider this method a "no-op" if there's no
00686   // <ACE_Select_Reactor> configured.
00687   if (this->select_reactor_ == 0)
00688     return 0;
00689 
00690   ACE_Event_Handler_var safe_handler (event_handler);
00691 
00692   if (event_handler)
00693     event_handler->add_reference ();
00694 
00695   ACE_Notification_Buffer buffer (event_handler, mask);
00696 
00697 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00698   int const notification_required =
00699     notification_queue_.push_new_notification(buffer);
00700 
00701   if (notification_required == -1)
00702     {
00703       return -1;
00704     }
00705 
00706   if (notification_required == 0)
00707     {
00708       // No failures, the handler is now owned by the notification queue
00709       safe_handler.release ();
00710 
00711       return 0;
00712     }
00713 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00714 
00715   ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
00716                                (char *) &buffer,
00717                                sizeof buffer,
00718                                timeout);
00719   if (n == -1)
00720     return -1;
00721 
00722   // No failures.
00723   safe_handler.release ();
00724 
00725   return 0;
00726 }
00727 
00728 // Handles pending threads (if any) that are waiting to unblock the
00729 // Select_Reactor.
00730 
00731 int
00732 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
00733                                                    ACE_Handle_Set &rd_mask)
00734 {
00735   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00736 
00737   ACE_HANDLE const read_handle =
00738     this->notification_pipe_.read_handle ();
00739 
00740   if (read_handle != ACE_INVALID_HANDLE
00741       && rd_mask.is_set (read_handle))
00742     {
00743       --number_of_active_handles;
00744       rd_mask.clr_bit (read_handle);
00745       return this->handle_input (read_handle);
00746     }
00747   else
00748     return 0;
00749 }
00750 
00751 
00752 ACE_HANDLE
00753 ACE_Select_Reactor_Notify::notify_handle (void)
00754 {
00755   ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00756 
00757   return this->notification_pipe_.read_handle ();
00758 }
00759 
00760 
00761 int
00762 ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
00763 {
00764 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00765   ACE_UNUSED_ARG(buffer);
00766   return 1;
00767 #else
00768   // If eh == 0 then another thread is unblocking the
00769   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00770   // internal structures.  Otherwise, we need to dispatch the
00771   // appropriate handle_* method on the <ACE_Event_Handler>
00772   // pointer we've been passed.
00773   if (buffer.eh_ != 0)
00774     {
00775       return 1;
00776     }
00777   else
00778     {
00779       // has no dispatchable buffer
00780       return 0;
00781     }
00782 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00783 }
00784 
00785 int
00786 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00787 {
00788   int result = 0;
00789 
00790 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00791   // Dispatch one message from the notify queue, and put another in
00792   // the pipe if one is available.  Remember, the idea is to keep
00793   // exactly one message in the pipe at a time.
00794 
00795   bool more_messages_queued = false;
00796   ACE_Notification_Buffer next;
00797 
00798   result = notification_queue_.pop_next_notification(buffer,
00799                                                      more_messages_queued,
00800                                                      next);
00801 
00802   if (result == 0)
00803     {
00804       return 0;
00805     }
00806 
00807   if (result == -1)
00808     {
00809       return -1;
00810     }
00811 
00812   if(more_messages_queued)
00813     {
00814       (void) ACE::send(this->notification_pipe_.write_handle(),
00815             (char *)&next, sizeof(ACE_Notification_Buffer));
00816     }
00817 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00818 
00819   // If eh == 0 then another thread is unblocking the
00820   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00821   // internal structures.  Otherwise, we need to dispatch the
00822   // appropriate handle_* method on the <ACE_Event_Handler> pointer
00823   // we've been passed.
00824   if (buffer.eh_ != 0)
00825     {
00826       ACE_Event_Handler *event_handler = buffer.eh_;
00827 
00828       bool const requires_reference_counting =
00829         event_handler->reference_counting_policy ().value () ==
00830         ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00831 
00832       switch (buffer.mask_)
00833         {
00834         case ACE_Event_Handler::READ_MASK:
00835         case ACE_Event_Handler::ACCEPT_MASK:
00836           result = event_handler->handle_input (ACE_INVALID_HANDLE);
00837           break;
00838         case ACE_Event_Handler::WRITE_MASK:
00839           result = event_handler->handle_output (ACE_INVALID_HANDLE);
00840           break;
00841         case ACE_Event_Handler::EXCEPT_MASK:
00842           result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00843           break;
00844         case ACE_Event_Handler::QOS_MASK:
00845           result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00846           break;
00847         case ACE_Event_Handler::GROUP_QOS_MASK:
00848           result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00849           break;
00850         default:
00851           // Should we bail out if we get an invalid mask?
00852           ACE_ERROR ((LM_ERROR,
00853                       ACE_TEXT ("invalid mask = %d\n"),
00854                       buffer.mask_));
00855         }
00856 
00857       if (result == -1)
00858         event_handler->handle_close (ACE_INVALID_HANDLE,
00859                                      ACE_Event_Handler::EXCEPT_MASK);
00860 
00861       if (requires_reference_counting)
00862         {
00863           event_handler->remove_reference ();
00864         }
00865     }
00866 
00867   return 1;
00868 }
00869 
00870 int
00871 ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
00872                                              ACE_Notification_Buffer &buffer)
00873 {
00874   ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00875 
00876   ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00877 
00878   if (n > 0)
00879     {
00880       // Check to see if we've got a short read.
00881       if (n != sizeof buffer)
00882         {
00883           ssize_t const remainder = sizeof buffer - n;
00884 
00885           // If so, try to recover by reading the remainder.  If this
00886           // doesn't work we're in big trouble since the input stream
00887           // won't be aligned correctly.  I'm not sure quite what to
00888           // do at this point.  It's probably best just to return -1.
00889           if (ACE::recv (handle,
00890                          ((char *) &buffer) + n,
00891                          remainder) != remainder)
00892             return -1;
00893         }
00894 
00895 
00896       return 1;
00897     }
00898 
00899   // Return -1 if things have gone seriously  wrong.
00900   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00901     return -1;
00902 
00903   return 0;
00904 }
00905 
00906 
00907 int
00908 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
00909 {
00910   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
00911   // Precondition: this->select_reactor_.token_.current_owner () ==
00912   // ACE_Thread::self ();
00913 
00914   int number_dispatched = 0;
00915   int result = 0;
00916   ACE_Notification_Buffer buffer;
00917 
00918   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00919     {
00920       // Dispatch the buffer
00921       // NOTE: We count only if we made any dispatches ie. upcalls.
00922       if (this->dispatch_notify (buffer) > 0)
00923         ++number_dispatched;
00924 
00925       // Bail out if we've reached the <notify_threshold_>.  Note that
00926       // by default <notify_threshold_> is -1, so we'll loop until all
00927       // the notifications in the pipe have been dispatched.
00928       if (number_dispatched == this->max_notify_iterations_)
00929         break;
00930     }
00931 
00932   // Reassign number_dispatched to -1 if things have gone seriously
00933   // wrong.
00934   if (result < 0)
00935     number_dispatched = -1;
00936 
00937   // Enqueue ourselves into the list of waiting threads.  When we
00938   // reacquire the token we'll be off and running again with ownership
00939   // of the token.  The postcondition of this call is that
00940   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
00941   this->select_reactor_->renew ();
00942   return number_dispatched;
00943 }
00944 
00945 // -------------------------------------------
00946 
00947 int
00948 ACE_Select_Reactor_Impl::purge_pending_notifications (ACE_Event_Handler *eh,
00949                                                       ACE_Reactor_Mask mask)
00950 {
00951   if (this->notify_handler_ == 0)
00952     return 0;
00953   else
00954     return this->notify_handler_->purge_pending_notifications (eh, mask);
00955 }
00956 
00957 
00958 // Perform GET, CLR, SET, and ADD operations on the Handle_Sets.
00959 //
00960 // GET = 1, Retrieve current value
00961 // SET = 2, Set value of bits to new mask (changes the entire mask)
00962 // ADD = 3, Bitwise "or" the value into the mask (only changes
00963 //          enabled bits)
00964 // CLR = 4  Bitwise "and" the negation of the value out of the mask
00965 //          (only changes enabled bits)
00966 //
00967 // Returns the original mask.  Must be called with locks held.
00968 int
00969 ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
00970                                   ACE_Reactor_Mask mask,
00971                                   ACE_Select_Reactor_Handle_Set &handle_set,
00972                                   int ops)
00973 {
00974   ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops");
00975   if (this->handler_rep_.handle_in_range (handle) == 0)
00976     return -1;
00977 
00978 #if !defined (ACE_WIN32)
00979   ACE_Sig_Guard sb (0,
00980                     this->mask_signals_); // Block out all signals until method returns.
00981 #endif /* ACE_WIN32 */
00982 
00983   ACE_FDS_PTMF ptmf  = &ACE_Handle_Set::set_bit;
00984   u_long omask = ACE_Event_Handler::NULL_MASK;
00985 
00986   // Find the old reactor masks.  This automatically does the work of
00987   // the GET_MASK operation.
00988   if (handle_set.rd_mask_.is_set (handle))
00989     ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
00990   if (handle_set.wr_mask_.is_set (handle))
00991     ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
00992   if (handle_set.ex_mask_.is_set (handle))
00993     ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
00994 
00995   switch (ops)
00996     {
00997     case ACE_Reactor::GET_MASK:
00998       // The work for this operation is done in all cases at the
00999       // begining of the function.
01000       break;
01001     case ACE_Reactor::CLR_MASK:
01002       ptmf = &ACE_Handle_Set::clr_bit;
01003       // State was changed. we need to reflect that change in the
01004       // dispatch_mask I assume that only ACE_Reactor::CLR_MASK should
01005       // be treated here  which means we need to clear the handle|mask
01006       // from the current dispatch handler
01007       this->clear_dispatch_mask (handle, mask);
01008       /* FALLTHRU */
01009     case ACE_Reactor::SET_MASK:
01010       /* FALLTHRU */
01011     case ACE_Reactor::ADD_MASK:
01012 
01013       // The following code is rather subtle...  Note that if we are
01014       // doing a ACE_Reactor::SET_MASK then if the bit is not enabled
01015       // in the mask we need to clear the bit from the ACE_Handle_Set.
01016       // On the other hand, if we are doing a ACE_Reactor::CLR_MASK or
01017       // a ACE_Reactor::ADD_MASK we just carry out the operations
01018       // specified by the mask.
01019 
01020       // READ, ACCEPT, and CONNECT flag will place the handle in the
01021       // read set.
01022       if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
01023           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
01024           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
01025         {
01026           (handle_set.rd_mask_.*ptmf) (handle);
01027         }
01028       else if (ops == ACE_Reactor::SET_MASK)
01029         handle_set.rd_mask_.clr_bit (handle);
01030 
01031       // WRITE and CONNECT flag will place the handle in the write set
01032       if (ACE_BIT_ENABLED (mask,
01033                            ACE_Event_Handler::WRITE_MASK)
01034           || ACE_BIT_ENABLED (mask,
01035                               ACE_Event_Handler::CONNECT_MASK))
01036         {
01037           (handle_set.wr_mask_.*ptmf) (handle);
01038         }
01039       else if (ops == ACE_Reactor::SET_MASK)
01040         handle_set.wr_mask_.clr_bit (handle);
01041 
01042       // EXCEPT (and CONNECT on Win32) flag will place the handle in
01043       // the except set.
01044       if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
01045 #if defined (ACE_WIN32)
01046           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
01047 #endif /* ACE_WIN32 */
01048           )
01049         {
01050           (handle_set.ex_mask_.*ptmf) (handle);
01051         }
01052       else if (ops == ACE_Reactor::SET_MASK)
01053         handle_set.ex_mask_.clr_bit (handle);
01054       break;
01055     default:
01056       return -1;
01057     }
01058   return omask;
01059 }
01060 
01061 void
01062 ACE_Select_Reactor_Impl::clear_dispatch_mask (ACE_HANDLE handle,
01063                                               ACE_Reactor_Mask mask)
01064 {
01065   ACE_TRACE ("ACE_Select_Reactor_Impl::clear_dispatch_mask");
01066 
01067   //  Use handle and mask in order to modify the sets
01068   // (wait/suspend/ready/dispatch), that way, the dispatch_io_set loop
01069   // will not be interrupt, and there will no reason to rescan the
01070   // wait_set and re-calling select function, which is *very*
01071   // expensive. It seems that wait/suspend/ready sets are getting
01072   // updated in register/remove bind/unbind etc functions.  The only
01073   // thing need to be updated is the dispatch_set (also can  be found
01074   // in that file code as dispatch_mask).  Because of that, we need
01075   // that dispatch_set to be member of the ACE_Select_Reactor_impl in
01076   // Select_Reactor_Base.h file  That way we will have access to that
01077   // member in that function.
01078 
01079   // We kind of invalidate the iterator in dispatch_io_set because its
01080   // an array and index built from the original dispatch-set. Take a
01081   // look at dispatch_io_set for more details.
01082 
01083   // We only need to clr_bit, because we are interested in clearing the
01084   // handles that was removed, so no dispatching to these handles will
01085   // occur.
01086   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) ||
01087       ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
01088     {
01089       this->dispatch_set_.rd_mask_.clr_bit (handle);
01090     }
01091   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
01092     {
01093       this->dispatch_set_.wr_mask_.clr_bit (handle);
01094     }
01095   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
01096     {
01097       this->dispatch_set_.ex_mask_.clr_bit (handle);
01098     }
01099 
01100   // That will make the dispatch_io_set iterator re-start and rescan
01101   // the dispatch set.
01102   this->state_changed_ = true;
01103 }
01104 
01105 
01106 int
01107 ACE_Select_Reactor_Impl::resumable_handler (void)
01108 {
01109   // The select reactor has no handlers that can be resumed by the
01110   // application. So return 0;
01111 
01112   return 0;
01113 }
01114 
01115 ACE_END_VERSIONED_NAMESPACE_DECL
01116 

Generated on Tue Feb 2 17:18:42 2010 for ACE by  doxygen 1.4.7