TP_Reactor.cpp

Go to the documentation of this file.
00001 // $Id: TP_Reactor.cpp 80826 2008-03-04 14:51:23Z wotte $
00002 
00003 #include "ace/TP_Reactor.h"
00004 #include "ace/Thread.h"
00005 #include "ace/Timer_Queue.h"
00006 #include "ace/Sig_Handler.h"
00007 #include "ace/Log_Msg.h"
00008 #include "ace/OS_NS_sys_time.h"
00009 
00010 #if !defined (__ACE_INLINE__)
00011 #include "ace/TP_Reactor.inl"
00012 #endif /* __ACE_INLINE__ */
00013 
00014 ACE_RCSID (ace,
00015            TP_Reactor,
00016            "$Id: TP_Reactor.cpp 80826 2008-03-04 14:51:23Z wotte $")
00017 
00018 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
00021 
00022 int
00023 ACE_TP_Token_Guard::acquire_read_token (ACE_Time_Value *max_wait_time)
00024 {
00025   ACE_TRACE ("ACE_TP_Token_Guard::acquire_read_token");
00026 
00027   // The order of these events is very subtle, modify with care.
00028 
00029   // Try to grab the lock.  If someone if already there, don't wake
00030   // them up, just queue up in the thread pool.
00031   int result = 0;
00032 
00033   if (max_wait_time)
00034     {
00035       ACE_Time_Value tv = ACE_OS::gettimeofday ();
00036       tv += *max_wait_time;
00037 
00038       ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook,
00039                                                   0,
00040                                                   &tv));
00041     }
00042   else
00043     {
00044       ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
00045     }
00046 
00047   // Check for timeouts and errors.
00048   if (result == -1)
00049     {
00050       if (errno == ETIME)
00051         return 0;
00052       else
00053         return -1;
00054     }
00055 
00056   // We got the token and so let us mark ourselves as owner
00057   this->owner_ = 1;
00058 
00059   return result;
00060 }
00061 
00062 int
00063 ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
00064 {
00065   ACE_TRACE ("ACE_TP_Token_Guard::acquire_token");
00066 
00067   // Try to grab the lock.  If someone if already there, don't wake
00068   // them up, just queue up in the thread pool.
00069   int result = 0;
00070 
00071   if (max_wait_time)
00072     {
00073       ACE_Time_Value tv = ACE_OS::gettimeofday ();
00074       tv += *max_wait_time;
00075 
00076       ACE_MT (result = this->token_.acquire (0,
00077                                              0,
00078                                              &tv));
00079     }
00080   else
00081     {
00082       ACE_MT (result = this->token_.acquire ());
00083     }
00084 
00085   // Check for timeouts and errors.
00086   if (result == -1)
00087     {
00088       if (errno == ETIME)
00089         return 0;
00090       else
00091         return -1;
00092     }
00093 
00094   // We got the token and so let us mark ourselves as owner
00095   this->owner_ = 1;
00096 
00097   return result;
00098 }
00099 
00100 
00101 ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
00102                                 ACE_Timer_Queue *tq,
00103                                 int mask_signals,
00104                                 int s_queue)
00105   : ACE_Select_Reactor (sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00106 {
00107   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108   this->supress_notify_renew (1);
00109 }
00110 
00111 ACE_TP_Reactor::ACE_TP_Reactor (size_t max_number_of_handles,
00112                                 int restart,
00113                                 ACE_Sig_Handler *sh,
00114                                 ACE_Timer_Queue *tq,
00115                                 int mask_signals,
00116                                 int s_queue)
00117   : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00118 {
00119   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120   this->supress_notify_renew (1);
00121 }
00122 
00123 int
00124 ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
00125 {
00126   ACE_TRACE ("ACE_TP_Reactor::owner");
00127   if (o_id)
00128     *o_id = ACE_Thread::self ();
00129 
00130   return 0;
00131 }
00132 
00133 int
00134 ACE_TP_Reactor::owner (ACE_thread_t *t_id)
00135 {
00136   ACE_TRACE ("ACE_TP_Reactor::owner");
00137   *t_id = ACE_Thread::self ();
00138 
00139   return 0;
00140 }
00141 
00142 int
00143 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
00144 {
00145   ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146 
00147   // Stash the current time -- the destructor of this object will
00148   // automatically compute how much time elapsed since this method was
00149   // called.
00150   ACE_Countdown_Time countdown (max_wait_time);
00151 
00152   //
00153   // The order of these events is very subtle, modify with care.
00154   //
00155 
00156   // Instantiate the token guard which will try grabbing the token for
00157   // this thread.
00158   ACE_TP_Token_Guard guard (this->token_);
00159 
00160   int const result = guard.acquire_read_token (max_wait_time);
00161 
00162   // If the guard is NOT the owner just return the retval
00163   if (!guard.is_owner ())
00164     return result;
00165 
00166   // After getting the lock just just for deactivation..
00167   if (this->deactivated_)
00168     return -1;
00169 
00170   // Update the countdown to reflect time waiting for the token.
00171   countdown.update ();
00172 
00173   return this->dispatch_i (max_wait_time, guard);
00174 }
00175 
00176 int
00177 ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
00178                             ACE_TP_Token_Guard &guard)
00179 {
00180   int event_count = this->get_event_for_dispatching (max_wait_time);
00181 
00182   // We use this count to detect potential infinite loops as described
00183   // in bug 2540.
00184   int const initial_event_count = event_count;
00185 
00186   int result = 0;
00187 
00188   // Note: We are passing the <event_count> around, to have record of
00189   // how many events still need processing. May be this could be
00190   // useful in future.
00191 
00192 #if 0
00193   // @Ciju
00194   // signal handling isn't in a production state yet.
00195   // Commenting it out for now.
00196 
00197   // Dispatch signals
00198   if (event_count == -1)
00199     {
00200       // Looks like we dont do any upcalls in dispatch signals. If at
00201       // a later point of time, we decide to handle signals we have to
00202       // release the lock before we make any upcalls.. What is here
00203       // now is not the right thing...
00204       //
00205       // @@ We need to do better..
00206       return this->handle_signals (event_count, guard);
00207     }
00208 #endif // #if 0
00209 
00210   // If there are no signals and if we had received a proper
00211   // event_count then first look at dispatching timeouts. We need to
00212   // handle timers early since they may have higher latency
00213   // constraints than I/O handlers.  Ideally, the order of dispatching
00214   // should be a strategy...
00215 
00216   // NOTE: The event count does not have the number of timers that
00217   // needs dispatching. But we are still passing this along. We dont
00218   // need to do that. In the future we *may* have the timers also
00219   // returned through the <event_count>. Just passing that along for
00220   // that day.
00221   result = this->handle_timer_events (event_count, guard);
00222 
00223   if (result > 0)
00224     return result;
00225 
00226   // Else just go ahead fall through for further handling.
00227 
00228   if (event_count > 0)
00229     {
00230       // Next dispatch the notification handlers (if there are any to
00231       // dispatch).  These are required to handle multiple-threads
00232       // that are trying to update the <Reactor>.
00233       result = this->handle_notify_events (event_count, guard);
00234 
00235       if (result > 0)
00236         return result;
00237 
00238       // Else just fall through for further handling
00239     }
00240 
00241   if (event_count > 0)
00242     {
00243       // Handle socket events
00244       result = this->handle_socket_events (event_count, guard);
00245     }
00246 
00247   if (event_count != 0 && event_count == initial_event_count)
00248     {
00249       this->state_changed_ = true;
00250     }
00251 
00252   return result;
00253 }
00254 
00255 
00256 #if 0
00257   // @Ciju
00258   // signal handling isn't in a production state yet.
00259   // Commenting it out for now.
00260 
00261 int
00262 ACE_TP_Reactor::handle_signals (int & /*event_count*/,
00263                                 ACE_TP_Token_Guard & /*guard*/)
00264 {
00265   ACE_TRACE ("ACE_TP_Reactor::handle_signals");
00266 
00267   /*
00268    *
00269    *             THIS METHOD SEEMS BROKEN
00270    *
00271    *
00272    */
00273   // First check for interrupts.
00274   // Bail out -- we got here since <select> was interrupted.
00275   if (ACE_Sig_Handler::sig_pending () != 0)
00276     {
00277       ACE_Sig_Handler::sig_pending (0);
00278 
00279       // This piece of code comes from the old TP_Reactor. We did not
00280       // handle signals at all then. If we happen to handle signals
00281       // in the TP_Reactor, we should then start worryiung about this
00282       // - Bala 21-Aug- 01
00283 if 0
00284       // Not sure if this should be done in the TP_Reactor
00285       // case... leave it out for now.   -Steve Huston 22-Aug-00
00286 
00287       // If any HANDLES in the <ready_set_> are activated as a
00288       // result of signals they should be dispatched since
00289       // they may be time critical...
00290       active_handle_count = this->any_ready (dispatch_set);
00291 else
00292       // active_handle_count = 0;
00293 endif
00294 
00295       // Record the fact that the Reactor has dispatched a
00296       // handle_signal() method.  We need this to return the
00297       // appropriate count.
00298       return 1;
00299     }
00300 
00301   return -1;
00302 }
00303 #endif // #if 0
00304 
00305 
00306 int
00307 ACE_TP_Reactor::handle_timer_events (int & /*event_count*/,
00308                                      ACE_TP_Token_Guard &guard)
00309 {
00310   if (this->timer_queue_ == 0 || this->timer_queue_->is_empty())
00311     { // Empty timer queue so cannot have any expired timers.
00312       return 0;
00313     }
00314 
00315   // Get the current time
00316   ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00317                            this->timer_queue_->timer_skew ());
00318 
00319   // Look for a node in the timer queue whose timer <= the present
00320   // time.
00321   ACE_Timer_Node_Dispatch_Info info;
00322 
00323   if (this->timer_queue_->dispatch_info (cur_time, info))
00324     {
00325       const void *upcall_act = 0;
00326 
00327       // Preinvoke.
00328       this->timer_queue_->preinvoke (info, cur_time, upcall_act);
00329 
00330       // Release the token before dispatching notifies...
00331       guard.release_token ();
00332 
00333       // call the functor
00334       this->timer_queue_->upcall (info, cur_time);
00335 
00336       // Postinvoke
00337       this->timer_queue_->postinvoke (info, cur_time, upcall_act);
00338 
00339       // We have dispatched a timer
00340       return 1;
00341     }
00342 
00343   return 0;
00344 }
00345 
00346 int
00347 ACE_TP_Reactor::handle_notify_events (int & /*event_count*/,
00348                                       ACE_TP_Token_Guard &guard)
00349 {
00350   // Get the handle on which notify calls could have occured
00351   ACE_HANDLE notify_handle = this->get_notify_handle ();
00352 
00353   int result = 0;
00354 
00355   // The notify was not in the list returned by
00356   // wait_for_multiple_events ().
00357   if (notify_handle == ACE_INVALID_HANDLE)
00358     return result;
00359 
00360   // Now just do a read on the pipe..
00361   ACE_Notification_Buffer buffer;
00362 
00363   // Clear the handle of the read_mask of our <ready_set_>
00364   this->ready_set_.rd_mask_.clr_bit (notify_handle);
00365 
00366   // Keep reading notifies till we empty it or till we have a
00367   // dispatchable buffer
00368   while (this->notify_handler_->read_notify_pipe (notify_handle, buffer) > 0)
00369     {
00370       // Just figure out whether we can read any buffer that has
00371       // dispatchable info. If not we have just been unblocked by
00372       // another thread trying to update the reactor. If we get any
00373       // buffer that needs dispatching we will dispatch that after
00374       // releasing the lock
00375       if (this->notify_handler_->is_dispatchable (buffer) > 0)
00376         {
00377           // Release the token before dispatching notifies...
00378           guard.release_token ();
00379 
00380           // Dispatch the upcall for the notify
00381           this->notify_handler_->dispatch_notify (buffer);
00382 
00383           // We had a successful dispatch.
00384           result = 1;
00385 
00386           // break out of the while loop
00387           break;
00388         }
00389     }
00390 
00391   // If we did some work, then we just return 1 which will allow us
00392   // to get out of here. If we return 0, then we will be asked to do
00393   // some work ie. dispacth socket events
00394   return result;
00395 }
00396 
00397 int
00398 ACE_TP_Reactor::handle_socket_events (int &event_count,
00399                                       ACE_TP_Token_Guard &guard)
00400 {
00401 
00402   // We got the lock, lets handle some I/O events.
00403   ACE_EH_Dispatch_Info dispatch_info;
00404 
00405   this->get_socket_event_info (dispatch_info);
00406 
00407   // If there is any event handler that is ready to be dispatched, the
00408   // dispatch information is recorded in dispatch_info.
00409   if (!dispatch_info.dispatch ())
00410     {
00411       // Check for removed handlers.
00412       if (dispatch_info.event_handler_ == 0)
00413         {
00414           this->handler_rep_.unbind(dispatch_info.handle_,
00415                                     dispatch_info.mask_);
00416         }
00417 
00418 
00419       return 0;
00420     }
00421 
00422   // Suspend the handler so that other threads don't start dispatching
00423   // it, if we can't suspend then return directly
00424   //
00425   // NOTE: This check was performed in older versions of the
00426   // TP_Reactor. Looks like it is a waste..
00427   if (dispatch_info.event_handler_ != this->notify_handler_)
00428     if (this->suspend_i (dispatch_info.handle_) == -1)
00429       return 0;
00430 
00431   // Call add_reference() if needed.
00432   if (dispatch_info.reference_counting_required_)
00433     dispatch_info.event_handler_->add_reference ();
00434 
00435   // Release the lock.  Others threads can start waiting.
00436   guard.release_token ();
00437 
00438   int result = 0;
00439 
00440   // If there was an event handler ready, dispatch it.
00441   // Decrement the event left
00442   --event_count;
00443 
00444   // Dispatched an event
00445   if (this->dispatch_socket_event (dispatch_info) == 0)
00446     ++result;
00447 
00448   return result;
00449 }
00450 
00451 int
00452 ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
00453 {
00454   // If the reactor handler state has changed, clear any remembered
00455   // ready bits and re-scan from the master wait_set.
00456   if (this->state_changed_)
00457     {
00458       this->ready_set_.rd_mask_.reset ();
00459       this->ready_set_.wr_mask_.reset ();
00460       this->ready_set_.ex_mask_.reset ();
00461 
00462       this->state_changed_ = false;
00463     }
00464   else
00465     {
00466       // This is a hack... somewhere, under certain conditions (which
00467       // I don't understand...) the mask will have all of its bits clear,
00468       // yet have a size_ > 0. This is an attempt to remedy the affect,
00469       // without knowing why it happens.
00470 
00471       this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00472       this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00473       this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00474     }
00475 
00476   return this->wait_for_multiple_events (this->ready_set_, max_wait_time);
00477 }
00478 
00479 int
00480 ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
00481 {
00482   // Check for dispatch in write, except, read. Only catch one, but if
00483   // one is caught, be sure to clear the handle from each mask in case
00484   // there is more than one mask set for it. This would cause problems
00485   // if the handler is suspended for dispatching, but its set bit in
00486   // another part of ready_set_ kept it from being dispatched.
00487   int found_io = 0;
00488   ACE_HANDLE handle;
00489 
00490   // @@todo: We can do quite a bit of code reduction here. Let me get
00491   // it to work before I do this.
00492   {
00493     ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00494 
00495     while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00496       {
00497         if (this->is_suspended_i (handle))
00498           continue;
00499 
00500         // Remember this info
00501         event.set (handle,
00502                    this->handler_rep_.find (handle),
00503                    ACE_Event_Handler::WRITE_MASK,
00504                    &ACE_Event_Handler::handle_output);
00505 
00506         this->clear_handle_read_set (handle);
00507         found_io = 1;
00508       }
00509   }
00510 
00511   if (!found_io)
00512     {
00513       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00514 
00515       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00516         {
00517           if (this->is_suspended_i (handle))
00518             continue;
00519 
00520           // Remember this info
00521           event.set (handle,
00522                      this->handler_rep_.find (handle),
00523                      ACE_Event_Handler::EXCEPT_MASK,
00524                      &ACE_Event_Handler::handle_exception);
00525 
00526           this->clear_handle_read_set (handle);
00527 
00528           found_io = 1;
00529         }
00530     }
00531 
00532   if (!found_io)
00533     {
00534       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00535 
00536       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00537         {
00538           if (this->is_suspended_i (handle))
00539             continue;
00540 
00541           // Remember this info
00542           event.set (handle,
00543                      this->handler_rep_.find (handle),
00544                      ACE_Event_Handler::READ_MASK,
00545                      &ACE_Event_Handler::handle_input);
00546 
00547           this->clear_handle_read_set (handle);
00548           found_io = 1;
00549         }
00550     }
00551 
00552   return found_io;
00553 }
00554 
00555 // Dispatches a single event handler
00556 int
00557 ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
00558 {
00559   ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00560 
00561   ACE_Event_Handler * const event_handler = dispatch_info.event_handler_;
00562   ACE_EH_PTMF const callback = dispatch_info.callback_;
00563 
00564   // Check for removed handlers.
00565   if (event_handler == 0)
00566     return -1;
00567 
00568   // Upcall. If the handler returns positive value (requesting a
00569   // reactor callback) don't set the ready-bit because it will be
00570   // ignored if the reactor state has changed. Just call back
00571   // as many times as the handler requests it. Other threads are off
00572   // handling other things.
00573   int status = 1;
00574   while (status > 0)
00575     status = (event_handler->*callback) (dispatch_info.handle_);
00576 
00577   // Post process socket event
00578   return this->post_process_socket_event (dispatch_info, status);
00579 }
00580 
00581 int
00582 ACE_TP_Reactor::post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,
00583                                            int status)
00584 {
00585   int result = 0;
00586 
00587   // First check if we really have to post process something, if not, then
00588   // we don't acquire the token which saves us a lot of time.
00589   if (status < 0 ||
00590      (dispatch_info.event_handler_ != this->notify_handler_ &&
00591       dispatch_info.resume_flag_ ==
00592         ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER))
00593     {
00594       // Get the reactor token and with this token acquired remove first the
00595       // handler and resume it at the same time. This must be atomic, see also
00596       // bugzilla 2395. When this is not atomic it can be that we resume the
00597       // handle after it is reused by the OS.
00598       ACE_TP_Token_Guard guard (this->token_);
00599 
00600       result = guard.acquire_token ();
00601 
00602       // If the guard is NOT the owner just return the retval
00603       if (!guard.is_owner ())
00604         return result;
00605 
00606       // A different event handler may have been registered during the
00607       // upcall if the handle was closed and then reopened, for
00608       // example.  Make sure we're removing and/or resuming the event
00609       // handler used during the upcall.
00610       ACE_Event_Handler const * const eh =
00611         this->handler_rep_.find (dispatch_info.handle_);
00612 
00613       // Only remove or resume the event handler used during the
00614       // upcall.
00615       if (eh == dispatch_info.event_handler_)
00616         {
00617           if (status < 0)
00618             {
00619               result =
00620                 this->remove_handler_i (dispatch_info.handle_,
00621                                         dispatch_info.mask_);
00622             }
00623 
00624           // Resume handler if required.
00625           if (dispatch_info.event_handler_ != this->notify_handler_ &&
00626               dispatch_info.resume_flag_ ==
00627               ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00628             this->resume_i (dispatch_info.handle_);
00629         }
00630     }
00631 
00632   // Call remove_reference() if needed.
00633   if (dispatch_info.reference_counting_required_)
00634     dispatch_info.event_handler_->remove_reference ();
00635 
00636   return result;
00637 }
00638 
00639 int
00640 ACE_TP_Reactor::resumable_handler (void)
00641 {
00642   return 1;
00643 }
00644 
00645 int
00646 ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
00647 {
00648   return this->handle_events (&max_wait_time);
00649 }
00650 
00651 void
00652 ACE_TP_Reactor::notify_handle (ACE_HANDLE,
00653                                ACE_Reactor_Mask,
00654                                ACE_Handle_Set &,
00655                                ACE_Event_Handler *eh,
00656                                ACE_EH_PTMF)
00657 {
00658   ACE_ERROR ((LM_ERROR,
00659               ACE_TEXT ("ACE_TP_Reactor::notify_handle: ")
00660               ACE_TEXT ("Wrong version of notify_handle() got called \n")));
00661 
00662   ACE_ASSERT (eh == 0);
00663   ACE_UNUSED_ARG (eh);
00664 }
00665 
00666 ACE_HANDLE
00667 ACE_TP_Reactor::get_notify_handle (void)
00668 {
00669   // Call the notify handler to get a handle on which we would have a
00670   // notify waiting
00671   ACE_HANDLE const read_handle =
00672     this->notify_handler_->notify_handle ();
00673 
00674   // Check whether the rd_mask has been set on that handle. If so
00675   // return the handle.
00676   if (read_handle != ACE_INVALID_HANDLE &&
00677       this->ready_set_.rd_mask_.is_set (read_handle))
00678     {
00679       return read_handle;
00680     }
00681 
00682   // None found..
00683   return ACE_INVALID_HANDLE;
00684 }
00685 
00686 ACE_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:18:43 2010 for ACE by  doxygen 1.4.7