00001 
00002 
00003 #include "ace/TP_Reactor.h"
00004 #include "ace/Thread.h"
00005 #include "ace/Timer_Queue.h"
00006 #include "ace/Signal.h"
00007 #include "ace/Log_Msg.h"
00008 #include "ace/OS_NS_sys_time.h"
00009 
00010 #if !defined (__ACE_INLINE__)
00011 #include "ace/TP_Reactor.inl"
00012 #endif 
00013 
00014 ACE_RCSID (ace,
00015            TP_Reactor,
00016            "TP_Reactor.cpp,v 4.79 2006/03/20 10:10:49 jwillemsen Exp")
00017 
00018 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
00021 
00022 int
00023 ACE_TP_Token_Guard::acquire_read_token (ACE_Time_Value *max_wait_time)
00024 {
00025   ACE_TRACE ("ACE_TP_Token_Guard::acquire_read_token");
00026 
00027   
00028 
00029   
00030   
00031   int result = 0;
00032 
00033   if (max_wait_time)
00034     {
00035       ACE_Time_Value tv = ACE_OS::gettimeofday ();
00036       tv += *max_wait_time;
00037 
00038       ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook,
00039                                                   0,
00040                                                   &tv));
00041     }
00042   else
00043     {
00044       ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
00045     }
00046 
00047   
00048   if (result == -1)
00049     {
00050       if (errno == ETIME)
00051         return 0;
00052       else
00053         return -1;
00054     }
00055 
00056   
00057   this->owner_ = 1;
00058 
00059   return result;
00060 }
00061 
00062 int
00063 ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
00064 {
00065   ACE_TRACE ("ACE_TP_Token_Guard::acquire_token");
00066 
00067   
00068   
00069   int result = 0;
00070 
00071   if (max_wait_time)
00072     {
00073       ACE_Time_Value tv = ACE_OS::gettimeofday ();
00074       tv += *max_wait_time;
00075 
00076       ACE_MT (result = this->token_.acquire (0,
00077                                              0,
00078                                              &tv));
00079     }
00080   else
00081     {
00082       ACE_MT (result = this->token_.acquire ());
00083     }
00084 
00085   
00086   if (result == -1)
00087     {
00088       if (errno == ETIME)
00089         return 0;
00090       else
00091         return -1;
00092     }
00093 
00094   
00095   this->owner_ = 1;
00096 
00097   return result;
00098 }
00099 
00100 
00101 ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
00102                                 ACE_Timer_Queue *tq,
00103                                 int mask_signals,
00104                                 int s_queue)
00105   : ACE_Select_Reactor (sh, tq, 0, 0, mask_signals, s_queue)
00106 {
00107   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108   this->supress_notify_renew (1);
00109 }
00110 
00111 ACE_TP_Reactor::ACE_TP_Reactor (size_t max_number_of_handles,
00112                                 int restart,
00113                                 ACE_Sig_Handler *sh,
00114                                 ACE_Timer_Queue *tq,
00115                                 int mask_signals,
00116                                 int s_queue)
00117   : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, 0, 0, mask_signals, s_queue)
00118 {
00119   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120   this->supress_notify_renew (1);
00121 }
00122 
00123 int
00124 ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
00125 {
00126   ACE_TRACE ("ACE_TP_Reactor::owner");
00127   if (o_id)
00128     *o_id = ACE_Thread::self ();
00129 
00130   return 0;
00131 }
00132 
00133 int
00134 ACE_TP_Reactor::owner (ACE_thread_t *t_id)
00135 {
00136   ACE_TRACE ("ACE_TP_Reactor::owner");
00137   *t_id = ACE_Thread::self ();
00138 
00139   return 0;
00140 }
00141 
00142 int
00143 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
00144 {
00145   ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146 
00147   
00148   
00149   
00150   ACE_Countdown_Time countdown (max_wait_time);
00151 
00152   
00153   
00154   
00155 
00156   
00157   
00158   ACE_TP_Token_Guard guard (this->token_);
00159 
00160   int const result = guard.acquire_read_token (max_wait_time);
00161 
00162   
00163   if (!guard.is_owner ())
00164     return result;
00165 
00166   
00167   if (this->deactivated_)
00168     return -1;
00169 
00170   
00171   countdown.update ();
00172 
00173   return this->dispatch_i (max_wait_time,
00174                            guard);
00175 }
00176 
00177 
00178 
00179 
00180 
00181 int
00182 ACE_TP_Reactor::register_handler (int,
00183                                   ACE_Event_Handler *,
00184                                   ACE_Sig_Action *,
00185                                   ACE_Event_Handler **,
00186                                   ACE_Sig_Action *)
00187 {
00188   ACE_NOTSUP_RETURN (-1);
00189 }
00190 
00191 
00192 int
00193 ACE_TP_Reactor::register_handler (const ACE_Sig_Set &,
00194                                   ACE_Event_Handler *,
00195                                   ACE_Sig_Action *)
00196 {
00197   ACE_NOTSUP_RETURN (-1);
00198 }
00199 
00200 int
00201 ACE_TP_Reactor::register_handler (ACE_Event_Handler *eh,
00202                                   ACE_Reactor_Mask mask)
00203 {
00204   return ACE_Select_Reactor::register_handler (eh,
00205                                                mask);
00206 }
00207 
00208 int
00209 ACE_TP_Reactor::register_handler (ACE_HANDLE handle,
00210                                   ACE_Event_Handler *eh,
00211                                   ACE_Reactor_Mask mask)
00212 {
00213   return ACE_Select_Reactor::register_handler (handle,
00214                                                eh,
00215                                                mask);
00216 }
00217 
00218 #if defined (ACE_WIN32)
00219 
00220 int
00221 ACE_TP_Reactor::register_handler (ACE_Event_Handler *eh,
00222                                   ACE_HANDLE h)
00223 {
00224   return ACE_Select_Reactor::register_handler (eh,
00225                                                h);
00226 }
00227 
00228 #endif 
00229 int
00230 ACE_TP_Reactor::register_handler (ACE_HANDLE event_handle,
00231                                   ACE_HANDLE io_handle,
00232                                   ACE_Event_Handler *event_handler,
00233                                   ACE_Reactor_Mask mask)
00234 {
00235   return ACE_Select_Reactor::register_handler (event_handle,
00236                                                io_handle,
00237                                                event_handler,
00238                                                mask);
00239 }
00240 
00241 int
00242 ACE_TP_Reactor::register_handler (const ACE_Handle_Set &handles,
00243                                   ACE_Event_Handler *eh,
00244                                   ACE_Reactor_Mask mask)
00245 {
00246   return ACE_Select_Reactor::register_handler (handles,
00247                                                eh,
00248                                                mask);
00249 }
00250 
00251 int
00252 ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
00253                             ACE_TP_Token_Guard &guard)
00254 {
00255   int event_count =
00256     this->get_event_for_dispatching (max_wait_time);
00257 
00258   int result = 0;
00259 
00260   
00261   
00262   
00263 
00264   
00265   if (event_count == -1)
00266     {
00267       
00268       
00269       
00270       
00271       
00272       
00273       return this->handle_signals (event_count,
00274                                    guard);
00275     }
00276 
00277   
00278   
00279   
00280   
00281   
00282 
00283   
00284   
00285   
00286   
00287   
00288   result = this->handle_timer_events (event_count,
00289                                       guard);
00290 
00291   if (result > 0)
00292     return result;
00293 
00294   
00295 
00296   if (event_count > 0)
00297     {
00298       
00299       
00300       
00301       result = this->handle_notify_events (event_count,
00302                                            guard);
00303 
00304       if (result > 0)
00305         return result;
00306 
00307       
00308     }
00309 
00310   if (event_count > 0)
00311     {
00312       
00313       return this->handle_socket_events (event_count,
00314                                          guard);
00315     }
00316 
00317   return 0;
00318 }
00319 
00320 int
00321 ACE_TP_Reactor::handle_signals (int & ,
00322                                 ACE_TP_Token_Guard & )
00323 {
00324   ACE_TRACE ("ACE_TP_Reactor::handle_signals");
00325 
00326   
00327 
00328 
00329 
00330 
00331 
00332   
00333   
00334   if (ACE_Sig_Handler::sig_pending () != 0)
00335     {
00336       ACE_Sig_Handler::sig_pending (0);
00337 
00338       
00339       
00340       
00341       
00342 #if 0
00343       
00344       
00345 
00346       
00347       
00348       
00349       active_handle_count = this->any_ready (dispatch_set);
00350 #else
00351       
00352 #endif
00353 
00354       
00355       
00356       
00357       return 1;
00358     }
00359 
00360   return -1;
00361 }
00362 
00363 
00364 int
00365 ACE_TP_Reactor::handle_timer_events (int & ,
00366                                      ACE_TP_Token_Guard &guard)
00367 {
00368   if (this->timer_queue_->is_empty())
00369     { 
00370       return 0;
00371     }
00372 
00373   
00374   ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00375                            this->timer_queue_->timer_skew ());
00376 
00377   
00378   
00379   ACE_Timer_Node_Dispatch_Info info;
00380 
00381   if (this->timer_queue_->dispatch_info (cur_time,
00382                                          info))
00383     {
00384       const void *upcall_act = 0;
00385 
00386       
00387       this->timer_queue_->preinvoke (info,
00388                                      cur_time,
00389                                      upcall_act);
00390 
00391       
00392       guard.release_token ();
00393 
00394       
00395       this->timer_queue_->upcall (info,
00396                                   cur_time);
00397 
00398       
00399       this->timer_queue_->postinvoke (info,
00400                                       cur_time,
00401                                       upcall_act);
00402 
00403       
00404       return 1;
00405     }
00406 
00407   return 0;
00408 }
00409 
00410 int
00411 ACE_TP_Reactor::handle_notify_events (int & ,
00412                                       ACE_TP_Token_Guard &guard)
00413 {
00414   
00415   ACE_HANDLE notify_handle =
00416     this->get_notify_handle ();
00417 
00418   int result = 0;
00419 
00420   
00421   
00422   if (notify_handle == ACE_INVALID_HANDLE)
00423     return result;
00424 
00425   
00426   ACE_Notification_Buffer buffer;
00427 
00428   
00429   this->ready_set_.rd_mask_.clr_bit (notify_handle);
00430 
00431   
00432   
00433   while (this->notify_handler_->read_notify_pipe (notify_handle,
00434                                                   buffer) > 0)
00435     {
00436       
00437       
00438       
00439       
00440       
00441       if (this->notify_handler_->is_dispatchable (buffer) > 0)
00442         {
00443           
00444           guard.release_token ();
00445 
00446           
00447           this->notify_handler_->dispatch_notify (buffer);
00448 
00449           
00450           result = 1;
00451 
00452           
00453           break;
00454         }
00455     }
00456 
00457   
00458   
00459   
00460   return result;
00461 }
00462 
00463 int
00464 ACE_TP_Reactor::handle_socket_events (int &event_count,
00465                                       ACE_TP_Token_Guard &guard)
00466 {
00467 
00468   
00469   ACE_EH_Dispatch_Info dispatch_info;
00470 
00471   this->get_socket_event_info (dispatch_info);
00472 
00473   
00474   
00475   if (!dispatch_info.dispatch ())
00476     {
00477       return 0;
00478     }
00479 
00480   
00481   
00482   
00483   
00484   
00485   if (dispatch_info.event_handler_ != this->notify_handler_)
00486     if (this->suspend_i (dispatch_info.handle_) == -1)
00487       return 0;
00488 
00489   
00490   if (dispatch_info.reference_counting_required_)
00491     dispatch_info.event_handler_->add_reference ();
00492 
00493   
00494   guard.release_token ();
00495 
00496   int result = 0;
00497 
00498   
00499   
00500   --event_count;
00501 
00502   
00503   if (this->dispatch_socket_event (dispatch_info) == 0)
00504     ++result;
00505 
00506   return result;
00507 }
00508 
00509 int
00510 ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
00511 {
00512   
00513   
00514   if (this->state_changed_)
00515     {
00516       this->ready_set_.rd_mask_.reset ();
00517       this->ready_set_.wr_mask_.reset ();
00518       this->ready_set_.ex_mask_.reset ();
00519 
00520       this->state_changed_ = false;
00521     }
00522   else
00523     {
00524       
00525       
00526       
00527       
00528 
00529       this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00530       this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00531       this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00532     }
00533 
00534   return this->wait_for_multiple_events (this->ready_set_,
00535                                          max_wait_time);
00536 }
00537 
00538 int
00539 ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
00540 {
00541   
00542   
00543   
00544   
00545   
00546   int found_io = 0;
00547   ACE_HANDLE handle;
00548 
00549   
00550   
00551   {
00552     ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00553 
00554     while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00555       {
00556         if (this->is_suspended_i (handle))
00557           continue;
00558 
00559         
00560         event.set (handle,
00561                    this->handler_rep_.find (handle),
00562                    ACE_Event_Handler::WRITE_MASK,
00563                    &ACE_Event_Handler::handle_output);
00564 
00565         this->clear_handle_read_set (handle);
00566         found_io = 1;
00567       }
00568   }
00569 
00570   if (!found_io)
00571     {
00572       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00573 
00574       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00575         {
00576           if (this->is_suspended_i (handle))
00577             continue;
00578 
00579           
00580           event.set (handle,
00581                      this->handler_rep_.find (handle),
00582                      ACE_Event_Handler::EXCEPT_MASK,
00583                      &ACE_Event_Handler::handle_exception);
00584 
00585           this->clear_handle_read_set (handle);
00586 
00587           found_io = 1;
00588         }
00589     }
00590 
00591   if (!found_io)
00592     {
00593       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00594 
00595       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00596         {
00597           if (this->is_suspended_i (handle))
00598             continue;
00599 
00600           
00601           event.set (handle,
00602                      this->handler_rep_.find (handle),
00603                      ACE_Event_Handler::READ_MASK,
00604                      &ACE_Event_Handler::handle_input);
00605 
00606           this->clear_handle_read_set (handle);
00607           found_io = 1;
00608         }
00609     }
00610 
00611   return found_io;
00612 }
00613 
00614 
00615 int
00616 ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
00617 {
00618   ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00619 
00620   ACE_Event_Handler * const event_handler = dispatch_info.event_handler_;
00621   ACE_EH_PTMF const callback = dispatch_info.callback_;
00622 
00623   
00624   if (event_handler == 0)
00625     return -1;
00626 
00627   
00628   
00629   
00630   
00631   
00632   int status = 1;
00633   while (status > 0)
00634     status = (event_handler->*callback) (dispatch_info.handle_);
00635 
00636   
00637   return this->post_process_socket_event (dispatch_info, status);
00638 }
00639 
00640 int
00641 ACE_TP_Reactor::post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,
00642                                            int status)
00643 {
00644   int result = 0;
00645 
00646   
00647   
00648   if (status < 0 ||
00649      (dispatch_info.event_handler_ != this->notify_handler_ &&
00650       dispatch_info.resume_flag_ ==
00651         ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER))
00652     {
00653       
00654       
00655       
00656       
00657       ACE_TP_Token_Guard guard (this->token_);
00658 
00659       result = guard.acquire_token ();
00660 
00661       
00662       if (!guard.is_owner ())
00663         return result;
00664 
00665       if (status < 0)
00666         {
00667           result =
00668             this->remove_handler_i (dispatch_info.handle_, dispatch_info.mask_);
00669         }
00670 
00671       
00672       if (dispatch_info.event_handler_ != this->notify_handler_ &&
00673           dispatch_info.resume_flag_ ==
00674             ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00675         this->resume_i (dispatch_info.handle_);
00676     }
00677 
00678   
00679   if (dispatch_info.reference_counting_required_)
00680     dispatch_info.event_handler_->remove_reference ();
00681 
00682   return result;
00683 }
00684 
00685 int
00686 ACE_TP_Reactor::resumable_handler (void)
00687 {
00688   return 1;
00689 }
00690 
00691 int
00692 ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
00693 {
00694   return this->handle_events (&max_wait_time);
00695 }
00696 
00697 void
00698 ACE_TP_Reactor::notify_handle (ACE_HANDLE,
00699                                ACE_Reactor_Mask,
00700                                ACE_Handle_Set &,
00701                                ACE_Event_Handler *eh,
00702                                ACE_EH_PTMF)
00703 {
00704   ACE_ERROR ((LM_ERROR,
00705               ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: ")
00706               ACE_LIB_TEXT ("Wrong version of notify_handle() got called \n")));
00707 
00708   ACE_ASSERT (eh == 0);
00709   ACE_UNUSED_ARG (eh);
00710 }
00711 
00712 ACE_HANDLE
00713 ACE_TP_Reactor::get_notify_handle (void)
00714 {
00715   
00716   
00717   ACE_HANDLE const read_handle =
00718     this->notify_handler_->notify_handle ();
00719 
00720   
00721   
00722   if (read_handle != ACE_INVALID_HANDLE &&
00723       this->ready_set_.rd_mask_.is_set (read_handle))
00724     {
00725       return read_handle;
00726     }
00727 
00728   
00729   return ACE_INVALID_HANDLE;
00730 }
00731 
00732 ACE_END_VERSIONED_NAMESPACE_DECL