00001 
00002 
00003 #include "ace/POSIX_Proactor.h"
00004 
00005 #if defined (ACE_HAS_AIO_CALLS)
00006 
00007 #if !defined (__ACE_INLINE__)
00008 #include "ace/POSIX_Proactor.inl"
00009 #endif 
00010 
00011 # if defined (ACE_HAS_SYSINFO)
00012 #   include  <sys/systeminfo.h>
00013 # endif 
00014 
00015 #include "ace/ACE.h"
00016 #include "ace/Flag_Manip.h"
00017 #include "ace/Task_T.h"
00018 #include "ace/Log_Msg.h"
00019 #include "ace/Object_Manager.h"
00020 #include "ace/OS_NS_sys_socket.h"
00021 #include "ace/OS_NS_signal.h"
00022 #include "ace/OS_NS_unistd.h"
00023 
00024 #if defined (sun)
00025 #  include "ace/OS_NS_strings.h"
00026 #endif 
00027 
00028 
00029 
00030 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 class ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result
00040 {
00041 public:
00042 
00043   ACE_POSIX_Wakeup_Completion (const ACE_Handler::Proxy_Ptr &handler_proxy,
00044                                const void *act = 0,
00045                                ACE_HANDLE event = ACE_INVALID_HANDLE,
00046                                int priority = 0,
00047                                int signal_number = ACE_SIGRTMIN);
00048 
00049 
00050   virtual ~ACE_POSIX_Wakeup_Completion (void);
00051 
00052 
00053 
00054   virtual void complete (size_t bytes_transferred = 0,
00055                          int success = 1,
00056                          const void *completion_key = 0,
00057                          u_long error = 0);
00058 };
00059 
00060 
00061 ACE_POSIX_Proactor::ACE_POSIX_Proactor (void)
00062   :  os_id_ (ACE_OS_UNDEFINED)
00063 {
00064 #if defined(sun)
00065 
00066   os_id_ = ACE_OS_SUN; 
00067 
00068   char Buf [32];
00069 
00070   ::memset(Buf,0,sizeof(Buf));
00071 
00072   ACE_OS::sysinfo (SI_RELEASE , Buf, sizeof(Buf)-1);
00073 
00074   if (ACE_OS::strcasecmp (Buf , "5.6") == 0)
00075     os_id_ = ACE_OS_SUN_56;
00076   else if (ACE_OS::strcasecmp (Buf , "5.7") == 0)
00077     os_id_ = ACE_OS_SUN_57;
00078   else if (ACE_OS::strcasecmp (Buf , "5.8") == 0)
00079     os_id_ = ACE_OS_SUN_58;
00080 
00081 #elif defined(HPUX)
00082 
00083   os_id_ = ACE_OS_HPUX;   
00084 
00085 #elif defined(__sgi)
00086 
00087   os_id_ = ACE_OS_IRIX;   
00088 
00089 #elif defined(__OpenBSD)
00090 
00091   os_id_ = ACE_OS_OPENBSD; 
00092 
00093   
00094 
00095 
00096 
00097 #endif
00098 }
00099 
00100 ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void)
00101 {
00102   this->close ();
00103 }
00104 
00105 int
00106 ACE_POSIX_Proactor::close (void)
00107 {
00108   return 0;
00109 }
00110 
00111 int
00112 ACE_POSIX_Proactor::register_handle (ACE_HANDLE handle,
00113                                      const void *completion_key)
00114 {
00115   ACE_UNUSED_ARG (handle);
00116   ACE_UNUSED_ARG (completion_key);
00117   return 0;
00118 }
00119 
00120 int
00121 ACE_POSIX_Proactor::wake_up_dispatch_threads (void)
00122 {
00123   return 0;
00124 }
00125 
00126 int
00127 ACE_POSIX_Proactor::close_dispatch_threads (int)
00128 {
00129   return 0;
00130 }
00131 
00132 size_t
00133 ACE_POSIX_Proactor::number_of_threads (void) const
00134 {
00135   
00136   ACE_NOTSUP_RETURN (0);
00137 }
00138 
00139 void
00140 ACE_POSIX_Proactor::number_of_threads (size_t threads)
00141 {
00142   
00143   ACE_UNUSED_ARG (threads);
00144 }
00145 
00146 ACE_HANDLE
00147 ACE_POSIX_Proactor::get_handle (void) const
00148 {
00149   return ACE_INVALID_HANDLE;
00150 }
00151 
00152 ACE_Asynch_Read_Stream_Impl *
00153 ACE_POSIX_Proactor::create_asynch_read_stream (void)
00154 {
00155   ACE_Asynch_Read_Stream_Impl *implementation = 0;
00156   ACE_NEW_RETURN (implementation,
00157                   ACE_POSIX_Asynch_Read_Stream (this),
00158                   0);
00159   return implementation;
00160 }
00161 
00162 ACE_Asynch_Read_Stream_Result_Impl *
00163 ACE_POSIX_Proactor::create_asynch_read_stream_result
00164   (const ACE_Handler::Proxy_Ptr &handler_proxy,
00165    ACE_HANDLE handle,
00166    ACE_Message_Block &message_block,
00167    size_t bytes_to_read,
00168    const void* act,
00169    ACE_HANDLE event,
00170    int priority,
00171    int signal_number)
00172 {
00173   ACE_Asynch_Read_Stream_Result_Impl *implementation;
00174   ACE_NEW_RETURN (implementation,
00175                   ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy,
00176                                                        handle,
00177                                                        message_block,
00178                                                        bytes_to_read,
00179                                                        act,
00180                                                        event,
00181                                                        priority,
00182                                                        signal_number),
00183                   0);
00184   return implementation;
00185 }
00186 
00187 
00188 ACE_Asynch_Write_Stream_Impl *
00189 ACE_POSIX_Proactor::create_asynch_write_stream (void)
00190 {
00191   ACE_Asynch_Write_Stream_Impl *implementation = 0;
00192   ACE_NEW_RETURN (implementation,
00193                   ACE_POSIX_Asynch_Write_Stream (this),
00194                   0);
00195   return implementation;
00196 }
00197 
00198 ACE_Asynch_Write_Stream_Result_Impl *
00199 ACE_POSIX_Proactor::create_asynch_write_stream_result
00200   (const ACE_Handler::Proxy_Ptr &handler_proxy,
00201    ACE_HANDLE handle,
00202    ACE_Message_Block &message_block,
00203    size_t bytes_to_write,
00204    const void* act,
00205    ACE_HANDLE event,
00206    int priority,
00207    int signal_number)
00208 {
00209   ACE_Asynch_Write_Stream_Result_Impl *implementation;
00210   ACE_NEW_RETURN (implementation,
00211                   ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy,
00212                                                         handle,
00213                                                         message_block,
00214                                                         bytes_to_write,
00215                                                         act,
00216                                                         event,
00217                                                         priority,
00218                                                         signal_number),
00219                   0);
00220   return implementation;
00221 }
00222 
00223 
00224 ACE_Asynch_Read_File_Impl *
00225 ACE_POSIX_Proactor::create_asynch_read_file (void)
00226 {
00227   ACE_Asynch_Read_File_Impl *implementation = 0;
00228   ACE_NEW_RETURN (implementation,
00229                   ACE_POSIX_Asynch_Read_File (this),
00230                   0);
00231   return  implementation;
00232 }
00233 
00234 ACE_Asynch_Read_File_Result_Impl *
00235 ACE_POSIX_Proactor::create_asynch_read_file_result
00236   (const ACE_Handler::Proxy_Ptr &handler_proxy,
00237    ACE_HANDLE handle,
00238    ACE_Message_Block &message_block,
00239    size_t bytes_to_read,
00240    const void* act,
00241    u_long offset,
00242    u_long offset_high,
00243    ACE_HANDLE event,
00244    int priority,
00245    int signal_number)
00246 {
00247   ACE_Asynch_Read_File_Result_Impl *implementation;
00248   ACE_NEW_RETURN (implementation,
00249                   ACE_POSIX_Asynch_Read_File_Result (handler_proxy,
00250                                                      handle,
00251                                                      message_block,
00252                                                      bytes_to_read,
00253                                                      act,
00254                                                      offset,
00255                                                      offset_high,
00256                                                      event,
00257                                                      priority,
00258                                                      signal_number),
00259                   0);
00260   return implementation;
00261 }
00262 
00263 
00264 ACE_Asynch_Write_File_Impl *
00265 ACE_POSIX_Proactor::create_asynch_write_file (void)
00266 {
00267   ACE_Asynch_Write_File_Impl *implementation = 0;
00268   ACE_NEW_RETURN (implementation,
00269                   ACE_POSIX_Asynch_Write_File (this),
00270                   0);
00271   return  implementation;
00272 }
00273 
00274 ACE_Asynch_Write_File_Result_Impl *
00275 ACE_POSIX_Proactor::create_asynch_write_file_result
00276   (const ACE_Handler::Proxy_Ptr &handler_proxy,
00277    ACE_HANDLE handle,
00278    ACE_Message_Block &message_block,
00279    size_t bytes_to_write,
00280    const void* act,
00281    u_long offset,
00282    u_long offset_high,
00283    ACE_HANDLE event,
00284    int priority,
00285    int signal_number)
00286 {
00287   ACE_Asynch_Write_File_Result_Impl *implementation;
00288   ACE_NEW_RETURN (implementation,
00289                   ACE_POSIX_Asynch_Write_File_Result (handler_proxy,
00290                                                       handle,
00291                                                       message_block,
00292                                                       bytes_to_write,
00293                                                       act,
00294                                                       offset,
00295                                                       offset_high,
00296                                                       event,
00297                                                       priority,
00298                                                       signal_number),
00299                   0);
00300   return implementation;
00301 }
00302 
00303 
00304 ACE_Asynch_Read_Dgram_Impl *
00305 ACE_POSIX_Proactor::create_asynch_read_dgram (void)
00306 {
00307     ACE_Asynch_Read_Dgram_Impl *implementation = 0;
00308     ACE_NEW_RETURN (implementation,
00309                     ACE_POSIX_Asynch_Read_Dgram (this),
00310                     0);
00311     return implementation;
00312 }
00313 
00314 ACE_Asynch_Read_Dgram_Result_Impl *
00315 ACE_POSIX_Proactor::create_asynch_read_dgram_result
00316   (const ACE_Handler::Proxy_Ptr &handler_proxy,
00317    ACE_HANDLE handle,
00318    ACE_Message_Block *message_block,
00319    size_t bytes_to_read,
00320    int flags,
00321    int protocol_family,
00322    const void* act,
00323    ACE_HANDLE event ,
00324    int priority ,
00325    int signal_number)
00326 {
00327   ACE_Asynch_Read_Dgram_Result_Impl *implementation=0;
00328   ACE_NEW_RETURN (implementation,
00329                   ACE_POSIX_Asynch_Read_Dgram_Result(handler_proxy,
00330                                                      handle,
00331                                                      message_block,
00332                                                      bytes_to_read,
00333                                                      flags,
00334                                                      protocol_family,
00335                                                      act,
00336                                                      event,
00337                                                      priority,
00338                                                      signal_number),
00339                   0);
00340 
00341   return implementation;
00342 }
00343 
00344 
00345 ACE_Asynch_Write_Dgram_Impl *
00346 ACE_POSIX_Proactor::create_asynch_write_dgram (void)
00347 {
00348         ACE_Asynch_Write_Dgram_Impl *implementation = 0;
00349         ACE_NEW_RETURN (implementation,
00350                         ACE_POSIX_Asynch_Write_Dgram (this),
00351                         0);
00352 
00353     return implementation;
00354 }
00355 
00356 ACE_Asynch_Write_Dgram_Result_Impl *
00357 ACE_POSIX_Proactor::create_asynch_write_dgram_result
00358   (const ACE_Handler::Proxy_Ptr &handler_proxy,
00359    ACE_HANDLE handle,
00360    ACE_Message_Block *message_block,
00361    size_t bytes_to_write,
00362    int flags,
00363    const void* act,
00364    ACE_HANDLE event,
00365    int priority ,
00366    int signal_number)
00367 {
00368   ACE_Asynch_Write_Dgram_Result_Impl *implementation=0;
00369   ACE_NEW_RETURN (implementation,
00370                   ACE_POSIX_Asynch_Write_Dgram_Result(handler_proxy,
00371                                                       handle,
00372                                                       message_block,
00373                                                       bytes_to_write,
00374                                                       flags,
00375                                                       act,
00376                                                       event,
00377                                                       priority,
00378                                                       signal_number),
00379                   0);
00380 
00381   return implementation;
00382 }
00383 
00384 
00385 ACE_Asynch_Accept_Impl *
00386 ACE_POSIX_Proactor::create_asynch_accept (void)
00387 {
00388   ACE_Asynch_Accept_Impl *implementation = 0;
00389   ACE_NEW_RETURN (implementation,
00390                   ACE_POSIX_Asynch_Accept (this),
00391                   0);
00392 
00393   return implementation;
00394 }
00395 
00396 ACE_Asynch_Accept_Result_Impl *
00397 ACE_POSIX_Proactor::create_asynch_accept_result
00398   (const ACE_Handler::Proxy_Ptr &handler_proxy,
00399    ACE_HANDLE listen_handle,
00400    ACE_HANDLE accept_handle,
00401    ACE_Message_Block &message_block,
00402    size_t bytes_to_read,
00403    const void* act,
00404    ACE_HANDLE event,
00405    int priority,
00406    int signal_number)
00407 {
00408   ACE_Asynch_Accept_Result_Impl *implementation;
00409   ACE_NEW_RETURN (implementation,
00410                   ACE_POSIX_Asynch_Accept_Result (handler_proxy,
00411                                                   listen_handle,
00412                                                   accept_handle,
00413                                                   message_block,
00414                                                   bytes_to_read,
00415                                                   act,
00416                                                   event,
00417                                                   priority,
00418                                                   signal_number),
00419                   0);
00420   return implementation;
00421 }
00422 
00423 
00424 ACE_Asynch_Connect_Impl *
00425 ACE_POSIX_Proactor::create_asynch_connect (void)
00426 {
00427   ACE_Asynch_Connect_Impl *implementation = 0;
00428   ACE_NEW_RETURN (implementation,
00429                   ACE_POSIX_Asynch_Connect (this),
00430                   0);
00431 
00432   return implementation;
00433 }
00434 
00435 ACE_Asynch_Connect_Result_Impl *
00436 ACE_POSIX_Proactor::create_asynch_connect_result
00437   (const ACE_Handler::Proxy_Ptr &handler_proxy,
00438    ACE_HANDLE connect_handle,
00439    const void* act,
00440    ACE_HANDLE event,
00441    int priority,
00442    int signal_number)
00443 {
00444   ACE_Asynch_Connect_Result_Impl *implementation;
00445   ACE_NEW_RETURN (implementation,
00446                   ACE_POSIX_Asynch_Connect_Result (handler_proxy,
00447                                                    connect_handle,
00448                                                    act,
00449                                                    event,
00450                                                    priority,
00451                                                    signal_number),
00452                   0);
00453   return implementation;
00454 }
00455 
00456 
00457 ACE_Asynch_Transmit_File_Impl *
00458 ACE_POSIX_Proactor::create_asynch_transmit_file (void)
00459 {
00460   ACE_Asynch_Transmit_File_Impl *implementation = 0;
00461   ACE_NEW_RETURN (implementation,
00462                   ACE_POSIX_Asynch_Transmit_File (this),
00463                   0);
00464   return implementation;
00465 }
00466 
00467 ACE_Asynch_Transmit_File_Result_Impl *
00468 ACE_POSIX_Proactor::create_asynch_transmit_file_result
00469   (const ACE_Handler::Proxy_Ptr &handler_proxy,
00470    ACE_HANDLE socket,
00471    ACE_HANDLE file,
00472    ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
00473    size_t bytes_to_write,
00474    u_long offset,
00475    u_long offset_high,
00476    size_t bytes_per_send,
00477    u_long flags,
00478    const void *act,
00479    ACE_HANDLE event,
00480    int priority,
00481    int signal_number)
00482 {
00483   ACE_Asynch_Transmit_File_Result_Impl *implementation;
00484   ACE_NEW_RETURN (implementation,
00485                   ACE_POSIX_Asynch_Transmit_File_Result (handler_proxy,
00486                                                          socket,
00487                                                          file,
00488                                                          header_and_trailer,
00489                                                          bytes_to_write,
00490                                                          offset,
00491                                                          offset_high,
00492                                                          bytes_per_send,
00493                                                          flags,
00494                                                          act,
00495                                                          event,
00496                                                          priority,
00497                                                          signal_number),
00498                   0);
00499   return implementation;
00500 }
00501 
00502 ACE_Asynch_Result_Impl *
00503 ACE_POSIX_Proactor::create_asynch_timer
00504   (const ACE_Handler::Proxy_Ptr &handler_proxy,
00505    const void *act,
00506    const ACE_Time_Value &tv,
00507    ACE_HANDLE event,
00508    int priority,
00509    int signal_number)
00510 {
00511   ACE_POSIX_Asynch_Timer *implementation;
00512   ACE_NEW_RETURN (implementation,
00513                   ACE_POSIX_Asynch_Timer (handler_proxy,
00514                                           act,
00515                                           tv,
00516                                           event,
00517                                           priority,
00518                                           signal_number),
00519                   0);
00520   return implementation;
00521 }
00522 
00523 #if 0
00524 int
00525 ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
00526 {
00527   
00528   
00529 
00530   ACE_Time_Value timeout (0, 0);
00531   int result = 0;
00532 
00533   for (;;)
00534     {
00535       result = this->handle_events (timeout);
00536       if (result != 0 || errno == ETIME)
00537         break;
00538     }
00539 
00540   
00541   
00542   return result == -1 ? -1 : 0;
00543 }
00544 
00545 int
00546 ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle,
00547                                   ACE_Reactor_Mask close_mask)
00548 {
00549   ACE_UNUSED_ARG (close_mask);
00550   ACE_UNUSED_ARG (handle);
00551 
00552   return this->close ();
00553 }
00554 #endif 
00555 
00556 void
00557 ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
00558                                                size_t bytes_transferred,
00559                                                const void *,
00560                                                u_long error)
00561 {
00562   ACE_SEH_TRY
00563     {
00564       
00565       asynch_result->complete (bytes_transferred,
00566                                error ? 0 : 1,
00567                                0, 
00568                                error);
00569     }
00570   ACE_SEH_FINALLY
00571     {
00572       
00573       delete asynch_result;
00574     }
00575 }
00576 
00577 int
00578 ACE_POSIX_Proactor::post_wakeup_completions (int how_many)
00579 {
00580   ACE_POSIX_Wakeup_Completion *wakeup_completion = 0;
00581 
00582   for (int ci = 0; ci < how_many; ci++)
00583     {
00584       ACE_NEW_RETURN
00585         (wakeup_completion,
00586          ACE_POSIX_Wakeup_Completion (this->wakeup_handler_.proxy ()),
00587          -1);
00588       if (this->post_completion (wakeup_completion) == -1)
00589         return -1;
00590     }
00591 
00592   return 0;
00593 }
00594 
00595 ACE_POSIX_Proactor::Proactor_Type
00596 ACE_POSIX_Proactor::get_impl_type (void)
00597 {
00598   return PROACTOR_POSIX;
00599 }
00600 
00601 
00602 
00603 
00604 
00605 
00606 
00607 
00608 
00609 
00610 
00611 
00612 
00613 
00614 
00615 
00616 
00617 
00618 
00619 
00620 
00621 
00622 
00623 
00624 
00625 
00626 
00627 
00628 
00629 class ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
00630 {
00631 public:
00632 
00633 
00634   ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
00635 
00636 
00637   virtual ~ACE_AIOCB_Notify_Pipe_Manager (void);
00638 
00639 
00640   int notify ();
00641 
00642 
00643 
00644   virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
00645 
00646 private:
00647 
00648   ACE_POSIX_AIOCB_Proactor  *posix_aiocb_proactor_;
00649 
00650 
00651   ACE_Message_Block message_block_;
00652 
00653 
00654 
00655   ACE_Pipe pipe_;
00656 
00657 
00658   ACE_POSIX_Asynch_Read_Stream read_stream_;
00659 
00660 
00661   ACE_AIOCB_Notify_Pipe_Manager (void);
00662 };
00663 
00664 ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
00665   : posix_aiocb_proactor_ (posix_aiocb_proactor),
00666     message_block_ (sizeof (2)),
00667     read_stream_ (posix_aiocb_proactor)
00668 {
00669   
00670   this->pipe_.open ();
00671 
00672   
00673   ACE::set_flags (this->pipe_.write_handle (), ACE_NONBLOCK);
00674 
00675   
00676   ACE::clr_flags (this->pipe_.read_handle (), ACE_NONBLOCK);
00677 
00678   
00679   posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ());
00680 
00681   
00682   if (this->read_stream_.open (this->proxy (),
00683                                this->pipe_.read_handle (),
00684                                0, 
00685                                0) 
00686       == -1)
00687     ACE_ERROR ((LM_ERROR,
00688                 "%N:%l:%p\n",
00689                 "ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:"
00690                 "Open on Read Stream failed"));
00691 
00692   
00693   if (this->read_stream_.read (this->message_block_,
00694                                1, 
00695                                0, 
00696                                0) 
00697       == -1)
00698     ACE_ERROR ((LM_ERROR,
00699                 "%N:%l:%p\n",
00700                 "ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:"
00701                 "Read from pipe failed"));
00702 }
00703 
00704 ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void)
00705 {
00706   
00707   this->read_stream_.cancel ();
00708 
00709   
00710   
00711   
00712   
00713   
00714   
00715   
00716   
00717   
00718 
00719   ACE_HANDLE h = this->pipe_.write_handle ();
00720   if (h != ACE_INVALID_HANDLE)
00721      ACE_OS::closesocket (h);
00722 
00723   h = this->pipe_.read_handle ();
00724   if ( h != ACE_INVALID_HANDLE)
00725      ACE_OS::closesocket (h);
00726 
00727 }
00728 
00729 
00730 int
00731 ACE_AIOCB_Notify_Pipe_Manager::notify ()
00732 {
00733   
00734   char char_send = 0;
00735   ssize_t ret_val = ACE::send (this->pipe_.write_handle (),
00736                                &char_send,
00737                                sizeof (char_send));
00738 
00739   if (ret_val < 0)
00740     {
00741       if (errno != EWOULDBLOCK)
00742 #if 0
00743         ACE_ERROR ((LM_ERROR,
00744                     ACE_TEXT ("(%P %t):%p\n"),
00745                     ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify")
00746                     ACE_TEXT ("Error:Writing on to notify pipe failed")));
00747 #endif 
00748       return -1;
00749     }
00750 
00751   return 0;
00752 }
00753 
00754 void
00755 ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream
00756   (const ACE_Asynch_Read_Stream::Result & )
00757 {
00758   
00759 
00760   
00761   
00762   if (this->message_block_.length () > 0)
00763       this->message_block_.wr_ptr (this->message_block_.rd_ptr ());
00764 
00765   
00766   
00767   if (-1 == this->read_stream_.read (this->message_block_,
00768                                      1,   
00769                                      0,   
00770                                      0))  
00771     ACE_ERROR ((LM_ERROR,
00772                 ACE_TEXT ("%N:%l:(%P | %t):%p\n"),
00773                 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:")
00774                 ACE_TEXT ("Read from pipe failed")));
00775 
00776 
00777   
00778   
00779 }
00780 
00781 
00782 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
00783   : aiocb_notify_pipe_manager_ (0),
00784     aiocb_list_ (0),
00785     result_list_ (0),
00786     aiocb_list_max_size_ (max_aio_operations),
00787     aiocb_list_cur_size_ (0),
00788     notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00789     num_deferred_aiocb_ (0),
00790     num_started_aio_ (0)
00791 {
00792   
00793   check_max_aio_num ();
00794 
00795   this->create_result_aiocb_list ();
00796 
00797   this->create_notify_manager ();
00798 
00799   
00800   
00801   this->get_asynch_pseudo_task().start ();
00802 
00803 }
00804 
00805 
00806 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
00807                                                     ACE_POSIX_Proactor::Proactor_Type)
00808   : aiocb_notify_pipe_manager_ (0),
00809     aiocb_list_ (0),
00810     result_list_ (0),
00811     aiocb_list_max_size_ (max_aio_operations),
00812     aiocb_list_cur_size_ (0),
00813     notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00814     num_deferred_aiocb_ (0),
00815     num_started_aio_ (0)
00816 {
00817   
00818   this->check_max_aio_num ();
00819 
00820   this->create_result_aiocb_list ();
00821 
00822   
00823   
00824 }
00825 
00826 
00827 ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
00828 {
00829   this->close();
00830 }
00831 
00832 ACE_POSIX_Proactor::Proactor_Type
00833 ACE_POSIX_AIOCB_Proactor::get_impl_type (void)
00834 {
00835   return PROACTOR_AIOCB;
00836 }
00837 
00838 
00839 int
00840 ACE_POSIX_AIOCB_Proactor::close (void)
00841 {
00842   
00843   this->get_asynch_pseudo_task().stop ();
00844 
00845   this->delete_notify_manager ();
00846 
00847   this->clear_result_queue ();
00848 
00849   return this->delete_result_aiocb_list ();
00850 }
00851 
00852 void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
00853 {
00854   notify_pipe_read_handle_ = h;
00855 }
00856 
00857 int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void)
00858 {
00859   if (aiocb_list_ != 0)
00860     return 0;
00861 
00862   ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
00863 
00864   ACE_NEW_RETURN (result_list_,
00865                   ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
00866                   -1);
00867 
00868   
00869   for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
00870     {
00871       aiocb_list_[ai] = 0;
00872       result_list_[ai] = 0;
00873     }
00874 
00875   return 0;
00876 }
00877 
00878 int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
00879 {
00880   if (aiocb_list_ == 0)  
00881     return 0;
00882 
00883   size_t ai;
00884 
00885   
00886   
00887   for (ai = 0; ai < aiocb_list_max_size_; ai++)
00888     if (this->aiocb_list_[ai] != 0)  
00889       this->cancel_aiocb (result_list_[ai]);
00890 
00891   int num_pending = 0;
00892 
00893   for (ai = 0; ai < aiocb_list_max_size_; ai++)
00894     {
00895       if (this->aiocb_list_[ai] == 0 ) 
00896         continue;
00897 
00898       
00899       int error_status  = 0;
00900       size_t transfer_count = 0;
00901       int flg_completed = this->get_result_status (result_list_[ai],
00902                                                    error_status,
00903                                                    transfer_count);
00904 
00905       
00906       if (flg_completed == 0)  
00907         {
00908           num_pending++;
00909 #if 0
00910           char * errtxt = ACE_OS::strerror (error_status);
00911           if (errtxt == 0)
00912               errtxt ="?????????";
00913 
00914           char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
00915             "WRITE":"READ" ;
00916 
00917 
00918           ACE_ERROR ((LM_ERROR,
00919                       ACE_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
00920                       ai,
00921                       op,
00922                       error_status,
00923                       transfer_count,
00924                       errtxt));
00925 #endif 
00926         }
00927       else 
00928         {
00929           delete this->result_list_[ai];
00930           this->result_list_[ai] = 0;
00931           this->aiocb_list_[ai] = 0;
00932         }
00933     }
00934 
00935   
00936   
00937   
00938   
00939   
00940   ACE_DEBUG
00941     ((LM_DEBUG,
00942       ACE_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
00943       ACE_TEXT(" number pending AIO=%d\n"),
00944       num_pending));
00945 
00946   delete [] this->aiocb_list_;
00947   this->aiocb_list_ = 0;
00948 
00949   delete [] this->result_list_;
00950   this->result_list_ = 0;
00951 
00952   return (num_pending == 0 ? 0 : -1);
00953   
00954 }
00955 
00956 void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
00957 {
00958   long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
00959 
00960   
00961   
00962   
00963 
00964   if (max_os_aio_num > 0 &&
00965       aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
00966      aiocb_list_max_size_ = max_os_aio_num;
00967 
00968 #if defined (HPUX) || defined (__FreeBSD__)
00969   
00970   
00971   
00972 
00973   long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
00974   if (max_os_listio_num > 0
00975       && aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
00976     aiocb_list_max_size_ = max_os_listio_num;
00977 #endif 
00978 
00979   
00980   
00981 
00982   if (aiocb_list_max_size_ <= 0
00983      || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE)
00984      aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
00985 
00986   
00987 
00988   int max_num_files = ACE::max_handles ();
00989 
00990   if (max_num_files > 0
00991       && aiocb_list_max_size_ > (unsigned long) max_num_files)
00992     {
00993       ACE::set_handle_limit (aiocb_list_max_size_);
00994 
00995       max_num_files = ACE::max_handles ();
00996     }
00997 
00998   if (max_num_files > 0
00999       && aiocb_list_max_size_ > (unsigned long) max_num_files)
01000     aiocb_list_max_size_ = (unsigned long) max_num_files;
01001 
01002   ACE_DEBUG ((LM_DEBUG,
01003              "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
01004               aiocb_list_max_size_));
01005 
01006 #if defined(__sgi)
01007 
01008    ACE_DEBUG((LM_DEBUG,
01009               ACE_TEXT( "SGI IRIX specific: aio_init!\n")));
01010 
01011 
01012 
01013 
01014 
01015 
01016 
01017 
01018 
01019 
01020 
01021     aioinit_t  aioinit;
01022 
01023     aioinit.aio_threads = 10; 
01024     aioinit.aio_locks = 20;   
01025                        
01026     aioinit.aio_num = aiocb_list_max_size_;
01027     aioinit.aio_usedba = 0;   
01028     aioinit.aio_debug = 0;    
01029     aioinit.aio_numusers = 100; 
01030     aioinit.aio_reserved[0] = 0;
01031     aioinit.aio_reserved[1] = 0;
01032     aioinit.aio_reserved[2] = 0;
01033 
01034     aio_sgi_init (&aioinit);
01035 
01036 #endif
01037 
01038     return;
01039 }
01040 
01041 void
01042 ACE_POSIX_AIOCB_Proactor::create_notify_manager (void)
01043 {
01044   
01045   
01046 
01047   if (aiocb_notify_pipe_manager_ == 0)
01048     ACE_NEW (aiocb_notify_pipe_manager_,
01049              ACE_AIOCB_Notify_Pipe_Manager (this));
01050 }
01051 
01052 void
01053 ACE_POSIX_AIOCB_Proactor::delete_notify_manager (void)
01054 {
01055   
01056   
01057 
01058   delete aiocb_notify_pipe_manager_;
01059   aiocb_notify_pipe_manager_ = 0;
01060 }
01061 
01062 int
01063 ACE_POSIX_AIOCB_Proactor::handle_events (ACE_Time_Value &wait_time)
01064 {
01065   
01066   ACE_Countdown_Time countdown (&wait_time);
01067   return this->handle_events_i (wait_time.msec ());
01068 }
01069 
01070 int
01071 ACE_POSIX_AIOCB_Proactor::handle_events (void)
01072 {
01073   return this->handle_events_i (ACE_INFINITE);
01074 }
01075 
01076 int
01077 ACE_POSIX_AIOCB_Proactor::notify_completion(int  sig_num)
01078 {
01079   ACE_UNUSED_ARG (sig_num);
01080 
01081   return this->aiocb_notify_pipe_manager_->notify ();
01082 }
01083 
01084 int
01085 ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
01086 {
01087   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1));
01088 
01089   int ret_val = this->putq_result (result);
01090 
01091   return ret_val;
01092 }
01093 
01094 int
01095 ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result)
01096 {
01097   
01098   
01099 
01100   if (!result)
01101     return -1;
01102 
01103   int sig_num = result->signal_number ();
01104   int ret_val = this->result_queue_.enqueue_tail (result);
01105 
01106   if (ret_val == -1)
01107     ACE_ERROR_RETURN ((LM_ERROR,
01108                        "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
01109                       -1);
01110 
01111   this->notify_completion (sig_num);
01112 
01113   return 0;
01114 }
01115 
01116 ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result (void)
01117 {
01118   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0));
01119 
01120 
01121   ACE_POSIX_Asynch_Result* result = 0;
01122 
01123   if (this->result_queue_.dequeue_head (result) != 0)
01124     return 0;
01125 
01126 
01127 
01128 
01129 
01130 
01131 
01132 
01133   return result;
01134 }
01135 
01136 int ACE_POSIX_AIOCB_Proactor::clear_result_queue (void)
01137 {
01138   int ret_val = 0;
01139   ACE_POSIX_Asynch_Result* result = 0;
01140 
01141   while ((result = this->getq_result ()) != 0)
01142     {
01143       delete result;
01144       ret_val++;
01145     }
01146 
01147   return ret_val;
01148 }
01149 
01150 int ACE_POSIX_AIOCB_Proactor::process_result_queue (void)
01151 {
01152   int ret_val = 0;
01153   ACE_POSIX_Asynch_Result* result = 0;
01154 
01155   while ((result = this->getq_result ()) != 0)
01156     {
01157       this->application_specific_code
01158             (result,
01159              result->bytes_transferred(), 
01160              0,  
01161              result->error());   
01162 
01163       ret_val++;
01164     }
01165 
01166   return ret_val;
01167 }
01168 
01169 int
01170 ACE_POSIX_AIOCB_Proactor::handle_events_i (u_long milli_seconds)
01171 {
01172   int result_suspend = 0;
01173   int retval= 0;
01174 
01175   if (milli_seconds == ACE_INFINITE)
01176     
01177     result_suspend = aio_suspend (aiocb_list_,
01178                                   aiocb_list_max_size_,
01179                                   0);
01180   else
01181     {
01182       
01183       timespec timeout;
01184       timeout.tv_sec = milli_seconds / 1000;
01185       timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000000;
01186       result_suspend = aio_suspend (aiocb_list_,
01187                                     aiocb_list_max_size_,
01188                                     &timeout);
01189     }
01190 
01191   
01192   if (result_suspend == -1)
01193     {
01194       if (errno != EAGAIN &&   
01195           errno != EINTR )    
01196           ACE_ERROR ((LM_ERROR,
01197                       ACE_TEXT ("%N:%l:(%P|%t)::%p\n"),
01198                       ACE_TEXT ("handle_events: aio_suspend failed")));
01199       
01200       
01201     }
01202   else
01203     {
01204       size_t index = 0;
01205       size_t count = aiocb_list_max_size_;  
01206       int error_status = 0;
01207       size_t transfer_count = 0;
01208 
01209       for (;; retval++)
01210         {
01211           ACE_POSIX_Asynch_Result *asynch_result =
01212             find_completed_aio (error_status,
01213                                 transfer_count,
01214                                 index,
01215                                 count);
01216 
01217           if (asynch_result == 0)
01218             break;
01219 
01220           
01221           this->application_specific_code (asynch_result,
01222                                            transfer_count,
01223                                            0,             
01224                                            error_status);
01225         }
01226     }
01227 
01228   
01229   retval += this->process_result_queue ();
01230 
01231   return retval > 0 ? 1 : 0;
01232 }
01233 
01234 int
01235 ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result *asynch_result,
01236                                              int &error_status,
01237                                              size_t &transfer_count)
01238 {
01239   transfer_count = 0;
01240 
01241   
01242   error_status  = aio_error (asynch_result);
01243   if (error_status == EINPROGRESS)
01244     return 0;  
01245 
01246   ssize_t op_return = aio_return (asynch_result);
01247   if (op_return > 0)
01248     transfer_count = static_cast<size_t> (op_return);
01249   
01250   return 1; 
01251 }
01252 
01253 ACE_POSIX_Asynch_Result *
01254 ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
01255                                               size_t &transfer_count,
01256                                               size_t &index,
01257                                               size_t &count)
01258 {
01259   
01260   
01261 
01262   ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
01263 
01264   ACE_POSIX_Asynch_Result *asynch_result = 0;
01265 
01266   if (num_started_aio_ == 0)  
01267     return 0;
01268 
01269   for (; count > 0; index++ , count--)
01270     {
01271       if (index >= aiocb_list_max_size_) 
01272         index = 0;
01273 
01274       if (aiocb_list_[index] == 0) 
01275         continue;
01276 
01277       if (0 != this->get_result_status (result_list_[index],
01278                                         error_status,
01279                                         transfer_count))  
01280         break;
01281 
01282     } 
01283 
01284   if (count == 0) 
01285     return 0;
01286   asynch_result = result_list_[index];
01287 
01288   aiocb_list_[index] = 0;
01289   result_list_[index] = 0;
01290   aiocb_list_cur_size_--;
01291 
01292   num_started_aio_--;  
01293   index++;             
01294   count--;             
01295 
01296   this->start_deferred_aio ();
01297   
01298   
01299 
01300   return asynch_result;
01301 }
01302 
01303 
01304 int
01305 ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
01306                                      ACE_POSIX_Proactor::Opcode op)
01307 {
01308   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
01309 
01310   ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01311 
01312   int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
01313 
01314   if (result == 0) 
01315     return ret_val;
01316 
01317   
01318   switch (op)
01319     {
01320     case ACE_POSIX_Proactor::ACE_OPCODE_READ:
01321       result->aio_lio_opcode = LIO_READ;
01322       break;
01323 
01324     case ACE_POSIX_Proactor::ACE_OPCODE_WRITE:
01325       result->aio_lio_opcode = LIO_WRITE;
01326       break;
01327 
01328     default:
01329       ACE_ERROR_RETURN ((LM_ERROR,
01330                          ACE_TEXT ("%N:%l:(%P|%t)::")
01331                          ACE_TEXT ("start_aio: Invalid op code %d\n"),
01332                          op),
01333                         -1);
01334     }
01335 
01336   if (ret_val != 0)   
01337     {
01338       errno = EAGAIN;
01339       return -1;
01340     }
01341 
01342   
01343 
01344   ssize_t slot = allocate_aio_slot (result);
01345 
01346   if (slot < 0)
01347     return -1;
01348 
01349   size_t index = static_cast<size_t> (slot);
01350 
01351   result_list_[index] = result;   
01352   aiocb_list_cur_size_++;
01353 
01354   ret_val = start_aio_i (result);
01355   switch (ret_val)
01356     {
01357     case 0:     
01358       aiocb_list_[index] = result;
01359       return 0;
01360 
01361     case 1:     
01362       num_deferred_aiocb_ ++;
01363       return 0;
01364 
01365     default:    
01366       break;    
01367     }
01368 
01369   result_list_[index] = 0;
01370   aiocb_list_cur_size_--;
01371   return -1;
01372 }
01373 
01374 ssize_t
01375 ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
01376 {
01377   size_t i = 0;
01378 
01379   
01380   
01381 
01382   if (notify_pipe_read_handle_ == result->aio_fildes) 
01383     {                                       
01384       if (result_list_[i] != 0)           
01385         {                                   
01386           errno   = EAGAIN;
01387           ACE_ERROR_RETURN ((LM_ERROR,
01388                      "%N:%l:(%P | %t)::\n"
01389                      "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01390                      "internal Proactor error 0\n"),
01391                      -1);
01392         }
01393     }
01394   else  
01395     {
01396       for (i= 1; i < this->aiocb_list_max_size_; i++)
01397         if (result_list_[i] == 0)
01398           break;
01399     }
01400 
01401   if (i >= this->aiocb_list_max_size_)
01402     ACE_ERROR_RETURN ((LM_ERROR,
01403               "%N:%l:(%P | %t)::\n"
01404               "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01405               "internal Proactor error 1\n"),
01406               -1);
01407 
01408   
01409   result->aio_sigevent.sigev_notify = SIGEV_NONE;
01410 
01411   return static_cast<ssize_t> (i);
01412 }
01413 
01414 
01415 
01416 
01417 
01418 
01419 int
01420 ACE_POSIX_AIOCB_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
01421 {
01422   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
01423 
01424   int ret_val;
01425   const ACE_TCHAR *ptype;
01426 
01427   
01428 
01429   switch (result->aio_lio_opcode )
01430     {
01431     case LIO_READ :
01432       ptype = ACE_TEXT ("read ");
01433       ret_val = aio_read (result);
01434       break;
01435     case LIO_WRITE :
01436       ptype = ACE_TEXT ("write");
01437       ret_val = aio_write (result);
01438       break;
01439     default:
01440       ptype = ACE_TEXT ("?????");
01441       ret_val = -1;
01442       break;
01443     }
01444 
01445   if (ret_val == 0)
01446     this->num_started_aio_++;
01447   else 
01448     {
01449       if (errno == EAGAIN || errno == ENOMEM)  
01450         ret_val = 1;
01451       else
01452         ACE_ERROR ((LM_ERROR,
01453                     ACE_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
01454                     ptype,
01455                     ACE_TEXT ("queueing failed")));
01456     }
01457 
01458   return ret_val;
01459 }
01460 
01461 
01462 int
01463 ACE_POSIX_AIOCB_Proactor::start_deferred_aio ()
01464 {
01465   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
01466 
01467   
01468   
01469   
01470   
01471   
01472   
01473   
01474 
01475   if (num_deferred_aiocb_ == 0)
01476     return 0;  
01477 
01478   size_t i = 0;
01479 
01480   for (i= 0; i < this->aiocb_list_max_size_; i++)
01481     if (result_list_[i] !=0       
01482        && aiocb_list_[i]  ==0)     
01483       break;
01484 
01485   if (i >= this->aiocb_list_max_size_)
01486     ACE_ERROR_RETURN ((LM_ERROR,
01487                  "%N:%l:(%P | %t)::\n"
01488                  "start_deferred_aio:"
01489                  "internal Proactor error 3\n"),
01490                  -1);
01491 
01492   ACE_POSIX_Asynch_Result *result = result_list_[i];
01493 
01494   int ret_val = start_aio_i (result);
01495 
01496   switch (ret_val)
01497     {
01498     case 0 :    
01499       aiocb_list_[i] = result;
01500       num_deferred_aiocb_ --;
01501       return 0;
01502 
01503     case 1 :
01504       return 0;  
01505 
01506     default :     
01507       break;
01508     }
01509 
01510   
01511 
01512   result_list_[i] = 0;
01513   aiocb_list_cur_size_--;
01514 
01515   num_deferred_aiocb_ --;
01516 
01517   result->set_error (errno);
01518   result->set_bytes_transferred (0);
01519   this->putq_result (result);  
01520 
01521   return -1;
01522 }
01523 
01524 int
01525 ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle)
01526 {
01527   
01528   
01529   
01530   
01531   
01532   
01533   
01534   
01535   
01536   
01537   
01538   
01539 
01540   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
01541 
01542   int    num_total     = 0;
01543   int    num_cancelled = 0;
01544 
01545   {
01546     ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01547 
01548     size_t ai = 0;
01549 
01550     for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
01551       {
01552         if (this->result_list_[ai] == 0)    
01553           continue;
01554 
01555         if (this->result_list_[ai]->aio_fildes != handle)  
01556           continue;
01557 
01558         num_total++;
01559 
01560         ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai];
01561 
01562         if (this->aiocb_list_[ai] == 0)  
01563           {
01564             num_cancelled++;
01565             this->num_deferred_aiocb_--;
01566 
01567             this->aiocb_list_[ai] = 0;
01568             this->result_list_[ai] = 0;
01569             this->aiocb_list_cur_size_--;
01570 
01571             asynch_result->set_error (ECANCELED);
01572             asynch_result->set_bytes_transferred (0);
01573             this->putq_result (asynch_result);
01574             
01575           }
01576         else      
01577           {
01578             int rc_cancel = this->cancel_aiocb (asynch_result);
01579 
01580             if (rc_cancel == 0)    
01581               num_cancelled++;     
01582           }
01583       }
01584 
01585   } 
01586 
01587   if (num_total == 0)
01588     return 1;  
01589 
01590   if (num_cancelled == num_total)
01591     return 0;  
01592 
01593   return 2; 
01594 }
01595 
01596 int
01597 ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result)
01598 {
01599   
01600   
01601   int rc = ::aio_cancel (0, result);
01602 
01603   
01604   if (rc == AIO_CANCELED)
01605     return 0;
01606   else if (rc == AIO_ALLDONE)
01607     return 1;
01608   else 
01609     return 2;
01610 }
01611 
01612 
01613 
01614 
01615 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
01616 
01617 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations)
01618   :  ACE_POSIX_AIOCB_Proactor (max_aio_operations,
01619                                ACE_POSIX_Proactor::PROACTOR_SIG)
01620 {
01621   
01622   
01623   
01624 
01625   
01626   ACE_OS::sigemptyset (&this->RT_completion_signals_);
01627 
01628   
01629   if (ACE_OS::sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1)
01630     ACE_ERROR ((LM_ERROR, ACE_TEXT ("ACE_POSIX_SIG_Proactor: %p\n"),
01631                 ACE_TEXT ("sigaddset")));
01632   this->block_signals ();
01633   
01634   this->setup_signal_handler (ACE_SIGRTMIN);
01635 
01636   
01637   
01638   
01639 
01640   this->get_asynch_pseudo_task().start ();
01641   return;
01642 }
01643 
01644 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set,
01645                                                 size_t max_aio_operations)
01646   :  ACE_POSIX_AIOCB_Proactor (max_aio_operations,
01647                                ACE_POSIX_Proactor::PROACTOR_SIG)
01648 {
01649   
01650   
01651 
01652   
01653 
01654   
01655   if (sigemptyset (&this->RT_completion_signals_) == -1)
01656     ACE_ERROR ((LM_ERROR,
01657                 "Error:(%P | %t):%p\n",
01658                 "sigemptyset failed"));
01659 
01660   
01661   
01662   
01663   int member = 0;
01664   for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
01665     {
01666       member = sigismember (&signal_set,
01667                             si);
01668       if (member == -1)
01669         ACE_ERROR ((LM_ERROR,
01670                     "%N:%l:(%P | %t)::%p\n",
01671                     "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
01672                     "sigismember failed"));
01673       else if (member == 1)
01674         {
01675           sigaddset (&this->RT_completion_signals_, si);
01676           this->setup_signal_handler (si);
01677         }
01678     }
01679 
01680   
01681   this->block_signals ();
01682 
01683   
01684   
01685   
01686 
01687   this->get_asynch_pseudo_task().start ();
01688   return;
01689 }
01690 
01691 ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
01692 {
01693   this->close ();
01694 
01695   
01696 }
01697 
01698 ACE_POSIX_Proactor::Proactor_Type
01699 ACE_POSIX_SIG_Proactor::get_impl_type (void)
01700 {
01701   return PROACTOR_SIG;
01702 }
01703 
01704 int
01705 ACE_POSIX_SIG_Proactor::handle_events (ACE_Time_Value &wait_time)
01706 {
01707   
01708   ACE_Countdown_Time countdown (&wait_time);
01709   return this->handle_events_i (&wait_time);
01710 }
01711 
01712 int
01713 ACE_POSIX_SIG_Proactor::handle_events (void)
01714 {
01715   return this->handle_events_i (0);
01716 }
01717 
01718 int
01719 ACE_POSIX_SIG_Proactor::notify_completion (int sig_num)
01720 {
01721   
01722   pid_t const pid = ACE_OS::getpid ();
01723   if (pid == (pid_t) -1)
01724     ACE_ERROR_RETURN ((LM_ERROR,
01725                        "Error:%N:%l(%P | %t):%p",
01726                        "<getpid> failed"),
01727                       -1);
01728 
01729   
01730   sigval value;
01731 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
01732   value.sigval_int = -1;
01733 #else
01734   value.sival_int = -1;
01735 #endif 
01736 
01737   
01738   if (sigqueue (pid, sig_num, value) == 0)
01739     return 0;
01740 
01741   if (errno != EAGAIN)
01742     ACE_ERROR_RETURN ((LM_ERROR,
01743                        "Error:%N:%l:(%P | %t):%p\n",
01744                        "<sigqueue> failed"),
01745                       -1);
01746   return -1;
01747 }
01748 
01749 ACE_Asynch_Result_Impl *
01750 ACE_POSIX_SIG_Proactor::create_asynch_timer
01751   (const ACE_Handler::Proxy_Ptr &handler_proxy,
01752    const void *act,
01753    const ACE_Time_Value &tv,
01754    ACE_HANDLE event,
01755    int priority,
01756    int signal_number)
01757 {
01758   int is_member = 0;
01759 
01760   
01761   if (signal_number == -1)
01762     {
01763       int si;
01764       for (si = ACE_SIGRTMAX;
01765            (is_member == 0) && (si >= ACE_SIGRTMIN);
01766            si--)
01767         {
01768           is_member = sigismember (&this->RT_completion_signals_,
01769                                    si);
01770           if (is_member == -1)
01771             ACE_ERROR_RETURN ((LM_ERROR,
01772                                "%N:%l:(%P | %t)::%s\n",
01773                                "ACE_POSIX_SIG_Proactor::create_asynch_timer:"
01774                                "sigismember failed"),
01775                               0);
01776         }
01777 
01778       if (is_member == 0)
01779         ACE_ERROR_RETURN ((LM_ERROR,
01780                            "Error:%N:%l:(%P | %t)::%s\n",
01781                            "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
01782                            "Signal mask set empty"),
01783                           0);
01784       else
01785         
01786         signal_number = si + 1;
01787     }
01788 
01789   ACE_Asynch_Result_Impl *implementation;
01790   ACE_NEW_RETURN (implementation,
01791                   ACE_POSIX_Asynch_Timer (handler_proxy,
01792                                           act,
01793                                           tv,
01794                                           event,
01795                                           priority,
01796                                           signal_number),
01797                   0);
01798   return implementation;
01799 }
01800 
01801 #if 0
01802 static void
01803 sig_handler (int sig_num, siginfo_t *, ucontext_t *)
01804 {
01805   
01806   ACE_DEBUG ((LM_DEBUG,
01807               "%N:%l:(%P | %t)::sig_handler received signal: %d\n",
01808                sig_num));
01809 }
01810 #endif 
01811 
01812 int
01813 ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const
01814 {
01815   
01816   
01817   
01818   
01819 
01820   
01821   
01822   
01823   
01824   
01825 #if 0
01826   struct sigaction reaction;
01827   sigemptyset (&reaction.sa_mask);   
01828   reaction.sa_flags = SA_SIGINFO;    
01829   reaction.sa_sigaction = ACE_SIGNAL_C_FUNC (sig_handler); 
01830   int sigaction_return = ACE_OS::sigaction (signal_number,
01831                                             &reaction,
01832                                             0);
01833   if (sigaction_return == -1)
01834     ACE_ERROR_RETURN ((LM_ERROR,
01835                        "Error:%p\n",
01836                        "Proactor couldnt do sigaction for the RT SIGNAL"),
01837                       -1);
01838 #else
01839   ACE_UNUSED_ARG(signal_number);
01840 #endif
01841   return 0;
01842 }
01843 
01844 
01845 int
01846 ACE_POSIX_SIG_Proactor::block_signals (void) const
01847 {
01848   return ACE_OS::pthread_sigmask (SIG_BLOCK, &this->RT_completion_signals_, 0);
01849 }
01850 
01851 ssize_t
01852 ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
01853 {
01854   size_t i = 0;
01855 
01856   
01857   for (i = 0; i < this->aiocb_list_max_size_; i++)
01858     if (result_list_[i] == 0)
01859       break;
01860 
01861   if (i >= this->aiocb_list_max_size_)
01862     ACE_ERROR_RETURN ((LM_ERROR,
01863               "%N:%l:(%P | %t)::\n"
01864               "ACE_POSIX_SIG_Proactor::allocate_aio_slot "
01865               "internal Proactor error 1\n"),
01866               -1);
01867 
01868   
01869   
01870   result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
01871   result->aio_sigevent.sigev_signo = result->signal_number ();
01872 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
01873   result->aio_sigevent.sigev_value.sigval_int = static_cast<int> (i);
01874 #else
01875   result->aio_sigevent.sigev_value.sival_int = static_cast<int> (i);
01876 #endif 
01877 
01878   return static_cast<ssize_t> (i);
01879 }
01880 
01881 int
01882 ACE_POSIX_SIG_Proactor::handle_events_i (const ACE_Time_Value *timeout)
01883 {
01884   int result_sigwait = 0;
01885   siginfo_t sig_info;
01886 
01887   do
01888     {
01889       
01890       if (timeout == 0)
01891         {
01892           result_sigwait = ACE_OS::sigwaitinfo (&this->RT_completion_signals_,
01893                                                 &sig_info);
01894         }
01895       else
01896         {
01897           result_sigwait = ACE_OS::sigtimedwait (&this->RT_completion_signals_,
01898                                                  &sig_info,
01899                                                  timeout);
01900           if (result_sigwait == -1 && errno == EAGAIN)
01901             return 0;
01902         }
01903     }
01904   while (result_sigwait == -1 && errno == EINTR);
01905 
01906   if (result_sigwait == -1)  
01907     return -1;
01908 
01909   
01910   
01911   
01912   int flg_aio = 0;           
01913 
01914   size_t index = 0;          
01915   size_t count = 1;          
01916   int error_status = 0;
01917   size_t transfer_count = 0;
01918 
01919   if (sig_info.si_code == SI_ASYNCIO || this->os_id_ == ACE_OS_SUN_56)
01920     {
01921       flg_aio = 1;  
01922       
01923       
01924 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
01925       index = static_cast<size_t> (sig_info.si_value.sigval_int);
01926 #else
01927       index = static_cast<size_t> (sig_info.si_value.sival_int);
01928 #endif 
01929       
01930       
01931       
01932       
01933       if (os_id_ == ACE_OS_SUN_56) 
01934         {
01935           
01936           
01937           
01938           
01939           
01940           
01941           
01942           count = aiocb_list_max_size_;
01943         }
01944     }
01945   else if (sig_info.si_code != SI_QUEUE)
01946     {
01947       
01948       
01949       
01950       
01951       ACE_ERROR ((LM_DEBUG,
01952                   ACE_TEXT ("%N:%l:(%P | %t): ")
01953                   ACE_TEXT ("ACE_POSIX_SIG_Proactor::handle_events: ")
01954                   ACE_TEXT ("Unexpected signal code (%d) returned ")
01955                   ACE_TEXT ("from sigwait; expecting %d\n"),
01956                   result_sigwait, sig_info.si_code));
01957       flg_aio = 1;
01958     }
01959 
01960   int ret_aio = 0;
01961   int ret_que = 0;
01962 
01963   if (flg_aio)
01964     for (;; ret_aio++)
01965       {
01966         ACE_POSIX_Asynch_Result *asynch_result =
01967           find_completed_aio (error_status,
01968                               transfer_count,
01969                               index,
01970                               count);
01971 
01972         if (asynch_result == 0)
01973           break;
01974 
01975         
01976         this->application_specific_code (asynch_result,
01977                                          transfer_count,
01978                                          0,             
01979                                          error_status); 
01980       }
01981 
01982   
01983   ret_que = this->process_result_queue ();
01984 
01985   
01986   
01987 #if 0
01988   ACE_DEBUG ((LM_DEBUG,
01989               "(%t) NumAIO=%d NumQueue=%d\n",
01990               ret_aio, ret_que));
01991 #endif
01992 
01993   return ret_aio + ret_que > 0 ? 1 : 0;
01994 }
01995 
01996 #endif 
01997 
01998 
01999 
02000 ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer
02001   (const ACE_Handler::Proxy_Ptr &handler_proxy,
02002    const void *act,
02003    const ACE_Time_Value &tv,
02004    ACE_HANDLE event,
02005    int priority,
02006    int signal_number)
02007   : ACE_POSIX_Asynch_Result
02008      (handler_proxy, act, event, 0, 0, priority, signal_number),
02009     time_ (tv)
02010 {
02011 }
02012 
02013 void
02014 ACE_POSIX_Asynch_Timer::complete (size_t       ,
02015                                   int          ,
02016                                   const void * ,
02017                                   u_long       )
02018 {
02019   ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
02020   if (handler != 0)
02021     handler->handle_time_out (this->time_, this->act ());
02022 }
02023 
02024 
02025 
02026 
02027 ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion
02028   (const ACE_Handler::Proxy_Ptr &handler_proxy,
02029    const void *act,
02030    ACE_HANDLE event,
02031    int priority,
02032    int signal_number)
02033   : ACE_Asynch_Result_Impl (),
02034     ACE_POSIX_Asynch_Result (handler_proxy,
02035                              act,
02036                              event,
02037                              0,
02038                              0,
02039                              priority,
02040                              signal_number)
02041 {
02042 }
02043 
02044 ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void)
02045 {
02046 }
02047 
02048 void
02049 ACE_POSIX_Wakeup_Completion::complete (size_t       ,
02050                                        int          ,
02051                                        const void * ,
02052                                        u_long       )
02053 {
02054 
02055   ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
02056   if (handler != 0)
02057     handler->handle_wakeup ();
02058 }
02059 
02060 ACE_END_VERSIONED_NAMESPACE_DECL
02061 
02062 #endif