00001 
00002 
00003 #include "ace/Select_Reactor_Base.h"
00004 #include "ace/Reactor.h"
00005 #include "ace/Thread.h"
00006 #include "ace/SOCK_Acceptor.h"
00007 #include "ace/SOCK_Connector.h"
00008 #include "ace/Timer_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/Signal.h"
00011 #include "ace/OS_NS_fcntl.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/Select_Reactor_Base.inl"
00015 #endif 
00016 
00017 ACE_RCSID (ace,
00018            Select_Reactor_Base,
00019            "Select_Reactor_Base.cpp,v 4.75 2006/04/19 19:13:09 jwillemsen Exp")
00020 
00021 
00022 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00023 
00024 #if defined (ACE_WIN32)
00025 #define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_)
00026 #define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_)
00027 #else
00028 #define ACE_SELECT_REACTOR_HANDLE(H) (H)
00029 #define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)])
00030 #endif 
00031 
00032 
00033 
00034 int
00035 ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
00036 {
00037   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
00038 #if defined (ACE_WIN32)
00039   
00040   
00041   if (handle == ACE_INVALID_HANDLE)
00042 #else 
00043     if (handle < 0 || handle >= this->max_size_)
00044 #endif 
00045       {
00046         errno = EINVAL;
00047         return 1;
00048       }
00049     else
00050       return 0;
00051 }
00052 
00053 
00054 
00055 int
00056 ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
00057 {
00058   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
00059 #if defined (ACE_WIN32)
00060   
00061   
00062   if (handle != ACE_INVALID_HANDLE)
00063 #else 
00064     if (handle >= 0 && handle < this->max_handlep1_)
00065 #endif 
00066       return 1;
00067     else
00068       {
00069         errno = EINVAL;
00070         return 0;
00071       }
00072 }
00073 
00074 size_t
00075 ACE_Select_Reactor_Handler_Repository::max_handlep1 (void)
00076 {
00077   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1");
00078 
00079   return this->max_handlep1_;
00080 }
00081 
00082 int
00083 ACE_Select_Reactor_Handler_Repository::open (size_t size)
00084 {
00085   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
00086   this->max_size_ = size;
00087   this->max_handlep1_ = 0;
00088 
00089 #if defined (ACE_WIN32)
00090   
00091   ACE_NEW_RETURN (this->event_handlers_,
00092                   ACE_Event_Tuple[size],
00093                   -1);
00094 
00095   
00096   for (size_t h = 0; h < size; ++h)
00097     {
00098       ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE;
00099       ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
00100     }
00101 #else
00102   
00103   ACE_NEW_RETURN (this->event_handlers_,
00104                   ACE_Event_Handler *[size],
00105                   -1);
00106 
00107   
00108   for (size_t h = 0; h < size; ++h)
00109     ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
00110 #endif 
00111 
00112   
00113   
00114   return ACE::set_handle_limit (static_cast<int> (size), 1);
00115 }
00116 
00117 
00118 
00119 ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
00120   : select_reactor_ (select_reactor),
00121     max_size_ (0),
00122     max_handlep1_ (0),
00123     event_handlers_ (0)
00124 {
00125   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
00126 }
00127 
00128 int
00129 ACE_Select_Reactor_Handler_Repository::unbind_all (void)
00130 {
00131   
00132   for (int slot = 0;
00133        slot < this->max_handlep1_;
00134        ++slot)
00135     this->unbind (ACE_SELECT_REACTOR_HANDLE (slot),
00136                   ACE_Event_Handler::ALL_EVENTS_MASK);
00137 
00138   return 0;
00139 }
00140 
00141 int
00142 ACE_Select_Reactor_Handler_Repository::close (void)
00143 {
00144   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
00145 
00146   if (this->event_handlers_ != 0)
00147     {
00148       this->unbind_all ();
00149 
00150       delete [] this->event_handlers_;
00151       this->event_handlers_ = 0;
00152     }
00153   return 0;
00154 }
00155 
00156 
00157 
00158 ACE_Event_Handler *
00159 ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle,
00160                                              size_t *index_p)
00161 {
00162   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find");
00163 
00164   ACE_Event_Handler *eh = 0;
00165   ssize_t i;
00166 
00167   
00168   if (this->handle_in_range (handle))
00169     {
00170 #if defined (ACE_WIN32)
00171       i = 0;
00172 
00173       for (; i < this->max_handlep1_; ++i)
00174         if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
00175           {
00176             eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i);
00177             break;
00178           }
00179 #else
00180       i = handle;
00181 
00182       eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle);
00183 #endif 
00184     }
00185   else
00186     
00187     
00188     i = 0;
00189 
00190   if (eh != 0)
00191     {
00192       if (index_p != 0)
00193         *index_p = i;
00194     }
00195   else
00196     errno = ENOENT;
00197 
00198   return eh;
00199 }
00200 
00201 
00202 
00203 int
00204 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
00205                                              ACE_Event_Handler *event_handler,
00206                                              ACE_Reactor_Mask mask)
00207 {
00208   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
00209 
00210   if (handle == ACE_INVALID_HANDLE)
00211     handle = event_handler->get_handle ();
00212 
00213   if (this->invalid_handle (handle))
00214     return -1;
00215 
00216   
00217   int existing_handle = 0;
00218 
00219 #if defined (ACE_WIN32)
00220 
00221   ssize_t assigned_slot = -1;
00222 
00223   for (ssize_t i = 0; i < this->max_handlep1_; ++i)
00224     {
00225       
00226       if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
00227         {
00228           
00229           if (ACE_SELECT_REACTOR_EVENT_HANDLER (this, i) !=
00230               event_handler)
00231             return -1;
00232 
00233           
00234           assigned_slot = i;
00235 
00236           
00237           
00238           existing_handle = 1;
00239 
00240           
00241           break;
00242         }
00243       else
00244         
00245         if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE &&
00246             assigned_slot == -1)
00247           {
00248             assigned_slot = i;
00249           }
00250     }
00251 
00252   if (assigned_slot > -1)
00253     
00254     {
00255       ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle;
00256       ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler;
00257     }
00258   else if (this->max_handlep1_ < this->max_size_)
00259     {
00260       
00261       ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle;
00262       ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler;
00263       ++this->max_handlep1_;
00264     }
00265   else
00266     {
00267       
00268       errno = ENOMEM;
00269       return -1;
00270     }
00271 
00272 #else
00273 
00274   
00275   ACE_Event_Handler *current_handler =
00276     ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle);
00277 
00278   if (current_handler)
00279     {
00280       
00281       if (current_handler != event_handler)
00282         return -1;
00283 
00284       
00285       
00286       existing_handle = 1;
00287     }
00288 
00289   ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler;
00290 
00291   if (this->max_handlep1_ < handle + 1)
00292     this->max_handlep1_ = handle + 1;
00293 
00294 #endif 
00295 
00296   if (this->select_reactor_.is_suspended_i (handle))
00297     {
00298       this->select_reactor_.bit_ops (handle,
00299                                      mask,
00300                                      this->select_reactor_.suspend_set_,
00301                                      ACE_Reactor::ADD_MASK);
00302     }
00303   else
00304     {
00305       this->select_reactor_.bit_ops (handle,
00306                                      mask,
00307                                      this->select_reactor_.wait_set_,
00308                                      ACE_Reactor::ADD_MASK);
00309 
00310       
00311       
00312       
00313       
00314     }
00315 
00316   
00317   if (!existing_handle)
00318     event_handler->add_reference ();
00319 
00320   return 0;
00321 }
00322 
00323 
00324 
00325 int
00326 ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
00327                                                ACE_Reactor_Mask mask)
00328 {
00329   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
00330 
00331   size_t slot = 0;
00332   ACE_Event_Handler *event_handler = this->find (handle, &slot);
00333 
00334   if (event_handler == 0)
00335     return -1;
00336 
00337   
00338   this->select_reactor_.bit_ops (handle,
00339                                  mask,
00340                                  this->select_reactor_.wait_set_,
00341                                  ACE_Reactor::CLR_MASK);
00342 
00343   
00344   this->select_reactor_.bit_ops (handle,
00345                                  mask,
00346                                  this->select_reactor_.suspend_set_,
00347                                  ACE_Reactor::CLR_MASK);
00348 
00349   
00350   
00351   
00352   
00353 
00354   
00355   
00356 
00357   int has_any_wait_mask =
00358     (this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
00359      || this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
00360      || this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
00361   int has_any_suspend_mask =
00362     (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
00363      || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
00364      || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
00365 
00366   int complete_removal = 0;
00367 
00368   if (!has_any_wait_mask && !has_any_suspend_mask)
00369     {
00370       
00371       complete_removal = 1;
00372 
00373       ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0;
00374 
00375 #if defined (ACE_WIN32)
00376 
00377       ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE;
00378 
00379       if (this->max_handlep1_ == (int) slot + 1)
00380         {
00381           
00382           
00383           
00384           
00385 
00386           int i;
00387 
00388           for (i = this->max_handlep1_ - 1;
00389                i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE;
00390                --i)
00391             continue;
00392 
00393           this->max_handlep1_ = i + 1;
00394         }
00395 
00396 #else
00397 
00398       if (this->max_handlep1_ == handle + 1)
00399         {
00400           
00401           
00402           
00403           ACE_HANDLE wait_rd_max =
00404             this->select_reactor_.wait_set_.rd_mask_.max_set ();
00405           ACE_HANDLE wait_wr_max =
00406             this->select_reactor_.wait_set_.wr_mask_.max_set ();
00407           ACE_HANDLE wait_ex_max =
00408             this->select_reactor_.wait_set_.ex_mask_.max_set ();
00409 
00410           ACE_HANDLE suspend_rd_max =
00411             this->select_reactor_.suspend_set_.rd_mask_.max_set ();
00412           ACE_HANDLE suspend_wr_max =
00413             this->select_reactor_.suspend_set_.wr_mask_.max_set ();
00414           ACE_HANDLE suspend_ex_max =
00415             this->select_reactor_.suspend_set_.ex_mask_.max_set ();
00416 
00417           
00418           this->max_handlep1_ = wait_rd_max;
00419           if (this->max_handlep1_ < wait_wr_max)
00420             this->max_handlep1_ = wait_wr_max;
00421           if (this->max_handlep1_ < wait_ex_max)
00422             this->max_handlep1_ = wait_ex_max;
00423 
00424           if (this->max_handlep1_ < suspend_rd_max)
00425             this->max_handlep1_ = suspend_rd_max;
00426           if (this->max_handlep1_ < suspend_wr_max)
00427             this->max_handlep1_ = suspend_wr_max;
00428           if (this->max_handlep1_ < suspend_ex_max)
00429             this->max_handlep1_ = suspend_ex_max;
00430 
00431           ++this->max_handlep1_;
00432         }
00433 
00434 #endif 
00435 
00436     }
00437 
00438   int requires_reference_counting =
00439     event_handler->reference_counting_policy ().value () ==
00440     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00441 
00442   
00443   
00444   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
00445     event_handler->handle_close (handle, mask);
00446 
00447   
00448   
00449   if (complete_removal && requires_reference_counting)
00450     {
00451       event_handler->remove_reference ();
00452     }
00453 
00454   return 0;
00455 }
00456 
00457 ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
00458   (const ACE_Select_Reactor_Handler_Repository *s)
00459     : rep_ (s),
00460       current_ (-1)
00461 {
00462   this->advance ();
00463 }
00464 
00465 
00466 
00467 
00468 int
00469 ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item)
00470 {
00471   int result = 1;
00472 
00473   if (this->current_ >= this->rep_->max_handlep1_)
00474     result = 0;
00475   else
00476     next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_,
00477                                                   this->current_);
00478   return result;
00479 }
00480 
00481 int
00482 ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const
00483 {
00484   return this->current_ >= this->rep_->max_handlep1_;
00485 }
00486 
00487 
00488 
00489 int
00490 ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
00491 {
00492   if (this->current_ < this->rep_->max_handlep1_)
00493     ++this->current_;
00494 
00495   while (this->current_ < this->rep_->max_handlep1_)
00496     if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0)
00497       return 1;
00498     else
00499       ++this->current_;
00500 
00501   return this->current_ < this->rep_->max_handlep1_;
00502 }
00503 
00504 
00505 
00506 void
00507 ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
00508 {
00509 #if defined (ACE_HAS_DUMP)
00510   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
00511 
00512   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00513   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("rep_ = %u"), this->rep_));
00514   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("current_ = %d"), this->current_));
00515   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00516 #endif 
00517 }
00518 
00519 void
00520 ACE_Select_Reactor_Handler_Repository::dump (void) const
00521 {
00522 #if defined (ACE_HAS_DUMP)
00523   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
00524 
00525   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00526   ACE_DEBUG ((LM_DEBUG,
00527               ACE_LIB_TEXT ("max_handlep1_ = %d, max_size_ = %d\n"),
00528               this->max_handlep1_, this->max_size_));
00529   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("[")));
00530 
00531   ACE_Event_Handler *event_handler = 0;
00532 
00533   for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
00534        iter.next (event_handler) != 0;
00535        iter.advance ())
00536     ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (event_handler = %x, event_handler->handle_ = %d)\n"),
00537                 event_handler, event_handler->get_handle ()));
00538 
00539   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]\n")));
00540   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00541 #endif 
00542 }
00543 
00544 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
00545 
00546 ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
00547   : max_notify_iterations_ (-1)
00548 {
00549 }
00550 
00551 ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void)
00552 {
00553 }
00554 
00555 void
00556 ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
00557 {
00558   
00559   if (iterations == 0)
00560     iterations = 1;
00561 
00562   this->max_notify_iterations_ = iterations;
00563 }
00564 
00565 int
00566 ACE_Select_Reactor_Notify::max_notify_iterations (void)
00567 {
00568   return this->max_notify_iterations_;
00569 }
00570 
00571 
00572 
00573 
00574 
00575 
00576 
00577 int
00578 ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
00579                                                         ACE_Reactor_Mask  mask )
00580 {
00581   ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00582 
00583 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00584 
00585   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00586 
00587   if (this->notify_queue_.is_empty ())
00588     return 0;
00589 
00590   ACE_Notification_Buffer *temp = 0;
00591   ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
00592 
00593   size_t queue_size = this->notify_queue_.size ();
00594   int number_purged = 0;
00595   size_t i;
00596   for (i = 0; i < queue_size; ++i)
00597     {
00598       if (-1 == this->notify_queue_.dequeue_head (temp))
00599         ACE_ERROR_RETURN ((LM_ERROR,
00600                            ACE_LIB_TEXT ("%p\n"),
00601                            ACE_LIB_TEXT ("dequeue_head")),
00602                           -1);
00603 
00604       
00605       
00606       
00607       
00608       if ((0 != temp->eh_) &&
00609           (0 == eh || eh == temp->eh_) &&
00610           ACE_BIT_DISABLED (temp->mask_, ~mask)) 
00611                                                  
00612                                                  
00613       {
00614         if (-1 == this->free_queue_.enqueue_head (temp))
00615           ACE_ERROR_RETURN ((LM_ERROR,
00616                              ACE_LIB_TEXT ("%p\n"),
00617                              ACE_LIB_TEXT ("enqueue_head")),
00618                             -1);
00619 
00620         ACE_Event_Handler *event_handler = temp->eh_;
00621         event_handler->remove_reference ();
00622 
00623         ++number_purged;
00624       }
00625       else
00626       {
00627         
00628         
00629         
00630         
00631         if ((0 != temp->eh_) &&
00632             (0 == eh || eh == temp->eh_))
00633           ACE_CLR_BITS(temp->mask_, mask);
00634         if (-1 == local_queue.enqueue_head (temp))
00635           return -1;
00636       }
00637     }
00638 
00639   if (this->notify_queue_.size ())
00640     { 
00641       ACE_ASSERT (0);
00642       return -1;
00643     }
00644 
00645   
00646   queue_size = local_queue.size ();
00647   for (i = 0; i < queue_size; ++i)
00648     {
00649       if (-1 == local_queue.dequeue_head (temp))
00650         ACE_ERROR_RETURN ((LM_ERROR,
00651                            ACE_LIB_TEXT ("%p\n"),
00652                            ACE_LIB_TEXT ("dequeue_head")),
00653                           -1);
00654 
00655       if (-1 == this->notify_queue_.enqueue_head (temp))
00656         ACE_ERROR_RETURN ((LM_ERROR,
00657                            ACE_LIB_TEXT ("%p\n"),
00658                            ACE_LIB_TEXT ("enqueue_head")),
00659                           -1);
00660     }
00661 
00662   return number_purged;
00663 
00664 #else 
00665   ACE_UNUSED_ARG (eh);
00666   ACE_UNUSED_ARG (mask);
00667   ACE_NOTSUP_RETURN (-1);
00668 #endif  
00669 }
00670 
00671 void
00672 ACE_Select_Reactor_Notify::dump (void) const
00673 {
00674 #if defined (ACE_HAS_DUMP)
00675   ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00676 
00677   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00678   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00679   this->notification_pipe_.dump ();
00680   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00681 #endif 
00682 }
00683 
00684 int
00685 ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
00686                                  ACE_Timer_Queue *,
00687                                  int disable_notify_pipe)
00688 {
00689   ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00690 
00691   if (disable_notify_pipe == 0)
00692     {
00693       this->select_reactor_ =
00694         dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00695 
00696       if (select_reactor_ == 0)
00697         {
00698           errno = EINVAL;
00699           return -1;
00700         }
00701 
00702       if (this->notification_pipe_.open () == -1)
00703         return -1;
00704 #if defined (F_SETFD)
00705       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00706       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00707 #endif 
00708 
00709 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00710       ACE_Notification_Buffer *temp = 0;
00711 
00712       ACE_NEW_RETURN (temp,
00713                       ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00714                       -1);
00715 
00716       if (this->alloc_queue_.enqueue_head (temp) == -1)
00717         {
00718           delete [] temp;
00719           return -1;
00720         }
00721 
00722       for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00723         if (free_queue_.enqueue_head (temp + i) == -1)
00724           return -1;
00725 
00726 #endif 
00727 
00728       
00729       
00730       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00731                           ACE_NONBLOCK) == -1)
00732         return -1;
00733       else
00734         return this->select_reactor_->register_handler
00735           (this->notification_pipe_.read_handle (),
00736            this,
00737            ACE_Event_Handler::READ_MASK);
00738     }
00739   else
00740     {
00741       this->select_reactor_ = 0;
00742       return 0;
00743     }
00744 }
00745 
00746 int
00747 ACE_Select_Reactor_Notify::close (void)
00748 {
00749   ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00750 
00751 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00752   
00753   ACE_Notification_Buffer **b = 0;
00754 
00755   for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
00756        alloc_iter.next (b) != 0;
00757        alloc_iter.advance ())
00758     {
00759       delete [] *b;
00760       *b = 0;
00761     }
00762 
00763   this->alloc_queue_.reset ();
00764   this->notify_queue_.reset ();
00765   this->free_queue_.reset ();
00766 #endif 
00767 
00768   return this->notification_pipe_.close ();
00769 }
00770 
00771 int
00772 ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
00773                                    ACE_Reactor_Mask mask,
00774                                    ACE_Time_Value *timeout)
00775 {
00776   ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00777 
00778   
00779   
00780   if (this->select_reactor_ == 0)
00781     return 0;
00782 
00783   ACE_Event_Handler_var safe_handler (event_handler);
00784 
00785   if (event_handler)
00786     event_handler->add_reference ();
00787 
00788   ACE_Notification_Buffer buffer (event_handler, mask);
00789 
00790 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00791   
00792   {
00793     bool notification_required = false;
00794 
00795     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00796 
00797     
00798     if (this->notify_queue_.is_empty ())
00799       notification_required = true;
00800 
00801     ACE_Notification_Buffer *temp = 0;
00802 
00803     if (free_queue_.dequeue_head (temp) == -1)
00804       {
00805         
00806         ACE_Notification_Buffer *temp1 = 0;
00807 
00808         ACE_NEW_RETURN (temp1,
00809                         ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00810                         -1);
00811 
00812         if (this->alloc_queue_.enqueue_head (temp1) == -1)
00813           {
00814             delete [] temp1;
00815             return -1;
00816           }
00817 
00818         
00819         
00820         
00821         for (size_t i = 1;
00822              i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
00823              ++i)
00824           this->free_queue_.enqueue_head (temp1 + i);
00825 
00826         temp = temp1;
00827       }
00828 
00829     ACE_ASSERT (temp != 0);
00830     *temp = buffer;
00831 
00832     if (notify_queue_.enqueue_tail (temp) == -1)
00833       return -1;
00834 
00835     if (!notification_required)
00836       {
00837         
00838         safe_handler.release ();
00839 
00840         return 0;
00841       }
00842   }
00843 #endif 
00844 
00845   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00846                          (char *) &buffer,
00847                          sizeof buffer,
00848                          timeout);
00849   if (n == -1)
00850     return -1;
00851 
00852   
00853   safe_handler.release ();
00854 
00855   return 0;
00856 }
00857 
00858 
00859 
00860 
00861 int
00862 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
00863                                                    ACE_Handle_Set &rd_mask)
00864 {
00865   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00866 
00867   ACE_HANDLE read_handle =
00868     this->notification_pipe_.read_handle ();
00869 
00870   if (read_handle != ACE_INVALID_HANDLE
00871       && rd_mask.is_set (read_handle))
00872     {
00873       --number_of_active_handles;
00874       rd_mask.clr_bit (read_handle);
00875       return this->handle_input (read_handle);
00876     }
00877   else
00878     return 0;
00879 }
00880 
00881 
00882 ACE_HANDLE
00883 ACE_Select_Reactor_Notify::notify_handle (void)
00884 {
00885   ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00886 
00887   return this->notification_pipe_.read_handle ();
00888 }
00889 
00890 
00891 int
00892 ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
00893 {
00894 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00895   ACE_UNUSED_ARG(buffer);
00896   return 1;
00897 #else
00898   
00899   
00900   
00901   
00902   
00903   if (buffer.eh_ != 0)
00904     return 1;
00905 
00906 #endif 
00907 
00908   
00909   return 0;
00910 }
00911 
00912 int
00913 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00914 {
00915   int result = 0;
00916 
00917 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00918   
00919   
00920   
00921   {
00922     
00923     
00924     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00925 
00926     ACE_Notification_Buffer *temp = 0;
00927 
00928     if (notify_queue_.is_empty ())
00929       return 0;
00930     else if (notify_queue_.dequeue_head (temp) == -1)
00931       ACE_ERROR_RETURN ((LM_ERROR,
00932                          ACE_LIB_TEXT ("%p\n"),
00933                          ACE_LIB_TEXT ("dequeue_head")),
00934                         -1);
00935     buffer = *temp;
00936     if (free_queue_.enqueue_head (temp) == -1)
00937       ACE_ERROR_RETURN ((LM_ERROR,
00938                          ACE_LIB_TEXT ("%p\n"),
00939                          ACE_LIB_TEXT ("enqueue_head")),
00940                         -1);
00941 
00942     bool write_next_buffer = false;
00943     ACE_Notification_Buffer ** next = 0;
00944 
00945     if(!this->notify_queue_.is_empty())
00946       {
00947         
00948         this->notify_queue_.get (next, 0);
00949         write_next_buffer = true;
00950       }
00951 
00952     if(write_next_buffer)
00953       {
00954         (void) ACE::send(
00955                          this->notification_pipe_.write_handle(),
00956             (char *)*next, sizeof(ACE_Notification_Buffer));
00957       }
00958   }
00959 
00960 #endif 
00961 
00962   
00963   
00964   
00965   
00966   
00967   if (buffer.eh_ != 0)
00968     {
00969       ACE_Event_Handler *event_handler =
00970         buffer.eh_;
00971 
00972       int requires_reference_counting =
00973         event_handler->reference_counting_policy ().value () ==
00974         ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00975 
00976       switch (buffer.mask_)
00977         {
00978         case ACE_Event_Handler::READ_MASK:
00979         case ACE_Event_Handler::ACCEPT_MASK:
00980           result = event_handler->handle_input (ACE_INVALID_HANDLE);
00981           break;
00982         case ACE_Event_Handler::WRITE_MASK:
00983           result = event_handler->handle_output (ACE_INVALID_HANDLE);
00984           break;
00985         case ACE_Event_Handler::EXCEPT_MASK:
00986           result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00987           break;
00988         case ACE_Event_Handler::QOS_MASK:
00989           result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00990           break;
00991         case ACE_Event_Handler::GROUP_QOS_MASK:
00992           result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00993           break;
00994         default:
00995           
00996           ACE_ERROR ((LM_ERROR,
00997                       ACE_LIB_TEXT ("invalid mask = %d\n"),
00998                       buffer.mask_));
00999         }
01000 
01001       if (result == -1)
01002         event_handler->handle_close (ACE_INVALID_HANDLE,
01003                                      ACE_Event_Handler::EXCEPT_MASK);
01004 
01005       if (requires_reference_counting)
01006         {
01007           event_handler->remove_reference ();
01008         }
01009     }
01010 
01011   return 1;
01012 }
01013 
01014 int
01015 ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
01016                                              ACE_Notification_Buffer &buffer)
01017 {
01018   ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
01019 
01020   ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
01021 
01022   if (n > 0)
01023     {
01024       
01025       if (n != sizeof buffer)
01026         {
01027           ssize_t remainder = sizeof buffer - n;
01028 
01029           
01030           
01031           
01032           
01033           if (ACE::recv (handle,
01034                          ((char *) &buffer) + n,
01035                          remainder) != remainder)
01036             return -1;
01037         }
01038 
01039 
01040       return 1;
01041     }
01042 
01043   
01044   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
01045     return -1;
01046 
01047   return 0;
01048 }
01049 
01050 
01051 int
01052 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
01053 {
01054   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
01055   
01056   
01057 
01058   int number_dispatched = 0;
01059   int result = 0;
01060   ACE_Notification_Buffer buffer;
01061 
01062   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
01063     {
01064       
01065       
01066       if (this->dispatch_notify (buffer) > 0)
01067         ++number_dispatched;
01068 
01069       
01070       
01071       
01072       if (number_dispatched == this->max_notify_iterations_)
01073         break;
01074     }
01075 
01076   
01077   
01078   if (result < 0)
01079     number_dispatched = -1;
01080 
01081   
01082   
01083   
01084   
01085   this->select_reactor_->renew ();
01086   return number_dispatched;
01087 }
01088 
01089 
01090 
01091 int
01092 ACE_Select_Reactor_Impl::purge_pending_notifications (ACE_Event_Handler *eh,
01093                                                       ACE_Reactor_Mask mask)
01094 {
01095   if (this->notify_handler_ == 0)
01096     return 0;
01097   else
01098     return this->notify_handler_->purge_pending_notifications (eh, mask);
01099 }
01100 
01101 
01102 
01103 
01104 
01105 
01106 
01107 
01108 
01109 
01110 
01111 
01112 int
01113 ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
01114                                   ACE_Reactor_Mask mask,
01115                                   ACE_Select_Reactor_Handle_Set &handle_set,
01116                                   int ops)
01117 {
01118   ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops");
01119   if (this->handler_rep_.handle_in_range (handle) == 0)
01120     return -1;
01121 
01122 #if !defined (ACE_WIN32)
01123   ACE_Sig_Guard sb (0,
01124                     this->mask_signals_); 
01125 #endif 
01126 
01127   ACE_FDS_PTMF ptmf  = &ACE_Handle_Set::set_bit;
01128   u_long omask = ACE_Event_Handler::NULL_MASK;
01129 
01130   
01131   
01132   if (handle_set.rd_mask_.is_set (handle))
01133     ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
01134   if (handle_set.wr_mask_.is_set (handle))
01135     ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
01136   if (handle_set.ex_mask_.is_set (handle))
01137     ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
01138 
01139   switch (ops)
01140     {
01141     case ACE_Reactor::GET_MASK:
01142       
01143       
01144       break;
01145     case ACE_Reactor::CLR_MASK:
01146       ptmf = &ACE_Handle_Set::clr_bit;
01147       
01148       
01149       
01150       
01151       this->clear_dispatch_mask (handle, mask);
01152       
01153     case ACE_Reactor::SET_MASK:
01154       
01155     case ACE_Reactor::ADD_MASK:
01156 
01157       
01158       
01159       
01160       
01161       
01162       
01163 
01164       
01165       
01166       if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
01167           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
01168           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
01169         {
01170           (handle_set.rd_mask_.*ptmf) (handle);
01171         }
01172       else if (ops == ACE_Reactor::SET_MASK)
01173         handle_set.rd_mask_.clr_bit (handle);
01174 
01175       
01176       if (ACE_BIT_ENABLED (mask,
01177                            ACE_Event_Handler::WRITE_MASK)
01178           || ACE_BIT_ENABLED (mask,
01179                               ACE_Event_Handler::CONNECT_MASK))
01180         {
01181           (handle_set.wr_mask_.*ptmf) (handle);
01182         }
01183       else if (ops == ACE_Reactor::SET_MASK)
01184         handle_set.wr_mask_.clr_bit (handle);
01185 
01186       
01187       
01188       if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
01189 #if defined (ACE_WIN32)
01190           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
01191 #endif 
01192           )
01193         {
01194           (handle_set.ex_mask_.*ptmf) (handle);
01195         }
01196       else if (ops == ACE_Reactor::SET_MASK)
01197         handle_set.ex_mask_.clr_bit (handle);
01198       break;
01199     default:
01200       return -1;
01201     }
01202   return omask;
01203 }
01204 
01205 void
01206 ACE_Select_Reactor_Impl::clear_dispatch_mask (ACE_HANDLE handle,
01207                                               ACE_Reactor_Mask mask)
01208 {
01209   ACE_TRACE ("ACE_Select_Reactor_Impl::clear_dispatch_mask");
01210 
01211   
01212   
01213   
01214   
01215   
01216   
01217   
01218   
01219   
01220   
01221   
01222 
01223   
01224   
01225   
01226 
01227   
01228   
01229   
01230   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) ||
01231       ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
01232     {
01233       this->dispatch_set_.rd_mask_.clr_bit (handle);
01234     }
01235   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
01236     {
01237       this->dispatch_set_.wr_mask_.clr_bit (handle);
01238     }
01239   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
01240     {
01241       this->dispatch_set_.ex_mask_.clr_bit (handle);
01242     }
01243 
01244   
01245   
01246   this->state_changed_ = true;
01247 }
01248 
01249 
01250 int
01251 ACE_Select_Reactor_Impl::resumable_handler (void)
01252 {
01253   
01254   
01255 
01256   return 0;
01257 }
01258 
01259 ACE_END_VERSIONED_NAMESPACE_DECL