00001
00002
00003 #include "ace/POSIX_Proactor.h"
00004
00005 #if defined (ACE_HAS_AIO_CALLS)
00006
00007 #if !defined (__ACE_INLINE__)
00008 #include "ace/POSIX_Proactor.inl"
00009 #endif
00010
00011 # if defined (ACE_HAS_SYSINFO)
00012 # include <sys/systeminfo.h>
00013 # endif
00014
00015 #include "ace/ACE.h"
00016 #include "ace/Flag_Manip.h"
00017 #include "ace/Task_T.h"
00018 #include "ace/Log_Msg.h"
00019 #include "ace/Object_Manager.h"
00020 #include "ace/OS_NS_sys_socket.h"
00021 #include "ace/OS_NS_signal.h"
00022 #include "ace/OS_NS_unistd.h"
00023
00024 #if defined (sun)
00025 # include "ace/OS_NS_strings.h"
00026 #endif
00027
00028
00029
00030 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00031
00032
00033
00034
00035
00036
00037
00038
00039 class ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result
00040 {
00041 public:
00042
00043 ACE_POSIX_Wakeup_Completion (const ACE_Handler::Proxy_Ptr &handler_proxy,
00044 const void *act = 0,
00045 ACE_HANDLE event = ACE_INVALID_HANDLE,
00046 int priority = 0,
00047 int signal_number = ACE_SIGRTMIN);
00048
00049
00050 virtual ~ACE_POSIX_Wakeup_Completion (void);
00051
00052
00053
00054 virtual void complete (size_t bytes_transferred = 0,
00055 int success = 1,
00056 const void *completion_key = 0,
00057 u_long error = 0);
00058 };
00059
00060
00061 ACE_POSIX_Proactor::ACE_POSIX_Proactor (void)
00062 : os_id_ (ACE_OS_UNDEFINED)
00063 {
00064 #if defined(sun)
00065
00066 os_id_ = ACE_OS_SUN;
00067
00068 char Buf [32];
00069
00070 ::memset(Buf,0,sizeof(Buf));
00071
00072 ACE_OS::sysinfo (SI_RELEASE , Buf, sizeof(Buf)-1);
00073
00074 if (ACE_OS::strcasecmp (Buf , "5.6") == 0)
00075 os_id_ = ACE_OS_SUN_56;
00076 else if (ACE_OS::strcasecmp (Buf , "5.7") == 0)
00077 os_id_ = ACE_OS_SUN_57;
00078 else if (ACE_OS::strcasecmp (Buf , "5.8") == 0)
00079 os_id_ = ACE_OS_SUN_58;
00080
00081 #elif defined(HPUX)
00082
00083 os_id_ = ACE_OS_HPUX;
00084
00085 #elif defined(__sgi)
00086
00087 os_id_ = ACE_OS_IRIX;
00088
00089 #elif defined(__OpenBSD)
00090
00091 os_id_ = ACE_OS_OPENBSD;
00092
00093
00094
00095
00096
00097 #endif
00098 }
00099
00100 ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void)
00101 {
00102 this->close ();
00103 }
00104
00105 int
00106 ACE_POSIX_Proactor::close (void)
00107 {
00108 return 0;
00109 }
00110
00111 int
00112 ACE_POSIX_Proactor::register_handle (ACE_HANDLE handle,
00113 const void *completion_key)
00114 {
00115 ACE_UNUSED_ARG (handle);
00116 ACE_UNUSED_ARG (completion_key);
00117 return 0;
00118 }
00119
00120 int
00121 ACE_POSIX_Proactor::wake_up_dispatch_threads (void)
00122 {
00123 return 0;
00124 }
00125
00126 int
00127 ACE_POSIX_Proactor::close_dispatch_threads (int)
00128 {
00129 return 0;
00130 }
00131
00132 size_t
00133 ACE_POSIX_Proactor::number_of_threads (void) const
00134 {
00135
00136 ACE_NOTSUP_RETURN (0);
00137 }
00138
00139 void
00140 ACE_POSIX_Proactor::number_of_threads (size_t threads)
00141 {
00142
00143 ACE_UNUSED_ARG (threads);
00144 }
00145
00146 ACE_HANDLE
00147 ACE_POSIX_Proactor::get_handle (void) const
00148 {
00149 return ACE_INVALID_HANDLE;
00150 }
00151
00152 ACE_Asynch_Read_Stream_Impl *
00153 ACE_POSIX_Proactor::create_asynch_read_stream (void)
00154 {
00155 ACE_Asynch_Read_Stream_Impl *implementation = 0;
00156 ACE_NEW_RETURN (implementation,
00157 ACE_POSIX_Asynch_Read_Stream (this),
00158 0);
00159 return implementation;
00160 }
00161
00162 ACE_Asynch_Read_Stream_Result_Impl *
00163 ACE_POSIX_Proactor::create_asynch_read_stream_result
00164 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00165 ACE_HANDLE handle,
00166 ACE_Message_Block &message_block,
00167 size_t bytes_to_read,
00168 const void* act,
00169 ACE_HANDLE event,
00170 int priority,
00171 int signal_number)
00172 {
00173 ACE_Asynch_Read_Stream_Result_Impl *implementation;
00174 ACE_NEW_RETURN (implementation,
00175 ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy,
00176 handle,
00177 message_block,
00178 bytes_to_read,
00179 act,
00180 event,
00181 priority,
00182 signal_number),
00183 0);
00184 return implementation;
00185 }
00186
00187
00188 ACE_Asynch_Write_Stream_Impl *
00189 ACE_POSIX_Proactor::create_asynch_write_stream (void)
00190 {
00191 ACE_Asynch_Write_Stream_Impl *implementation = 0;
00192 ACE_NEW_RETURN (implementation,
00193 ACE_POSIX_Asynch_Write_Stream (this),
00194 0);
00195 return implementation;
00196 }
00197
00198 ACE_Asynch_Write_Stream_Result_Impl *
00199 ACE_POSIX_Proactor::create_asynch_write_stream_result
00200 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00201 ACE_HANDLE handle,
00202 ACE_Message_Block &message_block,
00203 size_t bytes_to_write,
00204 const void* act,
00205 ACE_HANDLE event,
00206 int priority,
00207 int signal_number)
00208 {
00209 ACE_Asynch_Write_Stream_Result_Impl *implementation;
00210 ACE_NEW_RETURN (implementation,
00211 ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy,
00212 handle,
00213 message_block,
00214 bytes_to_write,
00215 act,
00216 event,
00217 priority,
00218 signal_number),
00219 0);
00220 return implementation;
00221 }
00222
00223
00224 ACE_Asynch_Read_File_Impl *
00225 ACE_POSIX_Proactor::create_asynch_read_file (void)
00226 {
00227 ACE_Asynch_Read_File_Impl *implementation = 0;
00228 ACE_NEW_RETURN (implementation,
00229 ACE_POSIX_Asynch_Read_File (this),
00230 0);
00231 return implementation;
00232 }
00233
00234 ACE_Asynch_Read_File_Result_Impl *
00235 ACE_POSIX_Proactor::create_asynch_read_file_result
00236 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00237 ACE_HANDLE handle,
00238 ACE_Message_Block &message_block,
00239 size_t bytes_to_read,
00240 const void* act,
00241 u_long offset,
00242 u_long offset_high,
00243 ACE_HANDLE event,
00244 int priority,
00245 int signal_number)
00246 {
00247 ACE_Asynch_Read_File_Result_Impl *implementation;
00248 ACE_NEW_RETURN (implementation,
00249 ACE_POSIX_Asynch_Read_File_Result (handler_proxy,
00250 handle,
00251 message_block,
00252 bytes_to_read,
00253 act,
00254 offset,
00255 offset_high,
00256 event,
00257 priority,
00258 signal_number),
00259 0);
00260 return implementation;
00261 }
00262
00263
00264 ACE_Asynch_Write_File_Impl *
00265 ACE_POSIX_Proactor::create_asynch_write_file (void)
00266 {
00267 ACE_Asynch_Write_File_Impl *implementation = 0;
00268 ACE_NEW_RETURN (implementation,
00269 ACE_POSIX_Asynch_Write_File (this),
00270 0);
00271 return implementation;
00272 }
00273
00274 ACE_Asynch_Write_File_Result_Impl *
00275 ACE_POSIX_Proactor::create_asynch_write_file_result
00276 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00277 ACE_HANDLE handle,
00278 ACE_Message_Block &message_block,
00279 size_t bytes_to_write,
00280 const void* act,
00281 u_long offset,
00282 u_long offset_high,
00283 ACE_HANDLE event,
00284 int priority,
00285 int signal_number)
00286 {
00287 ACE_Asynch_Write_File_Result_Impl *implementation;
00288 ACE_NEW_RETURN (implementation,
00289 ACE_POSIX_Asynch_Write_File_Result (handler_proxy,
00290 handle,
00291 message_block,
00292 bytes_to_write,
00293 act,
00294 offset,
00295 offset_high,
00296 event,
00297 priority,
00298 signal_number),
00299 0);
00300 return implementation;
00301 }
00302
00303
00304 ACE_Asynch_Read_Dgram_Impl *
00305 ACE_POSIX_Proactor::create_asynch_read_dgram (void)
00306 {
00307 ACE_Asynch_Read_Dgram_Impl *implementation = 0;
00308 ACE_NEW_RETURN (implementation,
00309 ACE_POSIX_Asynch_Read_Dgram (this),
00310 0);
00311 return implementation;
00312 }
00313
00314 ACE_Asynch_Read_Dgram_Result_Impl *
00315 ACE_POSIX_Proactor::create_asynch_read_dgram_result
00316 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00317 ACE_HANDLE handle,
00318 ACE_Message_Block *message_block,
00319 size_t bytes_to_read,
00320 int flags,
00321 int protocol_family,
00322 const void* act,
00323 ACE_HANDLE event ,
00324 int priority ,
00325 int signal_number)
00326 {
00327 ACE_Asynch_Read_Dgram_Result_Impl *implementation=0;
00328 ACE_NEW_RETURN (implementation,
00329 ACE_POSIX_Asynch_Read_Dgram_Result(handler_proxy,
00330 handle,
00331 message_block,
00332 bytes_to_read,
00333 flags,
00334 protocol_family,
00335 act,
00336 event,
00337 priority,
00338 signal_number),
00339 0);
00340
00341 return implementation;
00342 }
00343
00344
00345 ACE_Asynch_Write_Dgram_Impl *
00346 ACE_POSIX_Proactor::create_asynch_write_dgram (void)
00347 {
00348 ACE_Asynch_Write_Dgram_Impl *implementation = 0;
00349 ACE_NEW_RETURN (implementation,
00350 ACE_POSIX_Asynch_Write_Dgram (this),
00351 0);
00352
00353 return implementation;
00354 }
00355
00356 ACE_Asynch_Write_Dgram_Result_Impl *
00357 ACE_POSIX_Proactor::create_asynch_write_dgram_result
00358 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00359 ACE_HANDLE handle,
00360 ACE_Message_Block *message_block,
00361 size_t bytes_to_write,
00362 int flags,
00363 const void* act,
00364 ACE_HANDLE event,
00365 int priority ,
00366 int signal_number)
00367 {
00368 ACE_Asynch_Write_Dgram_Result_Impl *implementation=0;
00369 ACE_NEW_RETURN (implementation,
00370 ACE_POSIX_Asynch_Write_Dgram_Result(handler_proxy,
00371 handle,
00372 message_block,
00373 bytes_to_write,
00374 flags,
00375 act,
00376 event,
00377 priority,
00378 signal_number),
00379 0);
00380
00381 return implementation;
00382 }
00383
00384
00385 ACE_Asynch_Accept_Impl *
00386 ACE_POSIX_Proactor::create_asynch_accept (void)
00387 {
00388 ACE_Asynch_Accept_Impl *implementation = 0;
00389 ACE_NEW_RETURN (implementation,
00390 ACE_POSIX_Asynch_Accept (this),
00391 0);
00392
00393 return implementation;
00394 }
00395
00396 ACE_Asynch_Accept_Result_Impl *
00397 ACE_POSIX_Proactor::create_asynch_accept_result
00398 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00399 ACE_HANDLE listen_handle,
00400 ACE_HANDLE accept_handle,
00401 ACE_Message_Block &message_block,
00402 size_t bytes_to_read,
00403 const void* act,
00404 ACE_HANDLE event,
00405 int priority,
00406 int signal_number)
00407 {
00408 ACE_Asynch_Accept_Result_Impl *implementation;
00409 ACE_NEW_RETURN (implementation,
00410 ACE_POSIX_Asynch_Accept_Result (handler_proxy,
00411 listen_handle,
00412 accept_handle,
00413 message_block,
00414 bytes_to_read,
00415 act,
00416 event,
00417 priority,
00418 signal_number),
00419 0);
00420 return implementation;
00421 }
00422
00423
00424 ACE_Asynch_Connect_Impl *
00425 ACE_POSIX_Proactor::create_asynch_connect (void)
00426 {
00427 ACE_Asynch_Connect_Impl *implementation = 0;
00428 ACE_NEW_RETURN (implementation,
00429 ACE_POSIX_Asynch_Connect (this),
00430 0);
00431
00432 return implementation;
00433 }
00434
00435 ACE_Asynch_Connect_Result_Impl *
00436 ACE_POSIX_Proactor::create_asynch_connect_result
00437 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00438 ACE_HANDLE connect_handle,
00439 const void* act,
00440 ACE_HANDLE event,
00441 int priority,
00442 int signal_number)
00443 {
00444 ACE_Asynch_Connect_Result_Impl *implementation;
00445 ACE_NEW_RETURN (implementation,
00446 ACE_POSIX_Asynch_Connect_Result (handler_proxy,
00447 connect_handle,
00448 act,
00449 event,
00450 priority,
00451 signal_number),
00452 0);
00453 return implementation;
00454 }
00455
00456
00457 ACE_Asynch_Transmit_File_Impl *
00458 ACE_POSIX_Proactor::create_asynch_transmit_file (void)
00459 {
00460 ACE_Asynch_Transmit_File_Impl *implementation = 0;
00461 ACE_NEW_RETURN (implementation,
00462 ACE_POSIX_Asynch_Transmit_File (this),
00463 0);
00464 return implementation;
00465 }
00466
00467 ACE_Asynch_Transmit_File_Result_Impl *
00468 ACE_POSIX_Proactor::create_asynch_transmit_file_result
00469 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00470 ACE_HANDLE socket,
00471 ACE_HANDLE file,
00472 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
00473 size_t bytes_to_write,
00474 u_long offset,
00475 u_long offset_high,
00476 size_t bytes_per_send,
00477 u_long flags,
00478 const void *act,
00479 ACE_HANDLE event,
00480 int priority,
00481 int signal_number)
00482 {
00483 ACE_Asynch_Transmit_File_Result_Impl *implementation;
00484 ACE_NEW_RETURN (implementation,
00485 ACE_POSIX_Asynch_Transmit_File_Result (handler_proxy,
00486 socket,
00487 file,
00488 header_and_trailer,
00489 bytes_to_write,
00490 offset,
00491 offset_high,
00492 bytes_per_send,
00493 flags,
00494 act,
00495 event,
00496 priority,
00497 signal_number),
00498 0);
00499 return implementation;
00500 }
00501
00502 ACE_Asynch_Result_Impl *
00503 ACE_POSIX_Proactor::create_asynch_timer
00504 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00505 const void *act,
00506 const ACE_Time_Value &tv,
00507 ACE_HANDLE event,
00508 int priority,
00509 int signal_number)
00510 {
00511 ACE_POSIX_Asynch_Timer *implementation;
00512 ACE_NEW_RETURN (implementation,
00513 ACE_POSIX_Asynch_Timer (handler_proxy,
00514 act,
00515 tv,
00516 event,
00517 priority,
00518 signal_number),
00519 0);
00520 return implementation;
00521 }
00522
00523 #if 0
00524 int
00525 ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
00526 {
00527
00528
00529
00530 ACE_Time_Value timeout (0, 0);
00531 int result = 0;
00532
00533 for (;;)
00534 {
00535 result = this->handle_events (timeout);
00536 if (result != 0 || errno == ETIME)
00537 break;
00538 }
00539
00540
00541
00542 return result == -1 ? -1 : 0;
00543 }
00544
00545 int
00546 ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle,
00547 ACE_Reactor_Mask close_mask)
00548 {
00549 ACE_UNUSED_ARG (close_mask);
00550 ACE_UNUSED_ARG (handle);
00551
00552 return this->close ();
00553 }
00554 #endif
00555
00556 void
00557 ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
00558 size_t bytes_transferred,
00559 const void *,
00560 u_long error)
00561 {
00562 ACE_SEH_TRY
00563 {
00564
00565 asynch_result->complete (bytes_transferred,
00566 error ? 0 : 1,
00567 0,
00568 error);
00569 }
00570 ACE_SEH_FINALLY
00571 {
00572
00573 delete asynch_result;
00574 }
00575 }
00576
00577 int
00578 ACE_POSIX_Proactor::post_wakeup_completions (int how_many)
00579 {
00580 ACE_POSIX_Wakeup_Completion *wakeup_completion = 0;
00581
00582 for (int ci = 0; ci < how_many; ci++)
00583 {
00584 ACE_NEW_RETURN
00585 (wakeup_completion,
00586 ACE_POSIX_Wakeup_Completion (this->wakeup_handler_.proxy ()),
00587 -1);
00588 if (this->post_completion (wakeup_completion) == -1)
00589 return -1;
00590 }
00591
00592 return 0;
00593 }
00594
00595 ACE_POSIX_Proactor::Proactor_Type
00596 ACE_POSIX_Proactor::get_impl_type (void)
00597 {
00598 return PROACTOR_POSIX;
00599 }
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629 class ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
00630 {
00631 public:
00632
00633
00634 ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
00635
00636
00637 virtual ~ACE_AIOCB_Notify_Pipe_Manager (void);
00638
00639
00640 int notify ();
00641
00642
00643
00644 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
00645
00646 private:
00647
00648 ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_;
00649
00650
00651 ACE_Message_Block message_block_;
00652
00653
00654
00655 ACE_Pipe pipe_;
00656
00657
00658 ACE_POSIX_Asynch_Read_Stream read_stream_;
00659
00660
00661 ACE_AIOCB_Notify_Pipe_Manager (void);
00662 };
00663
00664 ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
00665 : posix_aiocb_proactor_ (posix_aiocb_proactor),
00666 message_block_ (sizeof (2)),
00667 read_stream_ (posix_aiocb_proactor)
00668 {
00669
00670 this->pipe_.open ();
00671
00672
00673 ACE::set_flags (this->pipe_.write_handle (), ACE_NONBLOCK);
00674
00675
00676 ACE::set_flags (this->pipe_.read_handle (), ACE_NONBLOCK);
00677
00678
00679 posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ());
00680
00681
00682 if (this->read_stream_.open (this->proxy (),
00683 this->pipe_.read_handle (),
00684 0,
00685 0)
00686 == -1)
00687 ACE_ERROR ((LM_ERROR,
00688 "%N:%l:%p\n",
00689 "ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:"
00690 "Open on Read Stream failed"));
00691
00692
00693 if (this->read_stream_.read (this->message_block_,
00694 1,
00695 0,
00696 0)
00697 == -1)
00698 ACE_ERROR ((LM_ERROR,
00699 "%N:%l:%p\n",
00700 "ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:"
00701 "Read from pipe failed"));
00702 }
00703
00704 ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void)
00705 {
00706
00707 this->read_stream_.cancel ();
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719 ACE_HANDLE h = this->pipe_.write_handle ();
00720 if (h != ACE_INVALID_HANDLE)
00721 ACE_OS::closesocket (h);
00722
00723 h = this->pipe_.read_handle ();
00724 if ( h != ACE_INVALID_HANDLE)
00725 ACE_OS::closesocket (h);
00726
00727 }
00728
00729
00730 int
00731 ACE_AIOCB_Notify_Pipe_Manager::notify ()
00732 {
00733
00734 char char_send = 0;
00735 ssize_t ret_val = ACE::send (this->pipe_.write_handle (),
00736 &char_send,
00737 sizeof (char_send));
00738
00739 if (ret_val < 0)
00740 {
00741 if (errno != EWOULDBLOCK)
00742 #if 0
00743 ACE_ERROR ((LM_ERROR,
00744 ACE_LIB_TEXT ("(%P %t):%p\n"),
00745 ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify")
00746 ACE_LIB_TEXT ("Error:Writing on to notify pipe failed")));
00747 #endif
00748 return -1;
00749 }
00750
00751 return 0;
00752 }
00753
00754 void
00755 ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream
00756 (const ACE_Asynch_Read_Stream::Result & )
00757 {
00758
00759
00760
00761
00762 if (this->message_block_.length () > 0)
00763 this->message_block_.wr_ptr (this->message_block_.rd_ptr ());
00764
00765
00766
00767 if (-1 == this->read_stream_.read (this->message_block_,
00768 1,
00769 0,
00770 0))
00771 ACE_ERROR ((LM_ERROR,
00772 ACE_LIB_TEXT ("%N:%l:(%P | %t):%p\n"),
00773 ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:")
00774 ACE_LIB_TEXT ("Read from pipe failed")));
00775
00776
00777
00778
00779 }
00780
00781
00782 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
00783 : aiocb_notify_pipe_manager_ (0),
00784 aiocb_list_ (0),
00785 result_list_ (0),
00786 aiocb_list_max_size_ (max_aio_operations),
00787 aiocb_list_cur_size_ (0),
00788 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00789 num_deferred_aiocb_ (0),
00790 num_started_aio_ (0)
00791 {
00792
00793 check_max_aio_num ();
00794
00795 this->create_result_aiocb_list ();
00796
00797 this->create_notify_manager ();
00798
00799
00800
00801 this->get_asynch_pseudo_task().start ();
00802
00803 }
00804
00805
00806 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
00807 ACE_POSIX_Proactor::Proactor_Type)
00808 : aiocb_notify_pipe_manager_ (0),
00809 aiocb_list_ (0),
00810 result_list_ (0),
00811 aiocb_list_max_size_ (max_aio_operations),
00812 aiocb_list_cur_size_ (0),
00813 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00814 num_deferred_aiocb_ (0),
00815 num_started_aio_ (0)
00816 {
00817
00818 this->check_max_aio_num ();
00819
00820 this->create_result_aiocb_list ();
00821
00822
00823
00824 }
00825
00826
00827 ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
00828 {
00829 this->close();
00830 }
00831
00832 ACE_POSIX_Proactor::Proactor_Type
00833 ACE_POSIX_AIOCB_Proactor::get_impl_type (void)
00834 {
00835 return PROACTOR_AIOCB;
00836 }
00837
00838
00839 int
00840 ACE_POSIX_AIOCB_Proactor::close (void)
00841 {
00842
00843 this->get_asynch_pseudo_task().stop ();
00844
00845 this->delete_notify_manager ();
00846
00847 this->clear_result_queue ();
00848
00849 return this->delete_result_aiocb_list ();
00850 }
00851
00852 void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
00853 {
00854 notify_pipe_read_handle_ = h;
00855 }
00856
00857 int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void)
00858 {
00859 if (aiocb_list_ != 0)
00860 return 0;
00861
00862 ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
00863
00864 ACE_NEW_RETURN (result_list_,
00865 ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
00866 -1);
00867
00868
00869 for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
00870 {
00871 aiocb_list_[ai] = 0;
00872 result_list_[ai] = 0;
00873 }
00874
00875 return 0;
00876 }
00877
00878 int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
00879 {
00880 if (aiocb_list_ == 0)
00881 return 0;
00882
00883 size_t ai;
00884
00885
00886
00887 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00888 if (this->aiocb_list_[ai] != 0)
00889 this->cancel_aiocb (result_list_[ai]);
00890
00891 int num_pending = 0;
00892
00893 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00894 {
00895 if (this->aiocb_list_[ai] == 0 )
00896 continue;
00897
00898
00899 int error_status = 0;
00900 size_t transfer_count = 0;
00901 int flg_completed = this->get_result_status (result_list_[ai],
00902 error_status,
00903 transfer_count);
00904
00905
00906 if (flg_completed == 0)
00907 {
00908 num_pending++;
00909 #if 0
00910 char * errtxt = ACE_OS::strerror (error_status);
00911 if (errtxt == 0)
00912 errtxt ="?????????";
00913
00914 char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
00915 "WRITE":"READ" ;
00916
00917
00918 ACE_ERROR ((LM_ERROR,
00919 ACE_LIB_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
00920 ai,
00921 op,
00922 error_status,
00923 transfer_count,
00924 errtxt));
00925 #endif
00926 }
00927 else
00928 {
00929 delete this->result_list_[ai];
00930 this->result_list_[ai] = 0;
00931 this->aiocb_list_[ai] = 0;
00932 }
00933 }
00934
00935
00936
00937
00938
00939
00940 ACE_DEBUG
00941 ((LM_DEBUG,
00942 ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
00943 ACE_LIB_TEXT(" number pending AIO=%d\n"),
00944 num_pending));
00945
00946 delete [] this->aiocb_list_;
00947 this->aiocb_list_ = 0;
00948
00949 delete [] this->result_list_;
00950 this->result_list_ = 0;
00951
00952 return (num_pending == 0 ? 0 : -1);
00953
00954 }
00955
00956 void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
00957 {
00958 long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
00959
00960
00961
00962
00963
00964 if (max_os_aio_num > 0 &&
00965 aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
00966 aiocb_list_max_size_ = max_os_aio_num;
00967
00968 #if defined (HPUX) || defined (__FreeBSD__)
00969
00970
00971
00972
00973 long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
00974 if (max_os_listio_num > 0
00975 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
00976 aiocb_list_max_size_ = max_os_listio_num;
00977 #endif
00978
00979
00980
00981
00982 if (aiocb_list_max_size_ <= 0
00983 || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE)
00984 aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
00985
00986
00987
00988 int max_num_files = ACE::max_handles ();
00989
00990 if (max_num_files > 0
00991 && aiocb_list_max_size_ > (unsigned long) max_num_files)
00992 {
00993 ACE::set_handle_limit (aiocb_list_max_size_);
00994
00995 max_num_files = ACE::max_handles ();
00996 }
00997
00998 if (max_num_files > 0
00999 && aiocb_list_max_size_ > (unsigned long) max_num_files)
01000 aiocb_list_max_size_ = (unsigned long) max_num_files;
01001
01002 ACE_DEBUG ((LM_DEBUG,
01003 "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
01004 aiocb_list_max_size_));
01005
01006 #if defined(__sgi)
01007
01008 ACE_DEBUG((LM_DEBUG,
01009 ACE_LIB_TEXT( "SGI IRIX specific: aio_init!\n")));
01010
01011
01012
01013
01014
01015
01016
01017
01018
01019
01020
01021 aioinit_t aioinit;
01022
01023 aioinit.aio_threads = 10;
01024 aioinit.aio_locks = 20;
01025
01026 aioinit.aio_num = aiocb_list_max_size_;
01027 aioinit.aio_usedba = 0;
01028 aioinit.aio_debug = 0;
01029 aioinit.aio_numusers = 100;
01030 aioinit.aio_reserved[0] = 0;
01031 aioinit.aio_reserved[1] = 0;
01032 aioinit.aio_reserved[2] = 0;
01033
01034 aio_sgi_init (&aioinit);
01035
01036 #endif
01037
01038 return;
01039 }
01040
01041 void
01042 ACE_POSIX_AIOCB_Proactor::create_notify_manager (void)
01043 {
01044
01045
01046
01047 if (aiocb_notify_pipe_manager_ == 0)
01048 ACE_NEW (aiocb_notify_pipe_manager_,
01049 ACE_AIOCB_Notify_Pipe_Manager (this));
01050 }
01051
01052 void
01053 ACE_POSIX_AIOCB_Proactor::delete_notify_manager (void)
01054 {
01055
01056
01057
01058 delete aiocb_notify_pipe_manager_;
01059 aiocb_notify_pipe_manager_ = 0;
01060 }
01061
01062 int
01063 ACE_POSIX_AIOCB_Proactor::handle_events (ACE_Time_Value &wait_time)
01064 {
01065
01066 ACE_Countdown_Time countdown (&wait_time);
01067 return this->handle_events_i (wait_time.msec ());
01068 }
01069
01070 int
01071 ACE_POSIX_AIOCB_Proactor::handle_events (void)
01072 {
01073 return this->handle_events_i (ACE_INFINITE);
01074 }
01075
01076 int
01077 ACE_POSIX_AIOCB_Proactor::notify_completion(int sig_num)
01078 {
01079 ACE_UNUSED_ARG (sig_num);
01080
01081 return this->aiocb_notify_pipe_manager_->notify ();
01082 }
01083
01084 int
01085 ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
01086 {
01087 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1));
01088
01089 int ret_val = this->putq_result (result);
01090
01091 return ret_val;
01092 }
01093
01094 int
01095 ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result)
01096 {
01097
01098
01099
01100 if (!result)
01101 return -1;
01102
01103 int sig_num = result->signal_number ();
01104 int ret_val = this->result_queue_.enqueue_tail (result);
01105
01106 if (ret_val == -1)
01107 ACE_ERROR_RETURN ((LM_ERROR,
01108 "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
01109 -1);
01110
01111 this->notify_completion (sig_num);
01112
01113 return 0;
01114 }
01115
01116 ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result (void)
01117 {
01118 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0));
01119
01120
01121 ACE_POSIX_Asynch_Result* result = 0;
01122
01123 if (this->result_queue_.dequeue_head (result) != 0)
01124 return 0;
01125
01126
01127
01128
01129
01130
01131
01132
01133 return result;
01134 }
01135
01136 int ACE_POSIX_AIOCB_Proactor::clear_result_queue (void)
01137 {
01138 int ret_val = 0;
01139 ACE_POSIX_Asynch_Result* result = 0;
01140
01141 while ((result = this->getq_result ()) != 0)
01142 {
01143 delete result;
01144 ret_val++;
01145 }
01146
01147 return ret_val;
01148 }
01149
01150 int ACE_POSIX_AIOCB_Proactor::process_result_queue (void)
01151 {
01152 int ret_val = 0;
01153 ACE_POSIX_Asynch_Result* result = 0;
01154
01155 while ((result = this->getq_result ()) != 0)
01156 {
01157 this->application_specific_code
01158 (result,
01159 result->bytes_transferred(),
01160 0,
01161 result->error());
01162
01163 ret_val++;
01164 }
01165
01166 return ret_val;
01167 }
01168
01169 int
01170 ACE_POSIX_AIOCB_Proactor::handle_events_i (u_long milli_seconds)
01171 {
01172 int result_suspend = 0;
01173 int retval= 0;
01174
01175 if (milli_seconds == ACE_INFINITE)
01176
01177 result_suspend = aio_suspend (aiocb_list_,
01178 aiocb_list_max_size_,
01179 0);
01180 else
01181 {
01182
01183 timespec timeout;
01184 timeout.tv_sec = milli_seconds / 1000;
01185 timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000000;
01186 result_suspend = aio_suspend (aiocb_list_,
01187 aiocb_list_max_size_,
01188 &timeout);
01189 }
01190
01191
01192 if (result_suspend == -1)
01193 {
01194 if (errno != EAGAIN &&
01195 errno != EINTR )
01196 ACE_ERROR ((LM_ERROR,
01197 "%N:%l:(%P | %t)::%p\n",
01198 "ACE_POSIX_AIOCB_Proactor::handle_events:"
01199 "aio_suspend failed\n"));
01200
01201
01202
01203 }
01204 else
01205 {
01206 size_t index = 0;
01207 size_t count = aiocb_list_max_size_;
01208 int error_status = 0;
01209 size_t transfer_count = 0;
01210
01211 for (;; retval++)
01212 {
01213 ACE_POSIX_Asynch_Result *asynch_result =
01214 find_completed_aio (error_status,
01215 transfer_count,
01216 index,
01217 count);
01218
01219 if (asynch_result == 0)
01220 break;
01221
01222
01223 this->application_specific_code (asynch_result,
01224 transfer_count,
01225 0,
01226 error_status);
01227 }
01228 }
01229
01230
01231 retval += this->process_result_queue ();
01232
01233 return retval > 0 ? 1 : 0;
01234 }
01235
01236 int
01237 ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result *asynch_result,
01238 int &error_status,
01239 size_t &transfer_count)
01240 {
01241 transfer_count = 0;
01242
01243
01244 error_status = aio_error (asynch_result);
01245 if (error_status == EINPROGRESS)
01246 return 0;
01247
01248 ssize_t op_return = aio_return (asynch_result);
01249 if (op_return > 0)
01250 transfer_count = static_cast<size_t> (op_return);
01251
01252 return 1;
01253 }
01254
01255 ACE_POSIX_Asynch_Result *
01256 ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
01257 size_t &transfer_count,
01258 size_t &index,
01259 size_t &count)
01260 {
01261
01262
01263
01264 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
01265
01266 ACE_POSIX_Asynch_Result *asynch_result = 0;
01267
01268 if (num_started_aio_ == 0)
01269 return 0;
01270
01271 for (; count > 0; index++ , count--)
01272 {
01273 if (index >= aiocb_list_max_size_)
01274 index = 0;
01275
01276 if (aiocb_list_[index] == 0)
01277 continue;
01278
01279 if (0 != this->get_result_status (result_list_[index],
01280 error_status,
01281 transfer_count))
01282 break;
01283
01284 }
01285
01286 if (count == 0)
01287 return 0;
01288 asynch_result = result_list_[index];
01289
01290 aiocb_list_[index] = 0;
01291 result_list_[index] = 0;
01292 aiocb_list_cur_size_--;
01293
01294 num_started_aio_--;
01295 index++;
01296 count--;
01297
01298 this->start_deferred_aio ();
01299
01300
01301
01302 return asynch_result;
01303 }
01304
01305
01306 int
01307 ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
01308 ACE_POSIX_Proactor::Opcode op)
01309 {
01310 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
01311
01312 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01313
01314 int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
01315
01316 if (result == 0)
01317 return ret_val;
01318
01319
01320 switch (op)
01321 {
01322 case ACE_POSIX_Proactor::ACE_OPCODE_READ:
01323 result->aio_lio_opcode = LIO_READ;
01324 break;
01325
01326 case ACE_POSIX_Proactor::ACE_OPCODE_WRITE:
01327 result->aio_lio_opcode = LIO_WRITE;
01328 break;
01329
01330 default:
01331 ACE_ERROR_RETURN ((LM_ERROR,
01332 "%N:%l:(%P | %t)::\n"
01333 "start_aio: Invalid operation code\n"),
01334 -1);
01335 }
01336
01337 if (ret_val != 0)
01338 {
01339 errno = EAGAIN;
01340 return -1;
01341 }
01342
01343
01344
01345 ssize_t slot = allocate_aio_slot (result);
01346
01347 if (slot < 0)
01348 return -1;
01349
01350 size_t index = static_cast<size_t> (slot);
01351
01352 result_list_[index] = result;
01353 aiocb_list_cur_size_++;
01354
01355 ret_val = start_aio_i (result);
01356 switch (ret_val)
01357 {
01358 case 0:
01359 aiocb_list_[index] = result;
01360 return 0;
01361
01362 case 1:
01363 num_deferred_aiocb_ ++;
01364 return 0;
01365
01366 default:
01367 break;
01368 }
01369
01370 result_list_[index] = 0;
01371 aiocb_list_cur_size_--;
01372 return -1;
01373 }
01374
01375 ssize_t
01376 ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
01377 {
01378 size_t i = 0;
01379
01380
01381
01382
01383 if (notify_pipe_read_handle_ == result->aio_fildes)
01384 {
01385 if (result_list_[i] != 0)
01386 {
01387 errno = EAGAIN;
01388 ACE_ERROR_RETURN ((LM_ERROR,
01389 "%N:%l:(%P | %t)::\n"
01390 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01391 "internal Proactor error 0\n"),
01392 -1);
01393 }
01394 }
01395 else
01396 {
01397 for (i= 1; i < this->aiocb_list_max_size_; i++)
01398 if (result_list_[i] == 0)
01399 break;
01400 }
01401
01402 if (i >= this->aiocb_list_max_size_)
01403 ACE_ERROR_RETURN ((LM_ERROR,
01404 "%N:%l:(%P | %t)::\n"
01405 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01406 "internal Proactor error 1\n"),
01407 -1);
01408
01409
01410 result->aio_sigevent.sigev_notify = SIGEV_NONE;
01411
01412 return static_cast<ssize_t> (i);
01413 }
01414
01415
01416
01417
01418
01419
01420 int
01421 ACE_POSIX_AIOCB_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
01422 {
01423 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
01424
01425 int ret_val;
01426 const ACE_TCHAR *ptype;
01427
01428
01429
01430 switch (result->aio_lio_opcode )
01431 {
01432 case LIO_READ :
01433 ptype = ACE_LIB_TEXT ("read ");
01434 ret_val = aio_read (result);
01435 break;
01436 case LIO_WRITE :
01437 ptype = ACE_LIB_TEXT ("write");
01438 ret_val = aio_write (result);
01439 break;
01440 default:
01441 ptype = ACE_LIB_TEXT ("?????");
01442 ret_val = -1;
01443 break;
01444 }
01445
01446 if (ret_val == 0)
01447 this->num_started_aio_++;
01448 else
01449 {
01450 if (errno == EAGAIN || errno == ENOMEM)
01451 ret_val = 1;
01452 else
01453 ACE_ERROR ((LM_ERROR,
01454 ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
01455 ptype,
01456 ACE_LIB_TEXT ("queueing failed\n")));
01457 }
01458
01459 return ret_val;
01460 }
01461
01462
01463 int
01464 ACE_POSIX_AIOCB_Proactor::start_deferred_aio ()
01465 {
01466 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
01467
01468
01469
01470
01471
01472
01473
01474
01475
01476 if (num_deferred_aiocb_ == 0)
01477 return 0;
01478
01479 size_t i = 0;
01480
01481 for (i= 0; i < this->aiocb_list_max_size_; i++)
01482 if (result_list_[i] !=0
01483 && aiocb_list_[i] ==0)
01484 break;
01485
01486 if (i >= this->aiocb_list_max_size_)
01487 ACE_ERROR_RETURN ((LM_ERROR,
01488 "%N:%l:(%P | %t)::\n"
01489 "start_deferred_aio:"
01490 "internal Proactor error 3\n"),
01491 -1);
01492
01493 ACE_POSIX_Asynch_Result *result = result_list_[i];
01494
01495 int ret_val = start_aio_i (result);
01496
01497 switch (ret_val)
01498 {
01499 case 0 :
01500 aiocb_list_[i] = result;
01501 num_deferred_aiocb_ --;
01502 return 0;
01503
01504 case 1 :
01505 return 0;
01506
01507 default :
01508 break;
01509 }
01510
01511
01512
01513 result_list_[i] = 0;
01514 aiocb_list_cur_size_--;
01515
01516 num_deferred_aiocb_ --;
01517
01518 result->set_error (errno);
01519 result->set_bytes_transferred (0);
01520 this->putq_result (result);
01521
01522 return -1;
01523 }
01524
01525 int
01526 ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle)
01527 {
01528
01529
01530
01531
01532
01533
01534
01535
01536
01537
01538
01539
01540
01541 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
01542
01543 int num_total = 0;
01544 int num_cancelled = 0;
01545
01546 {
01547 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01548
01549 size_t ai = 0;
01550
01551 for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
01552 {
01553 if (this->result_list_[ai] == 0)
01554 continue;
01555
01556 if (this->result_list_[ai]->aio_fildes != handle)
01557 continue;
01558
01559 num_total++;
01560
01561 ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai];
01562
01563 if (this->aiocb_list_[ai] == 0)
01564 {
01565 num_cancelled++;
01566 this->num_deferred_aiocb_--;
01567
01568 this->aiocb_list_[ai] = 0;
01569 this->result_list_[ai] = 0;
01570 this->aiocb_list_cur_size_--;
01571
01572 asynch_result->set_error (ECANCELED);
01573 asynch_result->set_bytes_transferred (0);
01574 this->putq_result (asynch_result);
01575
01576 }
01577 else
01578 {
01579 int rc_cancel = this->cancel_aiocb (asynch_result);
01580
01581 if (rc_cancel == 0)
01582 num_cancelled++;
01583 }
01584 }
01585
01586 }
01587
01588 if (num_total == 0)
01589 return 1;
01590
01591 if (num_cancelled == num_total)
01592 return 0;
01593
01594 return 2;
01595 }
01596
01597 int
01598 ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result)
01599 {
01600
01601
01602 int rc = ::aio_cancel (0, result);
01603
01604
01605 if (rc == AIO_CANCELED)
01606 return 0;
01607 else if (rc == AIO_ALLDONE)
01608 return 1;
01609 else
01610 return 2;
01611 }
01612
01613
01614
01615
01616 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
01617
01618 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations)
01619 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
01620 ACE_POSIX_Proactor::PROACTOR_SIG)
01621 {
01622
01623
01624
01625
01626
01627 ACE_OS::sigemptyset (&this->RT_completion_signals_);
01628
01629
01630 if (ACE_OS::sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1)
01631 ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("ACE_POSIX_SIG_Proactor: %p\n"),
01632 ACE_LIB_TEXT ("sigaddset")));
01633 this->block_signals ();
01634
01635 this->setup_signal_handler (ACE_SIGRTMIN);
01636
01637
01638
01639
01640
01641 this->get_asynch_pseudo_task().start ();
01642 return;
01643 }
01644
01645 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set,
01646 size_t max_aio_operations)
01647 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
01648 ACE_POSIX_Proactor::PROACTOR_SIG)
01649 {
01650
01651
01652
01653
01654
01655
01656 if (sigemptyset (&this->RT_completion_signals_) == -1)
01657 ACE_ERROR ((LM_ERROR,
01658 "Error:(%P | %t):%p\n",
01659 "sigemptyset failed"));
01660
01661
01662
01663
01664 int member = 0;
01665 for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
01666 {
01667 member = sigismember (&signal_set,
01668 si);
01669 if (member == -1)
01670 ACE_ERROR ((LM_ERROR,
01671 "%N:%l:(%P | %t)::%p\n",
01672 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
01673 "sigismember failed"));
01674 else if (member == 1)
01675 {
01676 sigaddset (&this->RT_completion_signals_, si);
01677 this->setup_signal_handler (si);
01678 }
01679 }
01680
01681
01682 this->block_signals ();
01683
01684
01685
01686
01687
01688 this->get_asynch_pseudo_task().start ();
01689 return;
01690 }
01691
01692 ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
01693 {
01694 this->close ();
01695
01696
01697 }
01698
01699 ACE_POSIX_Proactor::Proactor_Type
01700 ACE_POSIX_SIG_Proactor::get_impl_type (void)
01701 {
01702 return PROACTOR_SIG;
01703 }
01704
01705 int
01706 ACE_POSIX_SIG_Proactor::handle_events (ACE_Time_Value &wait_time)
01707 {
01708
01709 ACE_Countdown_Time countdown (&wait_time);
01710 return this->handle_events_i (&wait_time);
01711 }
01712
01713 int
01714 ACE_POSIX_SIG_Proactor::handle_events (void)
01715 {
01716 return this->handle_events_i (0);
01717 }
01718
01719 int
01720 ACE_POSIX_SIG_Proactor::notify_completion (int sig_num)
01721 {
01722
01723 pid_t pid = ACE_OS::getpid ();
01724 if (pid == (pid_t) -1)
01725 ACE_ERROR_RETURN ((LM_ERROR,
01726 "Error:%N:%l(%P | %t):%p",
01727 "<getpid> failed"),
01728 -1);
01729
01730
01731 sigval value;
01732 #if defined (__FreeBSD__)
01733 value.sigval_int = -1;
01734 #else
01735 value.sival_int = -1;
01736 #endif
01737
01738
01739 if (sigqueue (pid, sig_num, value) == 0)
01740 return 0;
01741
01742 if (errno != EAGAIN)
01743 ACE_ERROR_RETURN ((LM_ERROR,
01744 "Error:%N:%l:(%P | %t):%p\n",
01745 "<sigqueue> failed"),
01746 -1);
01747 return -1;
01748 }
01749
01750 ACE_Asynch_Result_Impl *
01751 ACE_POSIX_SIG_Proactor::create_asynch_timer
01752 (const ACE_Handler::Proxy_Ptr &handler_proxy,
01753 const void *act,
01754 const ACE_Time_Value &tv,
01755 ACE_HANDLE event,
01756 int priority,
01757 int signal_number)
01758 {
01759 int is_member = 0;
01760
01761
01762 if (signal_number == -1)
01763 {
01764 int si;
01765 for (si = ACE_SIGRTMAX;
01766 (is_member == 0) && (si >= ACE_SIGRTMIN);
01767 si--)
01768 {
01769 is_member = sigismember (&this->RT_completion_signals_,
01770 si);
01771 if (is_member == -1)
01772 ACE_ERROR_RETURN ((LM_ERROR,
01773 "%N:%l:(%P | %t)::%s\n",
01774 "ACE_POSIX_SIG_Proactor::create_asynch_timer:"
01775 "sigismember failed"),
01776 0);
01777 }
01778
01779 if (is_member == 0)
01780 ACE_ERROR_RETURN ((LM_ERROR,
01781 "Error:%N:%l:(%P | %t)::%s\n",
01782 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
01783 "Signal mask set empty"),
01784 0);
01785 else
01786
01787 signal_number = si + 1;
01788 }
01789
01790 ACE_Asynch_Result_Impl *implementation;
01791 ACE_NEW_RETURN (implementation,
01792 ACE_POSIX_Asynch_Timer (handler_proxy,
01793 act,
01794 tv,
01795 event,
01796 priority,
01797 signal_number),
01798 0);
01799 return implementation;
01800 }
01801
01802 #if 0
01803 static void
01804 sig_handler (int sig_num, siginfo_t *, ucontext_t *)
01805 {
01806
01807 ACE_DEBUG ((LM_DEBUG,
01808 "%N:%l:(%P | %t)::sig_handler received signal: %d\n",
01809 sig_num));
01810 }
01811 #endif
01812
01813 int
01814 ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const
01815 {
01816
01817
01818
01819
01820
01821
01822
01823
01824
01825
01826 #if 0
01827 struct sigaction reaction;
01828 sigemptyset (&reaction.sa_mask);
01829 reaction.sa_flags = SA_SIGINFO;
01830 reaction.sa_sigaction = ACE_SIGNAL_C_FUNC (sig_handler);
01831 int sigaction_return = ACE_OS::sigaction (signal_number,
01832 &reaction,
01833 0);
01834 if (sigaction_return == -1)
01835 ACE_ERROR_RETURN ((LM_ERROR,
01836 "Error:%p\n",
01837 "Proactor couldnt do sigaction for the RT SIGNAL"),
01838 -1);
01839 #else
01840 ACE_UNUSED_ARG(signal_number);
01841 #endif
01842 return 0;
01843 }
01844
01845
01846 int
01847 ACE_POSIX_SIG_Proactor::block_signals (void) const
01848 {
01849 return ACE_OS::pthread_sigmask (SIG_BLOCK, &this->RT_completion_signals_, 0);
01850 }
01851
01852 ssize_t
01853 ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
01854 {
01855 size_t i = 0;
01856
01857
01858 for (i = 0; i < this->aiocb_list_max_size_; i++)
01859 if (result_list_[i] == 0)
01860 break;
01861
01862 if (i >= this->aiocb_list_max_size_)
01863 ACE_ERROR_RETURN ((LM_ERROR,
01864 "%N:%l:(%P | %t)::\n"
01865 "ACE_POSIX_SIG_Proactor::allocate_aio_slot "
01866 "internal Proactor error 1\n"),
01867 -1);
01868
01869
01870
01871 result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
01872 result->aio_sigevent.sigev_signo = result->signal_number ();
01873 #if defined (__FreeBSD__)
01874 result->aio_sigevent.sigev_value.sigval_int = static_cast<int> (i);
01875 #else
01876 result->aio_sigevent.sigev_value.sival_int = static_cast<int> (i);
01877 #endif
01878
01879 return static_cast<ssize_t> (i);
01880 }
01881
01882 int
01883 ACE_POSIX_SIG_Proactor::handle_events_i (const ACE_Time_Value *timeout)
01884 {
01885 int result_sigwait = 0;
01886 siginfo_t sig_info;
01887
01888 do
01889 {
01890
01891 if (timeout == 0)
01892 {
01893 result_sigwait = ACE_OS::sigwaitinfo (&this->RT_completion_signals_,
01894 &sig_info);
01895 }
01896 else
01897 {
01898 result_sigwait = ACE_OS::sigtimedwait (&this->RT_completion_signals_,
01899 &sig_info,
01900 timeout);
01901 if (result_sigwait == -1 && errno == EAGAIN)
01902 return 0;
01903 }
01904 }
01905 while (result_sigwait == -1 && errno == EINTR);
01906
01907 if (result_sigwait == -1)
01908 return -1;
01909
01910
01911
01912
01913 int flg_aio = 0;
01914
01915 size_t index = 0;
01916 size_t count = 1;
01917 int error_status = 0;
01918 size_t transfer_count = 0;
01919
01920 if (sig_info.si_code == SI_ASYNCIO || this->os_id_ == ACE_OS_SUN_56)
01921 {
01922 flg_aio = 1;
01923
01924
01925 #if defined (__FreeBSD__)
01926 index = static_cast<size_t> (sig_info.si_value.sigval_int);
01927 #else
01928 index = static_cast<size_t> (sig_info.si_value.sival_int);
01929 #endif
01930
01931
01932
01933
01934 if (os_id_ == ACE_OS_SUN_56)
01935 {
01936
01937
01938
01939
01940
01941
01942
01943 count = aiocb_list_max_size_;
01944 }
01945 }
01946 else if (sig_info.si_code != SI_QUEUE)
01947 {
01948
01949
01950
01951
01952 ACE_ERROR ((LM_DEBUG,
01953 ACE_LIB_TEXT ("%N:%l:(%P | %t): ")
01954 ACE_LIB_TEXT ("ACE_POSIX_SIG_Proactor::handle_events: ")
01955 ACE_LIB_TEXT ("Unexpected signal code (%d) returned ")
01956 ACE_LIB_TEXT ("from sigwait; expecting %d\n"),
01957 result_sigwait, sig_info.si_code));
01958 flg_aio = 1;
01959 }
01960
01961 int ret_aio = 0;
01962 int ret_que = 0;
01963
01964 if (flg_aio)
01965 for (;; ret_aio++)
01966 {
01967 ACE_POSIX_Asynch_Result *asynch_result =
01968 find_completed_aio (error_status,
01969 transfer_count,
01970 index,
01971 count);
01972
01973 if (asynch_result == 0)
01974 break;
01975
01976
01977 this->application_specific_code (asynch_result,
01978 transfer_count,
01979 0,
01980 error_status);
01981 }
01982
01983
01984 ret_que = this->process_result_queue ();
01985
01986
01987
01988 #if 0
01989 ACE_DEBUG ((LM_DEBUG,
01990 "(%t) NumAIO=%d NumQueue=%d\n",
01991 ret_aio, ret_que));
01992 #endif
01993
01994 return ret_aio + ret_que > 0 ? 1 : 0;
01995 }
01996
01997 #endif
01998
01999
02000
02001 ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer
02002 (const ACE_Handler::Proxy_Ptr &handler_proxy,
02003 const void *act,
02004 const ACE_Time_Value &tv,
02005 ACE_HANDLE event,
02006 int priority,
02007 int signal_number)
02008 : ACE_POSIX_Asynch_Result
02009 (handler_proxy, act, event, 0, 0, priority, signal_number),
02010 time_ (tv)
02011 {
02012 }
02013
02014 void
02015 ACE_POSIX_Asynch_Timer::complete (size_t ,
02016 int ,
02017 const void * ,
02018 u_long )
02019 {
02020 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
02021 if (handler != 0)
02022 handler->handle_time_out (this->time_, this->act ());
02023 }
02024
02025
02026
02027
02028 ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion
02029 (const ACE_Handler::Proxy_Ptr &handler_proxy,
02030 const void *act,
02031 ACE_HANDLE event,
02032 int priority,
02033 int signal_number)
02034 : ACE_Asynch_Result_Impl (),
02035 ACE_POSIX_Asynch_Result (handler_proxy,
02036 act,
02037 event,
02038 0,
02039 0,
02040 priority,
02041 signal_number)
02042 {
02043 }
02044
02045 ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void)
02046 {
02047 }
02048
02049 void
02050 ACE_POSIX_Wakeup_Completion::complete (size_t ,
02051 int ,
02052 const void * ,
02053 u_long )
02054 {
02055
02056 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
02057 if (handler != 0)
02058 handler->handle_wakeup ();
02059 }
02060
02061 ACE_END_VERSIONED_NAMESPACE_DECL
02062
02063 #endif