WFMO_Reactor.cpp

Go to the documentation of this file.
00001 // WFMO_Reactor.cpp,v 4.110 2006/06/07 03:03:54 schmidt Exp
00002 
00003 #include "ace/WFMO_Reactor.h"
00004 
00005 #if defined (ACE_WIN32)
00006 
00007 #include "ace/Handle_Set.h"
00008 #include "ace/Timer_Heap.h"
00009 #include "ace/Thread.h"
00010 #include "ace/OS_NS_errno.h"
00011 #include "ace/Null_Condition.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/WFMO_Reactor.inl"
00015 #endif /* __ACE_INLINE__ */
00016 
00017 ACE_RCSID(ace, WFMO_Reactor, "WFMO_Reactor.cpp,v 4.110 2006/06/07 03:03:54 schmidt Exp")
00018 
00019 #include "ace/Auto_Ptr.h"
00020 
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor &wfmo_reactor)
00024   : wfmo_reactor_ (wfmo_reactor)
00025 {
00026 }
00027 
00028 int
00029 ACE_WFMO_Reactor_Handler_Repository::open (size_t size)
00030 {
00031   if (size > MAXIMUM_WAIT_OBJECTS)
00032     ACE_ERROR_RETURN ((LM_ERROR,
00033                        ACE_LIB_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
00034                        size,
00035                        MAXIMUM_WAIT_OBJECTS),
00036                       -1);
00037 
00038   // Dynamic allocation
00039   ACE_NEW_RETURN (this->current_handles_,
00040                   ACE_HANDLE[size],
00041                   -1);
00042   ACE_NEW_RETURN (this->current_info_,
00043                   Current_Info[size],
00044                   -1);
00045   ACE_NEW_RETURN (this->current_suspended_info_,
00046                   Suspended_Info[size],
00047                   -1);
00048   ACE_NEW_RETURN (this->to_be_added_info_,
00049                   To_Be_Added_Info[size],
00050                   -1);
00051 
00052   // Initialization
00053   this->max_size_ = size;
00054   this->max_handlep1_ = 0;
00055   this->suspended_handles_ = 0;
00056   this->handles_to_be_added_ = 0;
00057   this->handles_to_be_deleted_ = 0;
00058   this->handles_to_be_suspended_ = 0;
00059   this->handles_to_be_resumed_ = 0;
00060 
00061   for (size_t i = 0; i < size; ++i)
00062     this->current_handles_[i] = ACE_INVALID_HANDLE;
00063 
00064   return 0;
00065 }
00066 
00067 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void)
00068 {
00069   // Free up dynamically allocated space
00070   delete [] this->current_handles_;
00071   delete [] this->current_info_;
00072   delete [] this->current_suspended_info_;
00073   delete [] this->to_be_added_info_;
00074 }
00075 
00076 ACE_Reactor_Mask
00077 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks,
00078                                               ACE_Reactor_Mask change_masks,
00079                                               int operation)
00080 {
00081   // Find the old reactor masks.  This automatically does the work of
00082   // the GET_MASK operation.
00083 
00084   ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK;
00085 
00086   if (ACE_BIT_ENABLED (existing_masks, FD_READ)
00087       || ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
00088     ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK);
00089 
00090   if (ACE_BIT_ENABLED (existing_masks, FD_WRITE))
00091     ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK);
00092 
00093   if (ACE_BIT_ENABLED (existing_masks, FD_OOB))
00094     ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK);
00095 
00096   if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
00097     ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK);
00098 
00099   if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
00100     ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK);
00101 
00102   if (ACE_BIT_ENABLED (existing_masks, FD_QOS))
00103     ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK);
00104 
00105   if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
00106     ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK);
00107 
00108   switch (operation)
00109     {
00110     case ACE_Reactor::CLR_MASK:
00111       // For the CLR_MASK operation, clear only the specific masks.
00112 
00113       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00114         {
00115           ACE_CLR_BITS (existing_masks, FD_READ);
00116           ACE_CLR_BITS (existing_masks, FD_CLOSE);
00117         }
00118 
00119       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00120         ACE_CLR_BITS (existing_masks, FD_WRITE);
00121 
00122       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00123         ACE_CLR_BITS (existing_masks, FD_OOB);
00124 
00125       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00126         ACE_CLR_BITS (existing_masks, FD_ACCEPT);
00127 
00128       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00129         ACE_CLR_BITS (existing_masks, FD_CONNECT);
00130 
00131       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00132         ACE_CLR_BITS (existing_masks, FD_QOS);
00133 
00134       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00135         ACE_CLR_BITS (existing_masks, FD_GROUP_QOS);
00136 
00137       break;
00138 
00139     case ACE_Reactor::SET_MASK:
00140       // If the operation is a set, first reset any existing masks
00141 
00142       existing_masks = 0;
00143       /* FALLTHRU */
00144 
00145     case ACE_Reactor::ADD_MASK:
00146       // For the ADD_MASK and the SET_MASK operation, add only the
00147       // specific masks.
00148 
00149       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00150         {
00151           ACE_SET_BITS (existing_masks, FD_READ);
00152           ACE_SET_BITS (existing_masks, FD_CLOSE);
00153         }
00154 
00155       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00156         ACE_SET_BITS (existing_masks, FD_WRITE);
00157 
00158       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00159         ACE_SET_BITS (existing_masks, FD_OOB);
00160 
00161       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00162         ACE_SET_BITS (existing_masks, FD_ACCEPT);
00163 
00164       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00165         ACE_SET_BITS (existing_masks, FD_CONNECT);
00166 
00167       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00168         ACE_SET_BITS (existing_masks, FD_QOS);
00169 
00170       if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00171         ACE_SET_BITS (existing_masks, FD_GROUP_QOS);
00172 
00173       break;
00174 
00175     case ACE_Reactor::GET_MASK:
00176 
00177       // The work for this operation is done in all cases at the
00178       // begining of the function.
00179 
00180       ACE_UNUSED_ARG (change_masks);
00181 
00182       break;
00183     }
00184 
00185   return old_masks;
00186 }
00187 
00188 int
00189 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle,
00190                                                ACE_Reactor_Mask mask,
00191                                                int &changes_required)
00192 {
00193   int error = 0;
00194 
00195   // Remember this value; only if it changes do we need to wakeup
00196   // the other threads
00197   size_t const original_handle_count = this->handles_to_be_deleted_;
00198   int result = 0;
00199   size_t i;
00200 
00201   // Go through all the handles looking for <handle>.  Even if we find
00202   // it, we continue through the rest of the list since <handle> could
00203   // appear multiple times. All handles are checked.
00204 
00205   // First check the current entries
00206   for (i = 0; i < this->max_handlep1_ && error == 0; ++i)
00207     // Since the handle can either be the event or the I/O handle,
00208     // we have to check both
00209     if ((this->current_handles_[i] == handle
00210          || this->current_info_[i].io_handle_ == handle)
00211         && // Make sure that it is not already marked for deleted
00212         !this->current_info_[i].delete_entry_)
00213       {
00214         result = this->remove_handler_i (i,
00215                                          mask);
00216         if (result == -1)
00217           error = 1;
00218       }
00219 
00220   // Then check the suspended entries
00221   for (i = 0; i < this->suspended_handles_ && error == 0; ++i)
00222     // Since the handle can either be the event or the I/O handle, we
00223     // have to check both
00224     if ((this->current_suspended_info_[i].io_handle_ == handle
00225          || this->current_suspended_info_[i].event_handle_ == handle)
00226         &&
00227         // Make sure that it is not already marked for deleted
00228         !this->current_suspended_info_[i].delete_entry_)
00229       {
00230         result = this->remove_suspended_handler_i (i,
00231                                                    mask);
00232         if (result == -1)
00233           error = 1;
00234       }
00235 
00236   // Then check the to_be_added entries
00237   for (i = 0; i < this->handles_to_be_added_ && error == 0; ++i)
00238     // Since the handle can either be the event or the I/O handle,
00239     // we have to check both
00240     if ((this->to_be_added_info_[i].io_handle_ == handle
00241          || this->to_be_added_info_[i].event_handle_ == handle)
00242         &&
00243         // Make sure that it is not already marked for deleted
00244         !this->to_be_added_info_[i].delete_entry_)
00245       {
00246         result = this->remove_to_be_added_handler_i (i,
00247                                                      mask);
00248         if (result == -1)
00249           error = 1;
00250       }
00251 
00252   // Only if the number of handlers to be deleted changes do we need
00253   // to wakeup the other threads
00254   if (original_handle_count < this->handles_to_be_deleted_)
00255     changes_required = 1;
00256 
00257   return error ? -1 : 0;
00258 }
00259 
00260 int
00261 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
00262                                                        ACE_Reactor_Mask to_be_removed_masks)
00263 {
00264   // I/O entries
00265   if (this->current_info_[slot].io_entry_)
00266     {
00267       // See if there are other events that the <Event_Handler> is
00268       // interested in
00269       this->bit_ops (this->current_info_[slot].network_events_,
00270                      to_be_removed_masks,
00271                      ACE_Reactor::CLR_MASK);
00272 
00273       // Disassociate/Reassociate the event from/with the I/O handle.
00274       // This will depend on the value of remaining set of network
00275       // events that the <event_handler> is interested in. I don't
00276       // think we can do anything about errors here, so I will not
00277       // check this.
00278       ::WSAEventSelect ((SOCKET) this->current_info_[slot].io_handle_,
00279                         this->current_handles_[slot],
00280                         this->current_info_[slot].network_events_);
00281     }
00282   // Normal event entries.
00283   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00284     // Preserve DONT_CALL
00285     to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00286   else
00287     // Make sure that the <to_be_removed_masks> is the NULL_MASK
00288     to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00289 
00290   // If this event was marked for suspension, undo the suspension flag
00291   // and reduce the to be suspended count.
00292   if (this->current_info_[slot].suspend_entry_)
00293     {
00294       // Undo suspension
00295       this->current_info_[slot].suspend_entry_ = 0;
00296       // Decrement the handle count
00297       --this->handles_to_be_suspended_;
00298     }
00299 
00300   // If there are no more events that the <Event_Handler> is
00301   // interested in, or this is a non-I/O entry, schedule the
00302   // <Event_Handler> for removal
00303   if (this->current_info_[slot].network_events_ == 0)
00304     {
00305       // Mark to be deleted
00306       this->current_info_[slot].delete_entry_ = 1;
00307       // Remember the mask
00308       this->current_info_[slot].close_masks_ = to_be_removed_masks;
00309       // Increment the handle count
00310       ++this->handles_to_be_deleted_;
00311     }
00312 
00313   // Since it is not a complete removal, we'll call handle_close
00314   // for all the masks that were removed.  This does not change
00315   // the internal state of the reactor.
00316   //
00317   // Note: this condition only applies to I/O entries
00318   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00319     {
00320       ACE_HANDLE handle = this->current_info_[slot].io_handle_;
00321       this->current_info_[slot].event_handler_->handle_close (handle,
00322                                                               to_be_removed_masks);
00323     }
00324 
00325   return 0;
00326 }
00327 
00328 int
00329 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot,
00330                                                                  ACE_Reactor_Mask to_be_removed_masks)
00331 {
00332   // I/O entries
00333   if (this->current_suspended_info_[slot].io_entry_)
00334     {
00335       // See if there are other events that the <Event_Handler> is
00336       // interested in
00337       this->bit_ops (this->current_suspended_info_[slot].network_events_,
00338                      to_be_removed_masks,
00339                      ACE_Reactor::CLR_MASK);
00340 
00341       // Disassociate/Reassociate the event from/with the I/O handle.
00342       // This will depend on the value of remaining set of network
00343       // events that the <event_handler> is interested in. I don't
00344       // think we can do anything about errors here, so I will not
00345       // check this.
00346       ::WSAEventSelect ((SOCKET) this->current_suspended_info_[slot].io_handle_,
00347                         this->current_suspended_info_[slot].event_handle_,
00348                         this->current_suspended_info_[slot].network_events_);
00349     }
00350   // Normal event entries.
00351   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00352     // Preserve DONT_CALL
00353     to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00354   else
00355     // Make sure that the <to_be_removed_masks> is the NULL_MASK
00356     to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00357 
00358   // If this event was marked for resumption, undo the resumption flag
00359   // and reduce the to be resumed count.
00360   if (this->current_suspended_info_[slot].resume_entry_)
00361     {
00362       // Undo resumption
00363       this->current_suspended_info_[slot].resume_entry_ = 0;
00364       // Decrement the handle count
00365       --this->handles_to_be_resumed_;
00366     }
00367 
00368   // If there are no more events that the <Event_Handler> is
00369   // interested in, or this is a non-I/O entry, schedule the
00370   // <Event_Handler> for removal
00371   if (this->current_suspended_info_[slot].network_events_ == 0)
00372     {
00373       // Mark to be deleted
00374       this->current_suspended_info_[slot].delete_entry_ = 1;
00375       // Remember the mask
00376       this->current_suspended_info_[slot].close_masks_ = to_be_removed_masks;
00377       // Increment the handle count
00378       ++this->handles_to_be_deleted_;
00379     }
00380   // Since it is not a complete removal, we'll call handle_close for
00381   // all the masks that were removed.  This does not change the
00382   // internal state of the reactor.
00383   //
00384   // Note: this condition only applies to I/O entries
00385   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00386     {
00387       ACE_HANDLE handle = this->current_suspended_info_[slot].io_handle_;
00388       this->current_suspended_info_[slot].event_handler_->handle_close (handle,
00389                                                                         to_be_removed_masks);
00390     }
00391 
00392   return 0;
00393 }
00394 
00395 int
00396 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot,
00397                                                                    ACE_Reactor_Mask to_be_removed_masks)
00398 {
00399   // I/O entries
00400   if (this->to_be_added_info_[slot].io_entry_)
00401     {
00402       // See if there are other events that the <Event_Handler> is
00403       // interested in
00404       this->bit_ops (this->to_be_added_info_[slot].network_events_,
00405                      to_be_removed_masks,
00406                      ACE_Reactor::CLR_MASK);
00407 
00408       // Disassociate/Reassociate the event from/with the I/O handle.
00409       // This will depend on the value of remaining set of network
00410       // events that the <event_handler> is interested in. I don't
00411       // think we can do anything about errors here, so I will not
00412       // check this.
00413       ::WSAEventSelect ((SOCKET) this->to_be_added_info_[slot].io_handle_,
00414                         this->to_be_added_info_[slot].event_handle_,
00415                         this->to_be_added_info_[slot].network_events_);
00416     }
00417   // Normal event entries.
00418   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00419     // Preserve DONT_CALL
00420     to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00421   else
00422     // Make sure that the <to_be_removed_masks> is the NULL_MASK
00423     to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00424 
00425   // If this event was marked for suspension, undo the suspension flag
00426   // and reduce the to be suspended count.
00427   if (this->to_be_added_info_[slot].suspend_entry_)
00428     {
00429       // Undo suspension
00430       this->to_be_added_info_[slot].suspend_entry_ = 0;
00431       // Decrement the handle count
00432       --this->handles_to_be_suspended_;
00433     }
00434 
00435   // If there are no more events that the <Event_Handler> is
00436   // interested in, or this is a non-I/O entry, schedule the
00437   // <Event_Handler> for removal
00438   if (this->to_be_added_info_[slot].network_events_ == 0)
00439     {
00440       // Mark to be deleted
00441       this->to_be_added_info_[slot].delete_entry_ = 1;
00442       // Remember the mask
00443       this->to_be_added_info_[slot].close_masks_ = to_be_removed_masks;
00444       // Increment the handle count
00445       ++this->handles_to_be_deleted_;
00446     }
00447   // Since it is not a complete removal, we'll call handle_close
00448   // for all the masks that were removed.  This does not change
00449   // the internal state of the reactor.
00450   //
00451   // Note: this condition only applies to I/O entries
00452   else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00453     {
00454       ACE_HANDLE handle = this->to_be_added_info_[slot].io_handle_;
00455       this->to_be_added_info_[slot].event_handler_->handle_close (handle,
00456                                                                   to_be_removed_masks);
00457     }
00458 
00459   return 0;
00460 }
00461 
00462 int
00463 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle,
00464                                                         int &changes_required)
00465 {
00466   size_t i = 0;
00467 
00468   // Go through all the handles looking for <handle>.  Even if we find
00469   // it, we continue through the rest of the list since <handle> could
00470   // appear multiple times. All handles are checked.
00471 
00472   // Check the current entries first.
00473   for (i = 0; i < this->max_handlep1_; ++i)
00474     // Since the handle can either be the event or the I/O handle,
00475     // we have to check both
00476     if ((this->current_handles_[i] == handle ||
00477          this->current_info_[i].io_handle_ == handle) &&
00478         // Make sure that it is not already marked for suspension
00479         !this->current_info_[i].suspend_entry_)
00480       {
00481         // Mark to be suspended
00482         this->current_info_[i].suspend_entry_ = 1;
00483         // Increment the handle count
00484         ++this->handles_to_be_suspended_;
00485         // Changes will be required
00486         changes_required = 1;
00487       }
00488 
00489   // Then check the suspended entries.
00490   for (i = 0; i < this->suspended_handles_; ++i)
00491     // Since the handle can either be the event or the I/O handle,
00492     // we have to check both
00493     if ((this->current_suspended_info_[i].event_handle_ == handle ||
00494          this->current_suspended_info_[i].io_handle_ == handle) &&
00495         // Make sure that the resumption is not already undone
00496         this->current_suspended_info_[i].resume_entry_)
00497       {
00498         // Undo resumption
00499         this->current_suspended_info_[i].resume_entry_ = 0;
00500         // Decrement the handle count
00501         --this->handles_to_be_resumed_;
00502         // Changes will be required
00503         changes_required = 1;
00504       }
00505 
00506   // Then check the to_be_added entries.
00507   for (i = 0; i < this->handles_to_be_added_; ++i)
00508     // Since the handle can either be the event or the I/O handle,
00509     // we have to check both
00510     if ((this->to_be_added_info_[i].io_handle_ == handle ||
00511          this->to_be_added_info_[i].event_handle_ == handle) &&
00512         // Make sure that it is not already marked for suspension
00513         !this->to_be_added_info_[i].suspend_entry_)
00514       {
00515         // Mark to be suspended
00516         this->to_be_added_info_[i].suspend_entry_ = 1;
00517         // Increment the handle count
00518         ++this->handles_to_be_suspended_;
00519         // Changes will be required
00520         changes_required = 1;
00521       }
00522 
00523   return 0;
00524 }
00525 
00526 int
00527 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle,
00528                                                        int &changes_required)
00529 {
00530   size_t i = 0;
00531 
00532   // Go through all the handles looking for <handle>.  Even if we find
00533   // it, we continue through the rest of the list since <handle> could
00534   // appear multiple times. All handles are checked.
00535 
00536   // Check the current entries first.
00537   for (i = 0; i < this->max_handlep1_; ++i)
00538     // Since the handle can either be the event or the I/O handle,
00539     // we have to check both
00540     if ((this->current_handles_[i] == handle ||
00541          this->current_info_[i].io_handle_ == handle) &&
00542         // Make sure that the suspension is not already undone
00543         this->current_info_[i].suspend_entry_)
00544       {
00545         // Undo suspension
00546         this->current_info_[i].suspend_entry_ = 0;
00547         // Decrement the handle count
00548         --this->handles_to_be_suspended_;
00549         // Changes will be required
00550         changes_required = 1;
00551       }
00552 
00553   // Then check the suspended entries.
00554   for (i = 0; i < this->suspended_handles_; ++i)
00555     // Since the handle can either be the event or the I/O handle,
00556     // we have to check both
00557     if ((this->current_suspended_info_[i].event_handle_ == handle ||
00558          this->current_suspended_info_[i].io_handle_ == handle) &&
00559         // Make sure that it is not already marked for resumption
00560         !this->current_suspended_info_[i].resume_entry_)
00561       {
00562         // Mark to be resumed
00563         this->current_suspended_info_[i].resume_entry_ = 1;
00564         // Increment the handle count
00565         ++this->handles_to_be_resumed_;
00566         // Changes will be required
00567         changes_required = 1;
00568       }
00569 
00570   // Then check the to_be_added entries.
00571   for (i = 0; i < this->handles_to_be_added_; ++i)
00572     // Since the handle can either be the event or the I/O handle,
00573     // we have to check both
00574     if ((this->to_be_added_info_[i].io_handle_ == handle ||
00575          this->to_be_added_info_[i].event_handle_ == handle) &&
00576         // Make sure that the suspension is not already undone
00577         this->to_be_added_info_[i].suspend_entry_)
00578       {
00579         // Undo suspension
00580         this->to_be_added_info_[i].suspend_entry_ = 0;
00581         // Decrement the handle count
00582         --this->handles_to_be_suspended_;
00583         // Changes will be required
00584         changes_required = 1;
00585       }
00586 
00587   return 0;
00588 }
00589 
00590 void
00591 ACE_WFMO_Reactor_Handler_Repository::unbind_all (void)
00592 {
00593   {
00594     ACE_GUARD (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_);
00595 
00596     int dummy;
00597     size_t i;
00598 
00599     // Remove all the current handlers
00600     for (i = 0; i < this->max_handlep1_; ++i)
00601       this->unbind_i (this->current_handles_[i],
00602                       ACE_Event_Handler::ALL_EVENTS_MASK,
00603                       dummy);
00604 
00605     // Remove all the suspended handlers
00606     for (i = 0; i < this->suspended_handles_; ++i)
00607       this->unbind_i (this->current_suspended_info_[i].event_handle_,
00608                       ACE_Event_Handler::ALL_EVENTS_MASK,
00609                       dummy);
00610 
00611     // Remove all the to_be_added handlers
00612     for (i = 0; i < this->handles_to_be_added_; ++i)
00613       this->unbind_i (this->to_be_added_info_[i].event_handle_,
00614                       ACE_Event_Handler::ALL_EVENTS_MASK,
00615                       dummy);
00616 
00617   }
00618 
00619   // The guard is released here
00620 
00621   // Wake up all threads in WaitForMultipleObjects so that they can
00622   // reconsult the handle set
00623   this->wfmo_reactor_.wakeup_all_threads ();
00624 }
00625 
00626 int
00627 ACE_WFMO_Reactor_Handler_Repository::bind_i (int io_entry,
00628                                              ACE_Event_Handler *event_handler,
00629                                              long network_events,
00630                                              ACE_HANDLE io_handle,
00631                                              ACE_HANDLE event_handle,
00632                                              int delete_event)
00633 {
00634   // Make sure that the <handle> is valid
00635   if (event_handle == ACE_INVALID_HANDLE)
00636     event_handle = event_handler->get_handle ();
00637   if (this->invalid_handle (event_handle))
00638     return -1;
00639 
00640   size_t current_size = this->max_handlep1_ +
00641     this->handles_to_be_added_ -
00642     this->handles_to_be_deleted_ +
00643     this->suspended_handles_;
00644 
00645   // Make sure that there's room in the table and that total pending
00646   // additions should not exceed what the <to_be_added_info_> array
00647   // can hold.
00648   if (current_size < this->max_size_ &&
00649       this->handles_to_be_added_ < this->max_size_)
00650     {
00651       // Cache this set into the <to_be_added_info_>, till we come
00652       // around to actually adding this to the <current_info_>
00653       this->to_be_added_info_[this->handles_to_be_added_].set (event_handle,
00654                                                                io_entry,
00655                                                                event_handler,
00656                                                                io_handle,
00657                                                                network_events,
00658                                                                delete_event);
00659 
00660       ++this->handles_to_be_added_;
00661 
00662       event_handler->add_reference ();
00663 
00664       // Wake up all threads in WaitForMultipleObjects so that they can
00665       // reconsult the handle set
00666       this->wfmo_reactor_.wakeup_all_threads ();
00667     }
00668   else
00669     {
00670       errno = EMFILE;   // File descriptor table is full (better than nothing)
00671       return -1;
00672     }
00673 
00674   return 0;
00675 }
00676 
00677 int
00678 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
00679 {
00680   // Go through the entire valid array and check for all handles that
00681   // have been schedule for deletion
00682   if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
00683     {
00684       size_t i = 0;
00685       while (i < this->max_handlep1_)
00686         {
00687           // This stuff is necessary here, since we should not make
00688           // the upcall until all the internal data structures have
00689           // been updated.  This is to protect against upcalls that
00690           // try to deregister again.
00691           ACE_HANDLE handle = ACE_INVALID_HANDLE;
00692           ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00693           ACE_Event_Handler *event_handler = 0;
00694 
00695           // See if this entry is scheduled for deletion
00696           if (this->current_info_[i].delete_entry_)
00697             {
00698               // Calling the <handle_close> method here will ensure that we
00699               // will only call it once per deregistering <Event_Handler>.
00700               // This is essential in the case when the <Event_Handler> will
00701               // do something like delete itself and we have multiple
00702               // threads in WFMO_Reactor.
00703               //
00704               // Make sure that the DONT_CALL mask is not set
00705               masks = this->current_info_[i].close_masks_;
00706               if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00707                 {
00708                   // Grab the correct handle depending on the type entry
00709                   if (this->current_info_[i].io_entry_)
00710                     handle = this->current_info_[i].io_handle_;
00711                   else
00712                     handle = this->current_handles_[i];
00713 
00714                   // Event handler
00715                   event_handler = this->current_info_[i].event_handler_;
00716                 }
00717 
00718               // If <WFMO_Reactor> created the event, we need to clean it up
00719               if (this->current_info_[i].delete_event_)
00720                 ACE_OS::event_destroy (&this->current_handles_[i]);
00721 
00722               // Reduce count by one
00723               --this->handles_to_be_deleted_;
00724             }
00725 
00726           // See if this entry is scheduled for suspension
00727           else if (this->current_info_[i].suspend_entry_)
00728             {
00729               this->current_suspended_info_ [this->suspended_handles_].set (this->current_handles_[i],
00730                                                                             this->current_info_[i]);
00731               // Increase number of suspended handles
00732               ++this->suspended_handles_;
00733 
00734               // Reduce count by one
00735               --this->handles_to_be_suspended_;
00736             }
00737 
00738           // See if this entry is scheduled for deletion or suspension
00739           // If so we need to clean up
00740           if (this->current_info_[i].delete_entry_ ||
00741               this->current_info_[i].suspend_entry_  )
00742             {
00743               size_t last_valid_slot = this->max_handlep1_ - 1;
00744               // If this is the last handle in the set, no need to swap
00745               // places. Simply remove it.
00746               if (i < last_valid_slot)
00747                 // Swap this handle with the last valid handle
00748                 {
00749                   // Struct copy
00750                   this->current_info_[i] =
00751                     this->current_info_[last_valid_slot];
00752                   this->current_handles_[i] =
00753                     this->current_handles_[last_valid_slot];
00754                 }
00755               // Reset the info in this slot
00756               this->current_info_[last_valid_slot].reset ();
00757               this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
00758               --this->max_handlep1_;
00759             }
00760           else
00761             {
00762               // This current entry is not up for deletion or
00763               // suspension.  Proceed to the next entry in the current
00764               // handles.
00765               ++i;
00766             }
00767 
00768           // Now that all internal structures have been updated, make
00769           // the upcall.
00770           if (event_handler != 0)
00771             {
00772               int requires_reference_counting =
00773                 event_handler->reference_counting_policy ().value () ==
00774                 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00775 
00776               event_handler->handle_close (handle, masks);
00777 
00778               if (requires_reference_counting)
00779                 {
00780                   event_handler->remove_reference ();
00781                 }
00782             }
00783         }
00784     }
00785 
00786   return 0;
00787 }
00788 
00789 int
00790 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
00791 {
00792   // Go through the <suspended_handle> array
00793   if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
00794     {
00795       size_t i = 0;
00796       while (i < this->suspended_handles_)
00797         {
00798           // This stuff is necessary here, since we should not make
00799           // the upcall until all the internal data structures have
00800           // been updated.  This is to protect against upcalls that
00801           // try to deregister again.
00802           ACE_HANDLE handle = ACE_INVALID_HANDLE;
00803           ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00804           ACE_Event_Handler *event_handler = 0;
00805 
00806           // See if this entry is scheduled for deletion
00807           if (this->current_suspended_info_[i].delete_entry_)
00808             {
00809               // Calling the <handle_close> method here will ensure that we
00810               // will only call it once per deregistering <Event_Handler>.
00811               // This is essential in the case when the <Event_Handler> will
00812               // do something like delete itself and we have multiple
00813               // threads in WFMO_Reactor.
00814               //
00815               // Make sure that the DONT_CALL mask is not set
00816               masks = this->current_suspended_info_[i].close_masks_;
00817               if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00818                 {
00819                   // Grab the correct handle depending on the type entry
00820                   if (this->current_suspended_info_[i].io_entry_)
00821                     handle = this->current_suspended_info_[i].io_handle_;
00822                   else
00823                     handle = this->current_suspended_info_[i].event_handle_;
00824 
00825                   // Upcall
00826                   event_handler = this->current_suspended_info_[i].event_handler_;
00827                 }
00828 
00829               // If <WFMO_Reactor> created the event, we need to clean it up
00830               if (this->current_suspended_info_[i].delete_event_)
00831                 ACE_OS::event_destroy (&this->current_suspended_info_[i].event_handle_);
00832 
00833               // Reduce count by one
00834               --this->handles_to_be_deleted_;
00835             }
00836 
00837           else if (this->current_suspended_info_[i].resume_entry_)
00838             {
00839               // Add to the end of the current handles set
00840               this->current_handles_[this->max_handlep1_] = this->current_suspended_info_[i].event_handle_;
00841               // Struct copy
00842               this->current_info_[this->max_handlep1_].set (this->current_suspended_info_[i]);
00843               ++this->max_handlep1_;
00844 
00845               // Reduce count by one
00846               --this->handles_to_be_resumed_;
00847             }
00848 
00849           // If an entry needs to be removed, either because it
00850           // was deleted or resumed, remove it now before doing
00851           // the upcall.
00852           if (this->current_suspended_info_[i].resume_entry_ ||
00853               this->current_suspended_info_[i].delete_entry_)
00854             {
00855               size_t last_valid_slot = this->suspended_handles_ - 1;
00856               // Net effect is that we're removing an entry and
00857               // compressing the list from the end.  So, if removing
00858               // an entry from the middle, copy the last valid one to the
00859               // removed slot.  Reset the end and decrement the number
00860               // of suspended handles.
00861               if (i < last_valid_slot)
00862                 // Struct copy
00863                 this->current_suspended_info_[i] =
00864                   this->current_suspended_info_[last_valid_slot];
00865               this->current_suspended_info_[last_valid_slot].reset ();
00866               --this->suspended_handles_;
00867             }
00868           else
00869             {
00870               // This current entry is not up for deletion or
00871               // resumption.  Proceed to the next entry in the
00872               // suspended handles.
00873               ++i;
00874             }
00875 
00876           // Now that all internal structures have been updated, make
00877           // the upcall.
00878           if (event_handler != 0)
00879             {
00880               int requires_reference_counting =
00881                 event_handler->reference_counting_policy ().value () ==
00882                 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00883 
00884               event_handler->handle_close (handle, masks);
00885 
00886               if (requires_reference_counting)
00887                 {
00888                   event_handler->remove_reference ();
00889                 }
00890             }
00891         }
00892     }
00893 
00894   return 0;
00895 }
00896 
00897 int
00898 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void)
00899 {
00900   // Go through the <to_be_added_*> arrays
00901   for (size_t i = 0; i < this->handles_to_be_added_; ++i)
00902     {
00903       // This stuff is necessary here, since we should not make
00904       // the upcall until all the internal data structures have
00905       // been updated.  This is to protect against upcalls that
00906       // try to deregister again.
00907       ACE_HANDLE handle = ACE_INVALID_HANDLE;
00908       ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00909       ACE_Event_Handler *event_handler = 0;
00910 
00911       // See if this entry is scheduled for deletion
00912       if (this->to_be_added_info_[i].delete_entry_)
00913         {
00914           // Calling the <handle_close> method here will ensure that we
00915           // will only call it once per deregistering <Event_Handler>.
00916           // This is essential in the case when the <Event_Handler> will
00917           // do something like delete itself and we have multiple
00918           // threads in WFMO_Reactor.
00919           //
00920           // Make sure that the DONT_CALL mask is not set
00921           masks = this->to_be_added_info_[i].close_masks_;
00922           if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00923             {
00924               // Grab the correct handle depending on the type entry
00925               if (this->to_be_added_info_[i].io_entry_)
00926                 handle = this->to_be_added_info_[i].io_handle_;
00927               else
00928                 handle = this->to_be_added_info_[i].event_handle_;
00929 
00930               // Upcall
00931               event_handler = this->to_be_added_info_[i].event_handler_;
00932             }
00933 
00934           // If <WFMO_Reactor> created the event, we need to clean it up
00935           if (this->to_be_added_info_[i].delete_event_)
00936             ACE_OS::event_destroy (&this->to_be_added_info_[i].event_handle_);
00937 
00938           // Reduce count by one
00939           --this->handles_to_be_deleted_;
00940         }
00941 
00942       // See if this entry is scheduled for suspension
00943       else if (this->to_be_added_info_[i].suspend_entry_)
00944         {
00945           this->current_suspended_info_ [this->suspended_handles_].set (this->to_be_added_info_[i].event_handle_,
00946                                                                         this->to_be_added_info_[i]);
00947           // Increase number of suspended handles
00948           ++this->suspended_handles_;
00949 
00950           // Reduce count by one
00951           --this->handles_to_be_suspended_;
00952         }
00953 
00954       // If neither of the two flags are on, add to current
00955       else
00956         {
00957           // Add to the end of the current handles set
00958           this->current_handles_[this->max_handlep1_] = this->to_be_added_info_[i].event_handle_;
00959           // Struct copy
00960           this->current_info_[this->max_handlep1_].set (this->to_be_added_info_[i]);
00961           ++this->max_handlep1_;
00962         }
00963 
00964       // Reset the <to_be_added_info_>
00965       this->to_be_added_info_[i].reset ();
00966 
00967       // Now that all internal structures have been updated, make the
00968       // upcall.
00969       if (event_handler != 0)
00970         {
00971           int requires_reference_counting =
00972             event_handler->reference_counting_policy ().value () ==
00973             ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00974 
00975           event_handler->handle_close (handle, masks);
00976 
00977           if (requires_reference_counting)
00978             {
00979               event_handler->remove_reference ();
00980             }
00981         }
00982     }
00983 
00984   // Since all to be added handles have been taken care of, reset the
00985   // counter
00986   this->handles_to_be_added_ = 0;
00987 
00988   return 0;
00989 }
00990 
00991 void
00992 ACE_WFMO_Reactor_Handler_Repository::dump (void) const
00993 {
00994 #if defined (ACE_HAS_DUMP)
00995   size_t i = 0;
00996 
00997   ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
00998 
00999   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01000 
01001   ACE_DEBUG ((LM_DEBUG,
01002               ACE_LIB_TEXT ("Max size = %d\n"),
01003               this->max_size_));
01004 
01005   ACE_DEBUG ((LM_DEBUG,
01006               ACE_LIB_TEXT ("Current info table\n\n")));
01007   ACE_DEBUG ((LM_DEBUG,
01008               ACE_LIB_TEXT ("\tSize = %d\n"),
01009               this->max_handlep1_));
01010   ACE_DEBUG ((LM_DEBUG,
01011               ACE_LIB_TEXT ("\tHandles to be suspended = %d\n"),
01012               this->handles_to_be_suspended_));
01013 
01014   for (i = 0; i < this->max_handlep1_; ++i)
01015     this->current_info_[i].dump (this->current_handles_[i]);
01016 
01017   ACE_DEBUG ((LM_DEBUG,
01018               ACE_LIB_TEXT ("\n")));
01019 
01020   ACE_DEBUG ((LM_DEBUG,
01021               ACE_LIB_TEXT ("To-be-added info table\n\n")));
01022   ACE_DEBUG ((LM_DEBUG,
01023               ACE_LIB_TEXT ("\tSize = %d\n"),
01024               this->handles_to_be_added_));
01025 
01026   for (i = 0; i < this->handles_to_be_added_; ++i)
01027     this->to_be_added_info_[i].dump ();
01028 
01029   ACE_DEBUG ((LM_DEBUG,
01030               ACE_LIB_TEXT ("\n")));
01031 
01032   ACE_DEBUG ((LM_DEBUG,
01033               ACE_LIB_TEXT ("Suspended info table\n\n")));
01034   ACE_DEBUG ((LM_DEBUG,
01035               ACE_LIB_TEXT ("\tSize = %d\n"),
01036               this->suspended_handles_));
01037   ACE_DEBUG ((LM_DEBUG,
01038               ACE_LIB_TEXT ("\tHandles to be resumed = %d\n"),
01039               this->handles_to_be_resumed_));
01040 
01041   for (i = 0; i < this->suspended_handles_; ++i)
01042     this->current_suspended_info_[i].dump ();
01043 
01044   ACE_DEBUG ((LM_DEBUG,
01045               ACE_LIB_TEXT ("\n")));
01046 
01047   ACE_DEBUG ((LM_DEBUG,
01048               ACE_LIB_TEXT ("Total handles to be deleted = %d\n"),
01049               this->handles_to_be_deleted_));
01050 
01051   ACE_DEBUG ((LM_DEBUG,
01052               ACE_END_DUMP));
01053 #endif /* ACE_HAS_DUMP */
01054 }
01055 
01056 /************************************************************/
01057 
01058 int
01059 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
01060 {
01061   ACE_NOTSUP_RETURN (-1);
01062 }
01063 
01064 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
01065                                     ACE_Timer_Queue *tq,
01066                                     ACE_Reactor_Notify *notify)
01067   : signal_handler_ (0),
01068     delete_signal_handler_ (0),
01069     timer_queue_ (0),
01070     delete_timer_queue_ (0),
01071     delete_handler_rep_ (0),
01072     delete_notify_handler_ (0),
01073     lock_adapter_ (lock_),
01074     handler_rep_ (*this),
01075     // this event is initially signaled
01076     ok_to_wait_ (1),
01077     // this event is initially unsignaled
01078     wakeup_all_threads_ (0),
01079     // this event is initially unsignaled
01080     waiting_to_change_state_ (0),
01081     active_threads_ (0),
01082     owner_ (ACE_Thread::self ()),
01083     new_owner_ (0),
01084     change_state_thread_ (0),
01085     open_for_business_ (0),
01086     deactivated_ (0)
01087 {
01088   if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
01089     ACE_ERROR ((LM_ERROR,
01090                 ACE_LIB_TEXT ("%p\n"),
01091                 ACE_LIB_TEXT ("WFMO_Reactor")));
01092 }
01093 
01094 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
01095                                     int unused,
01096                                     ACE_Sig_Handler *sh,
01097                                     ACE_Timer_Queue *tq,
01098                                     ACE_Reactor_Notify *notify)
01099   : signal_handler_ (0),
01100     delete_signal_handler_ (0),
01101     timer_queue_ (0),
01102     delete_timer_queue_ (0),
01103     delete_handler_rep_ (0),
01104     delete_notify_handler_ (0),
01105     lock_adapter_ (lock_),
01106     handler_rep_ (*this),
01107     // this event is initially signaled
01108     ok_to_wait_ (1),
01109     // this event is initially unsignaled
01110     wakeup_all_threads_ (0),
01111     // this event is initially unsignaled
01112     waiting_to_change_state_ (0),
01113     active_threads_ (0),
01114     owner_ (ACE_Thread::self ()),
01115     new_owner_ (0),
01116     change_state_thread_ (0),
01117     open_for_business_ (0),
01118     deactivated_ (0)
01119 {
01120   ACE_UNUSED_ARG (unused);
01121 
01122   if (this->open (size, 0, sh, tq, 0, notify) == -1)
01123     ACE_ERROR ((LM_ERROR,
01124                 ACE_LIB_TEXT ("%p\n"),
01125                 ACE_LIB_TEXT ("WFMO_Reactor")));
01126 }
01127 
01128 int
01129 ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &)
01130 {
01131   return -1;
01132 }
01133 
01134 int
01135 ACE_WFMO_Reactor::open (size_t size,
01136                         int unused,
01137                         ACE_Sig_Handler *sh,
01138                         ACE_Timer_Queue *tq,
01139                         int disable_notify_pipe,
01140                         ACE_Reactor_Notify *notify)
01141 {
01142   ACE_UNUSED_ARG (unused);
01143   ACE_UNUSED_ARG (disable_notify_pipe);
01144 
01145   // This GUARD is necessary since we are updating shared state.
01146   ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01147 
01148   // If we are already open, return -1
01149   if (this->open_for_business_)
01150     return -1;
01151 
01152   // Timer Queue
01153   if (this->delete_timer_queue_)
01154     delete this->timer_queue_;
01155 
01156   if (tq == 0)
01157     {
01158       ACE_NEW_RETURN (this->timer_queue_,
01159                       ACE_Timer_Heap,
01160                       -1);
01161       this->delete_timer_queue_ = 1;
01162     }
01163   else
01164     {
01165       this->timer_queue_ = tq;
01166       this->delete_timer_queue_ = 0;
01167     }
01168 
01169   // Signal Handler
01170   if (this->delete_signal_handler_)
01171     delete this->signal_handler_;
01172 
01173   if (sh == 0)
01174     {
01175       ACE_NEW_RETURN (this->signal_handler_,
01176                       ACE_Sig_Handler,
01177                       -1);
01178       this->delete_signal_handler_ = 1;
01179     }
01180   else
01181     {
01182       this->signal_handler_ = sh;
01183       this->delete_signal_handler_ = 0;
01184     }
01185 
01186   // Setup the atomic wait array (used later in <handle_events>)
01187   this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
01188   this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
01189 
01190   // This is to guard against reopens of WFMO_Reactor
01191   if (this->delete_handler_rep_)
01192     this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository ();
01193 
01194   // Open the handle repository.  Two additional handles for internal
01195   // purposes
01196   if (this->handler_rep_.open (size + 2) == -1)
01197     ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT ("%p\n"),
01198                        ACE_LIB_TEXT ("opening handler repository")),
01199                       -1);
01200   else
01201     this->delete_handler_rep_ = 1;
01202 
01203   if (this->notify_handler_ != 0
01204       && this->delete_notify_handler_ != 0)
01205     delete this->notify_handler_;
01206 
01207   this->notify_handler_ = notify;
01208 
01209   if (this->notify_handler_ == 0)
01210     {
01211       ACE_NEW_RETURN (this->notify_handler_,
01212                       ACE_WFMO_Reactor_Notify,
01213                       -1);
01214 
01215       if (this->notify_handler_ == 0)
01216         return -1;
01217       else
01218         this->delete_notify_handler_ = 1;
01219     }
01220 
01221   /* NOTE */
01222   // The order of the following two registrations is very important
01223 
01224   // Open the notification handler
01225   if (this->notify_handler_->open (this, this->timer_queue_) == -1)
01226     ACE_ERROR_RETURN ((LM_ERROR,
01227                        ACE_LIB_TEXT ("%p\n"),
01228                        ACE_LIB_TEXT ("opening notify handler ")),
01229                       -1);
01230 
01231   // Register for <wakeup_all_threads> event
01232   if (this->register_handler (&this->wakeup_all_threads_handler_,
01233                               this->wakeup_all_threads_.handle ()) == -1)
01234     ACE_ERROR_RETURN ((LM_ERROR,
01235                        ACE_LIB_TEXT ("%p\n"),
01236                        ACE_LIB_TEXT ("registering thread wakeup handler")),
01237                       -1);
01238 
01239   // Since we have added two handles into the handler repository,
01240   // update the <handler_repository_>
01241   if (this->handler_rep_.changes_required ())
01242     {
01243       // Make necessary changes to the handler repository
01244       this->handler_rep_.make_changes ();
01245       // Turn off <wakeup_all_threads_> since all necessary changes
01246       // have completed
01247       this->wakeup_all_threads_.reset ();
01248     }
01249 
01250   // We are open for business
01251   this->open_for_business_ = 1;
01252 
01253   return 0;
01254 }
01255 
01256 int
01257 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
01258 {
01259   if (this->signal_handler_ != 0 && this->delete_signal_handler_ != 0)
01260     delete this->signal_handler_;
01261   this->signal_handler_ = signal_handler;
01262   this->delete_signal_handler_ = 0;
01263   return 0;
01264 }
01265 
01266 ACE_Timer_Queue *
01267 ACE_WFMO_Reactor::timer_queue (void) const
01268 {
01269   return this->timer_queue_;
01270 }
01271 
01272 int
01273 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue *tq)
01274 {
01275   if (this->timer_queue_ != 0 && this->delete_timer_queue_ != 0)
01276     delete this->timer_queue_;
01277   this->timer_queue_ = tq;
01278   this->delete_timer_queue_ = 0;
01279   return 0;
01280 }
01281 
01282 int
01283 ACE_WFMO_Reactor::close (void)
01284 {
01285   // This GUARD is necessary since we are updating shared state.
01286   ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01287 
01288   // If we are already closed, return error
01289   if (!this->open_for_business_)
01290     return -1;
01291 
01292   // We are now closed
01293   this->open_for_business_ = 0;
01294   // This will unregister all handles
01295   this->handler_rep_.close ();
01296 
01297   return 0;
01298 }
01299 
01300 ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void)
01301 {
01302   // Assumption: No threads are left in the Reactor when this method
01303   // is called (i.e., active_threads_ == 0)
01304 
01305   // Close down
01306   this->close ();
01307 
01308   // Make necessary changes to the handler repository that we caused
01309   // by <close>.
01310   this->handler_rep_.make_changes ();
01311 
01312   if (this->delete_timer_queue_)
01313     {
01314       delete this->timer_queue_;
01315       this->timer_queue_ = 0;
01316       this->delete_timer_queue_ = 0;
01317     }
01318 
01319   if (this->delete_signal_handler_)
01320     {
01321       delete this->signal_handler_;
01322       this->signal_handler_ = 0;
01323       this->delete_signal_handler_ = 0;
01324     }
01325 
01326   if (this->delete_notify_handler_)
01327     {
01328       delete this->notify_handler_;
01329       this->notify_handler_ = 0;
01330       this->delete_notify_handler_ = 0;
01331     }
01332 }
01333 
01334 int
01335 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
01336                                       ACE_HANDLE io_handle,
01337                                       ACE_Event_Handler *event_handler,
01338                                       ACE_Reactor_Mask new_masks)
01339 {
01340   // If this is a Winsock 1 system, the underlying event assignment will
01341   // not work, so don't try. Winsock 1 must use ACE_Select_Reactor for
01342   // reacting to socket activity.
01343 
01344 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
01345 
01346   ACE_UNUSED_ARG (event_handle);
01347   ACE_UNUSED_ARG (io_handle);
01348   ACE_UNUSED_ARG (event_handler);
01349   ACE_UNUSED_ARG (new_masks);
01350   ACE_NOTSUP_RETURN (-1);
01351 
01352 #else
01353 
01354   // Make sure that the <handle> is valid
01355   if (io_handle == ACE_INVALID_HANDLE)
01356     io_handle = event_handler->get_handle ();
01357 
01358   if (this->handler_rep_.invalid_handle (io_handle))
01359     {
01360       errno = ERROR_INVALID_HANDLE;
01361       return -1;
01362     }
01363 
01364   long new_network_events = 0;
01365   int delete_event = 0;
01366   auto_ptr <ACE_Auto_Event> event;
01367 
01368   // Look up the repository to see if the <event_handler> is already
01369   // there.
01370   ACE_Reactor_Mask old_masks;
01371   int found = this->handler_rep_.modify_network_events_i (io_handle,
01372                                                           new_masks,
01373                                                           old_masks,
01374                                                           new_network_events,
01375                                                           event_handle,
01376                                                           delete_event,
01377                                                           ACE_Reactor::ADD_MASK);
01378 
01379   // Check to see if the user passed us a valid event; If not then we
01380   // need to create one
01381   if (event_handle == ACE_INVALID_HANDLE)
01382     {
01383       // Note: don't change this since some C++ compilers have
01384       // <auto_ptr>s that don't work properly...
01385       auto_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
01386       event = tmp;
01387       event_handle = event->handle ();
01388       delete_event = 1;
01389     }
01390 
01391   int result = ::WSAEventSelect ((SOCKET) io_handle,
01392                                  event_handle,
01393                                  new_network_events);
01394   // If we had found the <Event_Handler> there is nothing more to do
01395   if (found)
01396     return result;
01397   else if (result != SOCKET_ERROR &&
01398            this->handler_rep_.bind_i (1,
01399                                       event_handler,
01400                                       new_network_events,
01401                                       io_handle,
01402                                       event_handle,
01403                                       delete_event) != -1)
01404     {
01405       // The <event_handler> was not found in the repository, add to
01406       // the repository.
01407       if (delete_event)
01408         {
01409           // Clear out the handle in the ACE_Auto_Event so that when
01410           // it is destroyed, the handle isn't closed out from under
01411           // the reactor. After setting it, running down the event
01412           // (via auto_ptr<> event, above) at function return will
01413           // cause an error because it'll try to close an invalid handle.
01414           // To avoid that smashing the errno value, save the errno
01415           // here, explicitly remove the event so the dtor won't do it
01416           // again, then restore errno.
01417           ACE_Errno_Guard guard (errno);
01418           event->handle (ACE_INVALID_HANDLE);
01419           event->remove ();
01420         }
01421       return 0;
01422     }
01423   else
01424     return -1;
01425 
01426 #endif /* ACE_HAS_WINSOCK2 || ACE_HAS_WINSOCK2 == 0 */
01427 
01428 }
01429 
01430 int
01431 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle,
01432                               ACE_Reactor_Mask new_masks,
01433                               int operation)
01434 {
01435   // Make sure that the <handle> is valid
01436   if (this->handler_rep_.invalid_handle (io_handle))
01437     return -1;
01438 
01439   long new_network_events = 0;
01440   int delete_event = 0;
01441   ACE_HANDLE event_handle = ACE_INVALID_HANDLE;
01442 
01443   // Look up the repository to see if the <Event_Handler> is already
01444   // there.
01445   ACE_Reactor_Mask old_masks;
01446   int found = this->handler_rep_.modify_network_events_i (io_handle,
01447                                                           new_masks,
01448                                                           old_masks,
01449                                                           new_network_events,
01450                                                           event_handle,
01451                                                           delete_event,
01452                                                           operation);
01453   if (found)
01454     {
01455       int result = ::WSAEventSelect ((SOCKET) io_handle,
01456                                      event_handle,
01457                                      new_network_events);
01458       if (result == 0)
01459         return old_masks;
01460       else
01461         return result;
01462     }
01463   else
01464     return -1;
01465 }
01466 
01467 
01468 
01469 int
01470 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle,
01471                                                               ACE_Reactor_Mask new_masks,
01472                                                               ACE_Reactor_Mask &old_masks,
01473                                                               long &new_network_events,
01474                                                               ACE_HANDLE &event_handle,
01475                                                               int &delete_event,
01476                                                               int operation)
01477 {
01478   long *modified_network_events = &new_network_events;
01479   int found = 0;
01480   size_t i;
01481 
01482   // First go through the current entries
01483   //
01484   // Look for all entries in the current handles for matching handle
01485   // (except those that have been scheduled for deletion)
01486   for (i = 0; i < this->max_handlep1_ && !found; ++i)
01487     if (io_handle == this->current_info_[i].io_handle_ &&
01488         !this->current_info_[i].delete_entry_)
01489       {
01490         found = 1;
01491         modified_network_events = &this->current_info_[i].network_events_;
01492         delete_event = this->current_info_[i].delete_event_;
01493         event_handle = this->current_handles_[i];
01494       }
01495 
01496   // Then pass through the suspended handles
01497   //
01498   // Look for all entries in the suspended handles for matching handle
01499   // (except those that have been scheduled for deletion)
01500   for (i = 0; i < this->suspended_handles_ && !found; ++i)
01501     if (io_handle == this->current_suspended_info_[i].io_handle_ &&
01502         !this->current_suspended_info_[i].delete_entry_)
01503       {
01504         found = 1;
01505         modified_network_events = &this->current_suspended_info_[i].network_events_;
01506         delete_event = this->current_suspended_info_[i].delete_event_;
01507         event_handle = this->current_suspended_info_[i].event_handle_;
01508       }
01509 
01510   // Then check the to_be_added handles
01511   //
01512   // Look for all entries in the to_be_added handles for matching
01513   // handle (except those that have been scheduled for deletion)
01514   for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01515     if (io_handle == this->to_be_added_info_[i].io_handle_ &&
01516         !this->to_be_added_info_[i].delete_entry_)
01517       {
01518         found = 1;
01519         modified_network_events = &this->to_be_added_info_[i].network_events_;
01520         delete_event = this->to_be_added_info_[i].delete_event_;
01521         event_handle = this->to_be_added_info_[i].event_handle_;
01522       }
01523 
01524   old_masks = this->bit_ops (*modified_network_events,
01525                              new_masks,
01526                              operation);
01527 
01528   new_network_events = *modified_network_events;
01529 
01530   return found;
01531 }
01532 
01533 ACE_Event_Handler *
01534 ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle)
01535 {
01536   long existing_masks_ignored = 0;
01537   return this->handler (handle,
01538                         existing_masks_ignored);
01539 }
01540 
01541 ACE_Event_Handler *
01542 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01543                                               long &existing_masks)
01544 {
01545   int found = 0;
01546   size_t i = 0;
01547   ACE_Event_Handler *event_handler = 0;
01548   existing_masks = 0;
01549 
01550   // Look for the handle first
01551 
01552   // First go through the current entries
01553   //
01554   // Look for all entries in the current handles for matching handle
01555   // (except those that have been scheduled for deletion)
01556   for (i = 0; i < this->max_handlep1_ && !found; ++i)
01557     if ((handle == this->current_info_[i].io_handle_ ||
01558          handle == this->current_handles_[i]) &&
01559         !this->current_info_[i].delete_entry_)
01560       {
01561         found = 1;
01562         event_handler = this->current_info_[i].event_handler_;
01563         existing_masks = this->current_info_[i].network_events_;
01564       }
01565 
01566   // Then pass through the suspended handles
01567   //
01568   // Look for all entries in the suspended handles for matching handle
01569   // (except those that have been scheduled for deletion)
01570   for (i = 0; i < this->suspended_handles_ && !found; ++i)
01571     if ((handle == this->current_suspended_info_[i].io_handle_ ||
01572          handle == this->current_suspended_info_[i].event_handle_) &&
01573         !this->current_suspended_info_[i].delete_entry_)
01574       {
01575         found = 1;
01576         event_handler = this->current_suspended_info_[i].event_handler_;
01577         existing_masks = this->current_suspended_info_[i].network_events_;
01578       }
01579 
01580   // Then check the to_be_added handles
01581   //
01582   // Look for all entries in the to_be_added handles for matching
01583   // handle (except those that have been scheduled for deletion)
01584   for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01585     if ((handle == this->to_be_added_info_[i].io_handle_ ||
01586          handle == this->to_be_added_info_[i].event_handle_) &&
01587         !this->to_be_added_info_[i].delete_entry_)
01588       {
01589         found = 1;
01590         event_handler = this->to_be_added_info_[i].event_handler_;
01591         existing_masks = this->to_be_added_info_[i].network_events_;
01592       }
01593 
01594   if (event_handler)
01595     event_handler->add_reference ();
01596 
01597   return event_handler;
01598 }
01599 
01600 int
01601 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01602                                               ACE_Reactor_Mask user_masks,
01603                                               ACE_Event_Handler **user_event_handler)
01604 {
01605   long existing_masks = 0;
01606   int found = 0;
01607 
01608   ACE_Event_Handler_var safe_event_handler =
01609     this->handler (handle,
01610                    existing_masks);
01611 
01612   if (safe_event_handler.handler ())
01613     found = 1;
01614 
01615   if (!found)
01616     return -1;
01617 
01618   // Otherwise, make sure that the masks that the user is looking for
01619   // are on.
01620   if (found &&
01621       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
01622     if (!ACE_BIT_ENABLED (existing_masks, FD_READ) &&
01623         !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
01624       found = 0;
01625 
01626   if (found &&
01627       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::WRITE_MASK))
01628     if (!ACE_BIT_ENABLED (existing_masks, FD_WRITE))
01629       found = 0;
01630 
01631   if (found &&
01632       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::EXCEPT_MASK))
01633     if (!ACE_BIT_ENABLED (existing_masks, FD_OOB))
01634       found = 0;
01635 
01636   if (found &&
01637       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::ACCEPT_MASK))
01638     if (!ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
01639       found = 0;
01640 
01641   if (found &&
01642       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::CONNECT_MASK))
01643     if (!ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
01644       found = 0;
01645 
01646   if (found &&
01647       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::QOS_MASK))
01648     if (!ACE_BIT_ENABLED (existing_masks, FD_QOS))
01649       found = 0;
01650 
01651   if (found &&
01652       ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::GROUP_QOS_MASK))
01653     if (!ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
01654       found = 0;
01655 
01656   if (found &&
01657       user_event_handler)
01658     *user_event_handler = safe_event_handler.release ();
01659 
01660   if (found)
01661     return 0;
01662   else
01663     return -1;
01664 }
01665 
01666 // Waits for and dispatches all events.  Returns -1 on error, 0 if
01667 // max_wait_time expired, or the number of events that were dispatched.
01668 int
01669 ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
01670                                   int alertable)
01671 {
01672   ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
01673 
01674   // Make sure we are not closed
01675   if (!this->open_for_business_ || this->deactivated_)
01676     return -1;
01677 
01678   // Stash the current time -- the destructor of this object will
01679   // automatically compute how much time elapsed since this method was
01680   // called.
01681   ACE_Countdown_Time countdown (max_wait_time);
01682 
01683   int result;
01684   do
01685     {
01686       // Check to see if it is ok to enter ::WaitForMultipleObjects
01687       // This will acquire <this->lock_> on success On failure, the
01688       // lock will not be acquired
01689       result = this->ok_to_wait (max_wait_time, alertable);
01690       if (result != 1)
01691         return result;
01692 
01693       // Increment the number of active threads
01694       ++this->active_threads_;
01695 
01696       // Release the <lock_>
01697       this->lock_.release ();
01698 
01699       // Update the countdown to reflect time waiting to play with the
01700       // mut and event.
01701       countdown.update ();
01702 
01703       // Calculate timeout
01704       int timeout = this->calculate_timeout (max_wait_time);
01705 
01706       // Wait for event to happen
01707       DWORD wait_status = this->wait_for_multiple_events (timeout,
01708                                                           alertable);
01709 
01710       // Upcall
01711       result = this->safe_dispatch (wait_status);
01712       if (0 == result)
01713         {
01714           // wait_for_multiple_events timed out without dispatching
01715           // anything.  Because of rounding and conversion errors and
01716           // such, it could be that the wait loop timed out, but
01717           // the timer queue said it wasn't quite ready to expire a
01718           // timer. In this case, max_wait_time won't have quite been
01719           // reduced to 0, and we need to go around again. If max_wait_time
01720           // is all the way to 0, just return, as the entire time the
01721           // caller wanted to wait has been used up.
01722           countdown.update ();     // Reflect time waiting for events
01723           if (0 == max_wait_time || max_wait_time->usec () == 0)
01724             break;
01725         }
01726     }
01727   while (result == 0);
01728 
01729   return result;
01730 }
01731 
01732 int
01733 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value *max_wait_time,
01734                               int alertable)
01735 {
01736   // Calculate the max time we should spend here
01737   //
01738   // Note: There is really no need to involve the <timer_queue_> here
01739   // because even if a timeout in the <timer_queue_> does expire we
01740   // will not be able to dispatch it
01741   int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
01742 
01743   // Atomically wait for both the <lock_> and <ok_to_wait_> event
01744   DWORD result = 0;
01745   while (1)
01746     {
01747 #if defined (ACE_HAS_PHARLAP)
01748       // PharLap doesn't implement WaitForMultipleObjectsEx, and doesn't
01749       // do async I/O, so it's not needed in this case anyway.
01750       result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01751                                          this->atomic_wait_array_,
01752                                          TRUE,
01753                                          timeout);
01754 
01755       if (result != WAIT_IO_COMPLETION)
01756         break;
01757 
01758 #elif defined (ACE_HAS_WINCE)
01759       result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01760                                          this->atomic_wait_array_,
01761                                          TRUE,
01762                                          timeout);
01763       break;  // CE does not have WAIT_IO_COMPLETION defined.
01764 #else
01765       result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01766                                            this->atomic_wait_array_,
01767                                            TRUE,
01768                                            timeout,
01769                                            alertable);
01770 
01771       if (result != WAIT_IO_COMPLETION)
01772         break;
01773 
01774 #endif /* ACE_HAS_PHARLAP */
01775     }
01776 
01777   switch (result)
01778     {
01779     case WAIT_TIMEOUT:
01780       errno = ETIME;
01781       return 0;
01782     case WAIT_FAILED:
01783     case WAIT_ABANDONED_0:
01784       ACE_OS::set_errno_to_last_error ();
01785       return -1;
01786     default:
01787       break;
01788     }
01789 
01790   // It is ok to enter ::WaitForMultipleObjects
01791   return 1;
01792 }
01793 
01794 DWORD
01795 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
01796                                             int alertable)
01797 {
01798   // Wait for any of handles_ to be active, or until timeout expires.
01799   // If <alertable> is enabled allow asynchronous completion of
01800   // ReadFile and WriteFile operations.
01801 
01802 #if defined (ACE_HAS_PHARLAP) || defined (ACE_HAS_WINCE)
01803   // PharLap doesn't do async I/O and doesn't implement
01804   // WaitForMultipleObjectsEx, so use WaitForMultipleObjects.
01805   ACE_UNUSED_ARG (alertable);
01806   return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 (),
01807                                    this->handler_rep_.handles (),
01808                                    FALSE,
01809                                    timeout);
01810 #else
01811   return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
01812                                      this->handler_rep_.handles (),
01813                                      FALSE,
01814                                      timeout,
01815                                      alertable);
01816 #endif /* ACE_HAS_PHARLAP */
01817 }
01818 
01819 DWORD
01820 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot)
01821 {
01822   return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - slot,
01823                                    this->handler_rep_.handles () + slot,
01824                                    FALSE,
01825                                    0);
01826 }
01827 
01828 int
01829 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value *max_wait_time)
01830 {
01831   ACE_Time_Value *time = 0;
01832   if (this->owner_ == ACE_Thread::self ())
01833     time = this->timer_queue_->calculate_timeout (max_wait_time);
01834   else
01835     time = max_wait_time;
01836 
01837   if (time == 0)
01838     return INFINITE;
01839   else
01840     return time->msec ();
01841 }
01842 
01843 
01844 int
01845 ACE_WFMO_Reactor::expire_timers (void)
01846 {
01847   // If "owner" thread
01848   if (ACE_Thread::self () == this->owner_)
01849     // expire all pending timers.
01850     return this->timer_queue_->expire ();
01851 
01852   else
01853     // Nothing to expire
01854     return 0;
01855 }
01856 
01857 int
01858 ACE_WFMO_Reactor::dispatch (DWORD wait_status)
01859 {
01860   int handlers_dispatched = 0;
01861 
01862   // Expire timers
01863   handlers_dispatched += this->expire_timers ();
01864 
01865   switch (wait_status)
01866     {
01867     case WAIT_FAILED: // Failure.
01868       ACE_OS::set_errno_to_last_error ();
01869       return -1;
01870 
01871     case WAIT_TIMEOUT: // Timeout.
01872       errno = ETIME;
01873       return handlers_dispatched;
01874 
01875 #ifndef ACE_HAS_WINCE
01876     case WAIT_IO_COMPLETION: // APC.
01877       return handlers_dispatched;
01878 #endif  // ACE_HAS_WINCE
01879 
01880     default:  // Dispatch.
01881       // We'll let dispatch worry about abandoned mutes.
01882       handlers_dispatched += this->dispatch_handles (wait_status);
01883       return handlers_dispatched;
01884     }
01885 }
01886 
01887 // Dispatches any active handles from <handles_[slot]> to
01888 // <handles_[max_handlep1_]>, polling through our handle set looking
01889 // for active handles.
01890 int
01891 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
01892 {
01893   // dispatch_slot is the absolute slot.  Only += is used to
01894   // increment it.
01895   DWORD dispatch_slot = 0;
01896 
01897   // Cache this value, this is the absolute value.
01898   DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
01899 
01900   // nCount starts off at <max_handlep1>, this is a transient count of
01901   // handles last waited on.
01902   DWORD nCount = max_handlep1;
01903 
01904   for (int number_of_handlers_dispatched = 1;
01905        ;
01906        ++number_of_handlers_dispatched)
01907     {
01908       const bool ok = (
01909 #if ! (defined(__BORLANDC__) && (__BORLANDC__ >= 0x0530)) \
01910     && !defined (ghs) \
01911     && !defined (__MINGW32__) \
01912     && !(defined (_MSC_VER) && _MSC_VER >= 1300)
01913                  // wait_status is unsigned in Borland, Green Hills,
01914                  // mingw32 and MSVC++ >= 7.1.
01915                  // This >= is always true, with a warning.
01916                  wait_status >= WAIT_OBJECT_0 &&
01917 #endif
01918                  wait_status <= (WAIT_OBJECT_0 + nCount));
01919 
01920       if (ok)
01921         dispatch_slot += wait_status - WAIT_OBJECT_0;
01922       else
01923         // Otherwise, a handle was abandoned.
01924         dispatch_slot += wait_status - WAIT_ABANDONED_0;
01925 
01926       // Dispatch handler
01927       if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
01928         return -1;
01929 
01930       // Increment slot
01931       ++dispatch_slot;
01932 
01933       // We're done.
01934       if (dispatch_slot >= max_handlep1)
01935         return number_of_handlers_dispatched;
01936 
01937       // Readjust nCount
01938       nCount = max_handlep1 - dispatch_slot;
01939 
01940       // Check the remaining handles
01941       wait_status = this->poll_remaining_handles (dispatch_slot);
01942       switch (wait_status)
01943         {
01944         case WAIT_FAILED: // Failure.
01945           ACE_OS::set_errno_to_last_error ();
01946           /* FALLTHRU */
01947         case WAIT_TIMEOUT:
01948           // There are no more handles ready, we can return.
01949           return number_of_handlers_dispatched;
01950         }
01951     }
01952 }
01953 
01954 int
01955 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
01956                                     DWORD max_handlep1)
01957 {
01958   // Check if there are window messages that need to be dispatched
01959   if (slot == max_handlep1)
01960     return this->dispatch_window_messages ();
01961 
01962   // Dispatch the handler if it has not been scheduled for deletion.
01963   // Note that this is a very week test if there are multiple threads
01964   // dispatching this slot as no locks are held here. Generally, you
01965   // do not want to do something like deleting the this pointer in
01966   // handle_close() if you have registered multiple times and there is
01967   // more than one thread in WFMO_Reactor->handle_events().
01968   else if (!this->handler_rep_.scheduled_for_deletion (slot))
01969     {
01970       ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
01971 
01972       if (this->handler_rep_.current_info ()[slot].io_entry_)
01973         return this->complex_dispatch_handler (slot,
01974                                                event_handle);
01975       else
01976         return this->simple_dispatch_handler (slot,
01977                                               event_handle);
01978     }
01979   else
01980     // The handle was scheduled for deletion, so we will skip it.
01981     return 0;
01982 }
01983 
01984 int
01985 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
01986                                            ACE_HANDLE event_handle)
01987 {
01988   // This dispatch is used for non-I/O entires
01989 
01990   // Assign the ``signaled'' HANDLE so that callers can get it.
01991   // siginfo_t is an ACE - specific fabrication. Constructor exists.
01992   siginfo_t sig (event_handle);
01993 
01994   ACE_Event_Handler *event_handler =
01995     this->handler_rep_.current_info ()[slot].event_handler_;
01996 
01997   int requires_reference_counting =
01998     event_handler->reference_counting_policy ().value () ==
01999     ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02000 
02001   if (requires_reference_counting)
02002     {
02003       event_handler->add_reference ();
02004     }
02005 
02006   // Upcall
02007   if (event_handler->handle_signal (0, &sig) == -1)
02008     this->handler_rep_.unbind (event_handle,
02009                                ACE_Event_Handler::NULL_MASK);
02010 
02011   // Call remove_reference() if needed.
02012   if (requires_reference_counting)
02013     {
02014       event_handler->remove_reference ();
02015     }
02016 
02017   return 0;
02018 }
02019 
02020 int
02021 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
02022                                             ACE_HANDLE event_handle)
02023 {
02024   // This dispatch is used for I/O entires.
02025 
02026   ACE_WFMO_Reactor_Handler_Repository::Current_Info &current_info =
02027     this->handler_rep_.current_info ()[slot];
02028 
02029   WSANETWORKEVENTS events;
02030   ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02031   if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
02032                               event_handle,
02033                               &events) == SOCKET_ERROR)
02034     problems = ACE_Event_Handler::ALL_EVENTS_MASK;
02035   else
02036     {
02037       // Prepare for upcalls. Clear the bits from <events> representing
02038       // events the handler is not interested in. If there are any left,
02039       // do the upcall(s). upcall will replace events.lNetworkEvents
02040       // with bits representing any functions that requested a repeat
02041       // callback before checking handles again. In this case, continue
02042       // to call back unless the handler is unregistered as a result of
02043       // one of the upcalls. The way this is written, the upcalls will
02044       // keep being done even if one or more upcalls reported problems.
02045       // In practice this may turn out not so good, but let's see. If any
02046       // problems, please notify Steve Huston <shuston@riverace.com>
02047       // before or after you change this code.
02048       events.lNetworkEvents &= current_info.network_events_;
02049       while (events.lNetworkEvents != 0)
02050         {
02051           ACE_Event_Handler *event_handler =
02052             current_info.event_handler_;
02053 
02054           int reference_counting_required =
02055             event_handler->reference_counting_policy ().value () ==
02056             ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02057 
02058           // Call add_reference() if needed.
02059           if (reference_counting_required)
02060             {
02061               event_handler->add_reference ();
02062             }
02063 
02064           // Upcall
02065           problems |= this->upcall (current_info.event_handler_,
02066                                     current_info.io_handle_,
02067                                     events);
02068 
02069           // Call remove_reference() if needed.
02070           if (reference_counting_required)
02071             {
02072               event_handler->remove_reference ();
02073             }
02074 
02075           if (this->handler_rep_.scheduled_for_deletion (slot))
02076             break;
02077         }
02078     }
02079 
02080   if (problems != ACE_Event_Handler::NULL_MASK
02081       && !this->handler_rep_.scheduled_for_deletion (slot)  )
02082     this->handler_rep_.unbind (event_handle, problems);
02083 
02084   return 0;
02085 }
02086 
02087 ACE_Reactor_Mask
02088 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
02089                           ACE_HANDLE io_handle,
02090                           WSANETWORKEVENTS &events)
02091 {
02092   // This method figures out what exactly has happened to the socket
02093   // and then calls appropriate methods.
02094   ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02095 
02096   // Go through the events and do the indicated upcalls. If the handler
02097   // doesn't want to be called back, clear the bit for that event.
02098   // At the end, set the bits back to <events> to request a repeat call.
02099 
02100   long actual_events = events.lNetworkEvents;
02101   int action;
02102 
02103   if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
02104     {
02105       action = event_handler->handle_output (io_handle);
02106       if (action <= 0)
02107         {
02108           ACE_CLR_BITS (actual_events, FD_WRITE);
02109           if (action == -1)
02110             ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
02111         }
02112     }
02113 
02114   if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
02115     {
02116       if (events.iErrorCode[FD_CONNECT_BIT] == 0)
02117         {
02118           // Successful connect
02119           action = event_handler->handle_output (io_handle);
02120           if (action <= 0)
02121             {
02122               ACE_CLR_BITS (actual_events, FD_CONNECT);
02123               if (action == -1)
02124                 ACE_SET_BITS (problems,
02125                               ACE_Event_Handler::CONNECT_MASK);
02126             }
02127         }
02128       // Unsuccessful connect
02129       else
02130         {
02131           action = event_handler->handle_input (io_handle);
02132           if (action <= 0)
02133             {
02134               ACE_CLR_BITS (actual_events, FD_CONNECT);
02135               if (action == -1)
02136                 ACE_SET_BITS (problems,
02137                               ACE_Event_Handler::CONNECT_MASK);
02138             }
02139         }
02140     }
02141 
02142   if (ACE_BIT_ENABLED (actual_events, FD_OOB))
02143     {
02144       action = event_handler->handle_exception (io_handle);
02145       if (action <= 0)
02146         {
02147           ACE_CLR_BITS (actual_events, FD_OOB);
02148           if (action == -1)
02149             ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
02150         }
02151     }
02152 
02153   if (ACE_BIT_ENABLED (actual_events, FD_READ))
02154     {
02155       action = event_handler->handle_input (io_handle);
02156       if (action <= 0)
02157         {
02158           ACE_CLR_BITS (actual_events, FD_READ);
02159           if (action == -1)
02160             ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02161         }
02162     }
02163 
02164   if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
02165       && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
02166     {
02167       action = event_handler->handle_input (io_handle);
02168       if (action <= 0)
02169         {
02170           ACE_CLR_BITS (actual_events, FD_CLOSE);
02171           if (action == -1)
02172             ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02173         }
02174     }
02175 
02176           if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
02177     {
02178       action = event_handler->handle_input (io_handle);
02179       if (action <= 0)
02180         {
02181           ACE_CLR_BITS (actual_events, FD_ACCEPT);
02182           if (action == -1)
02183             ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
02184         }
02185     }
02186 
02187   if (ACE_BIT_ENABLED (actual_events, FD_QOS))
02188     {
02189       action = event_handler->handle_qos (io_handle);
02190       if (action <= 0)
02191         {
02192           ACE_CLR_BITS (actual_events, FD_QOS);
02193           if (action == -1)
02194             ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
02195         }
02196     }
02197 
02198   if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
02199     {
02200       action = event_handler->handle_group_qos (io_handle);
02201       if (action <= 0)
02202         {
02203           ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
02204           if (action == -1)
02205             ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
02206         }
02207     }
02208 
02209   events.lNetworkEvents = actual_events;
02210   return problems;
02211 }
02212 
02213 
02214 int
02215 ACE_WFMO_Reactor::update_state (void)
02216 {
02217   // This GUARD is necessary since we are updating shared state.
02218   ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02219 
02220   // Decrement active threads
02221   --this->active_threads_;
02222 
02223   // Check if the state of the handler repository has changed or new
02224   // owner has to be set
02225   if (this->handler_rep_.changes_required () || this->new_owner ())
02226     {
02227       if (this->change_state_thread_ == 0)
02228         // Try to become the thread which will be responsible for the
02229         // changes
02230         {
02231           this->change_state_thread_ = ACE_Thread::self ();
02232           // Make sure no new threads are allowed to enter
02233           this->ok_to_wait_.reset ();
02234 
02235           if (this->active_threads_ > 0)
02236             // Check for other active threads
02237             {
02238               // Wake up all other threads
02239               this->wakeup_all_threads_.signal ();
02240               // Release <lock_>
02241               monitor.release ();
02242               // Go to sleep waiting for all other threads to get done
02243               this->waiting_to_change_state_.wait ();
02244               // Re-acquire <lock_> again
02245               monitor.acquire ();
02246             }
02247 
02248           // Note that make_changes() calls into user code which can
02249           // request other changes.  So keep looping until all
02250           // requested changes are completed.
02251           while (this->handler_rep_.changes_required ())
02252             // Make necessary changes to the handler repository
02253             this->handler_rep_.make_changes ();
02254           if (this->new_owner ())
02255             // Update the owner
02256             this->change_owner ();
02257           // Turn off <wakeup_all_threads_>
02258           this->wakeup_all_threads_.reset ();
02259           // Let everyone know that it is ok to go ahead
02260           this->ok_to_wait_.signal ();
02261           // Reset this flag
02262           this->change_state_thread_ = 0;
02263         }
02264       else if (this->active_threads_ == 0)
02265         // This thread did not get a chance to become the change
02266         // thread. If it is the last one out, it will wakeup the
02267         // change thread
02268         this->waiting_to_change_state_.signal ();
02269     }
02270   // This is if we were woken up explicitily by the user and there are
02271   // no state changes required.
02272   else if (this->active_threads_ == 0)
02273     // Turn off <wakeup_all_threads_>
02274     this->wakeup_all_threads_.reset ();
02275 
02276   return 0;
02277 }
02278 
02279 void
02280 ACE_WFMO_Reactor::dump (void) const
02281 {
02282 #if defined (ACE_HAS_DUMP)
02283   ACE_TRACE ("ACE_WFMO_Reactor::dump");
02284 
02285   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02286 
02287   ACE_DEBUG ((LM_DEBUG,
02288               ACE_LIB_TEXT ("Count of currently active threads = %d\n"),
02289               this->active_threads_));
02290 
02291   ACE_DEBUG ((LM_DEBUG,
02292               ACE_LIB_TEXT ("ID of owner thread = %d\n"),
02293               this->owner_));
02294 
02295   this->handler_rep_.dump ();
02296   this->signal_handler_->dump ();
02297   this->timer_queue_->dump ();
02298 
02299   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02300 #endif /* ACE_HAS_DUMP */
02301 }
02302 
02303 int
02304 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & /*number_of_active_handles*/,
02305                                                  ACE_Handle_Set & /*rd_mask*/)
02306 {
02307   return -1;
02308 }
02309 
02310 int
02311 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer & /*buffer*/)
02312 {
02313   return 0;
02314 }
02315 
02316 ACE_HANDLE
02317 ACE_WFMO_Reactor_Notify::notify_handle (void)
02318 {
02319   return ACE_INVALID_HANDLE;
02320 }
02321 
02322 int
02323 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE ,
02324                                            ACE_Notification_Buffer &)
02325 {
02326   return 0;
02327 }
02328 
02329 int
02330 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &)
02331 {
02332   return 0;
02333 }
02334 
02335 int
02336 ACE_WFMO_Reactor_Notify::close (void)
02337 {
02338   return -1;
02339 }
02340 
02341 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
02342   : timer_queue_ (0),
02343     message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
02344                     max_notifies * sizeof (ACE_Notification_Buffer)),
02345     max_notify_iterations_ (-1)
02346 {
02347 }
02348 
02349 int
02350 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor,
02351                                ACE_Timer_Queue *timer_queue,
02352                                int ignore_notify)
02353 {
02354   ACE_UNUSED_ARG (ignore_notify);
02355   timer_queue_ = timer_queue;
02356   return wfmo_reactor->register_handler (this);
02357 }
02358 
02359 ACE_HANDLE
02360 ACE_WFMO_Reactor_Notify::get_handle (void) const
02361 {
02362   return this->wakeup_one_thread_.handle ();
02363 }
02364 
02365 // Handle all pending notifications.
02366 
02367 int
02368 ACE_WFMO_Reactor_Notify::handle_signal (int signum,
02369                                         siginfo_t *siginfo,
02370                                         ucontext_t *)
02371 {
02372   ACE_UNUSED_ARG (signum);
02373 
02374   // Just check for sanity...
02375   if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
02376     return -1;
02377 
02378   // This will get called when <WFMO_Reactor->wakeup_one_thread_> event
02379   // is signaled.
02380   //  ACE_DEBUG ((LM_DEBUG,
02381   //             ACE_LIB_TEXT ("(%t) waking up to handle internal notifications\n")));
02382 
02383   for (int i = 1; ; ++i)
02384     {
02385       ACE_Message_Block *mb = 0;
02386       // Copy ACE_Time_Value::zero since dequeue_head will modify it.
02387       ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
02388       if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
02389         {
02390           if (errno == EWOULDBLOCK)
02391             // We've reached the end of the processing, return
02392             // normally.
02393             return 0;
02394           else
02395             return -1; // Something weird happened...
02396         }
02397       else
02398         {
02399           ACE_Notification_Buffer *buffer =
02400             reinterpret_cast <ACE_Notification_Buffer *> (mb->base ());
02401 
02402           // If eh == 0 then we've got major problems!  Otherwise, we
02403           // need to dispatch the appropriate handle_* method on the
02404           // ACE_Event_Handler pointer we've been passed.
02405 
02406           if (buffer->eh_ != 0)
02407             {
02408               ACE_Event_Handler *event_handler =
02409                 buffer->eh_;
02410 
02411               int requires_reference_counting =
02412                 event_handler->reference_counting_policy ().value () ==
02413                 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02414 
02415               int result = 0;
02416 
02417               switch (buffer->mask_)
02418                 {
02419                 case ACE_Event_Handler::READ_MASK:
02420                 case ACE_Event_Handler::ACCEPT_MASK:
02421                   result = event_handler->handle_input (ACE_INVALID_HANDLE);
02422                   break;
02423                 case ACE_Event_Handler::WRITE_MASK:
02424                   result = event_handler->handle_output (ACE_INVALID_HANDLE);
02425                   break;
02426                 case ACE_Event_Handler::EXCEPT_MASK:
02427                   result = event_handler->handle_exception (ACE_INVALID_HANDLE);
02428                   break;
02429                 case ACE_Event_Handler::QOS_MASK:
02430                   result = event_handler->handle_qos (ACE_INVALID_HANDLE);
02431                   break;
02432                 case ACE_Event_Handler::GROUP_QOS_MASK:
02433                   result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
02434                   break;
02435                 default:
02436                   ACE_ERROR ((LM_ERROR,
02437                               ACE_LIB_TEXT ("invalid mask = %d\n"),
02438                               buffer->mask_));
02439                   break;
02440                 }
02441 
02442               if (result == -1)
02443                 event_handler->handle_close (ACE_INVALID_HANDLE,
02444                                              ACE_Event_Handler::EXCEPT_MASK);
02445 
02446               if (requires_reference_counting)
02447                 {
02448                   event_handler->remove_reference ();
02449                 }
02450             }
02451 
02452           // Make sure to delete the memory regardless of success or
02453           // failure!
02454           mb->release ();
02455 
02456           // Bail out if we've reached the <max_notify_iterations_>.
02457           // Note that by default <max_notify_iterations_> is -1, so
02458           // we'll loop until we're done.
02459           if (i == this->max_notify_iterations_)
02460             {
02461               // If there are still notification in the queue, we need
02462               // to wake up again
02463               if (!this->message_queue_.is_empty ())
02464                 this->wakeup_one_thread_.signal ();
02465 
02466               // Break the loop as we have reached max_notify_iterations_
02467               return 0;
02468             }
02469         }
02470     }
02471 }
02472 
02473 // Notify the WFMO_Reactor, potentially enqueueing the
02474 // <ACE_Event_Handler> for subsequent processing in the WFMO_Reactor
02475 // thread of control.
02476 
02477 int
02478 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
02479                                  ACE_Reactor_Mask mask,
02480                                  ACE_Time_Value *timeout)
02481 {
02482   if (event_handler != 0)
02483     {
02484       ACE_Message_Block *mb = 0;
02485       ACE_NEW_RETURN (mb,
02486                       ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
02487                       -1);
02488 
02489       ACE_Notification_Buffer *buffer =
02490         (ACE_Notification_Buffer *) mb->base ();
02491       buffer->eh_ = event_handler;
02492       buffer->mask_ = mask;
02493 
02494       // Convert from relative time to absolute time by adding the
02495       // current time of day.  This is what <ACE_Message_Queue>
02496       // expects.
02497       if (timeout != 0)
02498         *timeout += timer_queue_->gettimeofday ();
02499 
02500       if (this->message_queue_.enqueue_tail
02501           (mb, timeout) == -1)
02502         {
02503           mb->release ();
02504           return -1;
02505         }
02506 
02507       event_handler->add_reference ();
02508     }
02509 
02510   return this->wakeup_one_thread_.signal ();
02511 }
02512 
02513 void
02514 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations)
02515 {
02516   ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02517   // Must always be > 0 or < 0 to optimize the loop exit condition.
02518   if (iterations == 0)
02519     iterations = 1;
02520 
02521   this->max_notify_iterations_ = iterations;
02522 }
02523 
02524 int
02525 ACE_WFMO_Reactor_Notify::max_notify_iterations (void)
02526 {
02527   ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02528   return this->max_notify_iterations_;
02529 }
02530 
02531 int
02532 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
02533                                                       ACE_Reactor_Mask mask)
02534 {
02535   ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
02536 
02537   // Go over message queue and take out all the matching event
02538   // handlers.  If eh == 0, purge all. Note that reactor notifies (no
02539   // handler specified) are never purged, as this may lose a needed
02540   // notify the reactor queued for itself.
02541 
02542   if (this->message_queue_.is_empty ())
02543     return 0;
02544 
02545   // Guard against new and/or delivered notifications while purging.
02546   // WARNING!!! The use of the notification queue's lock object for
02547   // this guard makes use of the knowledge that on Win32, the mutex
02548   // protecting the queue is really a CriticalSection, which is
02549   // recursive. This is how we can get away with locking it down here
02550   // and still calling member functions on the queue object.
02551   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
02552 
02553   // first, copy all to our own local queue. Since we've locked everyone out
02554   // of here, there's no need to use any synchronization on this queue.
02555   ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
02556 
02557   size_t queue_size  = this->message_queue_.message_count ();
02558   int number_purged = 0;
02559 
02560   size_t index;
02561 
02562   for (index = 0; index < queue_size; ++index)
02563     {
02564       ACE_Message_Block *mb = 0;
02565       if (-1 == this->message_queue_.dequeue_head (mb))
02566         return -1;        // This shouldn't happen...
02567 
02568       ACE_Notification_Buffer *buffer =
02569         reinterpret_cast<ACE_Notification_Buffer *> (mb->base ());
02570 
02571       // If this is not a Reactor notify (it is for a particular handler),
02572       // and it matches the specified handler (or purging all),
02573       // and applying the mask would totally eliminate the notification, then
02574       // release it and count the number purged.
02575       if ((0 != buffer->eh_) &&
02576           (0 == eh || eh == buffer->eh_) &&
02577           ACE_BIT_DISABLED (buffer->mask_, ~mask)) // the existing notification mask
02578                                                    // is left with nothing when
02579                                                    // applying the mask
02580         {
02581           ACE_Event_Handler *event_handler = buffer->eh_;
02582 
02583           event_handler->remove_reference ();
02584 
02585           mb->release ();
02586           ++number_purged;
02587         }
02588       else
02589         {
02590           // To preserve it, move it to the local_queue.  But first, if
02591           // this is not a Reactor notify (it is for a
02592           // particularhandler), and it matches the specified handler
02593           // (or purging all), then apply the mask
02594           if ((0 != buffer->eh_) &&
02595               (0 == eh || eh == buffer->eh_))
02596             ACE_CLR_BITS(buffer->mask_, mask);
02597           if (-1 == local_queue.enqueue_head (mb))
02598             return -1;
02599         }
02600     }
02601 
02602   if (this->message_queue_.message_count ())
02603     { // Should be empty!
02604       ACE_ASSERT (0);
02605       return -1;
02606     }
02607 
02608   // Now copy back from the local queue to the class queue, taking
02609   // care to preserve the original order...
02610   queue_size  = local_queue.message_count ();
02611   for (index = 0; index < queue_size; ++index)
02612     {
02613       ACE_Message_Block  *mb = 0;
02614       if (-1 == local_queue.dequeue_head (mb))
02615         {
02616           ACE_ASSERT (0);
02617           return -1;
02618         }
02619 
02620       if (-1 == this->message_queue_.enqueue_head (mb))
02621         {
02622           ACE_ASSERT (0);
02623           return -1;
02624         }
02625     }
02626 
02627   return number_purged;
02628 }
02629 
02630 void
02631 ACE_WFMO_Reactor_Notify::dump (void) const
02632 {
02633 #if defined (ACE_HAS_DUMP)
02634   ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
02635   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02636   this->timer_queue_->dump ();
02637   ACE_DEBUG ((LM_DEBUG,
02638               ACE_LIB_TEXT ("Max. iteration: %d\n"),
02639               this->max_notify_iterations_));
02640   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02641 #endif /* ACE_HAS_DUMP */
02642 }
02643 
02644 void
02645 ACE_WFMO_Reactor::max_notify_iterations (int iterations)
02646 {
02647   ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02648   ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
02649 
02650   // Must always be > 0 or < 0 to optimize the loop exit condition.
02651   this->notify_handler_->max_notify_iterations (iterations);
02652 }
02653 
02654 int
02655 ACE_WFMO_Reactor::max_notify_iterations (void)
02656 {
02657   ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02658   ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02659 
02660   return this->notify_handler_->max_notify_iterations ();
02661 }
02662 
02663 int
02664 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh,
02665                                                ACE_Reactor_Mask mask)
02666 {
02667   ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
02668   if (this->notify_handler_ == 0)
02669     return 0;
02670   else
02671     return this->notify_handler_->purge_pending_notifications (eh, mask);
02672 }
02673 
02674 int
02675 ACE_WFMO_Reactor::resumable_handler (void)
02676 {
02677   ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
02678   return 0;
02679 }
02680 
02681 
02682 // No-op WinSOCK2 methods to help WFMO_Reactor compile
02683 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
02684 int
02685 WSAEventSelect (SOCKET /* s */,
02686                 WSAEVENT /* hEventObject */,
02687                 long /* lNetworkEvents */)
02688 {
02689   return -1;
02690 }
02691 
02692 int
02693 WSAEnumNetworkEvents (SOCKET /* s */,
02694                       WSAEVENT /* hEventObject */,
02695                       LPWSANETWORKEVENTS /* lpNetworkEvents */)
02696 {
02697   return -1;
02698 }
02699 #endif /* !defined ACE_HAS_WINSOCK2 */
02700 
02701 ACE_END_VERSIONED_NAMESPACE_DECL
02702 
02703 #endif /* ACE_WIN32 */

Generated on Thu Nov 9 09:42:09 2006 for ACE by doxygen 1.3.6