TP_Reactor.cpp

Go to the documentation of this file.
00001 // $Id: TP_Reactor.cpp 79134 2007-07-31 18:23:50Z johnnyw $
00002 
00003 #include "ace/TP_Reactor.h"
00004 #include "ace/Thread.h"
00005 #include "ace/Timer_Queue.h"
00006 #include "ace/Sig_Handler.h"
00007 #include "ace/Log_Msg.h"
00008 #include "ace/OS_NS_sys_time.h"
00009 
00010 #if !defined (__ACE_INLINE__)
00011 #include "ace/TP_Reactor.inl"
00012 #endif /* __ACE_INLINE__ */
00013 
00014 ACE_RCSID (ace,
00015            TP_Reactor,
00016            "$Id: TP_Reactor.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00017 
00018 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
00021 
00022 int
00023 ACE_TP_Token_Guard::acquire_read_token (ACE_Time_Value *max_wait_time)
00024 {
00025   ACE_TRACE ("ACE_TP_Token_Guard::acquire_read_token");
00026 
00027   // The order of these events is very subtle, modify with care.
00028 
00029   // Try to grab the lock.  If someone if already there, don't wake
00030   // them up, just queue up in the thread pool.
00031   int result = 0;
00032 
00033   if (max_wait_time)
00034     {
00035       ACE_Time_Value tv = ACE_OS::gettimeofday ();
00036       tv += *max_wait_time;
00037 
00038       ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook,
00039                                                   0,
00040                                                   &tv));
00041     }
00042   else
00043     {
00044       ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
00045     }
00046 
00047   // Check for timeouts and errors.
00048   if (result == -1)
00049     {
00050       if (errno == ETIME)
00051         return 0;
00052       else
00053         return -1;
00054     }
00055 
00056   // We got the token and so let us mark ourselves as owner
00057   this->owner_ = 1;
00058 
00059   return result;
00060 }
00061 
00062 int
00063 ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
00064 {
00065   ACE_TRACE ("ACE_TP_Token_Guard::acquire_token");
00066 
00067   // Try to grab the lock.  If someone if already there, don't wake
00068   // them up, just queue up in the thread pool.
00069   int result = 0;
00070 
00071   if (max_wait_time)
00072     {
00073       ACE_Time_Value tv = ACE_OS::gettimeofday ();
00074       tv += *max_wait_time;
00075 
00076       ACE_MT (result = this->token_.acquire (0,
00077                                              0,
00078                                              &tv));
00079     }
00080   else
00081     {
00082       ACE_MT (result = this->token_.acquire ());
00083     }
00084 
00085   // Check for timeouts and errors.
00086   if (result == -1)
00087     {
00088       if (errno == ETIME)
00089         return 0;
00090       else
00091         return -1;
00092     }
00093 
00094   // We got the token and so let us mark ourselves as owner
00095   this->owner_ = 1;
00096 
00097   return result;
00098 }
00099 
00100 
00101 ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
00102                                 ACE_Timer_Queue *tq,
00103                                 int mask_signals,
00104                                 int s_queue)
00105   : ACE_Select_Reactor (sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00106 {
00107   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108   this->supress_notify_renew (1);
00109 }
00110 
00111 ACE_TP_Reactor::ACE_TP_Reactor (size_t max_number_of_handles,
00112                                 int restart,
00113                                 ACE_Sig_Handler *sh,
00114                                 ACE_Timer_Queue *tq,
00115                                 int mask_signals,
00116                                 int s_queue)
00117   : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
00118 {
00119   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120   this->supress_notify_renew (1);
00121 }
00122 
00123 int
00124 ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
00125 {
00126   ACE_TRACE ("ACE_TP_Reactor::owner");
00127   if (o_id)
00128     *o_id = ACE_Thread::self ();
00129 
00130   return 0;
00131 }
00132 
00133 int
00134 ACE_TP_Reactor::owner (ACE_thread_t *t_id)
00135 {
00136   ACE_TRACE ("ACE_TP_Reactor::owner");
00137   *t_id = ACE_Thread::self ();
00138 
00139   return 0;
00140 }
00141 
00142 int
00143 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
00144 {
00145   ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146 
00147   // Stash the current time -- the destructor of this object will
00148   // automatically compute how much time elapsed since this method was
00149   // called.
00150   ACE_Countdown_Time countdown (max_wait_time);
00151 
00152   //
00153   // The order of these events is very subtle, modify with care.
00154   //
00155 
00156   // Instantiate the token guard which will try grabbing the token for
00157   // this thread.
00158   ACE_TP_Token_Guard guard (this->token_);
00159 
00160   int const result = guard.acquire_read_token (max_wait_time);
00161 
00162   // If the guard is NOT the owner just return the retval
00163   if (!guard.is_owner ())
00164     return result;
00165 
00166   // After getting the lock just just for deactivation..
00167   if (this->deactivated_)
00168     return -1;
00169 
00170   // Update the countdown to reflect time waiting for the token.
00171   countdown.update ();
00172 
00173   return this->dispatch_i (max_wait_time,
00174                            guard);
00175 }
00176 
00177 int
00178 ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
00179                             ACE_TP_Token_Guard &guard)
00180 {
00181   int event_count =
00182     this->get_event_for_dispatching (max_wait_time);
00183 
00184   // We use this count to detect potential infinite loops as described
00185   // in bug 2540.
00186   int initial_event_count = event_count;
00187 
00188   int result = 0;
00189 
00190   // Note: We are passing the <event_count> around, to have record of
00191   // how many events still need processing. May be this could be
00192   // useful in future.
00193 
00194 #if 0
00195   // @Ciju
00196   // signal handling isn't in a production state yet.
00197   // Commenting it out for now.
00198 
00199   // Dispatch signals
00200   if (event_count == -1)
00201     {
00202       // Looks like we dont do any upcalls in dispatch signals. If at
00203       // a later point of time, we decide to handle signals we have to
00204       // release the lock before we make any upcalls.. What is here
00205       // now is not the right thing...
00206       //
00207       // @@ We need to do better..
00208       return this->handle_signals (event_count,
00209                                    guard);
00210     }
00211 #endif // #if 0
00212 
00213   // If there are no signals and if we had received a proper
00214   // event_count then first look at dispatching timeouts. We need to
00215   // handle timers early since they may have higher latency
00216   // constraints than I/O handlers.  Ideally, the order of dispatching
00217   // should be a strategy...
00218 
00219   // NOTE: The event count does not have the number of timers that
00220   // needs dispatching. But we are still passing this along. We dont
00221   // need to do that. In the future we *may* have the timers also
00222   // returned through the <event_count>. Just passing that along for
00223   // that day.
00224   result = this->handle_timer_events (event_count,
00225                                       guard);
00226 
00227   if (result > 0)
00228     return result;
00229 
00230   // Else just go ahead fall through for further handling.
00231 
00232   if (event_count > 0)
00233     {
00234       // Next dispatch the notification handlers (if there are any to
00235       // dispatch).  These are required to handle multiple-threads
00236       // that are trying to update the <Reactor>.
00237       result = this->handle_notify_events (event_count,
00238                                            guard);
00239 
00240       if (result > 0)
00241         return result;
00242 
00243       // Else just fall through for further handling
00244     }
00245 
00246   if (event_count > 0)
00247     {
00248       // Handle socket events
00249       result = this->handle_socket_events (event_count,
00250                                            guard);
00251     }
00252 
00253   if (event_count != 0
00254       && event_count == initial_event_count)
00255     {
00256       this->state_changed_ = true;
00257     }
00258 
00259   return result;
00260 }
00261 
00262 
00263 #if 0
00264   // @Ciju
00265   // signal handling isn't in a production state yet.
00266   // Commenting it out for now.
00267 
00268 int
00269 ACE_TP_Reactor::handle_signals (int & /*event_count*/,
00270                                 ACE_TP_Token_Guard & /*guard*/)
00271 {
00272   ACE_TRACE ("ACE_TP_Reactor::handle_signals");
00273 
00274   /*
00275    *
00276    *             THIS METHOD SEEMS BROKEN
00277    *
00278    *
00279    */
00280   // First check for interrupts.
00281   // Bail out -- we got here since <select> was interrupted.
00282   if (ACE_Sig_Handler::sig_pending () != 0)
00283     {
00284       ACE_Sig_Handler::sig_pending (0);
00285 
00286       // This piece of code comes from the old TP_Reactor. We did not
00287       // handle signals at all then. If we happen to handle signals
00288       // in the TP_Reactor, we should then start worryiung about this
00289       // - Bala 21-Aug- 01
00290 if 0
00291       // Not sure if this should be done in the TP_Reactor
00292       // case... leave it out for now.   -Steve Huston 22-Aug-00
00293 
00294       // If any HANDLES in the <ready_set_> are activated as a
00295       // result of signals they should be dispatched since
00296       // they may be time critical...
00297       active_handle_count = this->any_ready (dispatch_set);
00298 else
00299       // active_handle_count = 0;
00300 endif
00301 
00302       // Record the fact that the Reactor has dispatched a
00303       // handle_signal() method.  We need this to return the
00304       // appropriate count.
00305       return 1;
00306     }
00307 
00308   return -1;
00309 }
00310 #endif // #if 0
00311 
00312 
00313 int
00314 ACE_TP_Reactor::handle_timer_events (int & /*event_count*/,
00315                                      ACE_TP_Token_Guard &guard)
00316 {
00317   if (this->timer_queue_ == 0 || this->timer_queue_->is_empty())
00318     { // Empty timer queue so cannot have any expired timers.
00319       return 0;
00320     }
00321 
00322   // Get the current time
00323   ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00324                            this->timer_queue_->timer_skew ());
00325 
00326   // Look for a node in the timer queue whose timer <= the present
00327   // time.
00328   ACE_Timer_Node_Dispatch_Info info;
00329 
00330   if (this->timer_queue_->dispatch_info (cur_time,
00331                                          info))
00332     {
00333       const void *upcall_act = 0;
00334 
00335       // Preinvoke.
00336       this->timer_queue_->preinvoke (info,
00337                                      cur_time,
00338                                      upcall_act);
00339 
00340       // Release the token before dispatching notifies...
00341       guard.release_token ();
00342 
00343       // call the functor
00344       this->timer_queue_->upcall (info,
00345                                   cur_time);
00346 
00347       // Postinvoke
00348       this->timer_queue_->postinvoke (info,
00349                                       cur_time,
00350                                       upcall_act);
00351 
00352       // We have dispatched a timer
00353       return 1;
00354     }
00355 
00356   return 0;
00357 }
00358 
00359 int
00360 ACE_TP_Reactor::handle_notify_events (int & /*event_count*/,
00361                                       ACE_TP_Token_Guard &guard)
00362 {
00363   // Get the handle on which notify calls could have occured
00364   ACE_HANDLE notify_handle =
00365     this->get_notify_handle ();
00366 
00367   int result = 0;
00368 
00369   // The notify was not in the list returned by
00370   // wait_for_multiple_events ().
00371   if (notify_handle == ACE_INVALID_HANDLE)
00372     return result;
00373 
00374   // Now just do a read on the pipe..
00375   ACE_Notification_Buffer buffer;
00376 
00377   // Clear the handle of the read_mask of our <ready_set_>
00378   this->ready_set_.rd_mask_.clr_bit (notify_handle);
00379 
00380   // Keep reading notifies till we empty it or till we have a
00381   // dispatchable buffer
00382   while (this->notify_handler_->read_notify_pipe (notify_handle,
00383                                                   buffer) > 0)
00384     {
00385       // Just figure out whether we can read any buffer that has
00386       // dispatchable info. If not we have just been unblocked by
00387       // another thread trying to update the reactor. If we get any
00388       // buffer that needs dispatching we will dispatch that after
00389       // releasing the lock
00390       if (this->notify_handler_->is_dispatchable (buffer) > 0)
00391         {
00392           // Release the token before dispatching notifies...
00393           guard.release_token ();
00394 
00395           // Dispatch the upcall for the notify
00396           this->notify_handler_->dispatch_notify (buffer);
00397 
00398           // We had a successful dispatch.
00399           result = 1;
00400 
00401           // break out of the while loop
00402           break;
00403         }
00404     }
00405 
00406   // If we did some work, then we just return 1 which will allow us
00407   // to get out of here. If we return 0, then we will be asked to do
00408   // some work ie. dispacth socket events
00409   return result;
00410 }
00411 
00412 int
00413 ACE_TP_Reactor::handle_socket_events (int &event_count,
00414                                       ACE_TP_Token_Guard &guard)
00415 {
00416 
00417   // We got the lock, lets handle some I/O events.
00418   ACE_EH_Dispatch_Info dispatch_info;
00419 
00420   this->get_socket_event_info (dispatch_info);
00421 
00422   // If there is any event handler that is ready to be dispatched, the
00423   // dispatch information is recorded in dispatch_info.
00424   if (!dispatch_info.dispatch ())
00425     {
00426       // Check for removed handlers.
00427       if (dispatch_info.event_handler_ == 0)
00428         {
00429           this->handler_rep_.unbind(dispatch_info.handle_,
00430                                     dispatch_info.mask_);
00431         }
00432 
00433 
00434       return 0;
00435     }
00436 
00437   // Suspend the handler so that other threads don't start dispatching
00438   // it, if we can't suspend then return directly
00439   //
00440   // NOTE: This check was performed in older versions of the
00441   // TP_Reactor. Looks like it is a waste..
00442   if (dispatch_info.event_handler_ != this->notify_handler_)
00443     if (this->suspend_i (dispatch_info.handle_) == -1)
00444       return 0;
00445 
00446   // Call add_reference() if needed.
00447   if (dispatch_info.reference_counting_required_)
00448     dispatch_info.event_handler_->add_reference ();
00449 
00450   // Release the lock.  Others threads can start waiting.
00451   guard.release_token ();
00452 
00453   int result = 0;
00454 
00455   // If there was an event handler ready, dispatch it.
00456   // Decrement the event left
00457   --event_count;
00458 
00459   // Dispatched an event
00460   if (this->dispatch_socket_event (dispatch_info) == 0)
00461     ++result;
00462 
00463   return result;
00464 }
00465 
00466 int
00467 ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
00468 {
00469   // If the reactor handler state has changed, clear any remembered
00470   // ready bits and re-scan from the master wait_set.
00471   if (this->state_changed_)
00472     {
00473       this->ready_set_.rd_mask_.reset ();
00474       this->ready_set_.wr_mask_.reset ();
00475       this->ready_set_.ex_mask_.reset ();
00476 
00477       this->state_changed_ = false;
00478     }
00479   else
00480     {
00481       // This is a hack... somewhere, under certain conditions (which
00482       // I don't understand...) the mask will have all of its bits clear,
00483       // yet have a size_ > 0. This is an attempt to remedy the affect,
00484       // without knowing why it happens.
00485 
00486       this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00487       this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00488       this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00489     }
00490 
00491   return this->wait_for_multiple_events (this->ready_set_,
00492                                          max_wait_time);
00493 }
00494 
00495 int
00496 ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
00497 {
00498   // Check for dispatch in write, except, read. Only catch one, but if
00499   // one is caught, be sure to clear the handle from each mask in case
00500   // there is more than one mask set for it. This would cause problems
00501   // if the handler is suspended for dispatching, but its set bit in
00502   // another part of ready_set_ kept it from being dispatched.
00503   int found_io = 0;
00504   ACE_HANDLE handle;
00505 
00506   // @@todo: We can do quite a bit of code reduction here. Let me get
00507   // it to work before I do this.
00508   {
00509     ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00510 
00511     while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00512       {
00513         if (this->is_suspended_i (handle))
00514           continue;
00515 
00516         // Remember this info
00517         event.set (handle,
00518                    this->handler_rep_.find (handle),
00519                    ACE_Event_Handler::WRITE_MASK,
00520                    &ACE_Event_Handler::handle_output);
00521 
00522         this->clear_handle_read_set (handle);
00523         found_io = 1;
00524       }
00525   }
00526 
00527   if (!found_io)
00528     {
00529       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00530 
00531       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00532         {
00533           if (this->is_suspended_i (handle))
00534             continue;
00535 
00536           // Remember this info
00537           event.set (handle,
00538                      this->handler_rep_.find (handle),
00539                      ACE_Event_Handler::EXCEPT_MASK,
00540                      &ACE_Event_Handler::handle_exception);
00541 
00542           this->clear_handle_read_set (handle);
00543 
00544           found_io = 1;
00545         }
00546     }
00547 
00548   if (!found_io)
00549     {
00550       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00551 
00552       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00553         {
00554           if (this->is_suspended_i (handle))
00555             continue;
00556 
00557           // Remember this info
00558           event.set (handle,
00559                      this->handler_rep_.find (handle),
00560                      ACE_Event_Handler::READ_MASK,
00561                      &ACE_Event_Handler::handle_input);
00562 
00563           this->clear_handle_read_set (handle);
00564           found_io = 1;
00565         }
00566     }
00567 
00568   return found_io;
00569 }
00570 
00571 // Dispatches a single event handler
00572 int
00573 ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
00574 {
00575   ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00576 
00577   ACE_Event_Handler * const event_handler = dispatch_info.event_handler_;
00578   ACE_EH_PTMF const callback = dispatch_info.callback_;
00579 
00580   // Check for removed handlers.
00581   if (event_handler == 0)
00582     return -1;
00583 
00584   // Upcall. If the handler returns positive value (requesting a
00585   // reactor callback) don't set the ready-bit because it will be
00586   // ignored if the reactor state has changed. Just call back
00587   // as many times as the handler requests it. Other threads are off
00588   // handling other things.
00589   int status = 1;
00590   while (status > 0)
00591     status = (event_handler->*callback) (dispatch_info.handle_);
00592 
00593   // Post process socket event
00594   return this->post_process_socket_event (dispatch_info, status);
00595 }
00596 
00597 int
00598 ACE_TP_Reactor::post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,
00599                                            int status)
00600 {
00601   int result = 0;
00602 
00603   // First check if we really have to post process something, if not, then
00604   // we don't acquire the token which saves us a lot of time.
00605   if (status < 0 ||
00606      (dispatch_info.event_handler_ != this->notify_handler_ &&
00607       dispatch_info.resume_flag_ ==
00608         ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER))
00609     {
00610       // Get the reactor token and with this token acquired remove first the
00611       // handler and resume it at the same time. This must be atomic, see also
00612       // bugzilla 2395. When this is not atomic it can be that we resume the
00613       // handle after it is reused by the OS.
00614       ACE_TP_Token_Guard guard (this->token_);
00615 
00616       result = guard.acquire_token ();
00617 
00618       // If the guard is NOT the owner just return the retval
00619       if (!guard.is_owner ())
00620         return result;
00621 
00622       // A different event handler may have been registered during the
00623       // upcall if the handle was closed and then reopened, for
00624       // example.  Make sure we're removing and/or resuming the event
00625       // handler used during the upcall.
00626       ACE_Event_Handler const * const eh =
00627         this->handler_rep_.find (dispatch_info.handle_);
00628 
00629       // Only remove or resume the event handler used during the
00630       // upcall.
00631       if (eh == dispatch_info.event_handler_)
00632         {
00633           if (status < 0)
00634             {
00635               result =
00636                 this->remove_handler_i (dispatch_info.handle_,
00637                                         dispatch_info.mask_);
00638             }
00639 
00640           // Resume handler if required.
00641           if (dispatch_info.event_handler_ != this->notify_handler_ &&
00642               dispatch_info.resume_flag_ ==
00643               ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00644             this->resume_i (dispatch_info.handle_);
00645         }
00646     }
00647 
00648   // Call remove_reference() if needed.
00649   if (dispatch_info.reference_counting_required_)
00650     dispatch_info.event_handler_->remove_reference ();
00651 
00652   return result;
00653 }
00654 
00655 int
00656 ACE_TP_Reactor::resumable_handler (void)
00657 {
00658   return 1;
00659 }
00660 
00661 int
00662 ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
00663 {
00664   return this->handle_events (&max_wait_time);
00665 }
00666 
00667 void
00668 ACE_TP_Reactor::notify_handle (ACE_HANDLE,
00669                                ACE_Reactor_Mask,
00670                                ACE_Handle_Set &,
00671                                ACE_Event_Handler *eh,
00672                                ACE_EH_PTMF)
00673 {
00674   ACE_ERROR ((LM_ERROR,
00675               ACE_TEXT ("ACE_TP_Reactor::notify_handle: ")
00676               ACE_TEXT ("Wrong version of notify_handle() got called \n")));
00677 
00678   ACE_ASSERT (eh == 0);
00679   ACE_UNUSED_ARG (eh);
00680 }
00681 
00682 ACE_HANDLE
00683 ACE_TP_Reactor::get_notify_handle (void)
00684 {
00685   // Call the notify handler to get a handle on which we would have a
00686   // notify waiting
00687   ACE_HANDLE const read_handle =
00688     this->notify_handler_->notify_handle ();
00689 
00690   // Check whether the rd_mask has been set on that handle. If so
00691   // return the handle.
00692   if (read_handle != ACE_INVALID_HANDLE &&
00693       this->ready_set_.rd_mask_.is_set (read_handle))
00694     {
00695       return read_handle;
00696     }
00697 
00698   // None found..
00699   return ACE_INVALID_HANDLE;
00700 }
00701 
00702 ACE_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 12:05:42 2008 for ACE by doxygen 1.3.6