00001 
00002 
00003 #include "ace/Select_Reactor_Base.h"
00004 #include "ace/Reactor.h"
00005 #include "ace/Thread.h"
00006 #include "ace/SOCK_Acceptor.h"
00007 #include "ace/SOCK_Connector.h"
00008 #include "ace/Timer_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/Signal.h"
00011 #include "ace/OS_NS_fcntl.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/Select_Reactor_Base.inl"
00015 #endif 
00016 
00017 #ifndef ACE_WIN32
00018 # include <algorithm>
00019 #endif  
00020 
00021 ACE_RCSID (ace,
00022            Select_Reactor_Base,
00023            "$Id: Select_Reactor_Base.cpp 79332 2007-08-13 20:30:44Z sowayaa $")
00024 
00025 
00026 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 template<typename iterator>
00029 inline ACE_Event_Handler *
00030 ACE_SELECT_REACTOR_EVENT_HANDLER (iterator i)
00031 {
00032 #ifdef ACE_WIN32
00033   return (*i).item ();
00034 #else
00035   return (*i);
00036 #endif  
00037 }
00038 
00039 
00040 
00041 bool
00042 ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
00043 {
00044   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
00045 #if defined (ACE_WIN32)
00046   
00047   
00048   if (handle == ACE_INVALID_HANDLE)
00049 #else 
00050   if (handle < 0
00051       || static_cast<size_type> (handle) >= this->event_handlers_.size ())
00052 #endif 
00053     {
00054       errno = EINVAL;
00055       return true;
00056     }
00057 
00058   return false;
00059 }
00060 
00061 
00062 
00063 bool
00064 ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
00065 {
00066   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
00067 #if defined (ACE_WIN32)
00068   
00069   
00070   if (handle != ACE_INVALID_HANDLE)
00071 #else 
00072   if (handle >= 0 && handle < this->max_handlep1_)
00073 #endif 
00074     {
00075       return true;
00076     }
00077 
00078   
00079   
00080   
00081 
00082   return false;
00083 }
00084 
00085 int
00086 ACE_Select_Reactor_Handler_Repository::open (size_t size)
00087 {
00088   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
00089 
00090 #if defined (ACE_WIN32)
00091   if (this->event_handlers_.open (size) == -1)
00092     return -1;
00093 #else
00094   if (this->event_handlers_.size (size) == -1)
00095     return -1;
00096 
00097   
00098   std::fill (this->event_handlers_.begin (),
00099              this->event_handlers_.end (),
00100              static_cast<ACE_Event_Handler *> (0));
00101 
00102   this->max_handlep1_ = 0;
00103 #endif 
00104 
00105   
00106   
00107   return ACE::set_handle_limit (static_cast<int> (size), 1);
00108 }
00109 
00110 
00111 
00112 ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
00113   : select_reactor_ (select_reactor),
00114 #ifndef ACE_WIN32
00115     max_handlep1_ (0),
00116 #endif  
00117     event_handlers_ ()
00118 {
00119   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
00120 }
00121 
00122 int
00123 ACE_Select_Reactor_Handler_Repository::unbind_all (void)
00124 {
00125   
00126 #ifdef ACE_WIN32
00127   map_type::iterator const end = this->event_handlers_.end ();
00128   for (map_type::iterator pos = this->event_handlers_.begin ();
00129        pos != end;
00130        )
00131     {
00132       
00133       
00134       
00135       map_type::iterator const the_pos (pos++);
00136 
00137       ACE_HANDLE const handle = (*the_pos).key ();
00138       (void) this->unbind (handle,
00139                            the_pos,
00140                            ACE_Event_Handler::ALL_EVENTS_MASK);
00141     }
00142 #else
00143   
00144   
00145   
00146   map_type::iterator pos =
00147     this->event_handlers_.begin ();  
00148 
00149   for (ACE_HANDLE handle = 0;
00150        handle < this->max_handlep1_;
00151        ++handle)
00152     {
00153       (void) this->unbind (handle,
00154                            pos,
00155                            ACE_Event_Handler::ALL_EVENTS_MASK);
00156       ++pos;
00157     }
00158 #endif  
00159 
00160   return 0;
00161 }
00162 
00163 int
00164 ACE_Select_Reactor_Handler_Repository::close (void)
00165 {
00166   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
00167 
00168   return this->unbind_all ();
00169 }
00170 
00171 ACE_Select_Reactor_Handler_Repository::map_type::iterator
00172 ACE_Select_Reactor_Handler_Repository::find_eh (ACE_HANDLE handle)
00173 {
00174   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find_eh");
00175 
00176   map_type::iterator pos (this->event_handlers_.end ());
00177 
00178   
00179 #if defined (ACE_WIN32)
00180   this->event_handlers_.find (handle, pos);
00181 #else
00182   map_type::iterator const tmp = &this->event_handlers_[handle];
00183 
00184   if (*tmp != 0)
00185     pos = tmp;
00186 #endif 
00187 
00188   return pos;
00189 }
00190 
00191 
00192 int
00193 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
00194                                              ACE_Event_Handler *event_handler,
00195                                              ACE_Reactor_Mask mask)
00196 {
00197   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
00198 
00199   if (event_handler == 0)
00200     return -1;
00201 
00202   if (handle == ACE_INVALID_HANDLE)
00203     handle = event_handler->get_handle ();
00204 
00205   if (this->invalid_handle (handle))
00206     return -1;
00207 
00208   
00209   bool existing_handle = false;
00210 
00211 #if defined (ACE_WIN32)
00212 
00213   map_type::ENTRY * entry = 0;
00214 
00215   int const result =
00216     this->event_handlers_.bind (handle, event_handler, entry);
00217 
00218   if (result == -1)
00219     {
00220       return -1;
00221     }
00222   else if (result == 1)  
00223     {
00224       
00225       if (event_handler != entry->item ())
00226         {
00227           return -1;
00228         }
00229       else
00230         {
00231           
00232           
00233           existing_handle = true;
00234         }
00235     }
00236 
00237 #else
00238 
00239   
00240   ACE_Event_Handler * const current_handler =
00241     this->event_handlers_[handle];
00242 
00243   if (current_handler)
00244     {
00245       
00246       if (current_handler != event_handler)
00247         return -1;
00248 
00249       
00250       
00251       existing_handle = true;
00252     }
00253 
00254   this->event_handlers_[handle] = event_handler;
00255 
00256   if (this->max_handlep1_ < handle + 1)
00257     this->max_handlep1_ = handle + 1;
00258 
00259 #endif 
00260 
00261   if (this->select_reactor_.is_suspended_i (handle))
00262     {
00263       this->select_reactor_.bit_ops (handle,
00264                                      mask,
00265                                      this->select_reactor_.suspend_set_,
00266                                      ACE_Reactor::ADD_MASK);
00267     }
00268   else
00269     {
00270       this->select_reactor_.bit_ops (handle,
00271                                      mask,
00272                                      this->select_reactor_.wait_set_,
00273                                      ACE_Reactor::ADD_MASK);
00274 
00275       
00276       
00277       
00278       
00279     }
00280 
00281   
00282   if (!existing_handle)
00283     event_handler->add_reference ();
00284 
00285   return 0;
00286 }
00287 
00288 
00289 
00290 int
00291 ACE_Select_Reactor_Handler_Repository::unbind (
00292   ACE_HANDLE handle,
00293   map_type::iterator pos,
00294   ACE_Reactor_Mask mask)
00295 {
00296   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
00297 
00298   
00299   
00300   
00301   ACE_Event_Handler * const event_handler =
00302     (pos == this->event_handlers_.end ()
00303      ? 0
00304      : ACE_SELECT_REACTOR_EVENT_HANDLER (pos));
00305 
00306   
00307   this->select_reactor_.bit_ops (handle,
00308                                  mask,
00309                                  this->select_reactor_.wait_set_,
00310                                  ACE_Reactor::CLR_MASK);
00311 
00312   
00313   this->select_reactor_.bit_ops (handle,
00314                                  mask,
00315                                  this->select_reactor_.suspend_set_,
00316                                  ACE_Reactor::CLR_MASK);
00317 
00318   
00319   
00320   
00321   
00322 
00323   
00324   
00325 
00326   bool const has_any_wait_mask =
00327     (this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
00328      || this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
00329      || this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
00330   bool const has_any_suspend_mask =
00331     (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
00332      || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
00333      || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
00334 
00335   bool complete_removal = false;
00336 
00337   if (!has_any_wait_mask && !has_any_suspend_mask)
00338     {
00339 #if defined (ACE_WIN32)
00340       if (event_handler != 0 && this->event_handlers_.unbind (pos) == -1)
00341         return -1;  
00342 #else
00343       this->event_handlers_[handle] = 0;
00344 
00345       if (this->max_handlep1_ == handle + 1)
00346         {
00347           
00348           
00349           
00350           ACE_HANDLE const wait_rd_max =
00351             this->select_reactor_.wait_set_.rd_mask_.max_set ();
00352           ACE_HANDLE const wait_wr_max =
00353             this->select_reactor_.wait_set_.wr_mask_.max_set ();
00354           ACE_HANDLE const wait_ex_max =
00355             this->select_reactor_.wait_set_.ex_mask_.max_set ();
00356 
00357           ACE_HANDLE const suspend_rd_max =
00358             this->select_reactor_.suspend_set_.rd_mask_.max_set ();
00359           ACE_HANDLE const suspend_wr_max =
00360             this->select_reactor_.suspend_set_.wr_mask_.max_set ();
00361           ACE_HANDLE const suspend_ex_max =
00362             this->select_reactor_.suspend_set_.ex_mask_.max_set ();
00363 
00364           
00365           this->max_handlep1_ = wait_rd_max;
00366           if (this->max_handlep1_ < wait_wr_max)
00367             this->max_handlep1_ = wait_wr_max;
00368           if (this->max_handlep1_ < wait_ex_max)
00369             this->max_handlep1_ = wait_ex_max;
00370 
00371           if (this->max_handlep1_ < suspend_rd_max)
00372             this->max_handlep1_ = suspend_rd_max;
00373           if (this->max_handlep1_ < suspend_wr_max)
00374             this->max_handlep1_ = suspend_wr_max;
00375           if (this->max_handlep1_ < suspend_ex_max)
00376             this->max_handlep1_ = suspend_ex_max;
00377 
00378           ++this->max_handlep1_;
00379         }
00380 
00381 #endif 
00382 
00383       
00384       complete_removal = true;
00385     }
00386 
00387   if (event_handler == 0)
00388     return -1;
00389 
00390   bool const requires_reference_counting =
00391     event_handler->reference_counting_policy ().value () ==
00392     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00393 
00394   
00395   
00396   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
00397     (void) event_handler->handle_close (handle, mask);
00398 
00399   
00400   
00401   if (complete_removal && requires_reference_counting)
00402     {
00403       (void) event_handler->remove_reference ();
00404     }
00405 
00406   return 0;
00407 }
00408 
00409 ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
00410   (ACE_Select_Reactor_Handler_Repository const * s)
00411     : rep_ (s),
00412       current_ (s->event_handlers_.begin ())
00413 {
00414 }
00415 
00416 
00417 
00418 
00419 bool
00420 ACE_Select_Reactor_Handler_Repository_Iterator::next (
00421   ACE_Event_Handler *&next_item)
00422 {
00423   bool result = true;
00424 
00425   if (this->done ())
00426     result = false;
00427   else
00428     next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->current_);
00429 
00430   return result;
00431 }
00432 
00433 
00434 
00435 bool
00436 ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
00437 {
00438 #ifdef ACE_WIN32
00439   
00440   
00441   const_base_iterator const end = this->rep_->event_handlers_.end ();
00442 #else
00443   
00444   
00445   const_base_iterator const end =
00446     &this->rep_->event_handlers_[this->rep_->max_handlep1 ()];
00447 #endif  
00448 
00449   if (this->current_ != end)
00450     ++this->current_;
00451 
00452 #ifndef ACE_WIN32
00453   
00454   
00455   
00456   while (this->current_ != end && (*(this->current_) == 0))
00457     ++this->current_;
00458 #endif  
00459 
00460   return this->current_ != end;
00461 }
00462 
00463 
00464 
00465 void
00466 ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
00467 {
00468 #if defined (ACE_HAS_DUMP)
00469   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
00470 
00471   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00472   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("rep_ = %u"), this->rep_));
00473 # ifdef ACE_WIN32
00474   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = ")));
00475   this->current_.dump ();
00476 # else
00477   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = %@"), this->current_));
00478 # endif  
00479   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00480 #endif 
00481 }
00482 
00483 void
00484 ACE_Select_Reactor_Handler_Repository::dump (void) const
00485 {
00486 #if defined (ACE_HAS_DUMP)
00487   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
00488 
00489 # ifdef ACE_WIN32
00490 #  define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%@")
#  define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%u")
00491 # else
00492 #  define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%d")
00493 #  define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%d")
00494 # endif  
00495 
00496   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00497   ACE_DEBUG ((LM_DEBUG,
00498               ACE_TEXT ("max_handlep1_ = ")
00499               ACE_MAX_HANDLEP1_FORMAT_SPECIFIER
00500               ACE_TEXT ("\n"),
00501               this->max_handlep1 ()));
00502   ACE_DEBUG ((LM_DEBUG,  ACE_TEXT ("[")));
00503 
00504   ACE_Event_Handler *event_handler = 0;
00505 
00506   for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
00507        iter.next (event_handler) != 0;
00508        iter.advance ())
00509     ACE_DEBUG ((LM_DEBUG,
00510                 ACE_TEXT (" (event_handler = %@,")
00511                 ACE_TEXT (" event_handler->handle_ = ")
00512                 ACE_HANDLE_FORMAT_SPECIFIER
00513                 ACE_TEXT ("\n"),
00514                 event_handler,
00515                 event_handler->get_handle ()));
00516 
00517   ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" ]\n")));
00518   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00519 #endif 
00520 }
00521 
00522 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
00523 
00524 ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
00525   : max_notify_iterations_ (-1)
00526 {
00527 }
00528 
00529 ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void)
00530 {
00531 }
00532 
00533 void
00534 ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
00535 {
00536   
00537   if (iterations == 0)
00538     iterations = 1;
00539 
00540   this->max_notify_iterations_ = iterations;
00541 }
00542 
00543 int
00544 ACE_Select_Reactor_Notify::max_notify_iterations (void)
00545 {
00546   return this->max_notify_iterations_;
00547 }
00548 
00549 
00550 
00551 
00552 
00553 
00554 
00555 int
00556 ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
00557                                                         ACE_Reactor_Mask  mask )
00558 {
00559   ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00560 
00561 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00562 
00563   return notification_queue_.purge_pending_notifications(eh, mask);
00564 
00565 #else 
00566   ACE_UNUSED_ARG (eh);
00567   ACE_UNUSED_ARG (mask);
00568   ACE_NOTSUP_RETURN (-1);
00569 #endif  
00570 }
00571 
00572 void
00573 ACE_Select_Reactor_Notify::dump (void) const
00574 {
00575 #if defined (ACE_HAS_DUMP)
00576   ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00577 
00578   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00579   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00580   this->notification_pipe_.dump ();
00581   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00582 #endif 
00583 }
00584 
00585 int
00586 ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
00587                                  ACE_Timer_Queue *,
00588                                  int disable_notify_pipe)
00589 {
00590   ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00591 
00592   if (disable_notify_pipe == 0)
00593     {
00594       this->select_reactor_ =
00595         dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00596 
00597       if (select_reactor_ == 0)
00598         {
00599           errno = EINVAL;
00600           return -1;
00601         }
00602 
00603       if (this->notification_pipe_.open () == -1)
00604         return -1;
00605 #if defined (F_SETFD)
00606       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00607       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00608 #endif 
00609 
00610 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00611       if (notification_queue_.open() == -1)
00612         {
00613           return -1;
00614         }
00615 #endif 
00616 
00617       
00618       
00619       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00620                           ACE_NONBLOCK) == -1)
00621         return -1;
00622       else
00623         return this->select_reactor_->register_handler
00624           (this->notification_pipe_.read_handle (),
00625            this,
00626            ACE_Event_Handler::READ_MASK);
00627     }
00628   else
00629     {
00630       this->select_reactor_ = 0;
00631       return 0;
00632     }
00633 }
00634 
00635 int
00636 ACE_Select_Reactor_Notify::close (void)
00637 {
00638   ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00639 
00640 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00641   notification_queue_.reset();
00642 #else
00643   if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
00644     {
00645       
00646       
00647       
00648       
00649       
00650       ACE_Notification_Buffer b;
00651       for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
00652            r > 0;
00653            r = read_notify_pipe(notification_pipe_.read_handle(), b))
00654         {
00655           if (b.eh_ == 0) continue;
00656           b.eh_->remove_reference();
00657         }
00658     }
00659 #endif 
00660 
00661   return this->notification_pipe_.close ();
00662 }
00663 
00664 int
00665 ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
00666                                    ACE_Reactor_Mask mask,
00667                                    ACE_Time_Value *timeout)
00668 {
00669   ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00670 
00671   
00672   
00673   if (this->select_reactor_ == 0)
00674     return 0;
00675 
00676   ACE_Event_Handler_var safe_handler (event_handler);
00677 
00678   if (event_handler)
00679     event_handler->add_reference ();
00680 
00681   ACE_Notification_Buffer buffer (event_handler, mask);
00682 
00683 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00684   int notification_required =
00685     notification_queue_.push_new_notification(buffer);
00686 
00687   if (notification_required == -1)
00688   {
00689     return -1;
00690   }
00691 
00692   if (notification_required == 0)
00693   {
00694     
00695     safe_handler.release ();
00696 
00697     return 0;
00698   }
00699 #endif 
00700 
00701   ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
00702                                (char *) &buffer,
00703                                sizeof buffer,
00704                                timeout);
00705   if (n == -1)
00706     return -1;
00707 
00708   
00709   safe_handler.release ();
00710 
00711   return 0;
00712 }
00713 
00714 
00715 
00716 
00717 int
00718 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
00719                                                    ACE_Handle_Set &rd_mask)
00720 {
00721   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00722 
00723   ACE_HANDLE const read_handle =
00724     this->notification_pipe_.read_handle ();
00725 
00726   if (read_handle != ACE_INVALID_HANDLE
00727       && rd_mask.is_set (read_handle))
00728     {
00729       --number_of_active_handles;
00730       rd_mask.clr_bit (read_handle);
00731       return this->handle_input (read_handle);
00732     }
00733   else
00734     return 0;
00735 }
00736 
00737 
00738 ACE_HANDLE
00739 ACE_Select_Reactor_Notify::notify_handle (void)
00740 {
00741   ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00742 
00743   return this->notification_pipe_.read_handle ();
00744 }
00745 
00746 
00747 int
00748 ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
00749 {
00750 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00751   ACE_UNUSED_ARG(buffer);
00752   return 1;
00753 #else
00754   
00755   
00756   
00757   
00758   
00759   if (buffer.eh_ != 0)
00760     {
00761       return 1;
00762     }
00763   else
00764     {
00765       
00766       return 0;
00767     }
00768 #endif 
00769 }
00770 
00771 int
00772 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00773 {
00774   int result = 0;
00775 
00776 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00777   
00778   
00779   
00780 
00781   bool more_messages_queued = false;
00782   ACE_Notification_Buffer next;
00783 
00784   result = notification_queue_.pop_next_notification(buffer, 
00785                                                      more_messages_queued, 
00786                                                      next);
00787 
00788   if (result == 0)
00789     {
00790       return 0;
00791     }
00792 
00793   if (result == -1)
00794     {
00795       return -1;
00796     }
00797 
00798   if(more_messages_queued)
00799     {
00800       (void) ACE::send(this->notification_pipe_.write_handle(),
00801             (char *)&next, sizeof(ACE_Notification_Buffer));
00802     }
00803 #endif 
00804 
00805   
00806   
00807   
00808   
00809   
00810   if (buffer.eh_ != 0)
00811     {
00812       ACE_Event_Handler *event_handler =
00813         buffer.eh_;
00814 
00815       bool const requires_reference_counting =
00816         event_handler->reference_counting_policy ().value () ==
00817         ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00818 
00819       switch (buffer.mask_)
00820         {
00821         case ACE_Event_Handler::READ_MASK:
00822         case ACE_Event_Handler::ACCEPT_MASK:
00823           result = event_handler->handle_input (ACE_INVALID_HANDLE);
00824           break;
00825         case ACE_Event_Handler::WRITE_MASK:
00826           result = event_handler->handle_output (ACE_INVALID_HANDLE);
00827           break;
00828         case ACE_Event_Handler::EXCEPT_MASK:
00829           result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00830           break;
00831         case ACE_Event_Handler::QOS_MASK:
00832           result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00833           break;
00834         case ACE_Event_Handler::GROUP_QOS_MASK:
00835           result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00836           break;
00837         default:
00838           
00839           ACE_ERROR ((LM_ERROR,
00840                       ACE_TEXT ("invalid mask = %d\n"),
00841                       buffer.mask_));
00842         }
00843 
00844       if (result == -1)
00845         event_handler->handle_close (ACE_INVALID_HANDLE,
00846                                      ACE_Event_Handler::EXCEPT_MASK);
00847 
00848       if (requires_reference_counting)
00849         {
00850           event_handler->remove_reference ();
00851         }
00852     }
00853 
00854   return 1;
00855 }
00856 
00857 int
00858 ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
00859                                              ACE_Notification_Buffer &buffer)
00860 {
00861   ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00862 
00863   ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00864 
00865   if (n > 0)
00866     {
00867       
00868       if (n != sizeof buffer)
00869         {
00870           ssize_t const remainder = sizeof buffer - n;
00871 
00872           
00873           
00874           
00875           
00876           if (ACE::recv (handle,
00877                          ((char *) &buffer) + n,
00878                          remainder) != remainder)
00879             return -1;
00880         }
00881 
00882 
00883       return 1;
00884     }
00885 
00886   
00887   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00888     return -1;
00889 
00890   return 0;
00891 }
00892 
00893 
00894 int
00895 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
00896 {
00897   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
00898   
00899   
00900 
00901   int number_dispatched = 0;
00902   int result = 0;
00903   ACE_Notification_Buffer buffer;
00904 
00905   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00906     {
00907       
00908       
00909       if (this->dispatch_notify (buffer) > 0)
00910         ++number_dispatched;
00911 
00912       
00913       
00914       
00915       if (number_dispatched == this->max_notify_iterations_)
00916         break;
00917     }
00918 
00919   
00920   
00921   if (result < 0)
00922     number_dispatched = -1;
00923 
00924   
00925   
00926   
00927   
00928   this->select_reactor_->renew ();
00929   return number_dispatched;
00930 }
00931 
00932 
00933 
00934 int
00935 ACE_Select_Reactor_Impl::purge_pending_notifications (ACE_Event_Handler *eh,
00936                                                       ACE_Reactor_Mask mask)
00937 {
00938   if (this->notify_handler_ == 0)
00939     return 0;
00940   else
00941     return this->notify_handler_->purge_pending_notifications (eh, mask);
00942 }
00943 
00944 
00945 
00946 
00947 
00948 
00949 
00950 
00951 
00952 
00953 
00954 
00955 int
00956 ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
00957                                   ACE_Reactor_Mask mask,
00958                                   ACE_Select_Reactor_Handle_Set &handle_set,
00959                                   int ops)
00960 {
00961   ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops");
00962   if (this->handler_rep_.handle_in_range (handle) == 0)
00963     return -1;
00964 
00965 #if !defined (ACE_WIN32)
00966   ACE_Sig_Guard sb (0,
00967                     this->mask_signals_); 
00968 #endif 
00969 
00970   ACE_FDS_PTMF ptmf  = &ACE_Handle_Set::set_bit;
00971   u_long omask = ACE_Event_Handler::NULL_MASK;
00972 
00973   
00974   
00975   if (handle_set.rd_mask_.is_set (handle))
00976     ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
00977   if (handle_set.wr_mask_.is_set (handle))
00978     ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
00979   if (handle_set.ex_mask_.is_set (handle))
00980     ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
00981 
00982   switch (ops)
00983     {
00984     case ACE_Reactor::GET_MASK:
00985       
00986       
00987       break;
00988     case ACE_Reactor::CLR_MASK:
00989       ptmf = &ACE_Handle_Set::clr_bit;
00990       
00991       
00992       
00993       
00994       this->clear_dispatch_mask (handle, mask);
00995       
00996     case ACE_Reactor::SET_MASK:
00997       
00998     case ACE_Reactor::ADD_MASK:
00999 
01000       
01001       
01002       
01003       
01004       
01005       
01006 
01007       
01008       
01009       if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
01010           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
01011           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
01012         {
01013           (handle_set.rd_mask_.*ptmf) (handle);
01014         }
01015       else if (ops == ACE_Reactor::SET_MASK)
01016         handle_set.rd_mask_.clr_bit (handle);
01017 
01018       
01019       if (ACE_BIT_ENABLED (mask,
01020                            ACE_Event_Handler::WRITE_MASK)
01021           || ACE_BIT_ENABLED (mask,
01022                               ACE_Event_Handler::CONNECT_MASK))
01023         {
01024           (handle_set.wr_mask_.*ptmf) (handle);
01025         }
01026       else if (ops == ACE_Reactor::SET_MASK)
01027         handle_set.wr_mask_.clr_bit (handle);
01028 
01029       
01030       
01031       if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
01032 #if defined (ACE_WIN32)
01033           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
01034 #endif 
01035           )
01036         {
01037           (handle_set.ex_mask_.*ptmf) (handle);
01038         }
01039       else if (ops == ACE_Reactor::SET_MASK)
01040         handle_set.ex_mask_.clr_bit (handle);
01041       break;
01042     default:
01043       return -1;
01044     }
01045   return omask;
01046 }
01047 
01048 void
01049 ACE_Select_Reactor_Impl::clear_dispatch_mask (ACE_HANDLE handle,
01050                                               ACE_Reactor_Mask mask)
01051 {
01052   ACE_TRACE ("ACE_Select_Reactor_Impl::clear_dispatch_mask");
01053 
01054   
01055   
01056   
01057   
01058   
01059   
01060   
01061   
01062   
01063   
01064   
01065 
01066   
01067   
01068   
01069 
01070   
01071   
01072   
01073   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) ||
01074       ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
01075     {
01076       this->dispatch_set_.rd_mask_.clr_bit (handle);
01077     }
01078   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
01079     {
01080       this->dispatch_set_.wr_mask_.clr_bit (handle);
01081     }
01082   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
01083     {
01084       this->dispatch_set_.ex_mask_.clr_bit (handle);
01085     }
01086 
01087   
01088   
01089   this->state_changed_ = true;
01090 }
01091 
01092 
01093 int
01094 ACE_Select_Reactor_Impl::resumable_handler (void)
01095 {
01096   
01097   
01098 
01099   return 0;
01100 }
01101 
01102 ACE_END_VERSIONED_NAMESPACE_DECL
01103