WFMO_Reactor.cpp

Go to the documentation of this file.
00001 // $Id: WFMO_Reactor.cpp 81138 2008-03-28 09:18:15Z johnnyw $
00002 
00003 #include "ace/WFMO_Reactor.h"
00004 
00005 #if defined (ACE_WIN32)
00006 
00007 #include "ace/Handle_Set.h"
00008 #include "ace/Timer_Heap.h"
00009 #include "ace/Thread.h"
00010 #include "ace/OS_NS_errno.h"
00011 #include "ace/Null_Condition.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/WFMO_Reactor.inl"
00015 #endif /* __ACE_INLINE__ */
00016 
00017 ACE_RCSID(ace, WFMO_Reactor, "$Id: WFMO_Reactor.cpp 81138 2008-03-28 09:18:15Z johnnyw $")
00018 
00019 #include "ace/Auto_Ptr.h"
00020 
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor &wfmo_reactor)
00024   : wfmo_reactor_ (wfmo_reactor)
00025 {
00026 }
00027 
00028 int
00029 ACE_WFMO_Reactor_Handler_Repository::open (size_t size)
00030 {
00031   if (size > MAXIMUM_WAIT_OBJECTS)
00032     ACE_ERROR_RETURN ((LM_ERROR,
00033                        ACE_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
00034                        size,
00035                        MAXIMUM_WAIT_OBJECTS),
00036                       -1);
00037 
00038   // Dynamic allocation
00039   ACE_NEW_RETURN (this->current_handles_,
00040                   ACE_HANDLE[size],
00041                   -1);
00042   ACE_NEW_RETURN (this->current_info_,
00043                   Current_Info[size],
00044                   -1);
00045   ACE_NEW_RETURN (this->current_suspended_info_,
00046                   Suspended_Info[size],
00047                   -1);
00048   ACE_NEW_RETURN (this->to_be_added_info_,
00049                   To_Be_Added_Info[size],
00050                   -1);
00051 
00052   // Initialization
00053   this->max_size_ = size;
00054   this->max_handlep1_ = 0;
00055   this->suspended_handles_ = 0;
00056   this->handles_to_be_added_ = 0;
00057   this->handles_to_be_deleted_ = 0;
00058   this->handles_to_be_suspended_ = 0;
00059   this->handles_to_be_resumed_ = 0;
00060 
00061   for (size_t i = 0; i < size; ++i)
00062     this->current_handles_[i] = ACE_INVALID_HANDLE;
00063 
00064   return 0;
00065 }
00066 
00067 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void)
00068 {
00069   // Free up dynamically allocated space
00070   delete [] this->current_handles_;
00071   delete [] this->current_info_;
00072   delete [] this->current_suspended_info_;
00073   delete [] this->to_be_added_info_;
00074 }
00075 
00076 ACE_Reactor_Mask
00077 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks,
00078                                               ACE_Reactor_Mask change_masks,
00079                                               int operation)
00080 {
00081   // Find the old reactor masks.  This automatically does the work of
00082   // the GET_MASK operation.
00083 
00084   ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK;
00085 
00086   if (ACE_BIT_ENABLED (existing_masks, FD_READ)
00087       || ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
00088     ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK);
00089 
00090   if (ACE_BIT_ENABLED (existing_masks, FD_WRITE))
00091     ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK);
00092 
00093   if (ACE_BIT_ENABLED (existing_masks, FD_OOB))
00094     ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK);
00095 
00096   if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
00097     ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK);
00098 
00099   if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
00100     ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK);
00101 
00102   if (ACE_BIT_ENABLED (existing_masks, FD_QOS))
00103     ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK);
00104 
00105   if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
00106     ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK);
00107 
00108   switch (operation)
00109     {
00110     case ACE_Reactor::CLR_MASK:
00111       // For the CLR_MASK operation, clear only the specific masks.
00112 
00113       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00114         {
00115           ACE_CLR_BITS (existing_masks, FD_READ);
00116           ACE_CLR_BITS (existing_masks, FD_CLOSE);
00117         }
00118 
00119       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00120         ACE_CLR_BITS (existing_masks, FD_WRITE);
00121 
00122       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00123         ACE_CLR_BITS (existing_masks, FD_OOB);
00124 
00125       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00126         ACE_CLR_BITS (existing_masks, FD_ACCEPT);
00127 
00128       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00129         ACE_CLR_BITS (existing_masks, FD_CONNECT);
00130 
00131       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00132         ACE_CLR_BITS (existing_masks, FD_QOS);
00133 
00134       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00135         ACE_CLR_BITS (existing_masks, FD_GROUP_QOS);
00136 
00137       break;
00138 
00139     case ACE_Reactor::SET_MASK:
00140       // If the operation is a set, first reset any existing masks
00141 
00142       existing_masks = 0;
00143       /* FALLTHRU */
00144 
00145     case ACE_Reactor::ADD_MASK:
00146       // For the ADD_MASK and the SET_MASK operation, add only the
00147       // specific masks.
00148 
00149       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00150         {
00151           ACE_SET_BITS (existing_masks, FD_READ);
00152           ACE_SET_BITS (existing_masks, FD_CLOSE);
00153         }
00154 
00155       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00156         ACE_SET_BITS (existing_masks, FD_WRITE);
00157 
00158       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00159         ACE_SET_BITS (existing_masks, FD_OOB);
00160 
00161       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00162         ACE_SET_BITS (existing_masks, FD_ACCEPT);
00163 
00164       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00165         ACE_SET_BITS (existing_masks, FD_CONNECT);
00166 
00167       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00168         ACE_SET_BITS (existing_masks, FD_QOS);
00169 
00170       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00171         ACE_SET_BITS (existing_masks, FD_GROUP_QOS);
00172 
00173       break;
00174 
00175     case ACE_Reactor::GET_MASK:
00176 
00177       // The work for this operation is done in all cases at the
00178       // begining of the function.
00179 
00180       ACE_UNUSED_ARG (change_masks);
00181 
00182       break;
00183     }
00184 
00185   return old_masks;
00186 }
00187 
00188 int
00189 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle,
00190                                                ACE_Reactor_Mask mask,
00191                                                bool &changes_required)
00192 {
00193   int error = 0;
00194 
00195   // Remember this value; only if it changes do we need to wakeup
00196   // the other threads
00197   size_t const original_handle_count = this->handles_to_be_deleted_;
00198   size_t i;
00199 
00200   // Go through all the handles looking for <handle>.  Even if we find
00201   // it, we continue through the rest of the list since <handle> could
00202   // appear multiple times. All handles are checked.
00203 
00204   // First check the current entries
00205   for (i = 0; i < this->max_handlep1_ && error == 0; ++i)
00206     // Since the handle can either be the event or the I/O handle,
00207     // we have to check both
00208     if ((this->current_handles_[i] == handle
00209          || this->current_info_[i].io_handle_ == handle)
00210         && // Make sure that it is not already marked for deleted
00211         !this->current_info_[i].delete_entry_)
00212       {
00213         if (this->remove_handler_i (i, mask) == -1)
00214           error = 1;
00215       }
00216 
00217   // Then check the suspended entries
00218   for (i = 0; i < this->suspended_handles_ && error == 0; ++i)
00219     // Since the handle can either be the event or the I/O handle, we
00220     // have to check both
00221     if ((this->current_suspended_info_[i].io_handle_ == handle
00222          || this->current_suspended_info_[i].event_handle_ == handle)
00223         &&
00224         // Make sure that it is not already marked for deleted
00225         !this->current_suspended_info_[i].delete_entry_)
00226       {
00227         if (this->remove_suspended_handler_i (i, mask) == -1)
00228           error = 1;
00229       }
00230 
00231   // Then check the to_be_added entries
00232   for (i = 0; i < this->handles_to_be_added_ && error == 0; ++i)
00233     // Since the handle can either be the event or the I/O handle,
00234     // we have to check both
00235     if ((this->to_be_added_info_[i].io_handle_ == handle
00236          || this->to_be_added_info_[i].event_handle_ == handle)
00237         &&
00238         // Make sure that it is not already marked for deleted
00239         !this->to_be_added_info_[i].delete_entry_)
00240       {
00241         if (this->remove_to_be_added_handler_i (i, mask) == -1)
00242           error = 1;
00243       }
00244 
00245   // Only if the number of handlers to be deleted changes do we need
00246   // to wakeup the other threads
00247   if (original_handle_count < this->handles_to_be_deleted_)
00248     changes_required = true;
00249 
00250   return error ? -1 : 0;
00251 }
00252 
00253 int
00254 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
00255                                                        ACE_Reactor_Mask to_be_removed_masks)
00256 {
00257   // I/O entries
00258   if (this->current_info_[slot].io_entry_)
00259     {
00260       // See if there are other events that the <Event_Handler> is
00261       // interested in
00262       this->bit_ops (this->current_info_[slot].network_events_,
00263                      to_be_removed_masks,
00264                      ACE_Reactor::CLR_MASK);
00265 
00266       // Disassociate/Reassociate the event from/with the I/O handle.
00267       // This will depend on the value of remaining set of network
00268       // events that the <event_handler> is interested in. I don't
00269       // think we can do anything about errors here, so I will not
00270       // check this.
00271       ::WSAEventSelect ((SOCKET) this->current_info_[slot].io_handle_,
00272                         this->current_handles_[slot],
00273                         this->current_info_[slot].network_events_);
00274     }
00275   // Normal event entries.
00276   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00277     // Preserve DONT_CALL
00278     to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00279   else
00280     // Make sure that the <to_be_removed_masks> is the NULL_MASK
00281     to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00282 
00283   // If this event was marked for suspension, undo the suspension flag
00284   // and reduce the to be suspended count.
00285   if (this->current_info_[slot].suspend_entry_)
00286     {
00287       // Undo suspension
00288       this->current_info_[slot].suspend_entry_ = false;
00289       // Decrement the handle count
00290       --this->handles_to_be_suspended_;
00291     }
00292 
00293   // If there are no more events that the <Event_Handler> is
00294   // interested in, or this is a non-I/O entry, schedule the
00295   // <Event_Handler> for removal
00296   if (this->current_info_[slot].network_events_ == 0)
00297     {
00298       // Mark to be deleted
00299       this->current_info_[slot].delete_entry_ = true;
00300       // Remember the mask
00301       this->current_info_[slot].close_masks_ = to_be_removed_masks;
00302       // Increment the handle count
00303       ++this->handles_to_be_deleted_;
00304     }
00305 
00306   // Since it is not a complete removal, we'll call handle_close
00307   // for all the masks that were removed.  This does not change
00308   // the internal state of the reactor.
00309   //
00310   // Note: this condition only applies to I/O entries
00311   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00312     {
00313       ACE_HANDLE handle = this->current_info_[slot].io_handle_;
00314       this->current_info_[slot].event_handler_->handle_close (handle,
00315                                                               to_be_removed_masks);
00316     }
00317 
00318   return 0;
00319 }
00320 
00321 int
00322 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot,
00323                                                                  ACE_Reactor_Mask to_be_removed_masks)
00324 {
00325   // I/O entries
00326   if (this->current_suspended_info_[slot].io_entry_)
00327     {
00328       // See if there are other events that the <Event_Handler> is
00329       // interested in
00330       this->bit_ops (this->current_suspended_info_[slot].network_events_,
00331                      to_be_removed_masks,
00332                      ACE_Reactor::CLR_MASK);
00333 
00334       // Disassociate/Reassociate the event from/with the I/O handle.
00335       // This will depend on the value of remaining set of network
00336       // events that the <event_handler> is interested in. I don't
00337       // think we can do anything about errors here, so I will not
00338       // check this.
00339       ::WSAEventSelect ((SOCKET) this->current_suspended_info_[slot].io_handle_,
00340                         this->current_suspended_info_[slot].event_handle_,
00341                         this->current_suspended_info_[slot].network_events_);
00342     }
00343   // Normal event entries.
00344   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00345     // Preserve DONT_CALL
00346     to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00347   else
00348     // Make sure that the <to_be_removed_masks> is the NULL_MASK
00349     to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00350 
00351   // If this event was marked for resumption, undo the resumption flag
00352   // and reduce the to be resumed count.
00353   if (this->current_suspended_info_[slot].resume_entry_)
00354     {
00355       // Undo resumption
00356       this->current_suspended_info_[slot].resume_entry_ = false;
00357       // Decrement the handle count
00358       --this->handles_to_be_resumed_;
00359     }
00360 
00361   // If there are no more events that the <Event_Handler> is
00362   // interested in, or this is a non-I/O entry, schedule the
00363   // <Event_Handler> for removal
00364   if (this->current_suspended_info_[slot].network_events_ == 0)
00365     {
00366       // Mark to be deleted
00367       this->current_suspended_info_[slot].delete_entry_ = true;
00368       // Remember the mask
00369       this->current_suspended_info_[slot].close_masks_ = to_be_removed_masks;
00370       // Increment the handle count
00371       ++this->handles_to_be_deleted_;
00372     }
00373   // Since it is not a complete removal, we'll call handle_close for
00374   // all the masks that were removed.  This does not change the
00375   // internal state of the reactor.
00376   //
00377   // Note: this condition only applies to I/O entries
00378   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00379     {
00380       ACE_HANDLE handle = this->current_suspended_info_[slot].io_handle_;
00381       this->current_suspended_info_[slot].event_handler_->handle_close (handle,
00382                                                                         to_be_removed_masks);
00383     }
00384 
00385   return 0;
00386 }
00387 
00388 int
00389 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot,
00390                                                                    ACE_Reactor_Mask to_be_removed_masks)
00391 {
00392   // I/O entries
00393   if (this->to_be_added_info_[slot].io_entry_)
00394     {
00395       // See if there are other events that the <Event_Handler> is
00396       // interested in
00397       this->bit_ops (this->to_be_added_info_[slot].network_events_,
00398                      to_be_removed_masks,
00399                      ACE_Reactor::CLR_MASK);
00400 
00401       // Disassociate/Reassociate the event from/with the I/O handle.
00402       // This will depend on the value of remaining set of network
00403       // events that the <event_handler> is interested in. I don't
00404       // think we can do anything about errors here, so I will not
00405       // check this.
00406       ::WSAEventSelect ((SOCKET) this->to_be_added_info_[slot].io_handle_,
00407                         this->to_be_added_info_[slot].event_handle_,
00408                         this->to_be_added_info_[slot].network_events_);
00409     }
00410   // Normal event entries.
00411   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00412     // Preserve DONT_CALL
00413     to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00414   else
00415     // Make sure that the <to_be_removed_masks> is the NULL_MASK
00416     to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00417 
00418   // If this event was marked for suspension, undo the suspension flag
00419   // and reduce the to be suspended count.
00420   if (this->to_be_added_info_[slot].suspend_entry_)
00421     {
00422       // Undo suspension
00423       this->to_be_added_info_[slot].suspend_entry_ = false;
00424       // Decrement the handle count
00425       --this->handles_to_be_suspended_;
00426     }
00427 
00428   // If there are no more events that the <Event_Handler> is
00429   // interested in, or this is a non-I/O entry, schedule the
00430   // <Event_Handler> for removal
00431   if (this->to_be_added_info_[slot].network_events_ == 0)
00432     {
00433       // Mark to be deleted
00434       this->to_be_added_info_[slot].delete_entry_ = true;
00435       // Remember the mask
00436       this->to_be_added_info_[slot].close_masks_ = to_be_removed_masks;
00437       // Increment the handle count
00438       ++this->handles_to_be_deleted_;
00439     }
00440   // Since it is not a complete removal, we'll call handle_close
00441   // for all the masks that were removed.  This does not change
00442   // the internal state of the reactor.
00443   //
00444   // Note: this condition only applies to I/O entries
00445   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00446     {
00447       ACE_HANDLE handle = this->to_be_added_info_[slot].io_handle_;
00448       this->to_be_added_info_[slot].event_handler_->handle_close (handle,
00449                                                                   to_be_removed_masks);
00450     }
00451 
00452   return 0;
00453 }
00454 
00455 int
00456 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle,
00457                                                         bool &changes_required)
00458 {
00459   size_t i = 0;
00460 
00461   // Go through all the handles looking for <handle>.  Even if we find
00462   // it, we continue through the rest of the list since <handle> could
00463   // appear multiple times. All handles are checked.
00464 
00465   // Check the current entries first.
00466   for (i = 0; i < this->max_handlep1_; ++i)
00467     // Since the handle can either be the event or the I/O handle,
00468     // we have to check both
00469     if ((this->current_handles_[i] == handle ||
00470          this->current_info_[i].io_handle_ == handle) &&
00471         // Make sure that it is not already marked for suspension
00472         !this->current_info_[i].suspend_entry_)
00473       {
00474         // Mark to be suspended
00475         this->current_info_[i].suspend_entry_ = true;
00476         // Increment the handle count
00477         ++this->handles_to_be_suspended_;
00478         // Changes will be required
00479         changes_required = true;
00480       }
00481 
00482   // Then check the suspended entries.
00483   for (i = 0; i < this->suspended_handles_; ++i)
00484     // Since the handle can either be the event or the I/O handle,
00485     // we have to check both
00486     if ((this->current_suspended_info_[i].event_handle_ == handle ||
00487          this->current_suspended_info_[i].io_handle_ == handle) &&
00488         // Make sure that the resumption is not already undone
00489         this->current_suspended_info_[i].resume_entry_)
00490       {
00491         // Undo resumption
00492         this->current_suspended_info_[i].resume_entry_ = false;
00493         // Decrement the handle count
00494         --this->handles_to_be_resumed_;
00495         // Changes will be required
00496         changes_required = true;
00497       }
00498 
00499   // Then check the to_be_added entries.
00500   for (i = 0; i < this->handles_to_be_added_; ++i)
00501     // Since the handle can either be the event or the I/O handle,
00502     // we have to check both
00503     if ((this->to_be_added_info_[i].io_handle_ == handle ||
00504          this->to_be_added_info_[i].event_handle_ == handle) &&
00505         // Make sure that it is not already marked for suspension
00506         !this->to_be_added_info_[i].suspend_entry_)
00507       {
00508         // Mark to be suspended
00509         this->to_be_added_info_[i].suspend_entry_ = true;
00510         // Increment the handle count
00511         ++this->handles_to_be_suspended_;
00512         // Changes will be required
00513         changes_required = true;
00514       }
00515 
00516   return 0;
00517 }
00518 
00519 int
00520 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle,
00521                                                        bool &changes_required)
00522 {
00523   size_t i = 0;
00524 
00525   // Go through all the handles looking for <handle>.  Even if we find
00526   // it, we continue through the rest of the list since <handle> could
00527   // appear multiple times. All handles are checked.
00528 
00529   // Check the current entries first.
00530   for (i = 0; i < this->max_handlep1_; ++i)
00531     // Since the handle can either be the event or the I/O handle,
00532     // we have to check both
00533     if ((this->current_handles_[i] == handle ||
00534          this->current_info_[i].io_handle_ == handle) &&
00535         // Make sure that the suspension is not already undone
00536         this->current_info_[i].suspend_entry_)
00537       {
00538         // Undo suspension
00539         this->current_info_[i].suspend_entry_ = false;
00540         // Decrement the handle count
00541         --this->handles_to_be_suspended_;
00542         // Changes will be required
00543         changes_required = true;
00544       }
00545 
00546   // Then check the suspended entries.
00547   for (i = 0; i < this->suspended_handles_; ++i)
00548     // Since the handle can either be the event or the I/O handle,
00549     // we have to check both
00550     if ((this->current_suspended_info_[i].event_handle_ == handle ||
00551          this->current_suspended_info_[i].io_handle_ == handle) &&
00552         // Make sure that it is not already marked for resumption
00553         !this->current_suspended_info_[i].resume_entry_)
00554       {
00555         // Mark to be resumed
00556         this->current_suspended_info_[i].resume_entry_ = true;
00557         // Increment the handle count
00558         ++this->handles_to_be_resumed_;
00559         // Changes will be required
00560         changes_required = true;
00561       }
00562 
00563   // Then check the to_be_added entries.
00564   for (i = 0; i < this->handles_to_be_added_; ++i)
00565     // Since the handle can either be the event or the I/O handle,
00566     // we have to check both
00567     if ((this->to_be_added_info_[i].io_handle_ == handle ||
00568          this->to_be_added_info_[i].event_handle_ == handle) &&
00569         // Make sure that the suspension is not already undone
00570         this->to_be_added_info_[i].suspend_entry_)
00571       {
00572         // Undo suspension
00573         this->to_be_added_info_[i].suspend_entry_ = false;
00574         // Decrement the handle count
00575         --this->handles_to_be_suspended_;
00576         // Changes will be required
00577         changes_required = true;
00578       }
00579 
00580   return 0;
00581 }
00582 
00583 void
00584 ACE_WFMO_Reactor_Handler_Repository::unbind_all (void)
00585 {
00586   {
00587     ACE_GUARD (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_);
00588 
00589     bool dummy;
00590     size_t i;
00591 
00592     // Remove all the current handlers
00593     for (i = 0; i < this->max_handlep1_; ++i)
00594       this->unbind_i (this->current_handles_[i],
00595                       ACE_Event_Handler::ALL_EVENTS_MASK,
00596                       dummy);
00597 
00598     // Remove all the suspended handlers
00599     for (i = 0; i < this->suspended_handles_; ++i)
00600       this->unbind_i (this->current_suspended_info_[i].event_handle_,
00601                       ACE_Event_Handler::ALL_EVENTS_MASK,
00602                       dummy);
00603 
00604     // Remove all the to_be_added handlers
00605     for (i = 0; i < this->handles_to_be_added_; ++i)
00606       this->unbind_i (this->to_be_added_info_[i].event_handle_,
00607                       ACE_Event_Handler::ALL_EVENTS_MASK,
00608                       dummy);
00609   }
00610 
00611   // The guard is released here
00612 
00613   // Wake up all threads in WaitForMultipleObjects so that they can
00614   // reconsult the handle set
00615   this->wfmo_reactor_.wakeup_all_threads ();
00616 }
00617 
00618 int
00619 ACE_WFMO_Reactor_Handler_Repository::bind_i (bool io_entry,
00620                                              ACE_Event_Handler *event_handler,
00621                                              long network_events,
00622                                              ACE_HANDLE io_handle,
00623                                              ACE_HANDLE event_handle,
00624                                              bool delete_event)
00625 {
00626   if (event_handler == 0)
00627     return -1;
00628 
00629   // Make sure that the <handle> is valid
00630   if (event_handle == ACE_INVALID_HANDLE)
00631     event_handle = event_handler->get_handle ();
00632   if (this->invalid_handle (event_handle))
00633     return -1;
00634 
00635   size_t current_size = this->max_handlep1_ +
00636     this->handles_to_be_added_ -
00637     this->handles_to_be_deleted_ +
00638     this->suspended_handles_;
00639 
00640   // Make sure that there's room in the table and that total pending
00641   // additions should not exceed what the <to_be_added_info_> array
00642   // can hold.
00643   if (current_size < this->max_size_ &&
00644       this->handles_to_be_added_ < this->max_size_)
00645     {
00646       // Cache this set into the <to_be_added_info_>, till we come
00647       // around to actually adding this to the <current_info_>
00648       this->to_be_added_info_[this->handles_to_be_added_].set (event_handle,
00649                                                                io_entry,
00650                                                                event_handler,
00651                                                                io_handle,
00652                                                                network_events,
00653                                                                delete_event);
00654 
00655       ++this->handles_to_be_added_;
00656 
00657       event_handler->add_reference ();
00658 
00659       // Wake up all threads in WaitForMultipleObjects so that they can
00660       // reconsult the handle set
00661       this->wfmo_reactor_.wakeup_all_threads ();
00662     }
00663   else
00664     {
00665       errno = EMFILE;   // File descriptor table is full (better than nothing)
00666       return -1;
00667     }
00668 
00669   return 0;
00670 }
00671 
00672 int
00673 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
00674 {
00675   // Go through the entire valid array and check for all handles that
00676   // have been schedule for deletion
00677   if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
00678     {
00679       size_t i = 0;
00680       while (i < this->max_handlep1_)
00681         {
00682           // This stuff is necessary here, since we should not make
00683           // the upcall until all the internal data structures have
00684           // been updated.  This is to protect against upcalls that
00685           // try to deregister again.
00686           ACE_HANDLE handle = ACE_INVALID_HANDLE;
00687           ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00688           ACE_Event_Handler *event_handler = 0;
00689 
00690           // See if this entry is scheduled for deletion
00691           if (this->current_info_[i].delete_entry_)
00692             {
00693               // Calling the <handle_close> method here will ensure that we
00694               // will only call it once per deregistering <Event_Handler>.
00695               // This is essential in the case when the <Event_Handler> will
00696               // do something like delete itself and we have multiple
00697               // threads in WFMO_Reactor.
00698               //
00699               // Make sure that the DONT_CALL mask is not set
00700               masks = this->current_info_[i].close_masks_;
00701               if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00702                 {
00703                   // Grab the correct handle depending on the type entry
00704                   if (this->current_info_[i].io_entry_)
00705                     handle = this->current_info_[i].io_handle_;
00706                   else
00707                     handle = this->current_handles_[i];
00708 
00709                   // Event handler
00710                   event_handler = this->current_info_[i].event_handler_;
00711                 }
00712 
00713               // If <WFMO_Reactor> created the event, we need to clean it up
00714               if (this->current_info_[i].delete_event_)
00715                 ACE_OS::event_destroy (&this->current_handles_[i]);
00716 
00717               // Reduce count by one
00718               --this->handles_to_be_deleted_;
00719             }
00720 
00721           // See if this entry is scheduled for suspension
00722           else if (this->current_info_[i].suspend_entry_)
00723             {
00724               this->current_suspended_info_ [this->suspended_handles_].set (this->current_handles_[i],
00725                                                                             this->current_info_[i]);
00726               // Increase number of suspended handles
00727               ++this->suspended_handles_;
00728 
00729               // Reduce count by one
00730               --this->handles_to_be_suspended_;
00731             }
00732 
00733           // See if this entry is scheduled for deletion or suspension
00734           // If so we need to clean up
00735           if (this->current_info_[i].delete_entry_ ||
00736               this->current_info_[i].suspend_entry_  )
00737             {
00738               size_t last_valid_slot = this->max_handlep1_ - 1;
00739               // If this is the last handle in the set, no need to swap
00740               // places. Simply remove it.
00741               if (i < last_valid_slot)
00742                 // Swap this handle with the last valid handle
00743                 {
00744                   // Struct copy
00745                   this->current_info_[i] =
00746                     this->current_info_[last_valid_slot];
00747                   this->current_handles_[i] =
00748                     this->current_handles_[last_valid_slot];
00749                 }
00750               // Reset the info in this slot
00751               this->current_info_[last_valid_slot].reset ();
00752               this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
00753               --this->max_handlep1_;
00754             }
00755           else
00756             {
00757               // This current entry is not up for deletion or
00758               // suspension.  Proceed to the next entry in the current
00759               // handles.
00760               ++i;
00761             }
00762 
00763           // Now that all internal structures have been updated, make
00764           // the upcall.
00765           if (event_handler != 0)
00766             {
00767               bool const requires_reference_counting =
00768                 event_handler->reference_counting_policy ().value () ==
00769                 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00770 
00771               event_handler->handle_close (handle, masks);
00772 
00773               if (requires_reference_counting)
00774                 {
00775                   event_handler->remove_reference ();
00776                 }
00777             }
00778         }
00779     }
00780 
00781   return 0;
00782 }
00783 
00784 int
00785 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
00786 {
00787   // Go through the <suspended_handle> array
00788   if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
00789     {
00790       size_t i = 0;
00791       while (i < this->suspended_handles_)
00792         {
00793           // This stuff is necessary here, since we should not make
00794           // the upcall until all the internal data structures have
00795           // been updated.  This is to protect against upcalls that
00796           // try to deregister again.
00797           ACE_HANDLE handle = ACE_INVALID_HANDLE;
00798           ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00799           ACE_Event_Handler *event_handler = 0;
00800 
00801           // See if this entry is scheduled for deletion
00802           if (this->current_suspended_info_[i].delete_entry_)
00803             {
00804               // Calling the <handle_close> method here will ensure that we
00805               // will only call it once per deregistering <Event_Handler>.
00806               // This is essential in the case when the <Event_Handler> will
00807               // do something like delete itself and we have multiple
00808               // threads in WFMO_Reactor.
00809               //
00810               // Make sure that the DONT_CALL mask is not set
00811               masks = this->current_suspended_info_[i].close_masks_;
00812               if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00813                 {
00814                   // Grab the correct handle depending on the type entry
00815                   if (this->current_suspended_info_[i].io_entry_)
00816                     handle = this->current_suspended_info_[i].io_handle_;
00817                   else
00818                     handle = this->current_suspended_info_[i].event_handle_;
00819 
00820                   // Upcall
00821                   event_handler = this->current_suspended_info_[i].event_handler_;
00822                 }
00823 
00824               // If <WFMO_Reactor> created the event, we need to clean it up
00825               if (this->current_suspended_info_[i].delete_event_)
00826                 ACE_OS::event_destroy (&this->current_suspended_info_[i].event_handle_);
00827 
00828               // Reduce count by one
00829               --this->handles_to_be_deleted_;
00830             }
00831 
00832           else if (this->current_suspended_info_[i].resume_entry_)
00833             {
00834               // Add to the end of the current handles set
00835               this->current_handles_[this->max_handlep1_] = this->current_suspended_info_[i].event_handle_;
00836               // Struct copy
00837               this->current_info_[this->max_handlep1_].set (this->current_suspended_info_[i]);
00838               ++this->max_handlep1_;
00839 
00840               // Reduce count by one
00841               --this->handles_to_be_resumed_;
00842             }
00843 
00844           // If an entry needs to be removed, either because it
00845           // was deleted or resumed, remove it now before doing
00846           // the upcall.
00847           if (this->current_suspended_info_[i].resume_entry_ ||
00848               this->current_suspended_info_[i].delete_entry_)
00849             {
00850               size_t last_valid_slot = this->suspended_handles_ - 1;
00851               // Net effect is that we're removing an entry and
00852               // compressing the list from the end.  So, if removing
00853               // an entry from the middle, copy the last valid one to the
00854               // removed slot.  Reset the end and decrement the number
00855               // of suspended handles.
00856               if (i < last_valid_slot)
00857                 // Struct copy
00858                 this->current_suspended_info_[i] =
00859                   this->current_suspended_info_[last_valid_slot];
00860               this->current_suspended_info_[last_valid_slot].reset ();
00861               --this->suspended_handles_;
00862             }
00863           else
00864             {
00865               // This current entry is not up for deletion or
00866               // resumption.  Proceed to the next entry in the
00867               // suspended handles.
00868               ++i;
00869             }
00870 
00871           // Now that all internal structures have been updated, make
00872           // the upcall.
00873           if (event_handler != 0)
00874             {
00875               int requires_reference_counting =
00876                 event_handler->reference_counting_policy ().value () ==
00877                 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00878 
00879               event_handler->handle_close (handle, masks);
00880 
00881               if (requires_reference_counting)
00882                 {
00883                   event_handler->remove_reference ();
00884                 }
00885             }
00886         }
00887     }
00888 
00889   return 0;
00890 }
00891 
00892 int
00893 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void)
00894 {
00895   // Go through the <to_be_added_*> arrays
00896   for (size_t i = 0; i < this->handles_to_be_added_; ++i)
00897     {
00898       // This stuff is necessary here, since we should not make
00899       // the upcall until all the internal data structures have
00900       // been updated.  This is to protect against upcalls that
00901       // try to deregister again.
00902       ACE_HANDLE handle = ACE_INVALID_HANDLE;
00903       ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00904       ACE_Event_Handler *event_handler = 0;
00905 
00906       // See if this entry is scheduled for deletion
00907       if (this->to_be_added_info_[i].delete_entry_)
00908         {
00909           // Calling the <handle_close> method here will ensure that we
00910           // will only call it once per deregistering <Event_Handler>.
00911           // This is essential in the case when the <Event_Handler> will
00912           // do something like delete itself and we have multiple
00913           // threads in WFMO_Reactor.
00914           //
00915           // Make sure that the DONT_CALL mask is not set
00916           masks = this->to_be_added_info_[i].close_masks_;
00917           if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00918             {
00919               // Grab the correct handle depending on the type entry
00920               if (this->to_be_added_info_[i].io_entry_)
00921                 handle = this->to_be_added_info_[i].io_handle_;
00922               else
00923                 handle = this->to_be_added_info_[i].event_handle_;
00924 
00925               // Upcall
00926               event_handler = this->to_be_added_info_[i].event_handler_;
00927             }
00928 
00929           // If <WFMO_Reactor> created the event, we need to clean it up
00930           if (this->to_be_added_info_[i].delete_event_)
00931             ACE_OS::event_destroy (&this->to_be_added_info_[i].event_handle_);
00932 
00933           // Reduce count by one
00934           --this->handles_to_be_deleted_;
00935         }
00936 
00937       // See if this entry is scheduled for suspension
00938       else if (this->to_be_added_info_[i].suspend_entry_)
00939         {
00940           this->current_suspended_info_ [this->suspended_handles_].set (this->to_be_added_info_[i].event_handle_,
00941                                                                         this->to_be_added_info_[i]);
00942           // Increase number of suspended handles
00943           ++this->suspended_handles_;
00944 
00945           // Reduce count by one
00946           --this->handles_to_be_suspended_;
00947         }
00948 
00949       // If neither of the two flags are on, add to current
00950       else
00951         {
00952           // Add to the end of the current handles set
00953           this->current_handles_[this->max_handlep1_] = this->to_be_added_info_[i].event_handle_;
00954           // Struct copy
00955           this->current_info_[this->max_handlep1_].set (this->to_be_added_info_[i]);
00956           ++this->max_handlep1_;
00957         }
00958 
00959       // Reset the <to_be_added_info_>
00960       this->to_be_added_info_[i].reset ();
00961 
00962       // Now that all internal structures have been updated, make the
00963       // upcall.
00964       if (event_handler != 0)
00965         {
00966           int requires_reference_counting =
00967             event_handler->reference_counting_policy ().value () ==
00968             ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00969 
00970           event_handler->handle_close (handle, masks);
00971 
00972           if (requires_reference_counting)
00973             {
00974               event_handler->remove_reference ();
00975             }
00976         }
00977     }
00978 
00979   // Since all to be added handles have been taken care of, reset the
00980   // counter
00981   this->handles_to_be_added_ = 0;
00982 
00983   return 0;
00984 }
00985 
00986 void
00987 ACE_WFMO_Reactor_Handler_Repository::dump (void) const
00988 {
00989 #if defined (ACE_HAS_DUMP)
00990   size_t i = 0;
00991 
00992   ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
00993 
00994   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00995 
00996   ACE_DEBUG ((LM_DEBUG,
00997               ACE_TEXT ("Max size = %d\n"),
00998               this->max_size_));
00999 
01000   ACE_DEBUG ((LM_DEBUG,
01001               ACE_TEXT ("Current info table\n\n")));
01002   ACE_DEBUG ((LM_DEBUG,
01003               ACE_TEXT ("\tSize = %d\n"),
01004               this->max_handlep1_));
01005   ACE_DEBUG ((LM_DEBUG,
01006               ACE_TEXT ("\tHandles to be suspended = %d\n"),
01007               this->handles_to_be_suspended_));
01008 
01009   for (i = 0; i < this->max_handlep1_; ++i)
01010     this->current_info_[i].dump (this->current_handles_[i]);
01011 
01012   ACE_DEBUG ((LM_DEBUG,
01013               ACE_TEXT ("\n")));
01014 
01015   ACE_DEBUG ((LM_DEBUG,
01016               ACE_TEXT ("To-be-added info table\n\n")));
01017   ACE_DEBUG ((LM_DEBUG,
01018               ACE_TEXT ("\tSize = %d\n"),
01019               this->handles_to_be_added_));
01020 
01021   for (i = 0; i < this->handles_to_be_added_; ++i)
01022     this->to_be_added_info_[i].dump ();
01023 
01024   ACE_DEBUG ((LM_DEBUG,
01025               ACE_TEXT ("\n")));
01026 
01027   ACE_DEBUG ((LM_DEBUG,
01028               ACE_TEXT ("Suspended info table\n\n")));
01029   ACE_DEBUG ((LM_DEBUG,
01030               ACE_TEXT ("\tSize = %d\n"),
01031               this->suspended_handles_));
01032   ACE_DEBUG ((LM_DEBUG,
01033               ACE_TEXT ("\tHandles to be resumed = %d\n"),
01034               this->handles_to_be_resumed_));
01035 
01036   for (i = 0; i < this->suspended_handles_; ++i)
01037     this->current_suspended_info_[i].dump ();
01038 
01039   ACE_DEBUG ((LM_DEBUG,
01040               ACE_TEXT ("\n")));
01041 
01042   ACE_DEBUG ((LM_DEBUG,
01043               ACE_TEXT ("Total handles to be deleted = %d\n"),
01044               this->handles_to_be_deleted_));
01045 
01046   ACE_DEBUG ((LM_DEBUG,
01047               ACE_END_DUMP));
01048 #endif /* ACE_HAS_DUMP */
01049 }
01050 
01051 /************************************************************/
01052 
01053 int
01054 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
01055 {
01056   ACE_NOTSUP_RETURN (-1);
01057 }
01058 
01059 #if defined (ACE_WIN32_VC8)
01060 #  pragma warning (push)
01061 #  pragma warning (disable:4355)  /* Use of 'this' in initializer list */
01062 #  endif
01063 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
01064                                     ACE_Timer_Queue *tq,
01065                                     ACE_Reactor_Notify *notify)
01066   : signal_handler_ (0),
01067     delete_signal_handler_ (false),
01068     timer_queue_ (0),
01069     delete_timer_queue_ (false),
01070     delete_handler_rep_ (false),
01071     notify_handler_ (0),
01072     delete_notify_handler_ (false),
01073     lock_adapter_ (lock_),
01074     handler_rep_ (*this),
01075     // this event is initially signaled
01076     ok_to_wait_ (1),
01077     // this event is initially unsignaled
01078     wakeup_all_threads_ (0),
01079     // this event is initially unsignaled
01080     waiting_to_change_state_ (0),
01081     active_threads_ (0),
01082     owner_ (ACE_Thread::self ()),
01083     new_owner_ (0),
01084     change_state_thread_ (0),
01085     open_for_business_ (false),
01086     deactivated_ (0)
01087 {
01088   if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
01089     ACE_ERROR ((LM_ERROR,
01090                 ACE_TEXT ("%p\n"),
01091                 ACE_TEXT ("WFMO_Reactor")));
01092 }
01093 
01094 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
01095                                     int unused,
01096                                     ACE_Sig_Handler *sh,
01097                                     ACE_Timer_Queue *tq,
01098                                     ACE_Reactor_Notify *notify)
01099   : signal_handler_ (0),
01100     delete_signal_handler_ (false),
01101     timer_queue_ (0),
01102     delete_timer_queue_ (false),
01103     delete_handler_rep_ (false),
01104     notify_handler_ (0),
01105     delete_notify_handler_ (false),
01106     lock_adapter_ (lock_),
01107     handler_rep_ (*this),
01108     // this event is initially signaled
01109     ok_to_wait_ (1),
01110     // this event is initially unsignaled
01111     wakeup_all_threads_ (0),
01112     // this event is initially unsignaled
01113     waiting_to_change_state_ (0),
01114     active_threads_ (0),
01115     owner_ (ACE_Thread::self ()),
01116     new_owner_ (0),
01117     change_state_thread_ (0),
01118     open_for_business_ (false),
01119     deactivated_ (0)
01120 {
01121   ACE_UNUSED_ARG (unused);
01122 
01123   if (this->open (size, 0, sh, tq, 0, notify) == -1)
01124     ACE_ERROR ((LM_ERROR,
01125                 ACE_TEXT ("%p\n"),
01126                 ACE_TEXT ("WFMO_Reactor")));
01127 }
01128 #if defined (ACE_WIN32_VC8)
01129 #  pragma warning (pop)
01130 #endif
01131 
01132 int
01133 ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &)
01134 {
01135   return -1;
01136 }
01137 
01138 int
01139 ACE_WFMO_Reactor::open (size_t size,
01140                         int unused,
01141                         ACE_Sig_Handler *sh,
01142                         ACE_Timer_Queue *tq,
01143                         int disable_notify_pipe,
01144                         ACE_Reactor_Notify *notify)
01145 {
01146   ACE_UNUSED_ARG (unused);
01147   ACE_UNUSED_ARG (disable_notify_pipe);
01148 
01149   // This GUARD is necessary since we are updating shared state.
01150   ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01151 
01152   // If we are already open, return -1
01153   if (this->open_for_business_)
01154     return -1;
01155 
01156   // Timer Queue
01157   if (this->delete_timer_queue_)
01158     delete this->timer_queue_;
01159 
01160   if (tq == 0)
01161     {
01162       ACE_NEW_RETURN (this->timer_queue_,
01163                       ACE_Timer_Heap,
01164                       -1);
01165       this->delete_timer_queue_ = true;
01166     }
01167   else
01168     {
01169       this->timer_queue_ = tq;
01170       this->delete_timer_queue_ = false;
01171     }
01172 
01173   // Signal Handler
01174   if (this->delete_signal_handler_)
01175     delete this->signal_handler_;
01176 
01177   if (sh == 0)
01178     {
01179       ACE_NEW_RETURN (this->signal_handler_,
01180                       ACE_Sig_Handler,
01181                       -1);
01182       this->delete_signal_handler_ = true;
01183     }
01184   else
01185     {
01186       this->signal_handler_ = sh;
01187       this->delete_signal_handler_ = false;
01188     }
01189 
01190   // Setup the atomic wait array (used later in <handle_events>)
01191   this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
01192   this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
01193 
01194   // Prevent memory leaks when the ACE_WFMO_Reactor is reopened.
01195   if (this->delete_handler_rep_)
01196     {
01197       if (this->handler_rep_.changes_required ())
01198         {
01199           // Make necessary changes to the handler repository
01200           this->handler_rep_.make_changes ();
01201           // Turn off <wakeup_all_threads_> since all necessary changes
01202           // have completed
01203           this->wakeup_all_threads_.reset ();
01204         }
01205 
01206       this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository ();
01207     }
01208 
01209   // Open the handle repository.  Two additional handles for internal
01210   // purposes
01211   if (this->handler_rep_.open (size + 2) == -1)
01212     ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
01213                        ACE_TEXT ("opening handler repository")),
01214                       -1);
01215   else
01216     this->delete_handler_rep_ = true;
01217 
01218   if (this->notify_handler_ != 0 && this->delete_notify_handler_)
01219     delete this->notify_handler_;
01220 
01221   this->notify_handler_ = notify;
01222 
01223   if (this->notify_handler_ == 0)
01224     {
01225       ACE_NEW_RETURN (this->notify_handler_,
01226                       ACE_WFMO_Reactor_Notify,
01227                       -1);
01228 
01229       if (this->notify_handler_ == 0)
01230         return -1;
01231       else
01232         this->delete_notify_handler_ = true;
01233     }
01234 
01235   /* NOTE */
01236   // The order of the following two registrations is very important
01237 
01238   // Open the notification handler
01239   if (this->notify_handler_->open (this, this->timer_queue_) == -1)
01240     ACE_ERROR_RETURN ((LM_ERROR,
01241                        ACE_TEXT ("%p\n"),
01242                        ACE_TEXT ("opening notify handler ")),
01243                       -1);
01244 
01245   // Register for <wakeup_all_threads> event
01246   if (this->register_handler (&this->wakeup_all_threads_handler_,
01247                               this->wakeup_all_threads_.handle ()) == -1)
01248     ACE_ERROR_RETURN ((LM_ERROR,
01249                        ACE_TEXT ("%p\n"),
01250                        ACE_TEXT ("registering thread wakeup handler")),
01251                       -1);
01252 
01253   // Since we have added two handles into the handler repository,
01254   // update the <handler_repository_>
01255   if (this->handler_rep_.changes_required ())
01256     {
01257       // Make necessary changes to the handler repository
01258       this->handler_rep_.make_changes ();
01259       // Turn off <wakeup_all_threads_> since all necessary changes
01260       // have completed
01261       this->wakeup_all_threads_.reset ();
01262     }
01263 
01264   // We are open for business
01265   this->open_for_business_ = true;
01266 
01267   return 0;
01268 }
01269 
01270 int
01271 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
01272 {
01273   if (this->signal_handler_ != 0 && this->delete_signal_handler_)
01274     delete this->signal_handler_;
01275   this->signal_handler_ = signal_handler;
01276   this->delete_signal_handler_ = false;
01277   return 0;
01278 }
01279 
01280 ACE_Timer_Queue *
01281 ACE_WFMO_Reactor::timer_queue (void) const
01282 {
01283   return this->timer_queue_;
01284 }
01285 
01286 int
01287 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue *tq)
01288 {
01289   if (this->timer_queue_ != 0 && this->delete_timer_queue_)
01290     delete this->timer_queue_;
01291   this->timer_queue_ = tq;
01292   this->delete_timer_queue_ = false;
01293   return 0;
01294 }
01295 
01296 int
01297 ACE_WFMO_Reactor::close (void)
01298 {
01299   // This GUARD is necessary since we are updating shared state.
01300   ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01301 
01302   // If we are already closed, return error
01303   if (!this->open_for_business_)
01304     return -1;
01305 
01306   // We are now closed
01307   this->open_for_business_ = false;
01308   // This will unregister all handles
01309   this->handler_rep_.close ();
01310 
01311   return 0;
01312 }
01313 
01314 ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void)
01315 {
01316   // Assumption: No threads are left in the Reactor when this method
01317   // is called (i.e., active_threads_ == 0)
01318 
01319   // Close down
01320   this->close ();
01321 
01322   // Make necessary changes to the handler repository that we caused
01323   // by <close>.
01324   this->handler_rep_.make_changes ();
01325 
01326   if (this->delete_timer_queue_)
01327     {
01328       delete this->timer_queue_;
01329       this->timer_queue_ = 0;
01330       this->delete_timer_queue_ = false;
01331     }
01332 
01333   if (this->delete_signal_handler_)
01334     {
01335       delete this->signal_handler_;
01336       this->signal_handler_ = 0;
01337       this->delete_signal_handler_ = false;
01338     }
01339 
01340   if (this->delete_notify_handler_)
01341     {
01342       delete this->notify_handler_;
01343       this->notify_handler_ = 0;
01344       this->delete_notify_handler_ = false;
01345     }
01346 }
01347 
01348 int
01349 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
01350                                       ACE_HANDLE io_handle,
01351                                       ACE_Event_Handler *event_handler,
01352                                       ACE_Reactor_Mask new_masks)
01353 {
01354   // If this is a Winsock 1 system, the underlying event assignment will
01355   // not work, so don't try. Winsock 1 must use ACE_Select_Reactor for
01356   // reacting to socket activity.
01357 
01358 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
01359 
01360   ACE_UNUSED_ARG (event_handle);
01361   ACE_UNUSED_ARG (io_handle);
01362   ACE_UNUSED_ARG (event_handler);
01363   ACE_UNUSED_ARG (new_masks);
01364   ACE_NOTSUP_RETURN (-1);
01365 
01366 #else
01367 
01368   // Make sure that the <handle> is valid
01369   if (io_handle == ACE_INVALID_HANDLE)
01370     io_handle = event_handler->get_handle ();
01371 
01372   if (this->handler_rep_.invalid_handle (io_handle))
01373     {
01374       errno = ERROR_INVALID_HANDLE;
01375       return -1;
01376     }
01377 
01378   long new_network_events = 0;
01379   bool delete_event = false;
01380   auto_ptr <ACE_Auto_Event> event;
01381 
01382   // Look up the repository to see if the <event_handler> is already
01383   // there.
01384   ACE_Reactor_Mask old_masks;
01385   int found = this->handler_rep_.modify_network_events_i (io_handle,
01386                                                           new_masks,
01387                                                           old_masks,
01388                                                           new_network_events,
01389                                                           event_handle,
01390                                                           delete_event,
01391                                                           ACE_Reactor::ADD_MASK);
01392 
01393   // Check to see if the user passed us a valid event; If not then we
01394   // need to create one
01395   if (event_handle == ACE_INVALID_HANDLE)
01396     {
01397       // Note: don't change this since some C++ compilers have
01398       // <auto_ptr>s that don't work properly...
01399       auto_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
01400       event = tmp;
01401       event_handle = event->handle ();
01402       delete_event = true;
01403     }
01404 
01405   int result = ::WSAEventSelect ((SOCKET) io_handle,
01406                                  event_handle,
01407                                  new_network_events);
01408   // If we had found the <Event_Handler> there is nothing more to do
01409   if (found)
01410     return result;
01411   else if (result != SOCKET_ERROR &&
01412            this->handler_rep_.bind_i (1,
01413                                       event_handler,
01414                                       new_network_events,
01415                                       io_handle,
01416                                       event_handle,
01417                                       delete_event) != -1)
01418     {
01419       // The <event_handler> was not found in the repository, add to
01420       // the repository.
01421       if (delete_event)
01422         {
01423           // Clear out the handle in the ACE_Auto_Event so that when
01424           // it is destroyed, the handle isn't closed out from under
01425           // the reactor. After setting it, running down the event
01426           // (via auto_ptr<> event, above) at function return will
01427           // cause an error because it'll try to close an invalid handle.
01428           // To avoid that smashing the errno value, save the errno
01429           // here, explicitly remove the event so the dtor won't do it
01430           // again, then restore errno.
01431           ACE_Errno_Guard guard (errno);
01432           event->handle (ACE_INVALID_HANDLE);
01433           event->remove ();
01434         }
01435       return 0;
01436     }
01437   else
01438     return -1;
01439 
01440 #endif /* ACE_HAS_WINSOCK2 || ACE_HAS_WINSOCK2 == 0 */
01441 
01442 }
01443 
01444 int
01445 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle,
01446                               ACE_Reactor_Mask new_masks,
01447                               int operation)
01448 {
01449   // Make sure that the <handle> is valid
01450   if (this->handler_rep_.invalid_handle (io_handle))
01451     return -1;
01452 
01453   long new_network_events = 0;
01454   bool delete_event = false;
01455   ACE_HANDLE event_handle = ACE_INVALID_HANDLE;
01456 
01457   // Look up the repository to see if the <Event_Handler> is already
01458   // there.
01459   ACE_Reactor_Mask old_masks;
01460   int found = this->handler_rep_.modify_network_events_i (io_handle,
01461                                                           new_masks,
01462                                                           old_masks,
01463                                                           new_network_events,
01464                                                           event_handle,
01465                                                           delete_event,
01466                                                           operation);
01467   if (found)
01468     {
01469       int result = ::WSAEventSelect ((SOCKET) io_handle,
01470                                      event_handle,
01471                                      new_network_events);
01472       if (result == 0)
01473         return old_masks;
01474       else
01475         return result;
01476     }
01477   else
01478     return -1;
01479 }
01480 
01481 
01482 
01483 int
01484 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle,
01485                                                               ACE_Reactor_Mask new_masks,
01486                                                               ACE_Reactor_Mask &old_masks,
01487                                                               long &new_network_events,
01488                                                               ACE_HANDLE &event_handle,
01489                                                               bool &delete_event,
01490                                                               int operation)
01491 {
01492   long *modified_network_events = &new_network_events;
01493   int found = 0;
01494   size_t i;
01495 
01496   // First go through the current entries
01497   //
01498   // Look for all entries in the current handles for matching handle
01499   // (except those that have been scheduled for deletion)
01500   for (i = 0; i < this->max_handlep1_ && !found; ++i)
01501     if (io_handle == this->current_info_[i].io_handle_ &&
01502         !this->current_info_[i].delete_entry_)
01503       {
01504         found = 1;
01505         modified_network_events = &this->current_info_[i].network_events_;
01506         delete_event = this->current_info_[i].delete_event_;
01507         event_handle = this->current_handles_[i];
01508       }
01509 
01510   // Then pass through the suspended handles
01511   //
01512   // Look for all entries in the suspended handles for matching handle
01513   // (except those that have been scheduled for deletion)
01514   for (i = 0; i < this->suspended_handles_ && !found; ++i)
01515     if (io_handle == this->current_suspended_info_[i].io_handle_ &&
01516         !this->current_suspended_info_[i].delete_entry_)
01517       {
01518         found = 1;
01519         modified_network_events = &this->current_suspended_info_[i].network_events_;
01520         delete_event = this->current_suspended_info_[i].delete_event_;
01521         event_handle = this->current_suspended_info_[i].event_handle_;
01522       }
01523 
01524   // Then check the to_be_added handles
01525   //
01526   // Look for all entries in the to_be_added handles for matching
01527   // handle (except those that have been scheduled for deletion)
01528   for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01529     if (io_handle == this->to_be_added_info_[i].io_handle_ &&
01530         !this->to_be_added_info_[i].delete_entry_)
01531       {
01532         found = 1;
01533         modified_network_events = &this->to_be_added_info_[i].network_events_;
01534         delete_event = this->to_be_added_info_[i].delete_event_;
01535         event_handle = this->to_be_added_info_[i].event_handle_;
01536       }
01537 
01538   old_masks = this->bit_ops (*modified_network_events,
01539                              new_masks,
01540                              operation);
01541 
01542   new_network_events = *modified_network_events;
01543 
01544   return found;
01545 }
01546 
01547 ACE_Event_Handler *
01548 ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle)
01549 {
01550   long existing_masks_ignored = 0;
01551   return this->handler (handle, existing_masks_ignored);
01552 }
01553 
01554 ACE_Event_Handler *
01555 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01556                                               long &existing_masks)
01557 {
01558   int found = 0;
01559   size_t i = 0;
01560   ACE_Event_Handler *event_handler = 0;
01561   existing_masks = 0;
01562 
01563   // Look for the handle first
01564 
01565   // First go through the current entries
01566   //
01567   // Look for all entries in the current handles for matching handle
01568   // (except those that have been scheduled for deletion)
01569   for (i = 0; i < this->max_handlep1_ && !found; ++i)
01570     if ((handle == this->current_info_[i].io_handle_ ||
01571          handle == this->current_handles_[i]) &&
01572         !this->current_info_[i].delete_entry_)
01573       {
01574         found = 1;
01575         event_handler = this->current_info_[i].event_handler_;
01576         existing_masks = this->current_info_[i].network_events_;
01577       }
01578 
01579   // Then pass through the suspended handles
01580   //
01581   // Look for all entries in the suspended handles for matching handle
01582   // (except those that have been scheduled for deletion)
01583   for (i = 0; i < this->suspended_handles_ && !found; ++i)
01584     if ((handle == this->current_suspended_info_[i].io_handle_ ||
01585          handle == this->current_suspended_info_[i].event_handle_) &&
01586         !this->current_suspended_info_[i].delete_entry_)
01587       {
01588         found = 1;
01589         event_handler = this->current_suspended_info_[i].event_handler_;
01590         existing_masks = this->current_suspended_info_[i].network_events_;
01591       }
01592 
01593   // Then check the to_be_added handles
01594   //
01595   // Look for all entries in the to_be_added handles for matching
01596   // handle (except those that have been scheduled for deletion)
01597   for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01598     if ((handle == this->to_be_added_info_[i].io_handle_ ||
01599          handle == this->to_be_added_info_[i].event_handle_) &&
01600         !this->to_be_added_info_[i].delete_entry_)
01601       {
01602         found = 1;
01603         event_handler = this->to_be_added_info_[i].event_handler_;
01604         existing_masks = this->to_be_added_info_[i].network_events_;
01605       }
01606 
01607   if (event_handler)
01608     event_handler->add_reference ();
01609 
01610   return event_handler;
01611 }
01612 
01613 int
01614 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01615                                               ACE_Reactor_Mask user_masks,
01616                                               ACE_Event_Handler **user_event_handler)
01617 {
01618   long existing_masks = 0;
01619   int found = 0;
01620 
01621   ACE_Event_Handler_var safe_event_handler =
01622     this->handler (handle,
01623                    existing_masks);
01624 
01625   if (safe_event_handler.handler ())
01626     found = 1;
01627 
01628   if (!found)
01629     return -1;
01630 
01631   // Otherwise, make sure that the masks that the user is looking for
01632   // are on.
01633   if (found &&
01634       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
01635     if (!ACE_BIT_ENABLED (existing_masks, FD_READ) &&
01636         !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
01637       found = 0;
01638 
01639   if (found &&
01640       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::WRITE_MASK))
01641     if (!ACE_BIT_ENABLED (existing_masks, FD_WRITE))
01642       found = 0;
01643 
01644   if (found &&
01645       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::EXCEPT_MASK))
01646     if (!ACE_BIT_ENABLED (existing_masks, FD_OOB))
01647       found = 0;
01648 
01649   if (found &&
01650       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::ACCEPT_MASK))
01651     if (!ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
01652       found = 0;
01653 
01654   if (found &&
01655       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::CONNECT_MASK))
01656     if (!ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
01657       found = 0;
01658 
01659   if (found &&
01660       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::QOS_MASK))
01661     if (!ACE_BIT_ENABLED (existing_masks, FD_QOS))
01662       found = 0;
01663 
01664   if (found &&
01665       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::GROUP_QOS_MASK))
01666     if (!ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
01667       found = 0;
01668 
01669   if (found &&
01670       user_event_handler)
01671     *user_event_handler = safe_event_handler.release ();
01672 
01673   if (found)
01674     return 0;
01675   else
01676     return -1;
01677 }
01678 
01679 // Waits for and dispatches all events.  Returns -1 on error, 0 if
01680 // max_wait_time expired, or the number of events that were dispatched.
01681 int
01682 ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
01683                                   int alertable)
01684 {
01685   ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
01686 
01687   // Make sure we are not closed
01688   if (!this->open_for_business_ || this->deactivated_)
01689     return -1;
01690 
01691   // Stash the current time -- the destructor of this object will
01692   // automatically compute how much time elapsed since this method was
01693   // called.
01694   ACE_Countdown_Time countdown (max_wait_time);
01695 
01696   int result;
01697   do
01698     {
01699       // Check to see if it is ok to enter ::WaitForMultipleObjects
01700       // This will acquire <this->lock_> on success On failure, the
01701       // lock will not be acquired
01702       result = this->ok_to_wait (max_wait_time, alertable);
01703       if (result != 1)
01704         return result;
01705 
01706       // Increment the number of active threads
01707       ++this->active_threads_;
01708 
01709       // Release the <lock_>
01710       this->lock_.release ();
01711 
01712       // Update the countdown to reflect time waiting to play with the
01713       // mut and event.
01714       countdown.update ();
01715 
01716       // Calculate timeout
01717       int timeout = this->calculate_timeout (max_wait_time);
01718 
01719       // Wait for event to happen
01720       DWORD wait_status = this->wait_for_multiple_events (timeout,
01721                                                           alertable);
01722 
01723       // Upcall
01724       result = this->safe_dispatch (wait_status);
01725       if (0 == result)
01726         {
01727           // wait_for_multiple_events timed out without dispatching
01728           // anything.  Because of rounding and conversion errors and
01729           // such, it could be that the wait loop timed out, but
01730           // the timer queue said it wasn't quite ready to expire a
01731           // timer. In this case, max_wait_time won't have quite been
01732           // reduced to 0, and we need to go around again. If max_wait_time
01733           // is all the way to 0, just return, as the entire time the
01734           // caller wanted to wait has been used up.
01735           countdown.update ();     // Reflect time waiting for events
01736           if (0 == max_wait_time || max_wait_time->usec () == 0)
01737             break;
01738         }
01739     }
01740   while (result == 0);
01741 
01742   return result;
01743 }
01744 
01745 int
01746 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value *max_wait_time,
01747                               int alertable)
01748 {
01749   // Calculate the max time we should spend here
01750   //
01751   // Note: There is really no need to involve the <timer_queue_> here
01752   // because even if a timeout in the <timer_queue_> does expire we
01753   // will not be able to dispatch it
01754 
01755   // We need to wait for both the <lock_> and <ok_to_wait_> event.
01756   // If not on WinCE, use WaitForMultipleObjects() to wait for both atomically.
01757   // On WinCE, the waitAll arg to WFMO must be false, so wait for the
01758   // ok_to_wait_ event first (since that's likely to take the longest) then
01759   // grab the lock and recheck the ok_to_wait_ event. When we can get them
01760   // both, or there's an error/timeout, return.
01761 #if defined (ACE_HAS_WINCE)
01762   ACE_Time_Value timeout = ACE_OS::gettimeofday ();
01763   if (max_wait_time != 0)
01764     timeout += *max_wait_time;
01765   while (1)
01766     {
01767       int status;
01768       if (max_wait_time == 0)
01769         status = this->ok_to_wait_.wait ();
01770       else
01771         status = this->ok_to_wait_.wait (&timeout);
01772       if (status == -1)
01773         return -1;
01774       // The event is signaled, so it's ok to wait; grab the lock and
01775       // recheck the event. If something has changed, restart the wait.
01776       if (max_wait_time == 0)
01777         status = this->lock_.acquire ();
01778       else
01779         status = this->lock_.acquire (timeout);
01780       if (status == -1)
01781         return -1;
01782 
01783       // Have the lock_, now re-check the event. If it's not signaled,
01784       // another thread changed something so go back and wait again.
01785       ACE_Time_Value poll_it = ACE_OS::gettimeofday ();
01786       if (this->ok_to_wait_.wait (&poll_it) == 0)
01787         break;
01788       this->lock_.release ();
01789     }
01790   return 1;
01791 
01792 #else
01793   int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
01794   DWORD result = 0;
01795   while (1)
01796     {
01797 #  if defined (ACE_HAS_PHARLAP)
01798       // PharLap doesn't implement WaitForMultipleObjectsEx, and doesn't
01799       // do async I/O, so it's not needed in this case anyway.
01800       result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01801                                          this->atomic_wait_array_,
01802                                          TRUE,
01803                                          timeout);
01804 
01805       if (result != WAIT_IO_COMPLETION)
01806         break;
01807 
01808 #  else
01809       result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01810                                            this->atomic_wait_array_,
01811                                            TRUE,
01812                                            timeout,
01813                                            alertable);
01814 
01815       if (result != WAIT_IO_COMPLETION)
01816         break;
01817 
01818 #  endif /* ACE_HAS_PHARLAP */
01819     }
01820 
01821   switch (result)
01822     {
01823     case WAIT_TIMEOUT:
01824       errno = ETIME;
01825       return 0;
01826     case WAIT_FAILED:
01827     case WAIT_ABANDONED_0:
01828       ACE_OS::set_errno_to_last_error ();
01829       return -1;
01830     default:
01831       break;
01832     }
01833 
01834   // It is ok to enter ::WaitForMultipleObjects
01835   return 1;
01836 #endif /* ACE_HAS_WINCE */
01837 }
01838 
01839 DWORD
01840 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
01841                                             int alertable)
01842 {
01843   // Wait for any of handles_ to be active, or until timeout expires.
01844   // If <alertable> is enabled allow asynchronous completion of
01845   // ReadFile and WriteFile operations.
01846 
01847 #if defined (ACE_HAS_PHARLAP) || defined (ACE_HAS_WINCE)
01848   // PharLap doesn't do async I/O and doesn't implement
01849   // WaitForMultipleObjectsEx, so use WaitForMultipleObjects.
01850   ACE_UNUSED_ARG (alertable);
01851   return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 (),
01852                                    this->handler_rep_.handles (),
01853                                    FALSE,
01854                                    timeout);
01855 #else
01856   return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
01857                                      this->handler_rep_.handles (),
01858                                      FALSE,
01859                                      timeout,
01860                                      alertable);
01861 #endif /* ACE_HAS_PHARLAP */
01862 }
01863 
01864 DWORD
01865 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot)
01866 {
01867   return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - slot,
01868                                    this->handler_rep_.handles () + slot,
01869                                    FALSE,
01870                                    0);
01871 }
01872 
01873 int
01874 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value *max_wait_time)
01875 {
01876   ACE_Time_Value *time = 0;
01877   if (this->owner_ == ACE_Thread::self ())
01878     time = this->timer_queue_->calculate_timeout (max_wait_time);
01879   else
01880     time = max_wait_time;
01881 
01882   if (time == 0)
01883     return INFINITE;
01884   else
01885     return time->msec ();
01886 }
01887 
01888 
01889 int
01890 ACE_WFMO_Reactor::expire_timers (void)
01891 {
01892   // If "owner" thread
01893   if (ACE_Thread::self () == this->owner_)
01894     // expire all pending timers.
01895     return this->timer_queue_->expire ();
01896 
01897   else
01898     // Nothing to expire
01899     return 0;
01900 }
01901 
01902 int
01903 ACE_WFMO_Reactor::dispatch (DWORD wait_status)
01904 {
01905   int handlers_dispatched = 0;
01906 
01907   // Expire timers
01908   handlers_dispatched += this->expire_timers ();
01909 
01910   switch (wait_status)
01911     {
01912     case WAIT_FAILED: // Failure.
01913       ACE_OS::set_errno_to_last_error ();
01914       return -1;
01915 
01916     case WAIT_TIMEOUT: // Timeout.
01917       errno = ETIME;
01918       return handlers_dispatched;
01919 
01920 #ifndef ACE_HAS_WINCE
01921     case WAIT_IO_COMPLETION: // APC.
01922       return handlers_dispatched;
01923 #endif  // ACE_HAS_WINCE
01924 
01925     default:  // Dispatch.
01926       // We'll let dispatch worry about abandoned mutes.
01927       handlers_dispatched += this->dispatch_handles (wait_status);
01928       return handlers_dispatched;
01929     }
01930 }
01931 
01932 // Dispatches any active handles from <handles_[slot]> to
01933 // <handles_[max_handlep1_]>, polling through our handle set looking
01934 // for active handles.
01935 int
01936 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
01937 {
01938   // dispatch_slot is the absolute slot.  Only += is used to
01939   // increment it.
01940   DWORD dispatch_slot = 0;
01941 
01942   // Cache this value, this is the absolute value.
01943   DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
01944 
01945   // nCount starts off at <max_handlep1>, this is a transient count of
01946   // handles last waited on.
01947   DWORD nCount = max_handlep1;
01948 
01949   for (int number_of_handlers_dispatched = 1;
01950        ;
01951        ++number_of_handlers_dispatched)
01952     {
01953       const bool ok = (
01954 #if ! defined(__BORLANDC__) \
01955     && !defined (ghs) \
01956     && !defined (__MINGW32__) \
01957     && !(defined (_MSC_VER) && _MSC_VER >= 1300)
01958                  // wait_status is unsigned in Borland, Green Hills,
01959                  // mingw32 and MSVC++ >= 7.1.
01960                  // This >= is always true, with a warning.
01961                  wait_status >= WAIT_OBJECT_0 &&
01962 #endif
01963                  wait_status <= (WAIT_OBJECT_0 + nCount));
01964 
01965       if (ok)
01966         dispatch_slot += wait_status - WAIT_OBJECT_0;
01967       else
01968         // Otherwise, a handle was abandoned.
01969         dispatch_slot += wait_status - WAIT_ABANDONED_0;
01970 
01971       // Dispatch handler
01972       if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
01973         return -1;
01974 
01975       // Increment slot
01976       ++dispatch_slot;
01977 
01978       // We're done.
01979       if (dispatch_slot >= max_handlep1)
01980         return number_of_handlers_dispatched;
01981 
01982       // Readjust nCount
01983       nCount = max_handlep1 - dispatch_slot;
01984 
01985       // Check the remaining handles
01986       wait_status = this->poll_remaining_handles (dispatch_slot);
01987       switch (wait_status)
01988         {
01989         case WAIT_FAILED: // Failure.
01990           ACE_OS::set_errno_to_last_error ();
01991           /* FALLTHRU */
01992         case WAIT_TIMEOUT:
01993           // There are no more handles ready, we can return.
01994           return number_of_handlers_dispatched;
01995         }
01996     }
01997 }
01998 
01999 int
02000 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
02001                                     DWORD max_handlep1)
02002 {
02003   // Check if there are window messages that need to be dispatched
02004   if (slot == max_handlep1)
02005     return this->dispatch_window_messages ();
02006 
02007   // Dispatch the handler if it has not been scheduled for deletion.
02008   // Note that this is a very week test if there are multiple threads
02009   // dispatching this slot as no locks are held here. Generally, you
02010   // do not want to do something like deleting the this pointer in
02011   // handle_close() if you have registered multiple times and there is
02012   // more than one thread in WFMO_Reactor->handle_events().
02013   else if (!this->handler_rep_.scheduled_for_deletion (slot))
02014     {
02015       ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
02016 
02017       if (this->handler_rep_.current_info ()[slot].io_entry_)
02018         return this->complex_dispatch_handler (slot,
02019                                                event_handle);
02020       else
02021         return this->simple_dispatch_handler (slot,
02022                                               event_handle);
02023     }
02024   else
02025     // The handle was scheduled for deletion, so we will skip it.
02026     return 0;
02027 }
02028 
02029 int
02030 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
02031                                            ACE_HANDLE event_handle)
02032 {
02033   // This dispatch is used for non-I/O entires
02034 
02035   // Assign the ``signaled'' HANDLE so that callers can get it.
02036   // siginfo_t is an ACE - specific fabrication. Constructor exists.
02037   siginfo_t sig (event_handle);
02038 
02039   ACE_Event_Handler *event_handler =
02040     this->handler_rep_.current_info ()[slot].event_handler_;
02041 
02042   int requires_reference_counting =
02043     event_handler->reference_counting_policy ().value () ==
02044     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02045 
02046   if (requires_reference_counting)
02047     {
02048       event_handler->add_reference ();
02049     }
02050 
02051   // Upcall
02052   if (event_handler->handle_signal (0, &sig) == -1)
02053     this->handler_rep_.unbind (event_handle,
02054                                ACE_Event_Handler::NULL_MASK);
02055 
02056   // Call remove_reference() if needed.
02057   if (requires_reference_counting)
02058     {
02059       event_handler->remove_reference ();
02060     }
02061 
02062   return 0;
02063 }
02064 
02065 int
02066 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
02067                                             ACE_HANDLE event_handle)
02068 {
02069   // This dispatch is used for I/O entires.
02070 
02071   ACE_WFMO_Reactor_Handler_Repository::Current_Info &current_info =
02072     this->handler_rep_.current_info ()[slot];
02073 
02074   WSANETWORKEVENTS events;
02075   ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02076   if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
02077                               event_handle,
02078                               &events) == SOCKET_ERROR)
02079     problems = ACE_Event_Handler::ALL_EVENTS_MASK;
02080   else
02081     {
02082       // Prepare for upcalls. Clear the bits from <events> representing
02083       // events the handler is not interested in. If there are any left,
02084       // do the upcall(s). upcall will replace events.lNetworkEvents
02085       // with bits representing any functions that requested a repeat
02086       // callback before checking handles again. In this case, continue
02087       // to call back unless the handler is unregistered as a result of
02088       // one of the upcalls. The way this is written, the upcalls will
02089       // keep being done even if one or more upcalls reported problems.
02090       // In practice this may turn out not so good, but let's see. If any
02091       // problems, please notify Steve Huston <shuston@riverace.com>
02092       // before or after you change this code.
02093       events.lNetworkEvents &= current_info.network_events_;
02094       while (events.lNetworkEvents != 0)
02095         {
02096           ACE_Event_Handler *event_handler =
02097             current_info.event_handler_;
02098 
02099           int reference_counting_required =
02100             event_handler->reference_counting_policy ().value () ==
02101             ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02102 
02103           // Call add_reference() if needed.
02104           if (reference_counting_required)
02105             {
02106               event_handler->add_reference ();
02107             }
02108 
02109           // Upcall
02110           problems |= this->upcall (current_info.event_handler_,
02111                                     current_info.io_handle_,
02112                                     events);
02113 
02114           // Call remove_reference() if needed.
02115           if (reference_counting_required)
02116             {
02117               event_handler->remove_reference ();
02118             }
02119 
02120           if (this->handler_rep_.scheduled_for_deletion (slot))
02121             break;
02122         }
02123     }
02124 
02125   if (problems != ACE_Event_Handler::NULL_MASK
02126       && !this->handler_rep_.scheduled_for_deletion (slot)  )
02127     this->handler_rep_.unbind (event_handle, problems);
02128 
02129   return 0;
02130 }
02131 
02132 ACE_Reactor_Mask
02133 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
02134                           ACE_HANDLE io_handle,
02135                           WSANETWORKEVENTS &events)
02136 {
02137   // This method figures out what exactly has happened to the socket
02138   // and then calls appropriate methods.
02139   ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02140 
02141   // Go through the events and do the indicated upcalls. If the handler
02142   // doesn't want to be called back, clear the bit for that event.
02143   // At the end, set the bits back to <events> to request a repeat call.
02144 
02145   long actual_events = events.lNetworkEvents;
02146   int action;
02147 
02148   if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
02149     {
02150       action = event_handler->handle_output (io_handle);
02151       if (action <= 0)
02152         {
02153           ACE_CLR_BITS (actual_events, FD_WRITE);
02154           if (action == -1)
02155             ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
02156         }
02157     }
02158 
02159   if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
02160     {
02161       if (events.iErrorCode[FD_CONNECT_BIT] == 0)
02162         {
02163           // Successful connect
02164           action = event_handler->handle_output (io_handle);
02165           if (action <= 0)
02166             {
02167               ACE_CLR_BITS (actual_events, FD_CONNECT);
02168               if (action == -1)
02169                 ACE_SET_BITS (problems,
02170                               ACE_Event_Handler::CONNECT_MASK);
02171             }
02172         }
02173       // Unsuccessful connect
02174       else
02175         {
02176           action = event_handler->handle_input (io_handle);
02177           if (action <= 0)
02178             {
02179               ACE_CLR_BITS (actual_events, FD_CONNECT);
02180               if (action == -1)
02181                 ACE_SET_BITS (problems,
02182                               ACE_Event_Handler::CONNECT_MASK);
02183             }
02184         }
02185     }
02186 
02187   if (ACE_BIT_ENABLED (actual_events, FD_OOB))
02188     {
02189       action = event_handler->handle_exception (io_handle);
02190       if (action <= 0)
02191         {
02192           ACE_CLR_BITS (actual_events, FD_OOB);
02193           if (action == -1)
02194             ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
02195         }
02196     }
02197 
02198   if (ACE_BIT_ENABLED (actual_events, FD_READ))
02199     {
02200       action = event_handler->handle_input (io_handle);
02201       if (action <= 0)
02202         {
02203           ACE_CLR_BITS (actual_events, FD_READ);
02204           if (action == -1)
02205             ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02206         }
02207     }
02208 
02209   if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
02210       && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
02211     {
02212       action = event_handler->handle_input (io_handle);
02213       if (action <= 0)
02214         {
02215           ACE_CLR_BITS (actual_events, FD_CLOSE);
02216           if (action == -1)
02217             ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02218         }
02219     }
02220 
02221           if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
02222     {
02223       action = event_handler->handle_input (io_handle);
02224       if (action <= 0)
02225         {
02226           ACE_CLR_BITS (actual_events, FD_ACCEPT);
02227           if (action == -1)
02228             ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
02229         }
02230     }
02231 
02232   if (ACE_BIT_ENABLED (actual_events, FD_QOS))
02233     {
02234       action = event_handler->handle_qos (io_handle);
02235       if (action <= 0)
02236         {
02237           ACE_CLR_BITS (actual_events, FD_QOS);
02238           if (action == -1)
02239             ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
02240         }
02241     }
02242 
02243   if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
02244     {
02245       action = event_handler->handle_group_qos (io_handle);
02246       if (action <= 0)
02247         {
02248           ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
02249           if (action == -1)
02250             ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
02251         }
02252     }
02253 
02254   events.lNetworkEvents = actual_events;
02255   return problems;
02256 }
02257 
02258 
02259 int
02260 ACE_WFMO_Reactor::update_state (void)
02261 {
02262   // This GUARD is necessary since we are updating shared state.
02263   ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02264 
02265   // Decrement active threads
02266   --this->active_threads_;
02267 
02268   // Check if the state of the handler repository has changed or new
02269   // owner has to be set
02270   if (this->handler_rep_.changes_required () || this->new_owner ())
02271     {
02272       if (this->change_state_thread_ == 0)
02273         // Try to become the thread which will be responsible for the
02274         // changes
02275         {
02276           this->change_state_thread_ = ACE_Thread::self ();
02277           // Make sure no new threads are allowed to enter
02278           this->ok_to_wait_.reset ();
02279 
02280           if (this->active_threads_ > 0)
02281             // Check for other active threads
02282             {
02283               // Wake up all other threads
02284               this->wakeup_all_threads_.signal ();
02285               // Release <lock_>
02286               monitor.release ();
02287               // Go to sleep waiting for all other threads to get done
02288               this->waiting_to_change_state_.wait ();
02289               // Re-acquire <lock_> again
02290               monitor.acquire ();
02291             }
02292 
02293           // Note that make_changes() calls into user code which can
02294           // request other changes.  So keep looping until all
02295           // requested changes are completed.
02296           while (this->handler_rep_.changes_required ())
02297             // Make necessary changes to the handler repository
02298             this->handler_rep_.make_changes ();
02299           if (this->new_owner ())
02300             // Update the owner
02301             this->change_owner ();
02302           // Turn off <wakeup_all_threads_>
02303           this->wakeup_all_threads_.reset ();
02304           // Let everyone know that it is ok to go ahead
02305           this->ok_to_wait_.signal ();
02306           // Reset this flag
02307           this->change_state_thread_ = 0;
02308         }
02309       else if (this->active_threads_ == 0)
02310         // This thread did not get a chance to become the change
02311         // thread. If it is the last one out, it will wakeup the
02312         // change thread
02313         this->waiting_to_change_state_.signal ();
02314     }
02315   // This is if we were woken up explicitily by the user and there are
02316   // no state changes required.
02317   else if (this->active_threads_ == 0)
02318     // Turn off <wakeup_all_threads_>
02319     this->wakeup_all_threads_.reset ();
02320 
02321   return 0;
02322 }
02323 
02324 void
02325 ACE_WFMO_Reactor::dump (void) const
02326 {
02327 #if defined (ACE_HAS_DUMP)
02328   ACE_TRACE ("ACE_WFMO_Reactor::dump");
02329 
02330   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02331 
02332   ACE_DEBUG ((LM_DEBUG,
02333               ACE_TEXT ("Count of currently active threads = %d\n"),
02334               this->active_threads_));
02335 
02336   ACE_DEBUG ((LM_DEBUG,
02337               ACE_TEXT ("ID of owner thread = %d\n"),
02338               this->owner_));
02339 
02340   this->handler_rep_.dump ();
02341   this->signal_handler_->dump ();
02342   this->timer_queue_->dump ();
02343 
02344   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02345 #endif /* ACE_HAS_DUMP */
02346 }
02347 
02348 int
02349 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & /*number_of_active_handles*/,
02350                                                  ACE_Handle_Set & /*rd_mask*/)
02351 {
02352   return -1;
02353 }
02354 
02355 int
02356 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer & /*buffer*/)
02357 {
02358   return 0;
02359 }
02360 
02361 ACE_HANDLE
02362 ACE_WFMO_Reactor_Notify::notify_handle (void)
02363 {
02364   return ACE_INVALID_HANDLE;
02365 }
02366 
02367 int
02368 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE ,
02369                                            ACE_Notification_Buffer &)
02370 {
02371   return 0;
02372 }
02373 
02374 int
02375 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &)
02376 {
02377   return 0;
02378 }
02379 
02380 int
02381 ACE_WFMO_Reactor_Notify::close (void)
02382 {
02383   return -1;
02384 }
02385 
02386 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
02387   : timer_queue_ (0),
02388     message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
02389                     max_notifies * sizeof (ACE_Notification_Buffer)),
02390     max_notify_iterations_ (-1)
02391 {
02392 }
02393 
02394 int
02395 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor,
02396                                ACE_Timer_Queue *timer_queue,
02397                                int ignore_notify)
02398 {
02399   ACE_UNUSED_ARG (ignore_notify);
02400   timer_queue_ = timer_queue;
02401   return wfmo_reactor->register_handler (this);
02402 }
02403 
02404 ACE_HANDLE
02405 ACE_WFMO_Reactor_Notify::get_handle (void) const
02406 {
02407   return this->wakeup_one_thread_.handle ();
02408 }
02409 
02410 // Handle all pending notifications.
02411 
02412 int
02413 ACE_WFMO_Reactor_Notify::handle_signal (int signum,
02414                                         siginfo_t *siginfo,
02415                                         ucontext_t *)
02416 {
02417   ACE_UNUSED_ARG (signum);
02418 
02419   // Just check for sanity...
02420   if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
02421     return -1;
02422 
02423   // This will get called when <WFMO_Reactor->wakeup_one_thread_> event
02424   // is signaled.
02425   //  ACE_DEBUG ((LM_DEBUG,
02426   //             ACE_TEXT ("(%t) waking up to handle internal notifications\n")));
02427 
02428   for (int i = 1; ; ++i)
02429     {
02430       ACE_Message_Block *mb = 0;
02431       // Copy ACE_Time_Value::zero since dequeue_head will modify it.
02432       ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
02433       if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
02434         {
02435           if (errno == EWOULDBLOCK)
02436             // We've reached the end of the processing, return
02437             // normally.
02438             return 0;
02439           else
02440             return -1; // Something weird happened...
02441         }
02442       else
02443         {
02444           ACE_Notification_Buffer *buffer =
02445             reinterpret_cast <ACE_Notification_Buffer *> (mb->base ());
02446 
02447           // If eh == 0 then we've got major problems!  Otherwise, we
02448           // need to dispatch the appropriate handle_* method on the
02449           // ACE_Event_Handler pointer we've been passed.
02450 
02451           if (buffer->eh_ != 0)
02452             {
02453               ACE_Event_Handler *event_handler =
02454                 buffer->eh_;
02455 
02456               bool const requires_reference_counting =
02457                 event_handler->reference_counting_policy ().value () ==
02458                 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02459 
02460               int result = 0;
02461 
02462               switch (buffer->mask_)
02463                 {
02464                 case ACE_Event_Handler::READ_MASK:
02465                 case ACE_Event_Handler::ACCEPT_MASK:
02466                   result = event_handler->handle_input (ACE_INVALID_HANDLE);
02467                   break;
02468                 case ACE_Event_Handler::WRITE_MASK:
02469                   result = event_handler->handle_output (ACE_INVALID_HANDLE);
02470                   break;
02471                 case ACE_Event_Handler::EXCEPT_MASK:
02472                   result = event_handler->handle_exception (ACE_INVALID_HANDLE);
02473                   break;
02474                 case ACE_Event_Handler::QOS_MASK:
02475                   result = event_handler->handle_qos (ACE_INVALID_HANDLE);
02476                   break;
02477                 case ACE_Event_Handler::GROUP_QOS_MASK:
02478                   result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
02479                   break;
02480                 default:
02481                   ACE_ERROR ((LM_ERROR,
02482                               ACE_TEXT ("invalid mask = %d\n"),
02483                               buffer->mask_));
02484                   break;
02485                 }
02486 
02487               if (result == -1)
02488                 event_handler->handle_close (ACE_INVALID_HANDLE,
02489                                              ACE_Event_Handler::EXCEPT_MASK);
02490 
02491               if (requires_reference_counting)
02492                 {
02493                   event_handler->remove_reference ();
02494                 }
02495             }
02496 
02497           // Make sure to delete the memory regardless of success or
02498           // failure!
02499           mb->release ();
02500 
02501           // Bail out if we've reached the <max_notify_iterations_>.
02502           // Note that by default <max_notify_iterations_> is -1, so
02503           // we'll loop until we're done.
02504           if (i == this->max_notify_iterations_)
02505             {
02506               // If there are still notification in the queue, we need
02507               // to wake up again
02508               if (!this->message_queue_.is_empty ())
02509                 this->wakeup_one_thread_.signal ();
02510 
02511               // Break the loop as we have reached max_notify_iterations_
02512               return 0;
02513             }
02514         }
02515     }
02516 }
02517 
02518 // Notify the WFMO_Reactor, potentially enqueueing the
02519 // <ACE_Event_Handler> for subsequent processing in the WFMO_Reactor
02520 // thread of control.
02521 
02522 int
02523 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
02524                                  ACE_Reactor_Mask mask,
02525                                  ACE_Time_Value *timeout)
02526 {
02527   if (event_handler != 0)
02528     {
02529       ACE_Message_Block *mb = 0;
02530       ACE_NEW_RETURN (mb,
02531                       ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
02532                       -1);
02533 
02534       ACE_Notification_Buffer *buffer =
02535         (ACE_Notification_Buffer *) mb->base ();
02536       buffer->eh_ = event_handler;
02537       buffer->mask_ = mask;
02538 
02539       // Convert from relative time to absolute time by adding the
02540       // current time of day.  This is what <ACE_Message_Queue>
02541       // expects.
02542       if (timeout != 0)
02543         *timeout += timer_queue_->gettimeofday ();
02544 
02545       if (this->message_queue_.enqueue_tail
02546           (mb, timeout) == -1)
02547         {
02548           mb->release ();
02549           return -1;
02550         }
02551 
02552       event_handler->add_reference ();
02553     }
02554 
02555   return this->wakeup_one_thread_.signal ();
02556 }
02557 
02558 void
02559 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations)
02560 {
02561   ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02562   // Must always be > 0 or < 0 to optimize the loop exit condition.
02563   if (iterations == 0)
02564     iterations = 1;
02565 
02566   this->max_notify_iterations_ = iterations;
02567 }
02568 
02569 int
02570 ACE_WFMO_Reactor_Notify::max_notify_iterations (void)
02571 {
02572   ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02573   return this->max_notify_iterations_;
02574 }
02575 
02576 int
02577 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
02578                                                       ACE_Reactor_Mask mask)
02579 {
02580   ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
02581 
02582   // Go over message queue and take out all the matching event
02583   // handlers.  If eh == 0, purge all. Note that reactor notifies (no
02584   // handler specified) are never purged, as this may lose a needed
02585   // notify the reactor queued for itself.
02586 
02587   if (this->message_queue_.is_empty ())
02588     return 0;
02589 
02590   // Guard against new and/or delivered notifications while purging.
02591   // WARNING!!! The use of the notification queue's lock object for
02592   // this guard makes use of the knowledge that on Win32, the mutex
02593   // protecting the queue is really a CriticalSection, which is
02594   // recursive. This is how we can get away with locking it down here
02595   // and still calling member functions on the queue object.
02596   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
02597 
02598   // first, copy all to our own local queue. Since we've locked everyone out
02599   // of here, there's no need to use any synchronization on this queue.
02600   ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
02601 
02602   size_t queue_size  = this->message_queue_.message_count ();
02603   int number_purged = 0;
02604 
02605   size_t index;
02606 
02607   for (index = 0; index < queue_size; ++index)
02608     {
02609       ACE_Message_Block *mb = 0;
02610       if (-1 == this->message_queue_.dequeue_head (mb))
02611         return -1;        // This shouldn't happen...
02612 
02613       ACE_Notification_Buffer *buffer =
02614         reinterpret_cast<ACE_Notification_Buffer *> (mb->base ());
02615 
02616       // If this is not a Reactor notify (it is for a particular handler),
02617       // and it matches the specified handler (or purging all),
02618       // and applying the mask would totally eliminate the notification, then
02619       // release it and count the number purged.
02620       if ((0 != buffer->eh_) &&
02621           (0 == eh || eh == buffer->eh_) &&
02622           ACE_BIT_DISABLED (buffer->mask_, ~mask)) // the existing notification mask
02623                                                    // is left with nothing when
02624                                                    // applying the mask
02625         {
02626           ACE_Event_Handler *event_handler = buffer->eh_;
02627 
02628           event_handler->remove_reference ();
02629 
02630           mb->release ();
02631           ++number_purged;
02632         }
02633       else
02634         {
02635           // To preserve it, move it to the local_queue.  But first, if
02636           // this is not a Reactor notify (it is for a
02637           // particularhandler), and it matches the specified handler
02638           // (or purging all), then apply the mask
02639           if ((0 != buffer->eh_) &&
02640               (0 == eh || eh == buffer->eh_))
02641             ACE_CLR_BITS(buffer->mask_, mask);
02642           if (-1 == local_queue.enqueue_head (mb))
02643             return -1;
02644         }
02645     }
02646 
02647   if (this->message_queue_.message_count ())
02648     { // Should be empty!
02649       ACE_ASSERT (0);
02650       return -1;
02651     }
02652 
02653   // Now copy back from the local queue to the class queue, taking
02654   // care to preserve the original order...
02655   queue_size  = local_queue.message_count ();
02656   for (index = 0; index < queue_size; ++index)
02657     {
02658       ACE_Message_Block  *mb = 0;
02659       if (-1 == local_queue.dequeue_head (mb))
02660         {
02661           ACE_ASSERT (0);
02662           return -1;
02663         }
02664 
02665       if (-1 == this->message_queue_.enqueue_head (mb))
02666         {
02667           ACE_ASSERT (0);
02668           return -1;
02669         }
02670     }
02671 
02672   return number_purged;
02673 }
02674 
02675 void
02676 ACE_WFMO_Reactor_Notify::dump (void) const
02677 {
02678 #if defined (ACE_HAS_DUMP)
02679   ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
02680   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02681   this->timer_queue_->dump ();
02682   ACE_DEBUG ((LM_DEBUG,
02683               ACE_TEXT ("Max. iteration: %d\n"),
02684               this->max_notify_iterations_));
02685   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02686 #endif /* ACE_HAS_DUMP */
02687 }
02688 
02689 void
02690 ACE_WFMO_Reactor::max_notify_iterations (int iterations)
02691 {
02692   ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02693   ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
02694 
02695   // Must always be > 0 or < 0 to optimize the loop exit condition.
02696   this->notify_handler_->max_notify_iterations (iterations);
02697 }
02698 
02699 int
02700 ACE_WFMO_Reactor::max_notify_iterations (void)
02701 {
02702   ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02703   ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02704 
02705   return this->notify_handler_->max_notify_iterations ();
02706 }
02707 
02708 int
02709 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh,
02710                                                ACE_Reactor_Mask mask)
02711 {
02712   ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
02713   if (this->notify_handler_ == 0)
02714     return 0;
02715   else
02716     return this->notify_handler_->purge_pending_notifications (eh, mask);
02717 }
02718 
02719 int
02720 ACE_WFMO_Reactor::resumable_handler (void)
02721 {
02722   ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
02723   return 0;
02724 }
02725 
02726 
02727 // No-op WinSOCK2 methods to help WFMO_Reactor compile
02728 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
02729 int
02730 WSAEventSelect (SOCKET /* s */,
02731                 WSAEVENT /* hEventObject */,
02732                 long /* lNetworkEvents */)
02733 {
02734   return -1;
02735 }
02736 
02737 int
02738 WSAEnumNetworkEvents (SOCKET /* s */,
02739                       WSAEVENT /* hEventObject */,
02740                       LPWSANETWORKEVENTS /* lpNetworkEvents */)
02741 {
02742   return -1;
02743 }
02744 #endif /* !defined ACE_HAS_WINSOCK2 */
02745 
02746 ACE_END_VERSIONED_NAMESPACE_DECL
02747 
02748 #endif /* ACE_WIN32 */

Generated on Tue Feb 2 17:18:44 2010 for ACE by  doxygen 1.4.7