WFMO_Reactor.cpp

Go to the documentation of this file.
00001 // $Id: WFMO_Reactor.cpp 79134 2007-07-31 18:23:50Z johnnyw $
00002 
00003 #include "ace/WFMO_Reactor.h"
00004 
00005 #if defined (ACE_WIN32)
00006 
00007 #include "ace/Handle_Set.h"
00008 #include "ace/Timer_Heap.h"
00009 #include "ace/Thread.h"
00010 #include "ace/OS_NS_errno.h"
00011 #include "ace/Null_Condition.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/WFMO_Reactor.inl"
00015 #endif /* __ACE_INLINE__ */
00016 
00017 ACE_RCSID(ace, WFMO_Reactor, "$Id: WFMO_Reactor.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00018 
00019 #include "ace/Auto_Ptr.h"
00020 
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor &wfmo_reactor)
00024   : wfmo_reactor_ (wfmo_reactor)
00025 {
00026 }
00027 
00028 int
00029 ACE_WFMO_Reactor_Handler_Repository::open (size_t size)
00030 {
00031   if (size > MAXIMUM_WAIT_OBJECTS)
00032     ACE_ERROR_RETURN ((LM_ERROR,
00033                        ACE_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
00034                        size,
00035                        MAXIMUM_WAIT_OBJECTS),
00036                       -1);
00037 
00038   // Dynamic allocation
00039   ACE_NEW_RETURN (this->current_handles_,
00040                   ACE_HANDLE[size],
00041                   -1);
00042   ACE_NEW_RETURN (this->current_info_,
00043                   Current_Info[size],
00044                   -1);
00045   ACE_NEW_RETURN (this->current_suspended_info_,
00046                   Suspended_Info[size],
00047                   -1);
00048   ACE_NEW_RETURN (this->to_be_added_info_,
00049                   To_Be_Added_Info[size],
00050                   -1);
00051 
00052   // Initialization
00053   this->max_size_ = size;
00054   this->max_handlep1_ = 0;
00055   this->suspended_handles_ = 0;
00056   this->handles_to_be_added_ = 0;
00057   this->handles_to_be_deleted_ = 0;
00058   this->handles_to_be_suspended_ = 0;
00059   this->handles_to_be_resumed_ = 0;
00060 
00061   for (size_t i = 0; i < size; ++i)
00062     this->current_handles_[i] = ACE_INVALID_HANDLE;
00063 
00064   return 0;
00065 }
00066 
00067 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void)
00068 {
00069   // Free up dynamically allocated space
00070   delete [] this->current_handles_;
00071   delete [] this->current_info_;
00072   delete [] this->current_suspended_info_;
00073   delete [] this->to_be_added_info_;
00074 }
00075 
00076 ACE_Reactor_Mask
00077 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks,
00078                                               ACE_Reactor_Mask change_masks,
00079                                               int operation)
00080 {
00081   // Find the old reactor masks.  This automatically does the work of
00082   // the GET_MASK operation.
00083 
00084   ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK;
00085 
00086   if (ACE_BIT_ENABLED (existing_masks, FD_READ)
00087       || ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
00088     ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK);
00089 
00090   if (ACE_BIT_ENABLED (existing_masks, FD_WRITE))
00091     ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK);
00092 
00093   if (ACE_BIT_ENABLED (existing_masks, FD_OOB))
00094     ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK);
00095 
00096   if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
00097     ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK);
00098 
00099   if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
00100     ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK);
00101 
00102   if (ACE_BIT_ENABLED (existing_masks, FD_QOS))
00103     ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK);
00104 
00105   if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
00106     ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK);
00107 
00108   switch (operation)
00109     {
00110     case ACE_Reactor::CLR_MASK:
00111       // For the CLR_MASK operation, clear only the specific masks.
00112 
00113       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00114         {
00115           ACE_CLR_BITS (existing_masks, FD_READ);
00116           ACE_CLR_BITS (existing_masks, FD_CLOSE);
00117         }
00118 
00119       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00120         ACE_CLR_BITS (existing_masks, FD_WRITE);
00121 
00122       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00123         ACE_CLR_BITS (existing_masks, FD_OOB);
00124 
00125       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00126         ACE_CLR_BITS (existing_masks, FD_ACCEPT);
00127 
00128       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00129         ACE_CLR_BITS (existing_masks, FD_CONNECT);
00130 
00131       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00132         ACE_CLR_BITS (existing_masks, FD_QOS);
00133 
00134       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00135         ACE_CLR_BITS (existing_masks, FD_GROUP_QOS);
00136 
00137       break;
00138 
00139     case ACE_Reactor::SET_MASK:
00140       // If the operation is a set, first reset any existing masks
00141 
00142       existing_masks = 0;
00143       /* FALLTHRU */
00144 
00145     case ACE_Reactor::ADD_MASK:
00146       // For the ADD_MASK and the SET_MASK operation, add only the
00147       // specific masks.
00148 
00149       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00150         {
00151           ACE_SET_BITS (existing_masks, FD_READ);
00152           ACE_SET_BITS (existing_masks, FD_CLOSE);
00153         }
00154 
00155       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00156         ACE_SET_BITS (existing_masks, FD_WRITE);
00157 
00158       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00159         ACE_SET_BITS (existing_masks, FD_OOB);
00160 
00161       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00162         ACE_SET_BITS (existing_masks, FD_ACCEPT);
00163 
00164       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00165         ACE_SET_BITS (existing_masks, FD_CONNECT);
00166 
00167       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00168         ACE_SET_BITS (existing_masks, FD_QOS);
00169 
00170       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00171         ACE_SET_BITS (existing_masks, FD_GROUP_QOS);
00172 
00173       break;
00174 
00175     case ACE_Reactor::GET_MASK:
00176 
00177       // The work for this operation is done in all cases at the
00178       // begining of the function.
00179 
00180       ACE_UNUSED_ARG (change_masks);
00181 
00182       break;
00183     }
00184 
00185   return old_masks;
00186 }
00187 
00188 int
00189 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle,
00190                                                ACE_Reactor_Mask mask,
00191                                                int &changes_required)
00192 {
00193   int error = 0;
00194 
00195   // Remember this value; only if it changes do we need to wakeup
00196   // the other threads
00197   size_t const original_handle_count = this->handles_to_be_deleted_;
00198   int result = 0;
00199   size_t i;
00200 
00201   // Go through all the handles looking for <handle>.  Even if we find
00202   // it, we continue through the rest of the list since <handle> could
00203   // appear multiple times. All handles are checked.
00204 
00205   // First check the current entries
00206   for (i = 0; i < this->max_handlep1_ && error == 0; ++i)
00207     // Since the handle can either be the event or the I/O handle,
00208     // we have to check both
00209     if ((this->current_handles_[i] == handle
00210          || this->current_info_[i].io_handle_ == handle)
00211         && // Make sure that it is not already marked for deleted
00212         !this->current_info_[i].delete_entry_)
00213       {
00214         result = this->remove_handler_i (i,
00215                                          mask);
00216         if (result == -1)
00217           error = 1;
00218       }
00219 
00220   // Then check the suspended entries
00221   for (i = 0; i < this->suspended_handles_ && error == 0; ++i)
00222     // Since the handle can either be the event or the I/O handle, we
00223     // have to check both
00224     if ((this->current_suspended_info_[i].io_handle_ == handle
00225          || this->current_suspended_info_[i].event_handle_ == handle)
00226         &&
00227         // Make sure that it is not already marked for deleted
00228         !this->current_suspended_info_[i].delete_entry_)
00229       {
00230         result = this->remove_suspended_handler_i (i,
00231                                                    mask);
00232         if (result == -1)
00233           error = 1;
00234       }
00235 
00236   // Then check the to_be_added entries
00237   for (i = 0; i < this->handles_to_be_added_ && error == 0; ++i)
00238     // Since the handle can either be the event or the I/O handle,
00239     // we have to check both
00240     if ((this->to_be_added_info_[i].io_handle_ == handle
00241          || this->to_be_added_info_[i].event_handle_ == handle)
00242         &&
00243         // Make sure that it is not already marked for deleted
00244         !this->to_be_added_info_[i].delete_entry_)
00245       {
00246         result = this->remove_to_be_added_handler_i (i,
00247                                                      mask);
00248         if (result == -1)
00249           error = 1;
00250       }
00251 
00252   // Only if the number of handlers to be deleted changes do we need
00253   // to wakeup the other threads
00254   if (original_handle_count < this->handles_to_be_deleted_)
00255     changes_required = 1;
00256 
00257   return error ? -1 : 0;
00258 }
00259 
00260 int
00261 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
00262                                                        ACE_Reactor_Mask to_be_removed_masks)
00263 {
00264   // I/O entries
00265   if (this->current_info_[slot].io_entry_)
00266     {
00267       // See if there are other events that the <Event_Handler> is
00268       // interested in
00269       this->bit_ops (this->current_info_[slot].network_events_,
00270                      to_be_removed_masks,
00271                      ACE_Reactor::CLR_MASK);
00272 
00273       // Disassociate/Reassociate the event from/with the I/O handle.
00274       // This will depend on the value of remaining set of network
00275       // events that the <event_handler> is interested in. I don't
00276       // think we can do anything about errors here, so I will not
00277       // check this.
00278       ::WSAEventSelect ((SOCKET) this->current_info_[slot].io_handle_,
00279                         this->current_handles_[slot],
00280                         this->current_info_[slot].network_events_);
00281     }
00282   // Normal event entries.
00283   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00284     // Preserve DONT_CALL
00285     to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00286   else
00287     // Make sure that the <to_be_removed_masks> is the NULL_MASK
00288     to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00289 
00290   // If this event was marked for suspension, undo the suspension flag
00291   // and reduce the to be suspended count.
00292   if (this->current_info_[slot].suspend_entry_)
00293     {
00294       // Undo suspension
00295       this->current_info_[slot].suspend_entry_ = 0;
00296       // Decrement the handle count
00297       --this->handles_to_be_suspended_;
00298     }
00299 
00300   // If there are no more events that the <Event_Handler> is
00301   // interested in, or this is a non-I/O entry, schedule the
00302   // <Event_Handler> for removal
00303   if (this->current_info_[slot].network_events_ == 0)
00304     {
00305       // Mark to be deleted
00306       this->current_info_[slot].delete_entry_ = 1;
00307       // Remember the mask
00308       this->current_info_[slot].close_masks_ = to_be_removed_masks;
00309       // Increment the handle count
00310       ++this->handles_to_be_deleted_;
00311     }
00312 
00313   // Since it is not a complete removal, we'll call handle_close
00314   // for all the masks that were removed.  This does not change
00315   // the internal state of the reactor.
00316   //
00317   // Note: this condition only applies to I/O entries
00318   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00319     {
00320       ACE_HANDLE handle = this->current_info_[slot].io_handle_;
00321       this->current_info_[slot].event_handler_->handle_close (handle,
00322                                                               to_be_removed_masks);
00323     }
00324 
00325   return 0;
00326 }
00327 
00328 int
00329 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot,
00330                                                                  ACE_Reactor_Mask to_be_removed_masks)
00331 {
00332   // I/O entries
00333   if (this->current_suspended_info_[slot].io_entry_)
00334     {
00335       // See if there are other events that the <Event_Handler> is
00336       // interested in
00337       this->bit_ops (this->current_suspended_info_[slot].network_events_,
00338                      to_be_removed_masks,
00339                      ACE_Reactor::CLR_MASK);
00340 
00341       // Disassociate/Reassociate the event from/with the I/O handle.
00342       // This will depend on the value of remaining set of network
00343       // events that the <event_handler> is interested in. I don't
00344       // think we can do anything about errors here, so I will not
00345       // check this.
00346       ::WSAEventSelect ((SOCKET) this->current_suspended_info_[slot].io_handle_,
00347                         this->current_suspended_info_[slot].event_handle_,
00348                         this->current_suspended_info_[slot].network_events_);
00349     }
00350   // Normal event entries.
00351   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00352     // Preserve DONT_CALL
00353     to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00354   else
00355     // Make sure that the <to_be_removed_masks> is the NULL_MASK
00356     to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00357 
00358   // If this event was marked for resumption, undo the resumption flag
00359   // and reduce the to be resumed count.
00360   if (this->current_suspended_info_[slot].resume_entry_)
00361     {
00362       // Undo resumption
00363       this->current_suspended_info_[slot].resume_entry_ = 0;
00364       // Decrement the handle count
00365       --this->handles_to_be_resumed_;
00366     }
00367 
00368   // If there are no more events that the <Event_Handler> is
00369   // interested in, or this is a non-I/O entry, schedule the
00370   // <Event_Handler> for removal
00371   if (this->current_suspended_info_[slot].network_events_ == 0)
00372     {
00373       // Mark to be deleted
00374       this->current_suspended_info_[slot].delete_entry_ = 1;
00375       // Remember the mask
00376       this->current_suspended_info_[slot].close_masks_ = to_be_removed_masks;
00377       // Increment the handle count
00378       ++this->handles_to_be_deleted_;
00379     }
00380   // Since it is not a complete removal, we'll call handle_close for
00381   // all the masks that were removed.  This does not change the
00382   // internal state of the reactor.
00383   //
00384   // Note: this condition only applies to I/O entries
00385   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00386     {
00387       ACE_HANDLE handle = this->current_suspended_info_[slot].io_handle_;
00388       this->current_suspended_info_[slot].event_handler_->handle_close (handle,
00389                                                                         to_be_removed_masks);
00390     }
00391 
00392   return 0;
00393 }
00394 
00395 int
00396 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot,
00397                                                                    ACE_Reactor_Mask to_be_removed_masks)
00398 {
00399   // I/O entries
00400   if (this->to_be_added_info_[slot].io_entry_)
00401     {
00402       // See if there are other events that the <Event_Handler> is
00403       // interested in
00404       this->bit_ops (this->to_be_added_info_[slot].network_events_,
00405                      to_be_removed_masks,
00406                      ACE_Reactor::CLR_MASK);
00407 
00408       // Disassociate/Reassociate the event from/with the I/O handle.
00409       // This will depend on the value of remaining set of network
00410       // events that the <event_handler> is interested in. I don't
00411       // think we can do anything about errors here, so I will not
00412       // check this.
00413       ::WSAEventSelect ((SOCKET) this->to_be_added_info_[slot].io_handle_,
00414                         this->to_be_added_info_[slot].event_handle_,
00415                         this->to_be_added_info_[slot].network_events_);
00416     }
00417   // Normal event entries.
00418   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00419     // Preserve DONT_CALL
00420     to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00421   else
00422     // Make sure that the <to_be_removed_masks> is the NULL_MASK
00423     to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00424 
00425   // If this event was marked for suspension, undo the suspension flag
00426   // and reduce the to be suspended count.
00427   if (this->to_be_added_info_[slot].suspend_entry_)
00428     {
00429       // Undo suspension
00430       this->to_be_added_info_[slot].suspend_entry_ = 0;
00431       // Decrement the handle count
00432       --this->handles_to_be_suspended_;
00433     }
00434 
00435   // If there are no more events that the <Event_Handler> is
00436   // interested in, or this is a non-I/O entry, schedule the
00437   // <Event_Handler> for removal
00438   if (this->to_be_added_info_[slot].network_events_ == 0)
00439     {
00440       // Mark to be deleted
00441       this->to_be_added_info_[slot].delete_entry_ = 1;
00442       // Remember the mask
00443       this->to_be_added_info_[slot].close_masks_ = to_be_removed_masks;
00444       // Increment the handle count
00445       ++this->handles_to_be_deleted_;
00446     }
00447   // Since it is not a complete removal, we'll call handle_close
00448   // for all the masks that were removed.  This does not change
00449   // the internal state of the reactor.
00450   //
00451   // Note: this condition only applies to I/O entries
00452   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00453     {
00454       ACE_HANDLE handle = this->to_be_added_info_[slot].io_handle_;
00455       this->to_be_added_info_[slot].event_handler_->handle_close (handle,
00456                                                                   to_be_removed_masks);
00457     }
00458 
00459   return 0;
00460 }
00461 
00462 int
00463 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle,
00464                                                         int &changes_required)
00465 {
00466   size_t i = 0;
00467 
00468   // Go through all the handles looking for <handle>.  Even if we find
00469   // it, we continue through the rest of the list since <handle> could
00470   // appear multiple times. All handles are checked.
00471 
00472   // Check the current entries first.
00473   for (i = 0; i < this->max_handlep1_; ++i)
00474     // Since the handle can either be the event or the I/O handle,
00475     // we have to check both
00476     if ((this->current_handles_[i] == handle ||
00477          this->current_info_[i].io_handle_ == handle) &&
00478         // Make sure that it is not already marked for suspension
00479         !this->current_info_[i].suspend_entry_)
00480       {
00481         // Mark to be suspended
00482         this->current_info_[i].suspend_entry_ = 1;
00483         // Increment the handle count
00484         ++this->handles_to_be_suspended_;
00485         // Changes will be required
00486         changes_required = 1;
00487       }
00488 
00489   // Then check the suspended entries.
00490   for (i = 0; i < this->suspended_handles_; ++i)
00491     // Since the handle can either be the event or the I/O handle,
00492     // we have to check both
00493     if ((this->current_suspended_info_[i].event_handle_ == handle ||
00494          this->current_suspended_info_[i].io_handle_ == handle) &&
00495         // Make sure that the resumption is not already undone
00496         this->current_suspended_info_[i].resume_entry_)
00497       {
00498         // Undo resumption
00499         this->current_suspended_info_[i].resume_entry_ = 0;
00500         // Decrement the handle count
00501         --this->handles_to_be_resumed_;
00502         // Changes will be required
00503         changes_required = 1;
00504       }
00505 
00506   // Then check the to_be_added entries.
00507   for (i = 0; i < this->handles_to_be_added_; ++i)
00508     // Since the handle can either be the event or the I/O handle,
00509     // we have to check both
00510     if ((this->to_be_added_info_[i].io_handle_ == handle ||
00511          this->to_be_added_info_[i].event_handle_ == handle) &&
00512         // Make sure that it is not already marked for suspension
00513         !this->to_be_added_info_[i].suspend_entry_)
00514       {
00515         // Mark to be suspended
00516         this->to_be_added_info_[i].suspend_entry_ = 1;
00517         // Increment the handle count
00518         ++this->handles_to_be_suspended_;
00519         // Changes will be required
00520         changes_required = 1;
00521       }
00522 
00523   return 0;
00524 }
00525 
00526 int
00527 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle,
00528                                                        int &changes_required)
00529 {
00530   size_t i = 0;
00531 
00532   // Go through all the handles looking for <handle>.  Even if we find
00533   // it, we continue through the rest of the list since <handle> could
00534   // appear multiple times. All handles are checked.
00535 
00536   // Check the current entries first.
00537   for (i = 0; i < this->max_handlep1_; ++i)
00538     // Since the handle can either be the event or the I/O handle,
00539     // we have to check both
00540     if ((this->current_handles_[i] == handle ||
00541          this->current_info_[i].io_handle_ == handle) &&
00542         // Make sure that the suspension is not already undone
00543         this->current_info_[i].suspend_entry_)
00544       {
00545         // Undo suspension
00546         this->current_info_[i].suspend_entry_ = 0;
00547         // Decrement the handle count
00548         --this->handles_to_be_suspended_;
00549         // Changes will be required
00550         changes_required = 1;
00551       }
00552 
00553   // Then check the suspended entries.
00554   for (i = 0; i < this->suspended_handles_; ++i)
00555     // Since the handle can either be the event or the I/O handle,
00556     // we have to check both
00557     if ((this->current_suspended_info_[i].event_handle_ == handle ||
00558          this->current_suspended_info_[i].io_handle_ == handle) &&
00559         // Make sure that it is not already marked for resumption
00560         !this->current_suspended_info_[i].resume_entry_)
00561       {
00562         // Mark to be resumed
00563         this->current_suspended_info_[i].resume_entry_ = 1;
00564         // Increment the handle count
00565         ++this->handles_to_be_resumed_;
00566         // Changes will be required
00567         changes_required = 1;
00568       }
00569 
00570   // Then check the to_be_added entries.
00571   for (i = 0; i < this->handles_to_be_added_; ++i)
00572     // Since the handle can either be the event or the I/O handle,
00573     // we have to check both
00574     if ((this->to_be_added_info_[i].io_handle_ == handle ||
00575          this->to_be_added_info_[i].event_handle_ == handle) &&
00576         // Make sure that the suspension is not already undone
00577         this->to_be_added_info_[i].suspend_entry_)
00578       {
00579         // Undo suspension
00580         this->to_be_added_info_[i].suspend_entry_ = 0;
00581         // Decrement the handle count
00582         --this->handles_to_be_suspended_;
00583         // Changes will be required
00584         changes_required = 1;
00585       }
00586 
00587   return 0;
00588 }
00589 
00590 void
00591 ACE_WFMO_Reactor_Handler_Repository::unbind_all (void)
00592 {
00593   {
00594     ACE_GUARD (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_);
00595 
00596     int dummy;
00597     size_t i;
00598 
00599     // Remove all the current handlers
00600     for (i = 0; i < this->max_handlep1_; ++i)
00601       this->unbind_i (this->current_handles_[i],
00602                       ACE_Event_Handler::ALL_EVENTS_MASK,
00603                       dummy);
00604 
00605     // Remove all the suspended handlers
00606     for (i = 0; i < this->suspended_handles_; ++i)
00607       this->unbind_i (this->current_suspended_info_[i].event_handle_,
00608                       ACE_Event_Handler::ALL_EVENTS_MASK,
00609                       dummy);
00610 
00611     // Remove all the to_be_added handlers
00612     for (i = 0; i < this->handles_to_be_added_; ++i)
00613       this->unbind_i (this->to_be_added_info_[i].event_handle_,
00614                       ACE_Event_Handler::ALL_EVENTS_MASK,
00615                       dummy);
00616 
00617   }
00618 
00619   // The guard is released here
00620 
00621   // Wake up all threads in WaitForMultipleObjects so that they can
00622   // reconsult the handle set
00623   this->wfmo_reactor_.wakeup_all_threads ();
00624 }
00625 
00626 int
00627 ACE_WFMO_Reactor_Handler_Repository::bind_i (int io_entry,
00628                                              ACE_Event_Handler *event_handler,
00629                                              long network_events,
00630                                              ACE_HANDLE io_handle,
00631                                              ACE_HANDLE event_handle,
00632                                              int delete_event)
00633 {
00634   if (event_handler == 0)
00635     return -1;
00636 
00637   // Make sure that the <handle> is valid
00638   if (event_handle == ACE_INVALID_HANDLE)
00639     event_handle = event_handler->get_handle ();
00640   if (this->invalid_handle (event_handle))
00641     return -1;
00642 
00643   size_t current_size = this->max_handlep1_ +
00644     this->handles_to_be_added_ -
00645     this->handles_to_be_deleted_ +
00646     this->suspended_handles_;
00647 
00648   // Make sure that there's room in the table and that total pending
00649   // additions should not exceed what the <to_be_added_info_> array
00650   // can hold.
00651   if (current_size < this->max_size_ &&
00652       this->handles_to_be_added_ < this->max_size_)
00653     {
00654       // Cache this set into the <to_be_added_info_>, till we come
00655       // around to actually adding this to the <current_info_>
00656       this->to_be_added_info_[this->handles_to_be_added_].set (event_handle,
00657                                                                io_entry,
00658                                                                event_handler,
00659                                                                io_handle,
00660                                                                network_events,
00661                                                                delete_event);
00662 
00663       ++this->handles_to_be_added_;
00664 
00665       event_handler->add_reference ();
00666 
00667       // Wake up all threads in WaitForMultipleObjects so that they can
00668       // reconsult the handle set
00669       this->wfmo_reactor_.wakeup_all_threads ();
00670     }
00671   else
00672     {
00673       errno = EMFILE;   // File descriptor table is full (better than nothing)
00674       return -1;
00675     }
00676 
00677   return 0;
00678 }
00679 
00680 int
00681 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
00682 {
00683   // Go through the entire valid array and check for all handles that
00684   // have been schedule for deletion
00685   if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
00686     {
00687       size_t i = 0;
00688       while (i < this->max_handlep1_)
00689         {
00690           // This stuff is necessary here, since we should not make
00691           // the upcall until all the internal data structures have
00692           // been updated.  This is to protect against upcalls that
00693           // try to deregister again.
00694           ACE_HANDLE handle = ACE_INVALID_HANDLE;
00695           ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00696           ACE_Event_Handler *event_handler = 0;
00697 
00698           // See if this entry is scheduled for deletion
00699           if (this->current_info_[i].delete_entry_)
00700             {
00701               // Calling the <handle_close> method here will ensure that we
00702               // will only call it once per deregistering <Event_Handler>.
00703               // This is essential in the case when the <Event_Handler> will
00704               // do something like delete itself and we have multiple
00705               // threads in WFMO_Reactor.
00706               //
00707               // Make sure that the DONT_CALL mask is not set
00708               masks = this->current_info_[i].close_masks_;
00709               if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00710                 {
00711                   // Grab the correct handle depending on the type entry
00712                   if (this->current_info_[i].io_entry_)
00713                     handle = this->current_info_[i].io_handle_;
00714                   else
00715                     handle = this->current_handles_[i];
00716 
00717                   // Event handler
00718                   event_handler = this->current_info_[i].event_handler_;
00719                 }
00720 
00721               // If <WFMO_Reactor> created the event, we need to clean it up
00722               if (this->current_info_[i].delete_event_)
00723                 ACE_OS::event_destroy (&this->current_handles_[i]);
00724 
00725               // Reduce count by one
00726               --this->handles_to_be_deleted_;
00727             }
00728 
00729           // See if this entry is scheduled for suspension
00730           else if (this->current_info_[i].suspend_entry_)
00731             {
00732               this->current_suspended_info_ [this->suspended_handles_].set (this->current_handles_[i],
00733                                                                             this->current_info_[i]);
00734               // Increase number of suspended handles
00735               ++this->suspended_handles_;
00736 
00737               // Reduce count by one
00738               --this->handles_to_be_suspended_;
00739             }
00740 
00741           // See if this entry is scheduled for deletion or suspension
00742           // If so we need to clean up
00743           if (this->current_info_[i].delete_entry_ ||
00744               this->current_info_[i].suspend_entry_  )
00745             {
00746               size_t last_valid_slot = this->max_handlep1_ - 1;
00747               // If this is the last handle in the set, no need to swap
00748               // places. Simply remove it.
00749               if (i < last_valid_slot)
00750                 // Swap this handle with the last valid handle
00751                 {
00752                   // Struct copy
00753                   this->current_info_[i] =
00754                     this->current_info_[last_valid_slot];
00755                   this->current_handles_[i] =
00756                     this->current_handles_[last_valid_slot];
00757                 }
00758               // Reset the info in this slot
00759               this->current_info_[last_valid_slot].reset ();
00760               this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
00761               --this->max_handlep1_;
00762             }
00763           else
00764             {
00765               // This current entry is not up for deletion or
00766               // suspension.  Proceed to the next entry in the current
00767               // handles.
00768               ++i;
00769             }
00770 
00771           // Now that all internal structures have been updated, make
00772           // the upcall.
00773           if (event_handler != 0)
00774             {
00775               bool const requires_reference_counting =
00776                 event_handler->reference_counting_policy ().value () ==
00777                 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00778 
00779               event_handler->handle_close (handle, masks);
00780 
00781               if (requires_reference_counting)
00782                 {
00783                   event_handler->remove_reference ();
00784                 }
00785             }
00786         }
00787     }
00788 
00789   return 0;
00790 }
00791 
00792 int
00793 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
00794 {
00795   // Go through the <suspended_handle> array
00796   if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
00797     {
00798       size_t i = 0;
00799       while (i < this->suspended_handles_)
00800         {
00801           // This stuff is necessary here, since we should not make
00802           // the upcall until all the internal data structures have
00803           // been updated.  This is to protect against upcalls that
00804           // try to deregister again.
00805           ACE_HANDLE handle = ACE_INVALID_HANDLE;
00806           ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00807           ACE_Event_Handler *event_handler = 0;
00808 
00809           // See if this entry is scheduled for deletion
00810           if (this->current_suspended_info_[i].delete_entry_)
00811             {
00812               // Calling the <handle_close> method here will ensure that we
00813               // will only call it once per deregistering <Event_Handler>.
00814               // This is essential in the case when the <Event_Handler> will
00815               // do something like delete itself and we have multiple
00816               // threads in WFMO_Reactor.
00817               //
00818               // Make sure that the DONT_CALL mask is not set
00819               masks = this->current_suspended_info_[i].close_masks_;
00820               if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00821                 {
00822                   // Grab the correct handle depending on the type entry
00823                   if (this->current_suspended_info_[i].io_entry_)
00824                     handle = this->current_suspended_info_[i].io_handle_;
00825                   else
00826                     handle = this->current_suspended_info_[i].event_handle_;
00827 
00828                   // Upcall
00829                   event_handler = this->current_suspended_info_[i].event_handler_;
00830                 }
00831 
00832               // If <WFMO_Reactor> created the event, we need to clean it up
00833               if (this->current_suspended_info_[i].delete_event_)
00834                 ACE_OS::event_destroy (&this->current_suspended_info_[i].event_handle_);
00835 
00836               // Reduce count by one
00837               --this->handles_to_be_deleted_;
00838             }
00839 
00840           else if (this->current_suspended_info_[i].resume_entry_)
00841             {
00842               // Add to the end of the current handles set
00843               this->current_handles_[this->max_handlep1_] = this->current_suspended_info_[i].event_handle_;
00844               // Struct copy
00845               this->current_info_[this->max_handlep1_].set (this->current_suspended_info_[i]);
00846               ++this->max_handlep1_;
00847 
00848               // Reduce count by one
00849               --this->handles_to_be_resumed_;
00850             }
00851 
00852           // If an entry needs to be removed, either because it
00853           // was deleted or resumed, remove it now before doing
00854           // the upcall.
00855           if (this->current_suspended_info_[i].resume_entry_ ||
00856               this->current_suspended_info_[i].delete_entry_)
00857             {
00858               size_t last_valid_slot = this->suspended_handles_ - 1;
00859               // Net effect is that we're removing an entry and
00860               // compressing the list from the end.  So, if removing
00861               // an entry from the middle, copy the last valid one to the
00862               // removed slot.  Reset the end and decrement the number
00863               // of suspended handles.
00864               if (i < last_valid_slot)
00865                 // Struct copy
00866                 this->current_suspended_info_[i] =
00867                   this->current_suspended_info_[last_valid_slot];
00868               this->current_suspended_info_[last_valid_slot].reset ();
00869               --this->suspended_handles_;
00870             }
00871           else
00872             {
00873               // This current entry is not up for deletion or
00874               // resumption.  Proceed to the next entry in the
00875               // suspended handles.
00876               ++i;
00877             }
00878 
00879           // Now that all internal structures have been updated, make
00880           // the upcall.
00881           if (event_handler != 0)
00882             {
00883               int requires_reference_counting =
00884                 event_handler->reference_counting_policy ().value () ==
00885                 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00886 
00887               event_handler->handle_close (handle, masks);
00888 
00889               if (requires_reference_counting)
00890                 {
00891                   event_handler->remove_reference ();
00892                 }
00893             }
00894         }
00895     }
00896 
00897   return 0;
00898 }
00899 
00900 int
00901 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void)
00902 {
00903   // Go through the <to_be_added_*> arrays
00904   for (size_t i = 0; i < this->handles_to_be_added_; ++i)
00905     {
00906       // This stuff is necessary here, since we should not make
00907       // the upcall until all the internal data structures have
00908       // been updated.  This is to protect against upcalls that
00909       // try to deregister again.
00910       ACE_HANDLE handle = ACE_INVALID_HANDLE;
00911       ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00912       ACE_Event_Handler *event_handler = 0;
00913 
00914       // See if this entry is scheduled for deletion
00915       if (this->to_be_added_info_[i].delete_entry_)
00916         {
00917           // Calling the <handle_close> method here will ensure that we
00918           // will only call it once per deregistering <Event_Handler>.
00919           // This is essential in the case when the <Event_Handler> will
00920           // do something like delete itself and we have multiple
00921           // threads in WFMO_Reactor.
00922           //
00923           // Make sure that the DONT_CALL mask is not set
00924           masks = this->to_be_added_info_[i].close_masks_;
00925           if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00926             {
00927               // Grab the correct handle depending on the type entry
00928               if (this->to_be_added_info_[i].io_entry_)
00929                 handle = this->to_be_added_info_[i].io_handle_;
00930               else
00931                 handle = this->to_be_added_info_[i].event_handle_;
00932 
00933               // Upcall
00934               event_handler = this->to_be_added_info_[i].event_handler_;
00935             }
00936 
00937           // If <WFMO_Reactor> created the event, we need to clean it up
00938           if (this->to_be_added_info_[i].delete_event_)
00939             ACE_OS::event_destroy (&this->to_be_added_info_[i].event_handle_);
00940 
00941           // Reduce count by one
00942           --this->handles_to_be_deleted_;
00943         }
00944 
00945       // See if this entry is scheduled for suspension
00946       else if (this->to_be_added_info_[i].suspend_entry_)
00947         {
00948           this->current_suspended_info_ [this->suspended_handles_].set (this->to_be_added_info_[i].event_handle_,
00949                                                                         this->to_be_added_info_[i]);
00950           // Increase number of suspended handles
00951           ++this->suspended_handles_;
00952 
00953           // Reduce count by one
00954           --this->handles_to_be_suspended_;
00955         }
00956 
00957       // If neither of the two flags are on, add to current
00958       else
00959         {
00960           // Add to the end of the current handles set
00961           this->current_handles_[this->max_handlep1_] = this->to_be_added_info_[i].event_handle_;
00962           // Struct copy
00963           this->current_info_[this->max_handlep1_].set (this->to_be_added_info_[i]);
00964           ++this->max_handlep1_;
00965         }
00966 
00967       // Reset the <to_be_added_info_>
00968       this->to_be_added_info_[i].reset ();
00969 
00970       // Now that all internal structures have been updated, make the
00971       // upcall.
00972       if (event_handler != 0)
00973         {
00974           int requires_reference_counting =
00975             event_handler->reference_counting_policy ().value () ==
00976             ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00977 
00978           event_handler->handle_close (handle, masks);
00979 
00980           if (requires_reference_counting)
00981             {
00982               event_handler->remove_reference ();
00983             }
00984         }
00985     }
00986 
00987   // Since all to be added handles have been taken care of, reset the
00988   // counter
00989   this->handles_to_be_added_ = 0;
00990 
00991   return 0;
00992 }
00993 
00994 void
00995 ACE_WFMO_Reactor_Handler_Repository::dump (void) const
00996 {
00997 #if defined (ACE_HAS_DUMP)
00998   size_t i = 0;
00999 
01000   ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
01001 
01002   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01003 
01004   ACE_DEBUG ((LM_DEBUG,
01005               ACE_TEXT ("Max size = %d\n"),
01006               this->max_size_));
01007 
01008   ACE_DEBUG ((LM_DEBUG,
01009               ACE_TEXT ("Current info table\n\n")));
01010   ACE_DEBUG ((LM_DEBUG,
01011               ACE_TEXT ("\tSize = %d\n"),
01012               this->max_handlep1_));
01013   ACE_DEBUG ((LM_DEBUG,
01014               ACE_TEXT ("\tHandles to be suspended = %d\n"),
01015               this->handles_to_be_suspended_));
01016 
01017   for (i = 0; i < this->max_handlep1_; ++i)
01018     this->current_info_[i].dump (this->current_handles_[i]);
01019 
01020   ACE_DEBUG ((LM_DEBUG,
01021               ACE_TEXT ("\n")));
01022 
01023   ACE_DEBUG ((LM_DEBUG,
01024               ACE_TEXT ("To-be-added info table\n\n")));
01025   ACE_DEBUG ((LM_DEBUG,
01026               ACE_TEXT ("\tSize = %d\n"),
01027               this->handles_to_be_added_));
01028 
01029   for (i = 0; i < this->handles_to_be_added_; ++i)
01030     this->to_be_added_info_[i].dump ();
01031 
01032   ACE_DEBUG ((LM_DEBUG,
01033               ACE_TEXT ("\n")));
01034 
01035   ACE_DEBUG ((LM_DEBUG,
01036               ACE_TEXT ("Suspended info table\n\n")));
01037   ACE_DEBUG ((LM_DEBUG,
01038               ACE_TEXT ("\tSize = %d\n"),
01039               this->suspended_handles_));
01040   ACE_DEBUG ((LM_DEBUG,
01041               ACE_TEXT ("\tHandles to be resumed = %d\n"),
01042               this->handles_to_be_resumed_));
01043 
01044   for (i = 0; i < this->suspended_handles_; ++i)
01045     this->current_suspended_info_[i].dump ();
01046 
01047   ACE_DEBUG ((LM_DEBUG,
01048               ACE_TEXT ("\n")));
01049 
01050   ACE_DEBUG ((LM_DEBUG,
01051               ACE_TEXT ("Total handles to be deleted = %d\n"),
01052               this->handles_to_be_deleted_));
01053 
01054   ACE_DEBUG ((LM_DEBUG,
01055               ACE_END_DUMP));
01056 #endif /* ACE_HAS_DUMP */
01057 }
01058 
01059 /************************************************************/
01060 
01061 int
01062 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
01063 {
01064   ACE_NOTSUP_RETURN (-1);
01065 }
01066 
01067 #if defined (ACE_WIN32_VC8)
01068 #  pragma warning (push)
01069 #  pragma warning (disable:4355)  /* Use of 'this' in initializer list */
01070 #  endif
01071 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
01072                                     ACE_Timer_Queue *tq,
01073                                     ACE_Reactor_Notify *notify)
01074   : signal_handler_ (0),
01075     delete_signal_handler_ (false),
01076     timer_queue_ (0),
01077     delete_timer_queue_ (false),
01078     delete_handler_rep_ (false),
01079     notify_handler_ (0),
01080     delete_notify_handler_ (false),
01081     lock_adapter_ (lock_),
01082     handler_rep_ (*this),
01083     // this event is initially signaled
01084     ok_to_wait_ (1),
01085     // this event is initially unsignaled
01086     wakeup_all_threads_ (0),
01087     // this event is initially unsignaled
01088     waiting_to_change_state_ (0),
01089     active_threads_ (0),
01090     owner_ (ACE_Thread::self ()),
01091     new_owner_ (0),
01092     change_state_thread_ (0),
01093     open_for_business_ (false),
01094     deactivated_ (0)
01095 {
01096   if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
01097     ACE_ERROR ((LM_ERROR,
01098                 ACE_TEXT ("%p\n"),
01099                 ACE_TEXT ("WFMO_Reactor")));
01100 }
01101 
01102 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
01103                                     int unused,
01104                                     ACE_Sig_Handler *sh,
01105                                     ACE_Timer_Queue *tq,
01106                                     ACE_Reactor_Notify *notify)
01107   : signal_handler_ (0),
01108     delete_signal_handler_ (false),
01109     timer_queue_ (0),
01110     delete_timer_queue_ (false),
01111     delete_handler_rep_ (false),
01112     notify_handler_ (0),
01113     delete_notify_handler_ (false),
01114     lock_adapter_ (lock_),
01115     handler_rep_ (*this),
01116     // this event is initially signaled
01117     ok_to_wait_ (1),
01118     // this event is initially unsignaled
01119     wakeup_all_threads_ (0),
01120     // this event is initially unsignaled
01121     waiting_to_change_state_ (0),
01122     active_threads_ (0),
01123     owner_ (ACE_Thread::self ()),
01124     new_owner_ (0),
01125     change_state_thread_ (0),
01126     open_for_business_ (false),
01127     deactivated_ (0)
01128 {
01129   ACE_UNUSED_ARG (unused);
01130 
01131   if (this->open (size, 0, sh, tq, 0, notify) == -1)
01132     ACE_ERROR ((LM_ERROR,
01133                 ACE_TEXT ("%p\n"),
01134                 ACE_TEXT ("WFMO_Reactor")));
01135 }
01136 #if defined (ACE_WIN32_VC8)
01137 #  pragma warning (pop)
01138 #endif
01139 
01140 int
01141 ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &)
01142 {
01143   return -1;
01144 }
01145 
01146 int
01147 ACE_WFMO_Reactor::open (size_t size,
01148                         int unused,
01149                         ACE_Sig_Handler *sh,
01150                         ACE_Timer_Queue *tq,
01151                         int disable_notify_pipe,
01152                         ACE_Reactor_Notify *notify)
01153 {
01154   ACE_UNUSED_ARG (unused);
01155   ACE_UNUSED_ARG (disable_notify_pipe);
01156 
01157   // This GUARD is necessary since we are updating shared state.
01158   ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01159 
01160   // If we are already open, return -1
01161   if (this->open_for_business_)
01162     return -1;
01163 
01164   // Timer Queue
01165   if (this->delete_timer_queue_)
01166     delete this->timer_queue_;
01167 
01168   if (tq == 0)
01169     {
01170       ACE_NEW_RETURN (this->timer_queue_,
01171                       ACE_Timer_Heap,
01172                       -1);
01173       this->delete_timer_queue_ = true;
01174     }
01175   else
01176     {
01177       this->timer_queue_ = tq;
01178       this->delete_timer_queue_ = false;
01179     }
01180 
01181   // Signal Handler
01182   if (this->delete_signal_handler_)
01183     delete this->signal_handler_;
01184 
01185   if (sh == 0)
01186     {
01187       ACE_NEW_RETURN (this->signal_handler_,
01188                       ACE_Sig_Handler,
01189                       -1);
01190       this->delete_signal_handler_ = true;
01191     }
01192   else
01193     {
01194       this->signal_handler_ = sh;
01195       this->delete_signal_handler_ = false;
01196     }
01197 
01198   // Setup the atomic wait array (used later in <handle_events>)
01199   this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
01200   this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
01201 
01202   // Prevent memory leaks when the ACE_WFMO_Reactor is reopened.
01203   if (this->delete_handler_rep_)
01204     {
01205       if (this->handler_rep_.changes_required ())
01206         {
01207           // Make necessary changes to the handler repository
01208           this->handler_rep_.make_changes ();
01209           // Turn off <wakeup_all_threads_> since all necessary changes
01210           // have completed
01211           this->wakeup_all_threads_.reset ();
01212         }
01213 
01214       this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository ();
01215     }
01216 
01217   // Open the handle repository.  Two additional handles for internal
01218   // purposes
01219   if (this->handler_rep_.open (size + 2) == -1)
01220     ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
01221                        ACE_TEXT ("opening handler repository")),
01222                       -1);
01223   else
01224     this->delete_handler_rep_ = true;
01225 
01226   if (this->notify_handler_ != 0 && this->delete_notify_handler_)
01227     delete this->notify_handler_;
01228 
01229   this->notify_handler_ = notify;
01230 
01231   if (this->notify_handler_ == 0)
01232     {
01233       ACE_NEW_RETURN (this->notify_handler_,
01234                       ACE_WFMO_Reactor_Notify,
01235                       -1);
01236 
01237       if (this->notify_handler_ == 0)
01238         return -1;
01239       else
01240         this->delete_notify_handler_ = true;
01241     }
01242 
01243   /* NOTE */
01244   // The order of the following two registrations is very important
01245 
01246   // Open the notification handler
01247   if (this->notify_handler_->open (this, this->timer_queue_) == -1)
01248     ACE_ERROR_RETURN ((LM_ERROR,
01249                        ACE_TEXT ("%p\n"),
01250                        ACE_TEXT ("opening notify handler ")),
01251                       -1);
01252 
01253   // Register for <wakeup_all_threads> event
01254   if (this->register_handler (&this->wakeup_all_threads_handler_,
01255                               this->wakeup_all_threads_.handle ()) == -1)
01256     ACE_ERROR_RETURN ((LM_ERROR,
01257                        ACE_TEXT ("%p\n"),
01258                        ACE_TEXT ("registering thread wakeup handler")),
01259                       -1);
01260 
01261   // Since we have added two handles into the handler repository,
01262   // update the <handler_repository_>
01263   if (this->handler_rep_.changes_required ())
01264     {
01265       // Make necessary changes to the handler repository
01266       this->handler_rep_.make_changes ();
01267       // Turn off <wakeup_all_threads_> since all necessary changes
01268       // have completed
01269       this->wakeup_all_threads_.reset ();
01270     }
01271 
01272   // We are open for business
01273   this->open_for_business_ = true;
01274 
01275   return 0;
01276 }
01277 
01278 int
01279 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
01280 {
01281   if (this->signal_handler_ != 0 && this->delete_signal_handler_)
01282     delete this->signal_handler_;
01283   this->signal_handler_ = signal_handler;
01284   this->delete_signal_handler_ = false;
01285   return 0;
01286 }
01287 
01288 ACE_Timer_Queue *
01289 ACE_WFMO_Reactor::timer_queue (void) const
01290 {
01291   return this->timer_queue_;
01292 }
01293 
01294 int
01295 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue *tq)
01296 {
01297   if (this->timer_queue_ != 0 && this->delete_timer_queue_)
01298     delete this->timer_queue_;
01299   this->timer_queue_ = tq;
01300   this->delete_timer_queue_ = false;
01301   return 0;
01302 }
01303 
01304 int
01305 ACE_WFMO_Reactor::close (void)
01306 {
01307   // This GUARD is necessary since we are updating shared state.
01308   ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01309 
01310   // If we are already closed, return error
01311   if (!this->open_for_business_)
01312     return -1;
01313 
01314   // We are now closed
01315   this->open_for_business_ = false;
01316   // This will unregister all handles
01317   this->handler_rep_.close ();
01318 
01319   return 0;
01320 }
01321 
01322 ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void)
01323 {
01324   // Assumption: No threads are left in the Reactor when this method
01325   // is called (i.e., active_threads_ == 0)
01326 
01327   // Close down
01328   this->close ();
01329 
01330   // Make necessary changes to the handler repository that we caused
01331   // by <close>.
01332   this->handler_rep_.make_changes ();
01333 
01334   if (this->delete_timer_queue_)
01335     {
01336       delete this->timer_queue_;
01337       this->timer_queue_ = 0;
01338       this->delete_timer_queue_ = false;
01339     }
01340 
01341   if (this->delete_signal_handler_)
01342     {
01343       delete this->signal_handler_;
01344       this->signal_handler_ = 0;
01345       this->delete_signal_handler_ = false;
01346     }
01347 
01348   if (this->delete_notify_handler_)
01349     {
01350       delete this->notify_handler_;
01351       this->notify_handler_ = 0;
01352       this->delete_notify_handler_ = false;
01353     }
01354 }
01355 
01356 int
01357 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
01358                                       ACE_HANDLE io_handle,
01359                                       ACE_Event_Handler *event_handler,
01360                                       ACE_Reactor_Mask new_masks)
01361 {
01362   // If this is a Winsock 1 system, the underlying event assignment will
01363   // not work, so don't try. Winsock 1 must use ACE_Select_Reactor for
01364   // reacting to socket activity.
01365 
01366 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
01367 
01368   ACE_UNUSED_ARG (event_handle);
01369   ACE_UNUSED_ARG (io_handle);
01370   ACE_UNUSED_ARG (event_handler);
01371   ACE_UNUSED_ARG (new_masks);
01372   ACE_NOTSUP_RETURN (-1);
01373 
01374 #else
01375 
01376   // Make sure that the <handle> is valid
01377   if (io_handle == ACE_INVALID_HANDLE)
01378     io_handle = event_handler->get_handle ();
01379 
01380   if (this->handler_rep_.invalid_handle (io_handle))
01381     {
01382       errno = ERROR_INVALID_HANDLE;
01383       return -1;
01384     }
01385 
01386   long new_network_events = 0;
01387   int delete_event = 0;
01388   auto_ptr <ACE_Auto_Event> event;
01389 
01390   // Look up the repository to see if the <event_handler> is already
01391   // there.
01392   ACE_Reactor_Mask old_masks;
01393   int found = this->handler_rep_.modify_network_events_i (io_handle,
01394                                                           new_masks,
01395                                                           old_masks,
01396                                                           new_network_events,
01397                                                           event_handle,
01398                                                           delete_event,
01399                                                           ACE_Reactor::ADD_MASK);
01400 
01401   // Check to see if the user passed us a valid event; If not then we
01402   // need to create one
01403   if (event_handle == ACE_INVALID_HANDLE)
01404     {
01405       // Note: don't change this since some C++ compilers have
01406       // <auto_ptr>s that don't work properly...
01407       auto_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
01408       event = tmp;
01409       event_handle = event->handle ();
01410       delete_event = 1;
01411     }
01412 
01413   int result = ::WSAEventSelect ((SOCKET) io_handle,
01414                                  event_handle,
01415                                  new_network_events);
01416   // If we had found the <Event_Handler> there is nothing more to do
01417   if (found)
01418     return result;
01419   else if (result != SOCKET_ERROR &&
01420            this->handler_rep_.bind_i (1,
01421                                       event_handler,
01422                                       new_network_events,
01423                                       io_handle,
01424                                       event_handle,
01425                                       delete_event) != -1)
01426     {
01427       // The <event_handler> was not found in the repository, add to
01428       // the repository.
01429       if (delete_event)
01430         {
01431           // Clear out the handle in the ACE_Auto_Event so that when
01432           // it is destroyed, the handle isn't closed out from under
01433           // the reactor. After setting it, running down the event
01434           // (via auto_ptr<> event, above) at function return will
01435           // cause an error because it'll try to close an invalid handle.
01436           // To avoid that smashing the errno value, save the errno
01437           // here, explicitly remove the event so the dtor won't do it
01438           // again, then restore errno.
01439           ACE_Errno_Guard guard (errno);
01440           event->handle (ACE_INVALID_HANDLE);
01441           event->remove ();
01442         }
01443       return 0;
01444     }
01445   else
01446     return -1;
01447 
01448 #endif /* ACE_HAS_WINSOCK2 || ACE_HAS_WINSOCK2 == 0 */
01449 
01450 }
01451 
01452 int
01453 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle,
01454                               ACE_Reactor_Mask new_masks,
01455                               int operation)
01456 {
01457   // Make sure that the <handle> is valid
01458   if (this->handler_rep_.invalid_handle (io_handle))
01459     return -1;
01460 
01461   long new_network_events = 0;
01462   int delete_event = 0;
01463   ACE_HANDLE event_handle = ACE_INVALID_HANDLE;
01464 
01465   // Look up the repository to see if the <Event_Handler> is already
01466   // there.
01467   ACE_Reactor_Mask old_masks;
01468   int found = this->handler_rep_.modify_network_events_i (io_handle,
01469                                                           new_masks,
01470                                                           old_masks,
01471                                                           new_network_events,
01472                                                           event_handle,
01473                                                           delete_event,
01474                                                           operation);
01475   if (found)
01476     {
01477       int result = ::WSAEventSelect ((SOCKET) io_handle,
01478                                      event_handle,
01479                                      new_network_events);
01480       if (result == 0)
01481         return old_masks;
01482       else
01483         return result;
01484     }
01485   else
01486     return -1;
01487 }
01488 
01489 
01490 
01491 int
01492 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle,
01493                                                               ACE_Reactor_Mask new_masks,
01494                                                               ACE_Reactor_Mask &old_masks,
01495                                                               long &new_network_events,
01496                                                               ACE_HANDLE &event_handle,
01497                                                               int &delete_event,
01498                                                               int operation)
01499 {
01500   long *modified_network_events = &new_network_events;
01501   int found = 0;
01502   size_t i;
01503 
01504   // First go through the current entries
01505   //
01506   // Look for all entries in the current handles for matching handle
01507   // (except those that have been scheduled for deletion)
01508   for (i = 0; i < this->max_handlep1_ && !found; ++i)
01509     if (io_handle == this->current_info_[i].io_handle_ &&
01510         !this->current_info_[i].delete_entry_)
01511       {
01512         found = 1;
01513         modified_network_events = &this->current_info_[i].network_events_;
01514         delete_event = this->current_info_[i].delete_event_;
01515         event_handle = this->current_handles_[i];
01516       }
01517 
01518   // Then pass through the suspended handles
01519   //
01520   // Look for all entries in the suspended handles for matching handle
01521   // (except those that have been scheduled for deletion)
01522   for (i = 0; i < this->suspended_handles_ && !found; ++i)
01523     if (io_handle == this->current_suspended_info_[i].io_handle_ &&
01524         !this->current_suspended_info_[i].delete_entry_)
01525       {
01526         found = 1;
01527         modified_network_events = &this->current_suspended_info_[i].network_events_;
01528         delete_event = this->current_suspended_info_[i].delete_event_;
01529         event_handle = this->current_suspended_info_[i].event_handle_;
01530       }
01531 
01532   // Then check the to_be_added handles
01533   //
01534   // Look for all entries in the to_be_added handles for matching
01535   // handle (except those that have been scheduled for deletion)
01536   for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01537     if (io_handle == this->to_be_added_info_[i].io_handle_ &&
01538         !this->to_be_added_info_[i].delete_entry_)
01539       {
01540         found = 1;
01541         modified_network_events = &this->to_be_added_info_[i].network_events_;
01542         delete_event = this->to_be_added_info_[i].delete_event_;
01543         event_handle = this->to_be_added_info_[i].event_handle_;
01544       }
01545 
01546   old_masks = this->bit_ops (*modified_network_events,
01547                              new_masks,
01548                              operation);
01549 
01550   new_network_events = *modified_network_events;
01551 
01552   return found;
01553 }
01554 
01555 ACE_Event_Handler *
01556 ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle)
01557 {
01558   long existing_masks_ignored = 0;
01559   return this->handler (handle,
01560                         existing_masks_ignored);
01561 }
01562 
01563 ACE_Event_Handler *
01564 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01565                                               long &existing_masks)
01566 {
01567   int found = 0;
01568   size_t i = 0;
01569   ACE_Event_Handler *event_handler = 0;
01570   existing_masks = 0;
01571 
01572   // Look for the handle first
01573 
01574   // First go through the current entries
01575   //
01576   // Look for all entries in the current handles for matching handle
01577   // (except those that have been scheduled for deletion)
01578   for (i = 0; i < this->max_handlep1_ && !found; ++i)
01579     if ((handle == this->current_info_[i].io_handle_ ||
01580          handle == this->current_handles_[i]) &&
01581         !this->current_info_[i].delete_entry_)
01582       {
01583         found = 1;
01584         event_handler = this->current_info_[i].event_handler_;
01585         existing_masks = this->current_info_[i].network_events_;
01586       }
01587 
01588   // Then pass through the suspended handles
01589   //
01590   // Look for all entries in the suspended handles for matching handle
01591   // (except those that have been scheduled for deletion)
01592   for (i = 0; i < this->suspended_handles_ && !found; ++i)
01593     if ((handle == this->current_suspended_info_[i].io_handle_ ||
01594          handle == this->current_suspended_info_[i].event_handle_) &&
01595         !this->current_suspended_info_[i].delete_entry_)
01596       {
01597         found = 1;
01598         event_handler = this->current_suspended_info_[i].event_handler_;
01599         existing_masks = this->current_suspended_info_[i].network_events_;
01600       }
01601 
01602   // Then check the to_be_added handles
01603   //
01604   // Look for all entries in the to_be_added handles for matching
01605   // handle (except those that have been scheduled for deletion)
01606   for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01607     if ((handle == this->to_be_added_info_[i].io_handle_ ||
01608          handle == this->to_be_added_info_[i].event_handle_) &&
01609         !this->to_be_added_info_[i].delete_entry_)
01610       {
01611         found = 1;
01612         event_handler = this->to_be_added_info_[i].event_handler_;
01613         existing_masks = this->to_be_added_info_[i].network_events_;
01614       }
01615 
01616   if (event_handler)
01617     event_handler->add_reference ();
01618 
01619   return event_handler;
01620 }
01621 
01622 int
01623 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01624                                               ACE_Reactor_Mask user_masks,
01625                                               ACE_Event_Handler **user_event_handler)
01626 {
01627   long existing_masks = 0;
01628   int found = 0;
01629 
01630   ACE_Event_Handler_var safe_event_handler =
01631     this->handler (handle,
01632                    existing_masks);
01633 
01634   if (safe_event_handler.handler ())
01635     found = 1;
01636 
01637   if (!found)
01638     return -1;
01639 
01640   // Otherwise, make sure that the masks that the user is looking for
01641   // are on.
01642   if (found &&
01643       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
01644     if (!ACE_BIT_ENABLED (existing_masks, FD_READ) &&
01645         !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
01646       found = 0;
01647 
01648   if (found &&
01649       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::WRITE_MASK))
01650     if (!ACE_BIT_ENABLED (existing_masks, FD_WRITE))
01651       found = 0;
01652 
01653   if (found &&
01654       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::EXCEPT_MASK))
01655     if (!ACE_BIT_ENABLED (existing_masks, FD_OOB))
01656       found = 0;
01657 
01658   if (found &&
01659       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::ACCEPT_MASK))
01660     if (!ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
01661       found = 0;
01662 
01663   if (found &&
01664       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::CONNECT_MASK))
01665     if (!ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
01666       found = 0;
01667 
01668   if (found &&
01669       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::QOS_MASK))
01670     if (!ACE_BIT_ENABLED (existing_masks, FD_QOS))
01671       found = 0;
01672 
01673   if (found &&
01674       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::GROUP_QOS_MASK))
01675     if (!ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
01676       found = 0;
01677 
01678   if (found &&
01679       user_event_handler)
01680     *user_event_handler = safe_event_handler.release ();
01681 
01682   if (found)
01683     return 0;
01684   else
01685     return -1;
01686 }
01687 
01688 // Waits for and dispatches all events.  Returns -1 on error, 0 if
01689 // max_wait_time expired, or the number of events that were dispatched.
01690 int
01691 ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
01692                                   int alertable)
01693 {
01694   ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
01695 
01696   // Make sure we are not closed
01697   if (!this->open_for_business_ || this->deactivated_)
01698     return -1;
01699 
01700   // Stash the current time -- the destructor of this object will
01701   // automatically compute how much time elapsed since this method was
01702   // called.
01703   ACE_Countdown_Time countdown (max_wait_time);
01704 
01705   int result;
01706   do
01707     {
01708       // Check to see if it is ok to enter ::WaitForMultipleObjects
01709       // This will acquire <this->lock_> on success On failure, the
01710       // lock will not be acquired
01711       result = this->ok_to_wait (max_wait_time, alertable);
01712       if (result != 1)
01713         return result;
01714 
01715       // Increment the number of active threads
01716       ++this->active_threads_;
01717 
01718       // Release the <lock_>
01719       this->lock_.release ();
01720 
01721       // Update the countdown to reflect time waiting to play with the
01722       // mut and event.
01723       countdown.update ();
01724 
01725       // Calculate timeout
01726       int timeout = this->calculate_timeout (max_wait_time);
01727 
01728       // Wait for event to happen
01729       DWORD wait_status = this->wait_for_multiple_events (timeout,
01730                                                           alertable);
01731 
01732       // Upcall
01733       result = this->safe_dispatch (wait_status);
01734       if (0 == result)
01735         {
01736           // wait_for_multiple_events timed out without dispatching
01737           // anything.  Because of rounding and conversion errors and
01738           // such, it could be that the wait loop timed out, but
01739           // the timer queue said it wasn't quite ready to expire a
01740           // timer. In this case, max_wait_time won't have quite been
01741           // reduced to 0, and we need to go around again. If max_wait_time
01742           // is all the way to 0, just return, as the entire time the
01743           // caller wanted to wait has been used up.
01744           countdown.update ();     // Reflect time waiting for events
01745           if (0 == max_wait_time || max_wait_time->usec () == 0)
01746             break;
01747         }
01748     }
01749   while (result == 0);
01750 
01751   return result;
01752 }
01753 
01754 int
01755 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value *max_wait_time,
01756                               int alertable)
01757 {
01758   // Calculate the max time we should spend here
01759   //
01760   // Note: There is really no need to involve the <timer_queue_> here
01761   // because even if a timeout in the <timer_queue_> does expire we
01762   // will not be able to dispatch it
01763 
01764   // We need to wait for both the <lock_> and <ok_to_wait_> event.
01765   // If not on WinCE, use WaitForMultipleObjects() to wait for both atomically.
01766   // On WinCE, the waitAll arg to WFMO must be false, so wait for the
01767   // ok_to_wait_ event first (since that's likely to take the longest) then
01768   // grab the lock and recheck the ok_to_wait_ event. When we can get them
01769   // both, or there's an error/timeout, return.
01770 #if defined (ACE_HAS_WINCE)
01771   ACE_Time_Value timeout = ACE_OS::gettimeofday ();
01772   if (max_wait_time != 0)
01773     timeout += *max_wait_time;
01774   while (1)
01775     {
01776       int status;
01777       if (max_wait_time == 0)
01778         status = this->ok_to_wait_.wait ();
01779       else
01780         status = this->ok_to_wait_.wait (&timeout);
01781       if (status == -1)
01782         return -1;
01783       // The event is signaled, so it's ok to wait; grab the lock and
01784       // recheck the event. If something has changed, restart the wait.
01785       if (max_wait_time == 0)
01786         status = this->lock_.acquire ();
01787       else
01788         status = this->lock_.acquire (timeout);
01789       if (status == -1)
01790         return -1;
01791 
01792       // Have the lock_, now re-check the event. If it's not signaled,
01793       // another thread changed something so go back and wait again.
01794       ACE_Time_Value poll_it = ACE_OS::gettimeofday ();
01795       if (this->ok_to_wait_.wait (&poll_it) == 0)
01796         break;
01797       this->lock_.release ();
01798     }
01799   return 1;
01800 
01801 #else
01802   int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
01803   DWORD result = 0;
01804   while (1)
01805     {
01806 #  if defined (ACE_HAS_PHARLAP)
01807       // PharLap doesn't implement WaitForMultipleObjectsEx, and doesn't
01808       // do async I/O, so it's not needed in this case anyway.
01809       result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01810                                          this->atomic_wait_array_,
01811                                          TRUE,
01812                                          timeout);
01813 
01814       if (result != WAIT_IO_COMPLETION)
01815         break;
01816 
01817 #  else
01818       result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01819                                            this->atomic_wait_array_,
01820                                            TRUE,
01821                                            timeout,
01822                                            alertable);
01823 
01824       if (result != WAIT_IO_COMPLETION)
01825         break;
01826 
01827 #  endif /* ACE_HAS_PHARLAP */
01828     }
01829 
01830   switch (result)
01831     {
01832     case WAIT_TIMEOUT:
01833       errno = ETIME;
01834       return 0;
01835     case WAIT_FAILED:
01836     case WAIT_ABANDONED_0:
01837       ACE_OS::set_errno_to_last_error ();
01838       return -1;
01839     default:
01840       break;
01841     }
01842 
01843   // It is ok to enter ::WaitForMultipleObjects
01844   return 1;
01845 #endif /* ACE_HAS_WINCE */
01846 }
01847 
01848 DWORD
01849 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
01850                                             int alertable)
01851 {
01852   // Wait for any of handles_ to be active, or until timeout expires.
01853   // If <alertable> is enabled allow asynchronous completion of
01854   // ReadFile and WriteFile operations.
01855 
01856 #if defined (ACE_HAS_PHARLAP) || defined (ACE_HAS_WINCE)
01857   // PharLap doesn't do async I/O and doesn't implement
01858   // WaitForMultipleObjectsEx, so use WaitForMultipleObjects.
01859   ACE_UNUSED_ARG (alertable);
01860   return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 (),
01861                                    this->handler_rep_.handles (),
01862                                    FALSE,
01863                                    timeout);
01864 #else
01865   return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
01866                                      this->handler_rep_.handles (),
01867                                      FALSE,
01868                                      timeout,
01869                                      alertable);
01870 #endif /* ACE_HAS_PHARLAP */
01871 }
01872 
01873 DWORD
01874 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot)
01875 {
01876   return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - slot,
01877                                    this->handler_rep_.handles () + slot,
01878                                    FALSE,
01879                                    0);
01880 }
01881 
01882 int
01883 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value *max_wait_time)
01884 {
01885   ACE_Time_Value *time = 0;
01886   if (this->owner_ == ACE_Thread::self ())
01887     time = this->timer_queue_->calculate_timeout (max_wait_time);
01888   else
01889     time = max_wait_time;
01890 
01891   if (time == 0)
01892     return INFINITE;
01893   else
01894     return time->msec ();
01895 }
01896 
01897 
01898 int
01899 ACE_WFMO_Reactor::expire_timers (void)
01900 {
01901   // If "owner" thread
01902   if (ACE_Thread::self () == this->owner_)
01903     // expire all pending timers.
01904     return this->timer_queue_->expire ();
01905 
01906   else
01907     // Nothing to expire
01908     return 0;
01909 }
01910 
01911 int
01912 ACE_WFMO_Reactor::dispatch (DWORD wait_status)
01913 {
01914   int handlers_dispatched = 0;
01915 
01916   // Expire timers
01917   handlers_dispatched += this->expire_timers ();
01918 
01919   switch (wait_status)
01920     {
01921     case WAIT_FAILED: // Failure.
01922       ACE_OS::set_errno_to_last_error ();
01923       return -1;
01924 
01925     case WAIT_TIMEOUT: // Timeout.
01926       errno = ETIME;
01927       return handlers_dispatched;
01928 
01929 #ifndef ACE_HAS_WINCE
01930     case WAIT_IO_COMPLETION: // APC.
01931       return handlers_dispatched;
01932 #endif  // ACE_HAS_WINCE
01933 
01934     default:  // Dispatch.
01935       // We'll let dispatch worry about abandoned mutes.
01936       handlers_dispatched += this->dispatch_handles (wait_status);
01937       return handlers_dispatched;
01938     }
01939 }
01940 
01941 // Dispatches any active handles from <handles_[slot]> to
01942 // <handles_[max_handlep1_]>, polling through our handle set looking
01943 // for active handles.
01944 int
01945 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
01946 {
01947   // dispatch_slot is the absolute slot.  Only += is used to
01948   // increment it.
01949   DWORD dispatch_slot = 0;
01950 
01951   // Cache this value, this is the absolute value.
01952   DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
01953 
01954   // nCount starts off at <max_handlep1>, this is a transient count of
01955   // handles last waited on.
01956   DWORD nCount = max_handlep1;
01957 
01958   for (int number_of_handlers_dispatched = 1;
01959        ;
01960        ++number_of_handlers_dispatched)
01961     {
01962       const bool ok = (
01963 #if ! defined(__BORLANDC__) \
01964     && !defined (ghs) \
01965     && !defined (__MINGW32__) \
01966     && !(defined (_MSC_VER) && _MSC_VER >= 1300)
01967                  // wait_status is unsigned in Borland, Green Hills,
01968                  // mingw32 and MSVC++ >= 7.1.
01969                  // This >= is always true, with a warning.
01970                  wait_status >= WAIT_OBJECT_0 &&
01971 #endif
01972                  wait_status <= (WAIT_OBJECT_0 + nCount));
01973 
01974       if (ok)
01975         dispatch_slot += wait_status - WAIT_OBJECT_0;
01976       else
01977         // Otherwise, a handle was abandoned.
01978         dispatch_slot += wait_status - WAIT_ABANDONED_0;
01979 
01980       // Dispatch handler
01981       if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
01982         return -1;
01983 
01984       // Increment slot
01985       ++dispatch_slot;
01986 
01987       // We're done.
01988       if (dispatch_slot >= max_handlep1)
01989         return number_of_handlers_dispatched;
01990 
01991       // Readjust nCount
01992       nCount = max_handlep1 - dispatch_slot;
01993 
01994       // Check the remaining handles
01995       wait_status = this->poll_remaining_handles (dispatch_slot);
01996       switch (wait_status)
01997         {
01998         case WAIT_FAILED: // Failure.
01999           ACE_OS::set_errno_to_last_error ();
02000           /* FALLTHRU */
02001         case WAIT_TIMEOUT:
02002           // There are no more handles ready, we can return.
02003           return number_of_handlers_dispatched;
02004         }
02005     }
02006 }
02007 
02008 int
02009 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
02010                                     DWORD max_handlep1)
02011 {
02012   // Check if there are window messages that need to be dispatched
02013   if (slot == max_handlep1)
02014     return this->dispatch_window_messages ();
02015 
02016   // Dispatch the handler if it has not been scheduled for deletion.
02017   // Note that this is a very week test if there are multiple threads
02018   // dispatching this slot as no locks are held here. Generally, you
02019   // do not want to do something like deleting the this pointer in
02020   // handle_close() if you have registered multiple times and there is
02021   // more than one thread in WFMO_Reactor->handle_events().
02022   else if (!this->handler_rep_.scheduled_for_deletion (slot))
02023     {
02024       ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
02025 
02026       if (this->handler_rep_.current_info ()[slot].io_entry_)
02027         return this->complex_dispatch_handler (slot,
02028                                                event_handle);
02029       else
02030         return this->simple_dispatch_handler (slot,
02031                                               event_handle);
02032     }
02033   else
02034     // The handle was scheduled for deletion, so we will skip it.
02035     return 0;
02036 }
02037 
02038 int
02039 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
02040                                            ACE_HANDLE event_handle)
02041 {
02042   // This dispatch is used for non-I/O entires
02043 
02044   // Assign the ``signaled'' HANDLE so that callers can get it.
02045   // siginfo_t is an ACE - specific fabrication. Constructor exists.
02046   siginfo_t sig (event_handle);
02047 
02048   ACE_Event_Handler *event_handler =
02049     this->handler_rep_.current_info ()[slot].event_handler_;
02050 
02051   int requires_reference_counting =
02052     event_handler->reference_counting_policy ().value () ==
02053     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02054 
02055   if (requires_reference_counting)
02056     {
02057       event_handler->add_reference ();
02058     }
02059 
02060   // Upcall
02061   if (event_handler->handle_signal (0, &sig) == -1)
02062     this->handler_rep_.unbind (event_handle,
02063                                ACE_Event_Handler::NULL_MASK);
02064 
02065   // Call remove_reference() if needed.
02066   if (requires_reference_counting)
02067     {
02068       event_handler->remove_reference ();
02069     }
02070 
02071   return 0;
02072 }
02073 
02074 int
02075 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
02076                                             ACE_HANDLE event_handle)
02077 {
02078   // This dispatch is used for I/O entires.
02079 
02080   ACE_WFMO_Reactor_Handler_Repository::Current_Info &current_info =
02081     this->handler_rep_.current_info ()[slot];
02082 
02083   WSANETWORKEVENTS events;
02084   ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02085   if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
02086                               event_handle,
02087                               &events) == SOCKET_ERROR)
02088     problems = ACE_Event_Handler::ALL_EVENTS_MASK;
02089   else
02090     {
02091       // Prepare for upcalls. Clear the bits from <events> representing
02092       // events the handler is not interested in. If there are any left,
02093       // do the upcall(s). upcall will replace events.lNetworkEvents
02094       // with bits representing any functions that requested a repeat
02095       // callback before checking handles again. In this case, continue
02096       // to call back unless the handler is unregistered as a result of
02097       // one of the upcalls. The way this is written, the upcalls will
02098       // keep being done even if one or more upcalls reported problems.
02099       // In practice this may turn out not so good, but let's see. If any
02100       // problems, please notify Steve Huston <shuston@riverace.com>
02101       // before or after you change this code.
02102       events.lNetworkEvents &= current_info.network_events_;
02103       while (events.lNetworkEvents != 0)
02104         {
02105           ACE_Event_Handler *event_handler =
02106             current_info.event_handler_;
02107 
02108           int reference_counting_required =
02109             event_handler->reference_counting_policy ().value () ==
02110             ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02111 
02112           // Call add_reference() if needed.
02113           if (reference_counting_required)
02114             {
02115               event_handler->add_reference ();
02116             }
02117 
02118           // Upcall
02119           problems |= this->upcall (current_info.event_handler_,
02120                                     current_info.io_handle_,
02121                                     events);
02122 
02123           // Call remove_reference() if needed.
02124           if (reference_counting_required)
02125             {
02126               event_handler->remove_reference ();
02127             }
02128 
02129           if (this->handler_rep_.scheduled_for_deletion (slot))
02130             break;
02131         }
02132     }
02133 
02134   if (problems != ACE_Event_Handler::NULL_MASK
02135       && !this->handler_rep_.scheduled_for_deletion (slot)  )
02136     this->handler_rep_.unbind (event_handle, problems);
02137 
02138   return 0;
02139 }
02140 
02141 ACE_Reactor_Mask
02142 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
02143                           ACE_HANDLE io_handle,
02144                           WSANETWORKEVENTS &events)
02145 {
02146   // This method figures out what exactly has happened to the socket
02147   // and then calls appropriate methods.
02148   ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02149 
02150   // Go through the events and do the indicated upcalls. If the handler
02151   // doesn't want to be called back, clear the bit for that event.
02152   // At the end, set the bits back to <events> to request a repeat call.
02153 
02154   long actual_events = events.lNetworkEvents;
02155   int action;
02156 
02157   if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
02158     {
02159       action = event_handler->handle_output (io_handle);
02160       if (action <= 0)
02161         {
02162           ACE_CLR_BITS (actual_events, FD_WRITE);
02163           if (action == -1)
02164             ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
02165         }
02166     }
02167 
02168   if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
02169     {
02170       if (events.iErrorCode[FD_CONNECT_BIT] == 0)
02171         {
02172           // Successful connect
02173           action = event_handler->handle_output (io_handle);
02174           if (action <= 0)
02175             {
02176               ACE_CLR_BITS (actual_events, FD_CONNECT);
02177               if (action == -1)
02178                 ACE_SET_BITS (problems,
02179                               ACE_Event_Handler::CONNECT_MASK);
02180             }
02181         }
02182       // Unsuccessful connect
02183       else
02184         {
02185           action = event_handler->handle_input (io_handle);
02186           if (action <= 0)
02187             {
02188               ACE_CLR_BITS (actual_events, FD_CONNECT);
02189               if (action == -1)
02190                 ACE_SET_BITS (problems,
02191                               ACE_Event_Handler::CONNECT_MASK);
02192             }
02193         }
02194     }
02195 
02196   if (ACE_BIT_ENABLED (actual_events, FD_OOB))
02197     {
02198       action = event_handler->handle_exception (io_handle);
02199       if (action <= 0)
02200         {
02201           ACE_CLR_BITS (actual_events, FD_OOB);
02202           if (action == -1)
02203             ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
02204         }
02205     }
02206 
02207   if (ACE_BIT_ENABLED (actual_events, FD_READ))
02208     {
02209       action = event_handler->handle_input (io_handle);
02210       if (action <= 0)
02211         {
02212           ACE_CLR_BITS (actual_events, FD_READ);
02213           if (action == -1)
02214             ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02215         }
02216     }
02217 
02218   if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
02219       && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
02220     {
02221       action = event_handler->handle_input (io_handle);
02222       if (action <= 0)
02223         {
02224           ACE_CLR_BITS (actual_events, FD_CLOSE);
02225           if (action == -1)
02226             ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02227         }
02228     }
02229 
02230           if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
02231     {
02232       action = event_handler->handle_input (io_handle);
02233       if (action <= 0)
02234         {
02235           ACE_CLR_BITS (actual_events, FD_ACCEPT);
02236           if (action == -1)
02237             ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
02238         }
02239     }
02240 
02241   if (ACE_BIT_ENABLED (actual_events, FD_QOS))
02242     {
02243       action = event_handler->handle_qos (io_handle);
02244       if (action <= 0)
02245         {
02246           ACE_CLR_BITS (actual_events, FD_QOS);
02247           if (action == -1)
02248             ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
02249         }
02250     }
02251 
02252   if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
02253     {
02254       action = event_handler->handle_group_qos (io_handle);
02255       if (action <= 0)
02256         {
02257           ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
02258           if (action == -1)
02259             ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
02260         }
02261     }
02262 
02263   events.lNetworkEvents = actual_events;
02264   return problems;
02265 }
02266 
02267 
02268 int
02269 ACE_WFMO_Reactor::update_state (void)
02270 {
02271   // This GUARD is necessary since we are updating shared state.
02272   ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02273 
02274   // Decrement active threads
02275   --this->active_threads_;
02276 
02277   // Check if the state of the handler repository has changed or new
02278   // owner has to be set
02279   if (this->handler_rep_.changes_required () || this->new_owner ())
02280     {
02281       if (this->change_state_thread_ == 0)
02282         // Try to become the thread which will be responsible for the
02283         // changes
02284         {
02285           this->change_state_thread_ = ACE_Thread::self ();
02286           // Make sure no new threads are allowed to enter
02287           this->ok_to_wait_.reset ();
02288 
02289           if (this->active_threads_ > 0)
02290             // Check for other active threads
02291             {
02292               // Wake up all other threads
02293               this->wakeup_all_threads_.signal ();
02294               // Release <lock_>
02295               monitor.release ();
02296               // Go to sleep waiting for all other threads to get done
02297               this->waiting_to_change_state_.wait ();
02298               // Re-acquire <lock_> again
02299               monitor.acquire ();
02300             }
02301 
02302           // Note that make_changes() calls into user code which can
02303           // request other changes.  So keep looping until all
02304           // requested changes are completed.
02305           while (this->handler_rep_.changes_required ())
02306             // Make necessary changes to the handler repository
02307             this->handler_rep_.make_changes ();
02308           if (this->new_owner ())
02309             // Update the owner
02310             this->change_owner ();
02311           // Turn off <wakeup_all_threads_>
02312           this->wakeup_all_threads_.reset ();
02313           // Let everyone know that it is ok to go ahead
02314           this->ok_to_wait_.signal ();
02315           // Reset this flag
02316           this->change_state_thread_ = 0;
02317         }
02318       else if (this->active_threads_ == 0)
02319         // This thread did not get a chance to become the change
02320         // thread. If it is the last one out, it will wakeup the
02321         // change thread
02322         this->waiting_to_change_state_.signal ();
02323     }
02324   // This is if we were woken up explicitily by the user and there are
02325   // no state changes required.
02326   else if (this->active_threads_ == 0)
02327     // Turn off <wakeup_all_threads_>
02328     this->wakeup_all_threads_.reset ();
02329 
02330   return 0;
02331 }
02332 
02333 void
02334 ACE_WFMO_Reactor::dump (void) const
02335 {
02336 #if defined (ACE_HAS_DUMP)
02337   ACE_TRACE ("ACE_WFMO_Reactor::dump");
02338 
02339   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02340 
02341   ACE_DEBUG ((LM_DEBUG,
02342               ACE_TEXT ("Count of currently active threads = %d\n"),
02343               this->active_threads_));
02344 
02345   ACE_DEBUG ((LM_DEBUG,
02346               ACE_TEXT ("ID of owner thread = %d\n"),
02347               this->owner_));
02348 
02349   this->handler_rep_.dump ();
02350   this->signal_handler_->dump ();
02351   this->timer_queue_->dump ();
02352 
02353   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02354 #endif /* ACE_HAS_DUMP */
02355 }
02356 
02357 int
02358 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & /*number_of_active_handles*/,
02359                                                  ACE_Handle_Set & /*rd_mask*/)
02360 {
02361   return -1;
02362 }
02363 
02364 int
02365 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer & /*buffer*/)
02366 {
02367   return 0;
02368 }
02369 
02370 ACE_HANDLE
02371 ACE_WFMO_Reactor_Notify::notify_handle (void)
02372 {
02373   return ACE_INVALID_HANDLE;
02374 }
02375 
02376 int
02377 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE ,
02378                                            ACE_Notification_Buffer &)
02379 {
02380   return 0;
02381 }
02382 
02383 int
02384 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &)
02385 {
02386   return 0;
02387 }
02388 
02389 int
02390 ACE_WFMO_Reactor_Notify::close (void)
02391 {
02392   return -1;
02393 }
02394 
02395 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
02396   : timer_queue_ (0),
02397     message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
02398                     max_notifies * sizeof (ACE_Notification_Buffer)),
02399     max_notify_iterations_ (-1)
02400 {
02401 }
02402 
02403 int
02404 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor,
02405                                ACE_Timer_Queue *timer_queue,
02406                                int ignore_notify)
02407 {
02408   ACE_UNUSED_ARG (ignore_notify);
02409   timer_queue_ = timer_queue;
02410   return wfmo_reactor->register_handler (this);
02411 }
02412 
02413 ACE_HANDLE
02414 ACE_WFMO_Reactor_Notify::get_handle (void) const
02415 {
02416   return this->wakeup_one_thread_.handle ();
02417 }
02418 
02419 // Handle all pending notifications.
02420 
02421 int
02422 ACE_WFMO_Reactor_Notify::handle_signal (int signum,
02423                                         siginfo_t *siginfo,
02424                                         ucontext_t *)
02425 {
02426   ACE_UNUSED_ARG (signum);
02427 
02428   // Just check for sanity...
02429   if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
02430     return -1;
02431 
02432   // This will get called when <WFMO_Reactor->wakeup_one_thread_> event
02433   // is signaled.
02434   //  ACE_DEBUG ((LM_DEBUG,
02435   //             ACE_TEXT ("(%t) waking up to handle internal notifications\n")));
02436 
02437   for (int i = 1; ; ++i)
02438     {
02439       ACE_Message_Block *mb = 0;
02440       // Copy ACE_Time_Value::zero since dequeue_head will modify it.
02441       ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
02442       if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
02443         {
02444           if (errno == EWOULDBLOCK)
02445             // We've reached the end of the processing, return
02446             // normally.
02447             return 0;
02448           else
02449             return -1; // Something weird happened...
02450         }
02451       else
02452         {
02453           ACE_Notification_Buffer *buffer =
02454             reinterpret_cast <ACE_Notification_Buffer *> (mb->base ());
02455 
02456           // If eh == 0 then we've got major problems!  Otherwise, we
02457           // need to dispatch the appropriate handle_* method on the
02458           // ACE_Event_Handler pointer we've been passed.
02459 
02460           if (buffer->eh_ != 0)
02461             {
02462               ACE_Event_Handler *event_handler =
02463                 buffer->eh_;
02464 
02465               bool const requires_reference_counting =
02466                 event_handler->reference_counting_policy ().value () ==
02467                 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02468 
02469               int result = 0;
02470 
02471               switch (buffer->mask_)
02472                 {
02473                 case ACE_Event_Handler::READ_MASK:
02474                 case ACE_Event_Handler::ACCEPT_MASK:
02475                   result = event_handler->handle_input (ACE_INVALID_HANDLE);
02476                   break;
02477                 case ACE_Event_Handler::WRITE_MASK:
02478                   result = event_handler->handle_output (ACE_INVALID_HANDLE);
02479                   break;
02480                 case ACE_Event_Handler::EXCEPT_MASK:
02481                   result = event_handler->handle_exception (ACE_INVALID_HANDLE);
02482                   break;
02483                 case ACE_Event_Handler::QOS_MASK:
02484                   result = event_handler->handle_qos (ACE_INVALID_HANDLE);
02485                   break;
02486                 case ACE_Event_Handler::GROUP_QOS_MASK:
02487                   result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
02488                   break;
02489                 default:
02490                   ACE_ERROR ((LM_ERROR,
02491                               ACE_TEXT ("invalid mask = %d\n"),
02492                               buffer->mask_));
02493                   break;
02494                 }
02495 
02496               if (result == -1)
02497                 event_handler->handle_close (ACE_INVALID_HANDLE,
02498                                              ACE_Event_Handler::EXCEPT_MASK);
02499 
02500               if (requires_reference_counting)
02501                 {
02502                   event_handler->remove_reference ();
02503                 }
02504             }
02505 
02506           // Make sure to delete the memory regardless of success or
02507           // failure!
02508           mb->release ();
02509 
02510           // Bail out if we've reached the <max_notify_iterations_>.
02511           // Note that by default <max_notify_iterations_> is -1, so
02512           // we'll loop until we're done.
02513           if (i == this->max_notify_iterations_)
02514             {
02515               // If there are still notification in the queue, we need
02516               // to wake up again
02517               if (!this->message_queue_.is_empty ())
02518                 this->wakeup_one_thread_.signal ();
02519 
02520               // Break the loop as we have reached max_notify_iterations_
02521               return 0;
02522             }
02523         }
02524     }
02525 }
02526 
02527 // Notify the WFMO_Reactor, potentially enqueueing the
02528 // <ACE_Event_Handler> for subsequent processing in the WFMO_Reactor
02529 // thread of control.
02530 
02531 int
02532 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
02533                                  ACE_Reactor_Mask mask,
02534                                  ACE_Time_Value *timeout)
02535 {
02536   if (event_handler != 0)
02537     {
02538       ACE_Message_Block *mb = 0;
02539       ACE_NEW_RETURN (mb,
02540                       ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
02541                       -1);
02542 
02543       ACE_Notification_Buffer *buffer =
02544         (ACE_Notification_Buffer *) mb->base ();
02545       buffer->eh_ = event_handler;
02546       buffer->mask_ = mask;
02547 
02548       // Convert from relative time to absolute time by adding the
02549       // current time of day.  This is what <ACE_Message_Queue>
02550       // expects.
02551       if (timeout != 0)
02552         *timeout += timer_queue_->gettimeofday ();
02553 
02554       if (this->message_queue_.enqueue_tail
02555           (mb, timeout) == -1)
02556         {
02557           mb->release ();
02558           return -1;
02559         }
02560 
02561       event_handler->add_reference ();
02562     }
02563 
02564   return this->wakeup_one_thread_.signal ();
02565 }
02566 
02567 void
02568 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations)
02569 {
02570   ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02571   // Must always be > 0 or < 0 to optimize the loop exit condition.
02572   if (iterations == 0)
02573     iterations = 1;
02574 
02575   this->max_notify_iterations_ = iterations;
02576 }
02577 
02578 int
02579 ACE_WFMO_Reactor_Notify::max_notify_iterations (void)
02580 {
02581   ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02582   return this->max_notify_iterations_;
02583 }
02584 
02585 int
02586 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
02587                                                       ACE_Reactor_Mask mask)
02588 {
02589   ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
02590 
02591   // Go over message queue and take out all the matching event
02592   // handlers.  If eh == 0, purge all. Note that reactor notifies (no
02593   // handler specified) are never purged, as this may lose a needed
02594   // notify the reactor queued for itself.
02595 
02596   if (this->message_queue_.is_empty ())
02597     return 0;
02598 
02599   // Guard against new and/or delivered notifications while purging.
02600   // WARNING!!! The use of the notification queue's lock object for
02601   // this guard makes use of the knowledge that on Win32, the mutex
02602   // protecting the queue is really a CriticalSection, which is
02603   // recursive. This is how we can get away with locking it down here
02604   // and still calling member functions on the queue object.
02605   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
02606 
02607   // first, copy all to our own local queue. Since we've locked everyone out
02608   // of here, there's no need to use any synchronization on this queue.
02609   ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
02610 
02611   size_t queue_size  = this->message_queue_.message_count ();
02612   int number_purged = 0;
02613 
02614   size_t index;
02615 
02616   for (index = 0; index < queue_size; ++index)
02617     {
02618       ACE_Message_Block *mb = 0;
02619       if (-1 == this->message_queue_.dequeue_head (mb))
02620         return -1;        // This shouldn't happen...
02621 
02622       ACE_Notification_Buffer *buffer =
02623         reinterpret_cast<ACE_Notification_Buffer *> (mb->base ());
02624 
02625       // If this is not a Reactor notify (it is for a particular handler),
02626       // and it matches the specified handler (or purging all),
02627       // and applying the mask would totally eliminate the notification, then
02628       // release it and count the number purged.
02629       if ((0 != buffer->eh_) &&
02630           (0 == eh || eh == buffer->eh_) &&
02631           ACE_BIT_DISABLED (buffer->mask_, ~mask)) // the existing notification mask
02632                                                    // is left with nothing when
02633                                                    // applying the mask
02634         {
02635           ACE_Event_Handler *event_handler = buffer->eh_;
02636 
02637           event_handler->remove_reference ();
02638 
02639           mb->release ();
02640           ++number_purged;
02641         }
02642       else
02643         {
02644           // To preserve it, move it to the local_queue.  But first, if
02645           // this is not a Reactor notify (it is for a
02646           // particularhandler), and it matches the specified handler
02647           // (or purging all), then apply the mask
02648           if ((0 != buffer->eh_) &&
02649               (0 == eh || eh == buffer->eh_))
02650             ACE_CLR_BITS(buffer->mask_, mask);
02651           if (-1 == local_queue.enqueue_head (mb))
02652             return -1;
02653         }
02654     }
02655 
02656   if (this->message_queue_.message_count ())
02657     { // Should be empty!
02658       ACE_ASSERT (0);
02659       return -1;
02660     }
02661 
02662   // Now copy back from the local queue to the class queue, taking
02663   // care to preserve the original order...
02664   queue_size  = local_queue.message_count ();
02665   for (index = 0; index < queue_size; ++index)
02666     {
02667       ACE_Message_Block  *mb = 0;
02668       if (-1 == local_queue.dequeue_head (mb))
02669         {
02670           ACE_ASSERT (0);
02671           return -1;
02672         }
02673 
02674       if (-1 == this->message_queue_.enqueue_head (mb))
02675         {
02676           ACE_ASSERT (0);
02677           return -1;
02678         }
02679     }
02680 
02681   return number_purged;
02682 }
02683 
02684 void
02685 ACE_WFMO_Reactor_Notify::dump (void) const
02686 {
02687 #if defined (ACE_HAS_DUMP)
02688   ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
02689   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02690   this->timer_queue_->dump ();
02691   ACE_DEBUG ((LM_DEBUG,
02692               ACE_TEXT ("Max. iteration: %d\n"),
02693               this->max_notify_iterations_));
02694   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02695 #endif /* ACE_HAS_DUMP */
02696 }
02697 
02698 void
02699 ACE_WFMO_Reactor::max_notify_iterations (int iterations)
02700 {
02701   ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02702   ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
02703 
02704   // Must always be > 0 or < 0 to optimize the loop exit condition.
02705   this->notify_handler_->max_notify_iterations (iterations);
02706 }
02707 
02708 int
02709 ACE_WFMO_Reactor::max_notify_iterations (void)
02710 {
02711   ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02712   ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02713 
02714   return this->notify_handler_->max_notify_iterations ();
02715 }
02716 
02717 int
02718 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh,
02719                                                ACE_Reactor_Mask mask)
02720 {
02721   ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
02722   if (this->notify_handler_ == 0)
02723     return 0;
02724   else
02725     return this->notify_handler_->purge_pending_notifications (eh, mask);
02726 }
02727 
02728 int
02729 ACE_WFMO_Reactor::resumable_handler (void)
02730 {
02731   ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
02732   return 0;
02733 }
02734 
02735 
02736 // No-op WinSOCK2 methods to help WFMO_Reactor compile
02737 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
02738 int
02739 WSAEventSelect (SOCKET /* s */,
02740                 WSAEVENT /* hEventObject */,
02741                 long /* lNetworkEvents */)
02742 {
02743   return -1;
02744 }
02745 
02746 int
02747 WSAEnumNetworkEvents (SOCKET /* s */,
02748                       WSAEVENT /* hEventObject */,
02749                       LPWSANETWORKEVENTS /* lpNetworkEvents */)
02750 {
02751   return -1;
02752 }
02753 #endif /* !defined ACE_HAS_WINSOCK2 */
02754 
02755 ACE_END_VERSIONED_NAMESPACE_DECL
02756 
02757 #endif /* ACE_WIN32 */

Generated on Sun Jan 27 12:05:43 2008 for ACE by doxygen 1.3.6