00001
00002
00003 #include "ace/POSIX_Proactor.h"
00004
00005 #if defined (ACE_HAS_AIO_CALLS)
00006
00007 #if !defined (__ACE_INLINE__)
00008 #include "ace/POSIX_Proactor.inl"
00009 #endif
00010
00011 # if defined (ACE_HAS_SYSINFO)
00012 # include <sys/systeminfo.h>
00013 # endif
00014
00015 #include "ace/ACE.h"
00016 #include "ace/Flag_Manip.h"
00017 #include "ace/Task_T.h"
00018 #include "ace/Log_Msg.h"
00019 #include "ace/Object_Manager.h"
00020 #include "ace/OS_NS_sys_socket.h"
00021 #include "ace/OS_NS_signal.h"
00022 #include "ace/OS_NS_unistd.h"
00023
00024 #if defined (sun)
00025 # include "ace/OS_NS_strings.h"
00026 #endif
00027
00028
00029
00030 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00031
00032
00033
00034
00035
00036
00037
00038
00039 class ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result
00040 {
00041 public:
00042
00043 ACE_POSIX_Wakeup_Completion (const ACE_Handler::Proxy_Ptr &handler_proxy,
00044 const void *act = 0,
00045 ACE_HANDLE event = ACE_INVALID_HANDLE,
00046 int priority = 0,
00047 int signal_number = ACE_SIGRTMIN);
00048
00049
00050 virtual ~ACE_POSIX_Wakeup_Completion (void);
00051
00052
00053
00054 virtual void complete (size_t bytes_transferred = 0,
00055 int success = 1,
00056 const void *completion_key = 0,
00057 u_long error = 0);
00058 };
00059
00060
00061 ACE_POSIX_Proactor::ACE_POSIX_Proactor (void)
00062 : os_id_ (ACE_OS_UNDEFINED)
00063 {
00064 #if defined(sun)
00065
00066 os_id_ = ACE_OS_SUN;
00067
00068 char Buf [32];
00069
00070 ::memset(Buf,0,sizeof(Buf));
00071
00072 ACE_OS::sysinfo (SI_RELEASE , Buf, sizeof(Buf)-1);
00073
00074 if (ACE_OS::strcasecmp (Buf , "5.6") == 0)
00075 os_id_ = ACE_OS_SUN_56;
00076 else if (ACE_OS::strcasecmp (Buf , "5.7") == 0)
00077 os_id_ = ACE_OS_SUN_57;
00078 else if (ACE_OS::strcasecmp (Buf , "5.8") == 0)
00079 os_id_ = ACE_OS_SUN_58;
00080
00081 #elif defined(HPUX)
00082
00083 os_id_ = ACE_OS_HPUX;
00084
00085 #elif defined(__sgi)
00086
00087 os_id_ = ACE_OS_IRIX;
00088
00089 #elif defined(__OpenBSD)
00090
00091 os_id_ = ACE_OS_OPENBSD;
00092
00093
00094
00095
00096
00097 #endif
00098 }
00099
00100 ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void)
00101 {
00102 this->close ();
00103 }
00104
00105 int
00106 ACE_POSIX_Proactor::close (void)
00107 {
00108 return 0;
00109 }
00110
00111 int
00112 ACE_POSIX_Proactor::register_handle (ACE_HANDLE handle,
00113 const void *completion_key)
00114 {
00115 ACE_UNUSED_ARG (handle);
00116 ACE_UNUSED_ARG (completion_key);
00117 return 0;
00118 }
00119
00120 int
00121 ACE_POSIX_Proactor::wake_up_dispatch_threads (void)
00122 {
00123 return 0;
00124 }
00125
00126 int
00127 ACE_POSIX_Proactor::close_dispatch_threads (int)
00128 {
00129 return 0;
00130 }
00131
00132 size_t
00133 ACE_POSIX_Proactor::number_of_threads (void) const
00134 {
00135
00136 ACE_NOTSUP_RETURN (0);
00137 }
00138
00139 void
00140 ACE_POSIX_Proactor::number_of_threads (size_t threads)
00141 {
00142
00143 ACE_UNUSED_ARG (threads);
00144 }
00145
00146 ACE_HANDLE
00147 ACE_POSIX_Proactor::get_handle (void) const
00148 {
00149 return ACE_INVALID_HANDLE;
00150 }
00151
00152 ACE_Asynch_Read_Stream_Impl *
00153 ACE_POSIX_Proactor::create_asynch_read_stream (void)
00154 {
00155 ACE_Asynch_Read_Stream_Impl *implementation = 0;
00156 ACE_NEW_RETURN (implementation,
00157 ACE_POSIX_Asynch_Read_Stream (this),
00158 0);
00159 return implementation;
00160 }
00161
00162 ACE_Asynch_Read_Stream_Result_Impl *
00163 ACE_POSIX_Proactor::create_asynch_read_stream_result
00164 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00165 ACE_HANDLE handle,
00166 ACE_Message_Block &message_block,
00167 size_t bytes_to_read,
00168 const void* act,
00169 ACE_HANDLE event,
00170 int priority,
00171 int signal_number)
00172 {
00173 ACE_Asynch_Read_Stream_Result_Impl *implementation;
00174 ACE_NEW_RETURN (implementation,
00175 ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy,
00176 handle,
00177 message_block,
00178 bytes_to_read,
00179 act,
00180 event,
00181 priority,
00182 signal_number),
00183 0);
00184 return implementation;
00185 }
00186
00187
00188 ACE_Asynch_Write_Stream_Impl *
00189 ACE_POSIX_Proactor::create_asynch_write_stream (void)
00190 {
00191 ACE_Asynch_Write_Stream_Impl *implementation = 0;
00192 ACE_NEW_RETURN (implementation,
00193 ACE_POSIX_Asynch_Write_Stream (this),
00194 0);
00195 return implementation;
00196 }
00197
00198 ACE_Asynch_Write_Stream_Result_Impl *
00199 ACE_POSIX_Proactor::create_asynch_write_stream_result
00200 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00201 ACE_HANDLE handle,
00202 ACE_Message_Block &message_block,
00203 size_t bytes_to_write,
00204 const void* act,
00205 ACE_HANDLE event,
00206 int priority,
00207 int signal_number)
00208 {
00209 ACE_Asynch_Write_Stream_Result_Impl *implementation;
00210 ACE_NEW_RETURN (implementation,
00211 ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy,
00212 handle,
00213 message_block,
00214 bytes_to_write,
00215 act,
00216 event,
00217 priority,
00218 signal_number),
00219 0);
00220 return implementation;
00221 }
00222
00223
00224 ACE_Asynch_Read_File_Impl *
00225 ACE_POSIX_Proactor::create_asynch_read_file (void)
00226 {
00227 ACE_Asynch_Read_File_Impl *implementation = 0;
00228 ACE_NEW_RETURN (implementation,
00229 ACE_POSIX_Asynch_Read_File (this),
00230 0);
00231 return implementation;
00232 }
00233
00234 ACE_Asynch_Read_File_Result_Impl *
00235 ACE_POSIX_Proactor::create_asynch_read_file_result
00236 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00237 ACE_HANDLE handle,
00238 ACE_Message_Block &message_block,
00239 size_t bytes_to_read,
00240 const void* act,
00241 u_long offset,
00242 u_long offset_high,
00243 ACE_HANDLE event,
00244 int priority,
00245 int signal_number)
00246 {
00247 ACE_Asynch_Read_File_Result_Impl *implementation;
00248 ACE_NEW_RETURN (implementation,
00249 ACE_POSIX_Asynch_Read_File_Result (handler_proxy,
00250 handle,
00251 message_block,
00252 bytes_to_read,
00253 act,
00254 offset,
00255 offset_high,
00256 event,
00257 priority,
00258 signal_number),
00259 0);
00260 return implementation;
00261 }
00262
00263
00264 ACE_Asynch_Write_File_Impl *
00265 ACE_POSIX_Proactor::create_asynch_write_file (void)
00266 {
00267 ACE_Asynch_Write_File_Impl *implementation = 0;
00268 ACE_NEW_RETURN (implementation,
00269 ACE_POSIX_Asynch_Write_File (this),
00270 0);
00271 return implementation;
00272 }
00273
00274 ACE_Asynch_Write_File_Result_Impl *
00275 ACE_POSIX_Proactor::create_asynch_write_file_result
00276 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00277 ACE_HANDLE handle,
00278 ACE_Message_Block &message_block,
00279 size_t bytes_to_write,
00280 const void* act,
00281 u_long offset,
00282 u_long offset_high,
00283 ACE_HANDLE event,
00284 int priority,
00285 int signal_number)
00286 {
00287 ACE_Asynch_Write_File_Result_Impl *implementation;
00288 ACE_NEW_RETURN (implementation,
00289 ACE_POSIX_Asynch_Write_File_Result (handler_proxy,
00290 handle,
00291 message_block,
00292 bytes_to_write,
00293 act,
00294 offset,
00295 offset_high,
00296 event,
00297 priority,
00298 signal_number),
00299 0);
00300 return implementation;
00301 }
00302
00303
00304 ACE_Asynch_Read_Dgram_Impl *
00305 ACE_POSIX_Proactor::create_asynch_read_dgram (void)
00306 {
00307 ACE_Asynch_Read_Dgram_Impl *implementation = 0;
00308 ACE_NEW_RETURN (implementation,
00309 ACE_POSIX_Asynch_Read_Dgram (this),
00310 0);
00311 return implementation;
00312 }
00313
00314 ACE_Asynch_Read_Dgram_Result_Impl *
00315 ACE_POSIX_Proactor::create_asynch_read_dgram_result
00316 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00317 ACE_HANDLE handle,
00318 ACE_Message_Block *message_block,
00319 size_t bytes_to_read,
00320 int flags,
00321 int protocol_family,
00322 const void* act,
00323 ACE_HANDLE event ,
00324 int priority ,
00325 int signal_number)
00326 {
00327 ACE_Asynch_Read_Dgram_Result_Impl *implementation=0;
00328 ACE_NEW_RETURN (implementation,
00329 ACE_POSIX_Asynch_Read_Dgram_Result(handler_proxy,
00330 handle,
00331 message_block,
00332 bytes_to_read,
00333 flags,
00334 protocol_family,
00335 act,
00336 event,
00337 priority,
00338 signal_number),
00339 0);
00340
00341 return implementation;
00342 }
00343
00344
00345 ACE_Asynch_Write_Dgram_Impl *
00346 ACE_POSIX_Proactor::create_asynch_write_dgram (void)
00347 {
00348 ACE_Asynch_Write_Dgram_Impl *implementation = 0;
00349 ACE_NEW_RETURN (implementation,
00350 ACE_POSIX_Asynch_Write_Dgram (this),
00351 0);
00352
00353 return implementation;
00354 }
00355
00356 ACE_Asynch_Write_Dgram_Result_Impl *
00357 ACE_POSIX_Proactor::create_asynch_write_dgram_result
00358 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00359 ACE_HANDLE handle,
00360 ACE_Message_Block *message_block,
00361 size_t bytes_to_write,
00362 int flags,
00363 const void* act,
00364 ACE_HANDLE event,
00365 int priority ,
00366 int signal_number)
00367 {
00368 ACE_Asynch_Write_Dgram_Result_Impl *implementation=0;
00369 ACE_NEW_RETURN (implementation,
00370 ACE_POSIX_Asynch_Write_Dgram_Result(handler_proxy,
00371 handle,
00372 message_block,
00373 bytes_to_write,
00374 flags,
00375 act,
00376 event,
00377 priority,
00378 signal_number),
00379 0);
00380
00381 return implementation;
00382 }
00383
00384
00385 ACE_Asynch_Accept_Impl *
00386 ACE_POSIX_Proactor::create_asynch_accept (void)
00387 {
00388 ACE_Asynch_Accept_Impl *implementation = 0;
00389 ACE_NEW_RETURN (implementation,
00390 ACE_POSIX_Asynch_Accept (this),
00391 0);
00392
00393 return implementation;
00394 }
00395
00396 ACE_Asynch_Accept_Result_Impl *
00397 ACE_POSIX_Proactor::create_asynch_accept_result
00398 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00399 ACE_HANDLE listen_handle,
00400 ACE_HANDLE accept_handle,
00401 ACE_Message_Block &message_block,
00402 size_t bytes_to_read,
00403 const void* act,
00404 ACE_HANDLE event,
00405 int priority,
00406 int signal_number)
00407 {
00408 ACE_Asynch_Accept_Result_Impl *implementation;
00409 ACE_NEW_RETURN (implementation,
00410 ACE_POSIX_Asynch_Accept_Result (handler_proxy,
00411 listen_handle,
00412 accept_handle,
00413 message_block,
00414 bytes_to_read,
00415 act,
00416 event,
00417 priority,
00418 signal_number),
00419 0);
00420 return implementation;
00421 }
00422
00423
00424 ACE_Asynch_Connect_Impl *
00425 ACE_POSIX_Proactor::create_asynch_connect (void)
00426 {
00427 ACE_Asynch_Connect_Impl *implementation = 0;
00428 ACE_NEW_RETURN (implementation,
00429 ACE_POSIX_Asynch_Connect (this),
00430 0);
00431
00432 return implementation;
00433 }
00434
00435 ACE_Asynch_Connect_Result_Impl *
00436 ACE_POSIX_Proactor::create_asynch_connect_result
00437 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00438 ACE_HANDLE connect_handle,
00439 const void* act,
00440 ACE_HANDLE event,
00441 int priority,
00442 int signal_number)
00443 {
00444 ACE_Asynch_Connect_Result_Impl *implementation;
00445 ACE_NEW_RETURN (implementation,
00446 ACE_POSIX_Asynch_Connect_Result (handler_proxy,
00447 connect_handle,
00448 act,
00449 event,
00450 priority,
00451 signal_number),
00452 0);
00453 return implementation;
00454 }
00455
00456
00457 ACE_Asynch_Transmit_File_Impl *
00458 ACE_POSIX_Proactor::create_asynch_transmit_file (void)
00459 {
00460 ACE_Asynch_Transmit_File_Impl *implementation = 0;
00461 ACE_NEW_RETURN (implementation,
00462 ACE_POSIX_Asynch_Transmit_File (this),
00463 0);
00464 return implementation;
00465 }
00466
00467 ACE_Asynch_Transmit_File_Result_Impl *
00468 ACE_POSIX_Proactor::create_asynch_transmit_file_result
00469 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00470 ACE_HANDLE socket,
00471 ACE_HANDLE file,
00472 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
00473 size_t bytes_to_write,
00474 u_long offset,
00475 u_long offset_high,
00476 size_t bytes_per_send,
00477 u_long flags,
00478 const void *act,
00479 ACE_HANDLE event,
00480 int priority,
00481 int signal_number)
00482 {
00483 ACE_Asynch_Transmit_File_Result_Impl *implementation;
00484 ACE_NEW_RETURN (implementation,
00485 ACE_POSIX_Asynch_Transmit_File_Result (handler_proxy,
00486 socket,
00487 file,
00488 header_and_trailer,
00489 bytes_to_write,
00490 offset,
00491 offset_high,
00492 bytes_per_send,
00493 flags,
00494 act,
00495 event,
00496 priority,
00497 signal_number),
00498 0);
00499 return implementation;
00500 }
00501
00502 ACE_Asynch_Result_Impl *
00503 ACE_POSIX_Proactor::create_asynch_timer
00504 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00505 const void *act,
00506 const ACE_Time_Value &tv,
00507 ACE_HANDLE event,
00508 int priority,
00509 int signal_number)
00510 {
00511 ACE_POSIX_Asynch_Timer *implementation;
00512 ACE_NEW_RETURN (implementation,
00513 ACE_POSIX_Asynch_Timer (handler_proxy,
00514 act,
00515 tv,
00516 event,
00517 priority,
00518 signal_number),
00519 0);
00520 return implementation;
00521 }
00522
00523 #if 0
00524 int
00525 ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
00526 {
00527
00528
00529
00530 ACE_Time_Value timeout (0, 0);
00531 int result = 0;
00532
00533 for (;;)
00534 {
00535 result = this->handle_events (timeout);
00536 if (result != 0 || errno == ETIME)
00537 break;
00538 }
00539
00540
00541
00542 return result == -1 ? -1 : 0;
00543 }
00544
00545 int
00546 ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle,
00547 ACE_Reactor_Mask close_mask)
00548 {
00549 ACE_UNUSED_ARG (close_mask);
00550 ACE_UNUSED_ARG (handle);
00551
00552 return this->close ();
00553 }
00554 #endif
00555
00556 void
00557 ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
00558 size_t bytes_transferred,
00559 const void *,
00560 u_long error)
00561 {
00562 ACE_SEH_TRY
00563 {
00564
00565 asynch_result->complete (bytes_transferred,
00566 error ? 0 : 1,
00567 0,
00568 error);
00569 }
00570 ACE_SEH_FINALLY
00571 {
00572
00573 delete asynch_result;
00574 }
00575 }
00576
00577 int
00578 ACE_POSIX_Proactor::post_wakeup_completions (int how_many)
00579 {
00580 ACE_POSIX_Wakeup_Completion *wakeup_completion = 0;
00581
00582 for (int ci = 0; ci < how_many; ci++)
00583 {
00584 ACE_NEW_RETURN
00585 (wakeup_completion,
00586 ACE_POSIX_Wakeup_Completion (this->wakeup_handler_.proxy ()),
00587 -1);
00588 if (this->post_completion (wakeup_completion) == -1)
00589 return -1;
00590 }
00591
00592 return 0;
00593 }
00594
00595 ACE_POSIX_Proactor::Proactor_Type
00596 ACE_POSIX_Proactor::get_impl_type (void)
00597 {
00598 return PROACTOR_POSIX;
00599 }
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629 class ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
00630 {
00631 public:
00632
00633
00634 ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
00635
00636
00637 virtual ~ACE_AIOCB_Notify_Pipe_Manager (void);
00638
00639
00640 int notify ();
00641
00642
00643
00644 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
00645
00646 private:
00647
00648 ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_;
00649
00650
00651 ACE_Message_Block message_block_;
00652
00653
00654
00655 ACE_Pipe pipe_;
00656
00657
00658 ACE_POSIX_Asynch_Read_Stream read_stream_;
00659
00660
00661 ACE_AIOCB_Notify_Pipe_Manager (void);
00662 };
00663
00664 ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
00665 : posix_aiocb_proactor_ (posix_aiocb_proactor),
00666 message_block_ (sizeof (2)),
00667 read_stream_ (posix_aiocb_proactor)
00668 {
00669
00670 this->pipe_.open ();
00671
00672
00673 ACE::set_flags (this->pipe_.write_handle (), ACE_NONBLOCK);
00674
00675
00676 ACE::clr_flags (this->pipe_.read_handle (), ACE_NONBLOCK);
00677
00678
00679 posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ());
00680
00681
00682 if (this->read_stream_.open (this->proxy (),
00683 this->pipe_.read_handle (),
00684 0,
00685 0)
00686 == -1)
00687 ACE_ERROR ((LM_ERROR,
00688 "%N:%l:%p\n",
00689 "ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:"
00690 "Open on Read Stream failed"));
00691
00692
00693 if (this->read_stream_.read (this->message_block_,
00694 1,
00695 0,
00696 0)
00697 == -1)
00698 ACE_ERROR ((LM_ERROR,
00699 "%N:%l:%p\n",
00700 "ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:"
00701 "Read from pipe failed"));
00702 }
00703
00704 ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void)
00705 {
00706
00707 this->read_stream_.cancel ();
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719 ACE_HANDLE h = this->pipe_.write_handle ();
00720 if (h != ACE_INVALID_HANDLE)
00721 ACE_OS::closesocket (h);
00722
00723 h = this->pipe_.read_handle ();
00724 if ( h != ACE_INVALID_HANDLE)
00725 ACE_OS::closesocket (h);
00726
00727 }
00728
00729
00730 int
00731 ACE_AIOCB_Notify_Pipe_Manager::notify ()
00732 {
00733
00734 char char_send = 0;
00735 ssize_t ret_val = ACE::send (this->pipe_.write_handle (),
00736 &char_send,
00737 sizeof (char_send));
00738
00739 if (ret_val < 0)
00740 {
00741 if (errno != EWOULDBLOCK)
00742 #if 0
00743 ACE_ERROR ((LM_ERROR,
00744 ACE_TEXT ("(%P %t):%p\n"),
00745 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify")
00746 ACE_TEXT ("Error:Writing on to notify pipe failed")));
00747 #endif
00748 return -1;
00749 }
00750
00751 return 0;
00752 }
00753
00754 void
00755 ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream
00756 (const ACE_Asynch_Read_Stream::Result & )
00757 {
00758
00759
00760
00761
00762 if (this->message_block_.length () > 0)
00763 this->message_block_.wr_ptr (this->message_block_.rd_ptr ());
00764
00765
00766
00767 if (-1 == this->read_stream_.read (this->message_block_,
00768 1,
00769 0,
00770 0))
00771 ACE_ERROR ((LM_ERROR,
00772 ACE_TEXT ("%N:%l:(%P | %t):%p\n"),
00773 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:")
00774 ACE_TEXT ("Read from pipe failed")));
00775
00776
00777
00778
00779 }
00780
00781
00782 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
00783 : aiocb_notify_pipe_manager_ (0),
00784 aiocb_list_ (0),
00785 result_list_ (0),
00786 aiocb_list_max_size_ (max_aio_operations),
00787 aiocb_list_cur_size_ (0),
00788 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00789 num_deferred_aiocb_ (0),
00790 num_started_aio_ (0)
00791 {
00792
00793 check_max_aio_num ();
00794
00795 this->create_result_aiocb_list ();
00796
00797 this->create_notify_manager ();
00798
00799
00800
00801 this->get_asynch_pseudo_task().start ();
00802
00803 }
00804
00805
00806 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
00807 ACE_POSIX_Proactor::Proactor_Type)
00808 : aiocb_notify_pipe_manager_ (0),
00809 aiocb_list_ (0),
00810 result_list_ (0),
00811 aiocb_list_max_size_ (max_aio_operations),
00812 aiocb_list_cur_size_ (0),
00813 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00814 num_deferred_aiocb_ (0),
00815 num_started_aio_ (0)
00816 {
00817
00818 this->check_max_aio_num ();
00819
00820 this->create_result_aiocb_list ();
00821
00822
00823
00824 }
00825
00826
00827 ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
00828 {
00829 this->close();
00830 }
00831
00832 ACE_POSIX_Proactor::Proactor_Type
00833 ACE_POSIX_AIOCB_Proactor::get_impl_type (void)
00834 {
00835 return PROACTOR_AIOCB;
00836 }
00837
00838
00839 int
00840 ACE_POSIX_AIOCB_Proactor::close (void)
00841 {
00842
00843 this->get_asynch_pseudo_task().stop ();
00844
00845 this->delete_notify_manager ();
00846
00847 this->clear_result_queue ();
00848
00849 return this->delete_result_aiocb_list ();
00850 }
00851
00852 void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
00853 {
00854 notify_pipe_read_handle_ = h;
00855 }
00856
00857 int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void)
00858 {
00859 if (aiocb_list_ != 0)
00860 return 0;
00861
00862 ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
00863
00864 ACE_NEW_RETURN (result_list_,
00865 ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
00866 -1);
00867
00868
00869 for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
00870 {
00871 aiocb_list_[ai] = 0;
00872 result_list_[ai] = 0;
00873 }
00874
00875 return 0;
00876 }
00877
00878 int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
00879 {
00880 if (aiocb_list_ == 0)
00881 return 0;
00882
00883 size_t ai;
00884
00885
00886
00887 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00888 if (this->aiocb_list_[ai] != 0)
00889 this->cancel_aiocb (result_list_[ai]);
00890
00891 int num_pending = 0;
00892
00893 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00894 {
00895 if (this->aiocb_list_[ai] == 0 )
00896 continue;
00897
00898
00899 int error_status = 0;
00900 size_t transfer_count = 0;
00901 int flg_completed = this->get_result_status (result_list_[ai],
00902 error_status,
00903 transfer_count);
00904
00905
00906 if (flg_completed == 0)
00907 {
00908 num_pending++;
00909 #if 0
00910 char * errtxt = ACE_OS::strerror (error_status);
00911 if (errtxt == 0)
00912 errtxt ="?????????";
00913
00914 char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
00915 "WRITE":"READ" ;
00916
00917
00918 ACE_ERROR ((LM_ERROR,
00919 ACE_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
00920 ai,
00921 op,
00922 error_status,
00923 transfer_count,
00924 errtxt));
00925 #endif
00926 }
00927 else
00928 {
00929 delete this->result_list_[ai];
00930 this->result_list_[ai] = 0;
00931 this->aiocb_list_[ai] = 0;
00932 }
00933 }
00934
00935
00936
00937
00938
00939
00940 ACE_DEBUG
00941 ((LM_DEBUG,
00942 ACE_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
00943 ACE_TEXT(" number pending AIO=%d\n"),
00944 num_pending));
00945
00946 delete [] this->aiocb_list_;
00947 this->aiocb_list_ = 0;
00948
00949 delete [] this->result_list_;
00950 this->result_list_ = 0;
00951
00952 return (num_pending == 0 ? 0 : -1);
00953
00954 }
00955
00956 void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
00957 {
00958 long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
00959
00960
00961
00962
00963
00964 if (max_os_aio_num > 0 &&
00965 aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
00966 aiocb_list_max_size_ = max_os_aio_num;
00967
00968 #if defined (HPUX) || defined (__FreeBSD__)
00969
00970
00971
00972
00973 long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
00974 if (max_os_listio_num > 0
00975 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
00976 aiocb_list_max_size_ = max_os_listio_num;
00977 #endif
00978
00979
00980
00981
00982 if (aiocb_list_max_size_ <= 0
00983 || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE)
00984 aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
00985
00986
00987
00988 int max_num_files = ACE::max_handles ();
00989
00990 if (max_num_files > 0
00991 && aiocb_list_max_size_ > (unsigned long) max_num_files)
00992 {
00993 ACE::set_handle_limit (aiocb_list_max_size_);
00994
00995 max_num_files = ACE::max_handles ();
00996 }
00997
00998 if (max_num_files > 0
00999 && aiocb_list_max_size_ > (unsigned long) max_num_files)
01000 aiocb_list_max_size_ = (unsigned long) max_num_files;
01001
01002 ACE_DEBUG ((LM_DEBUG,
01003 "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
01004 aiocb_list_max_size_));
01005
01006 #if defined(__sgi)
01007
01008 ACE_DEBUG((LM_DEBUG,
01009 ACE_TEXT( "SGI IRIX specific: aio_init!\n")));
01010
01011
01012
01013
01014
01015
01016
01017
01018
01019
01020
01021 aioinit_t aioinit;
01022
01023 aioinit.aio_threads = 10;
01024 aioinit.aio_locks = 20;
01025
01026 aioinit.aio_num = aiocb_list_max_size_;
01027 aioinit.aio_usedba = 0;
01028 aioinit.aio_debug = 0;
01029 aioinit.aio_numusers = 100;
01030 aioinit.aio_reserved[0] = 0;
01031 aioinit.aio_reserved[1] = 0;
01032 aioinit.aio_reserved[2] = 0;
01033
01034 aio_sgi_init (&aioinit);
01035
01036 #endif
01037
01038 return;
01039 }
01040
01041 void
01042 ACE_POSIX_AIOCB_Proactor::create_notify_manager (void)
01043 {
01044
01045
01046
01047 if (aiocb_notify_pipe_manager_ == 0)
01048 ACE_NEW (aiocb_notify_pipe_manager_,
01049 ACE_AIOCB_Notify_Pipe_Manager (this));
01050 }
01051
01052 void
01053 ACE_POSIX_AIOCB_Proactor::delete_notify_manager (void)
01054 {
01055
01056
01057
01058 delete aiocb_notify_pipe_manager_;
01059 aiocb_notify_pipe_manager_ = 0;
01060 }
01061
01062 int
01063 ACE_POSIX_AIOCB_Proactor::handle_events (ACE_Time_Value &wait_time)
01064 {
01065
01066 ACE_Countdown_Time countdown (&wait_time);
01067 return this->handle_events_i (wait_time.msec ());
01068 }
01069
01070 int
01071 ACE_POSIX_AIOCB_Proactor::handle_events (void)
01072 {
01073 return this->handle_events_i (ACE_INFINITE);
01074 }
01075
01076 int
01077 ACE_POSIX_AIOCB_Proactor::notify_completion(int sig_num)
01078 {
01079 ACE_UNUSED_ARG (sig_num);
01080
01081 return this->aiocb_notify_pipe_manager_->notify ();
01082 }
01083
01084 int
01085 ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
01086 {
01087 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1));
01088
01089 int ret_val = this->putq_result (result);
01090
01091 return ret_val;
01092 }
01093
01094 int
01095 ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result)
01096 {
01097
01098
01099
01100 if (!result)
01101 return -1;
01102
01103 int sig_num = result->signal_number ();
01104 int ret_val = this->result_queue_.enqueue_tail (result);
01105
01106 if (ret_val == -1)
01107 ACE_ERROR_RETURN ((LM_ERROR,
01108 "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
01109 -1);
01110
01111 this->notify_completion (sig_num);
01112
01113 return 0;
01114 }
01115
01116 ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result (void)
01117 {
01118 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0));
01119
01120
01121 ACE_POSIX_Asynch_Result* result = 0;
01122
01123 if (this->result_queue_.dequeue_head (result) != 0)
01124 return 0;
01125
01126
01127
01128
01129
01130
01131
01132
01133 return result;
01134 }
01135
01136 int ACE_POSIX_AIOCB_Proactor::clear_result_queue (void)
01137 {
01138 int ret_val = 0;
01139 ACE_POSIX_Asynch_Result* result = 0;
01140
01141 while ((result = this->getq_result ()) != 0)
01142 {
01143 delete result;
01144 ret_val++;
01145 }
01146
01147 return ret_val;
01148 }
01149
01150 int ACE_POSIX_AIOCB_Proactor::process_result_queue (void)
01151 {
01152 int ret_val = 0;
01153 ACE_POSIX_Asynch_Result* result = 0;
01154
01155 while ((result = this->getq_result ()) != 0)
01156 {
01157 this->application_specific_code
01158 (result,
01159 result->bytes_transferred(),
01160 0,
01161 result->error());
01162
01163 ret_val++;
01164 }
01165
01166 return ret_val;
01167 }
01168
01169 int
01170 ACE_POSIX_AIOCB_Proactor::handle_events_i (u_long milli_seconds)
01171 {
01172 int result_suspend = 0;
01173 int retval= 0;
01174
01175 if (milli_seconds == ACE_INFINITE)
01176
01177 result_suspend = aio_suspend (aiocb_list_,
01178 aiocb_list_max_size_,
01179 0);
01180 else
01181 {
01182
01183 timespec timeout;
01184 timeout.tv_sec = milli_seconds / 1000;
01185 timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000000;
01186 result_suspend = aio_suspend (aiocb_list_,
01187 aiocb_list_max_size_,
01188 &timeout);
01189 }
01190
01191
01192 if (result_suspend == -1)
01193 {
01194 if (errno != EAGAIN &&
01195 errno != EINTR )
01196 ACE_ERROR ((LM_ERROR,
01197 ACE_TEXT ("%N:%l:(%P|%t)::%p\n"),
01198 ACE_TEXT ("handle_events: aio_suspend failed")));
01199
01200
01201 }
01202 else
01203 {
01204 size_t index = 0;
01205 size_t count = aiocb_list_max_size_;
01206 int error_status = 0;
01207 size_t transfer_count = 0;
01208
01209 for (;; retval++)
01210 {
01211 ACE_POSIX_Asynch_Result *asynch_result =
01212 find_completed_aio (error_status,
01213 transfer_count,
01214 index,
01215 count);
01216
01217 if (asynch_result == 0)
01218 break;
01219
01220
01221 this->application_specific_code (asynch_result,
01222 transfer_count,
01223 0,
01224 error_status);
01225 }
01226 }
01227
01228
01229 retval += this->process_result_queue ();
01230
01231 return retval > 0 ? 1 : 0;
01232 }
01233
01234 int
01235 ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result *asynch_result,
01236 int &error_status,
01237 size_t &transfer_count)
01238 {
01239 transfer_count = 0;
01240
01241
01242 error_status = aio_error (asynch_result);
01243 if (error_status == EINPROGRESS)
01244 return 0;
01245
01246 ssize_t op_return = aio_return (asynch_result);
01247 if (op_return > 0)
01248 transfer_count = static_cast<size_t> (op_return);
01249
01250 return 1;
01251 }
01252
01253 ACE_POSIX_Asynch_Result *
01254 ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
01255 size_t &transfer_count,
01256 size_t &index,
01257 size_t &count)
01258 {
01259
01260
01261
01262 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
01263
01264 ACE_POSIX_Asynch_Result *asynch_result = 0;
01265
01266 if (num_started_aio_ == 0)
01267 return 0;
01268
01269 for (; count > 0; index++ , count--)
01270 {
01271 if (index >= aiocb_list_max_size_)
01272 index = 0;
01273
01274 if (aiocb_list_[index] == 0)
01275 continue;
01276
01277 if (0 != this->get_result_status (result_list_[index],
01278 error_status,
01279 transfer_count))
01280 break;
01281
01282 }
01283
01284 if (count == 0)
01285 return 0;
01286 asynch_result = result_list_[index];
01287
01288 aiocb_list_[index] = 0;
01289 result_list_[index] = 0;
01290 aiocb_list_cur_size_--;
01291
01292 num_started_aio_--;
01293 index++;
01294 count--;
01295
01296 this->start_deferred_aio ();
01297
01298
01299
01300 return asynch_result;
01301 }
01302
01303
01304 int
01305 ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
01306 ACE_POSIX_Proactor::Opcode op)
01307 {
01308 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
01309
01310 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01311
01312 int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
01313
01314 if (result == 0)
01315 return ret_val;
01316
01317
01318 switch (op)
01319 {
01320 case ACE_POSIX_Proactor::ACE_OPCODE_READ:
01321 result->aio_lio_opcode = LIO_READ;
01322 break;
01323
01324 case ACE_POSIX_Proactor::ACE_OPCODE_WRITE:
01325 result->aio_lio_opcode = LIO_WRITE;
01326 break;
01327
01328 default:
01329 ACE_ERROR_RETURN ((LM_ERROR,
01330 ACE_TEXT ("%N:%l:(%P|%t)::")
01331 ACE_TEXT ("start_aio: Invalid op code %d\n"),
01332 op),
01333 -1);
01334 }
01335
01336 if (ret_val != 0)
01337 {
01338 errno = EAGAIN;
01339 return -1;
01340 }
01341
01342
01343
01344 ssize_t slot = allocate_aio_slot (result);
01345
01346 if (slot < 0)
01347 return -1;
01348
01349 size_t index = static_cast<size_t> (slot);
01350
01351 result_list_[index] = result;
01352 aiocb_list_cur_size_++;
01353
01354 ret_val = start_aio_i (result);
01355 switch (ret_val)
01356 {
01357 case 0:
01358 aiocb_list_[index] = result;
01359 return 0;
01360
01361 case 1:
01362 num_deferred_aiocb_ ++;
01363 return 0;
01364
01365 default:
01366 break;
01367 }
01368
01369 result_list_[index] = 0;
01370 aiocb_list_cur_size_--;
01371 return -1;
01372 }
01373
01374 ssize_t
01375 ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
01376 {
01377 size_t i = 0;
01378
01379
01380
01381
01382 if (notify_pipe_read_handle_ == result->aio_fildes)
01383 {
01384 if (result_list_[i] != 0)
01385 {
01386 errno = EAGAIN;
01387 ACE_ERROR_RETURN ((LM_ERROR,
01388 "%N:%l:(%P | %t)::\n"
01389 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01390 "internal Proactor error 0\n"),
01391 -1);
01392 }
01393 }
01394 else
01395 {
01396 for (i= 1; i < this->aiocb_list_max_size_; i++)
01397 if (result_list_[i] == 0)
01398 break;
01399 }
01400
01401 if (i >= this->aiocb_list_max_size_)
01402 ACE_ERROR_RETURN ((LM_ERROR,
01403 "%N:%l:(%P | %t)::\n"
01404 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01405 "internal Proactor error 1\n"),
01406 -1);
01407
01408
01409 result->aio_sigevent.sigev_notify = SIGEV_NONE;
01410
01411 return static_cast<ssize_t> (i);
01412 }
01413
01414
01415
01416
01417
01418
01419 int
01420 ACE_POSIX_AIOCB_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
01421 {
01422 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
01423
01424 int ret_val;
01425 const ACE_TCHAR *ptype;
01426
01427
01428
01429 switch (result->aio_lio_opcode )
01430 {
01431 case LIO_READ :
01432 ptype = ACE_TEXT ("read ");
01433 ret_val = aio_read (result);
01434 break;
01435 case LIO_WRITE :
01436 ptype = ACE_TEXT ("write");
01437 ret_val = aio_write (result);
01438 break;
01439 default:
01440 ptype = ACE_TEXT ("?????");
01441 ret_val = -1;
01442 break;
01443 }
01444
01445 if (ret_val == 0)
01446 this->num_started_aio_++;
01447 else
01448 {
01449 if (errno == EAGAIN || errno == ENOMEM)
01450 ret_val = 1;
01451 else
01452 ACE_ERROR ((LM_ERROR,
01453 ACE_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
01454 ptype,
01455 ACE_TEXT ("queueing failed")));
01456 }
01457
01458 return ret_val;
01459 }
01460
01461
01462 int
01463 ACE_POSIX_AIOCB_Proactor::start_deferred_aio ()
01464 {
01465 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
01466
01467
01468
01469
01470
01471
01472
01473
01474
01475 if (num_deferred_aiocb_ == 0)
01476 return 0;
01477
01478 size_t i = 0;
01479
01480 for (i= 0; i < this->aiocb_list_max_size_; i++)
01481 if (result_list_[i] !=0
01482 && aiocb_list_[i] ==0)
01483 break;
01484
01485 if (i >= this->aiocb_list_max_size_)
01486 ACE_ERROR_RETURN ((LM_ERROR,
01487 "%N:%l:(%P | %t)::\n"
01488 "start_deferred_aio:"
01489 "internal Proactor error 3\n"),
01490 -1);
01491
01492 ACE_POSIX_Asynch_Result *result = result_list_[i];
01493
01494 int ret_val = start_aio_i (result);
01495
01496 switch (ret_val)
01497 {
01498 case 0 :
01499 aiocb_list_[i] = result;
01500 num_deferred_aiocb_ --;
01501 return 0;
01502
01503 case 1 :
01504 return 0;
01505
01506 default :
01507 break;
01508 }
01509
01510
01511
01512 result_list_[i] = 0;
01513 aiocb_list_cur_size_--;
01514
01515 num_deferred_aiocb_ --;
01516
01517 result->set_error (errno);
01518 result->set_bytes_transferred (0);
01519 this->putq_result (result);
01520
01521 return -1;
01522 }
01523
01524 int
01525 ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle)
01526 {
01527
01528
01529
01530
01531
01532
01533
01534
01535
01536
01537
01538
01539
01540 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
01541
01542 int num_total = 0;
01543 int num_cancelled = 0;
01544
01545 {
01546 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01547
01548 size_t ai = 0;
01549
01550 for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
01551 {
01552 if (this->result_list_[ai] == 0)
01553 continue;
01554
01555 if (this->result_list_[ai]->aio_fildes != handle)
01556 continue;
01557
01558 num_total++;
01559
01560 ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai];
01561
01562 if (this->aiocb_list_[ai] == 0)
01563 {
01564 num_cancelled++;
01565 this->num_deferred_aiocb_--;
01566
01567 this->aiocb_list_[ai] = 0;
01568 this->result_list_[ai] = 0;
01569 this->aiocb_list_cur_size_--;
01570
01571 asynch_result->set_error (ECANCELED);
01572 asynch_result->set_bytes_transferred (0);
01573 this->putq_result (asynch_result);
01574
01575 }
01576 else
01577 {
01578 int rc_cancel = this->cancel_aiocb (asynch_result);
01579
01580 if (rc_cancel == 0)
01581 num_cancelled++;
01582 }
01583 }
01584
01585 }
01586
01587 if (num_total == 0)
01588 return 1;
01589
01590 if (num_cancelled == num_total)
01591 return 0;
01592
01593 return 2;
01594 }
01595
01596 int
01597 ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result)
01598 {
01599
01600
01601 int rc = ::aio_cancel (0, result);
01602
01603
01604 if (rc == AIO_CANCELED)
01605 return 0;
01606 else if (rc == AIO_ALLDONE)
01607 return 1;
01608 else
01609 return 2;
01610 }
01611
01612
01613
01614
01615 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
01616
01617 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations)
01618 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
01619 ACE_POSIX_Proactor::PROACTOR_SIG)
01620 {
01621
01622
01623
01624
01625
01626 ACE_OS::sigemptyset (&this->RT_completion_signals_);
01627
01628
01629 if (ACE_OS::sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1)
01630 ACE_ERROR ((LM_ERROR, ACE_TEXT ("ACE_POSIX_SIG_Proactor: %p\n"),
01631 ACE_TEXT ("sigaddset")));
01632 this->block_signals ();
01633
01634 this->setup_signal_handler (ACE_SIGRTMIN);
01635
01636
01637
01638
01639
01640 this->get_asynch_pseudo_task().start ();
01641 return;
01642 }
01643
01644 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set,
01645 size_t max_aio_operations)
01646 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
01647 ACE_POSIX_Proactor::PROACTOR_SIG)
01648 {
01649
01650
01651
01652
01653
01654
01655 if (sigemptyset (&this->RT_completion_signals_) == -1)
01656 ACE_ERROR ((LM_ERROR,
01657 "Error:(%P | %t):%p\n",
01658 "sigemptyset failed"));
01659
01660
01661
01662
01663 int member = 0;
01664 for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
01665 {
01666 member = sigismember (&signal_set,
01667 si);
01668 if (member == -1)
01669 ACE_ERROR ((LM_ERROR,
01670 "%N:%l:(%P | %t)::%p\n",
01671 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
01672 "sigismember failed"));
01673 else if (member == 1)
01674 {
01675 sigaddset (&this->RT_completion_signals_, si);
01676 this->setup_signal_handler (si);
01677 }
01678 }
01679
01680
01681 this->block_signals ();
01682
01683
01684
01685
01686
01687 this->get_asynch_pseudo_task().start ();
01688 return;
01689 }
01690
01691 ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
01692 {
01693 this->close ();
01694
01695
01696 }
01697
01698 ACE_POSIX_Proactor::Proactor_Type
01699 ACE_POSIX_SIG_Proactor::get_impl_type (void)
01700 {
01701 return PROACTOR_SIG;
01702 }
01703
01704 int
01705 ACE_POSIX_SIG_Proactor::handle_events (ACE_Time_Value &wait_time)
01706 {
01707
01708 ACE_Countdown_Time countdown (&wait_time);
01709 return this->handle_events_i (&wait_time);
01710 }
01711
01712 int
01713 ACE_POSIX_SIG_Proactor::handle_events (void)
01714 {
01715 return this->handle_events_i (0);
01716 }
01717
01718 int
01719 ACE_POSIX_SIG_Proactor::notify_completion (int sig_num)
01720 {
01721
01722 pid_t const pid = ACE_OS::getpid ();
01723 if (pid == (pid_t) -1)
01724 ACE_ERROR_RETURN ((LM_ERROR,
01725 "Error:%N:%l(%P | %t):%p",
01726 "<getpid> failed"),
01727 -1);
01728
01729
01730 sigval value;
01731 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
01732 value.sigval_int = -1;
01733 #else
01734 value.sival_int = -1;
01735 #endif
01736
01737
01738 if (sigqueue (pid, sig_num, value) == 0)
01739 return 0;
01740
01741 if (errno != EAGAIN)
01742 ACE_ERROR_RETURN ((LM_ERROR,
01743 "Error:%N:%l:(%P | %t):%p\n",
01744 "<sigqueue> failed"),
01745 -1);
01746 return -1;
01747 }
01748
01749 ACE_Asynch_Result_Impl *
01750 ACE_POSIX_SIG_Proactor::create_asynch_timer
01751 (const ACE_Handler::Proxy_Ptr &handler_proxy,
01752 const void *act,
01753 const ACE_Time_Value &tv,
01754 ACE_HANDLE event,
01755 int priority,
01756 int signal_number)
01757 {
01758 int is_member = 0;
01759
01760
01761 if (signal_number == -1)
01762 {
01763 int si;
01764 for (si = ACE_SIGRTMAX;
01765 (is_member == 0) && (si >= ACE_SIGRTMIN);
01766 si--)
01767 {
01768 is_member = sigismember (&this->RT_completion_signals_,
01769 si);
01770 if (is_member == -1)
01771 ACE_ERROR_RETURN ((LM_ERROR,
01772 "%N:%l:(%P | %t)::%s\n",
01773 "ACE_POSIX_SIG_Proactor::create_asynch_timer:"
01774 "sigismember failed"),
01775 0);
01776 }
01777
01778 if (is_member == 0)
01779 ACE_ERROR_RETURN ((LM_ERROR,
01780 "Error:%N:%l:(%P | %t)::%s\n",
01781 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
01782 "Signal mask set empty"),
01783 0);
01784 else
01785
01786 signal_number = si + 1;
01787 }
01788
01789 ACE_Asynch_Result_Impl *implementation;
01790 ACE_NEW_RETURN (implementation,
01791 ACE_POSIX_Asynch_Timer (handler_proxy,
01792 act,
01793 tv,
01794 event,
01795 priority,
01796 signal_number),
01797 0);
01798 return implementation;
01799 }
01800
01801 #if 0
01802 static void
01803 sig_handler (int sig_num, siginfo_t *, ucontext_t *)
01804 {
01805
01806 ACE_DEBUG ((LM_DEBUG,
01807 "%N:%l:(%P | %t)::sig_handler received signal: %d\n",
01808 sig_num));
01809 }
01810 #endif
01811
01812 int
01813 ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const
01814 {
01815
01816
01817
01818
01819
01820
01821
01822
01823
01824
01825 #if 0
01826 struct sigaction reaction;
01827 sigemptyset (&reaction.sa_mask);
01828 reaction.sa_flags = SA_SIGINFO;
01829 reaction.sa_sigaction = ACE_SIGNAL_C_FUNC (sig_handler);
01830 int sigaction_return = ACE_OS::sigaction (signal_number,
01831 &reaction,
01832 0);
01833 if (sigaction_return == -1)
01834 ACE_ERROR_RETURN ((LM_ERROR,
01835 "Error:%p\n",
01836 "Proactor couldnt do sigaction for the RT SIGNAL"),
01837 -1);
01838 #else
01839 ACE_UNUSED_ARG(signal_number);
01840 #endif
01841 return 0;
01842 }
01843
01844
01845 int
01846 ACE_POSIX_SIG_Proactor::block_signals (void) const
01847 {
01848 return ACE_OS::pthread_sigmask (SIG_BLOCK, &this->RT_completion_signals_, 0);
01849 }
01850
01851 ssize_t
01852 ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
01853 {
01854 size_t i = 0;
01855
01856
01857 for (i = 0; i < this->aiocb_list_max_size_; i++)
01858 if (result_list_[i] == 0)
01859 break;
01860
01861 if (i >= this->aiocb_list_max_size_)
01862 ACE_ERROR_RETURN ((LM_ERROR,
01863 "%N:%l:(%P | %t)::\n"
01864 "ACE_POSIX_SIG_Proactor::allocate_aio_slot "
01865 "internal Proactor error 1\n"),
01866 -1);
01867
01868
01869
01870 result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
01871 result->aio_sigevent.sigev_signo = result->signal_number ();
01872 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
01873 result->aio_sigevent.sigev_value.sigval_int = static_cast<int> (i);
01874 #else
01875 result->aio_sigevent.sigev_value.sival_int = static_cast<int> (i);
01876 #endif
01877
01878 return static_cast<ssize_t> (i);
01879 }
01880
01881 int
01882 ACE_POSIX_SIG_Proactor::handle_events_i (const ACE_Time_Value *timeout)
01883 {
01884 int result_sigwait = 0;
01885 siginfo_t sig_info;
01886
01887 do
01888 {
01889
01890 if (timeout == 0)
01891 {
01892 result_sigwait = ACE_OS::sigwaitinfo (&this->RT_completion_signals_,
01893 &sig_info);
01894 }
01895 else
01896 {
01897 result_sigwait = ACE_OS::sigtimedwait (&this->RT_completion_signals_,
01898 &sig_info,
01899 timeout);
01900 if (result_sigwait == -1 && errno == EAGAIN)
01901 return 0;
01902 }
01903 }
01904 while (result_sigwait == -1 && errno == EINTR);
01905
01906 if (result_sigwait == -1)
01907 return -1;
01908
01909
01910
01911
01912 int flg_aio = 0;
01913
01914 size_t index = 0;
01915 size_t count = 1;
01916 int error_status = 0;
01917 size_t transfer_count = 0;
01918
01919 if (sig_info.si_code == SI_ASYNCIO || this->os_id_ == ACE_OS_SUN_56)
01920 {
01921 flg_aio = 1;
01922
01923
01924 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
01925 index = static_cast<size_t> (sig_info.si_value.sigval_int);
01926 #else
01927 index = static_cast<size_t> (sig_info.si_value.sival_int);
01928 #endif
01929
01930
01931
01932
01933 if (os_id_ == ACE_OS_SUN_56)
01934 {
01935
01936
01937
01938
01939
01940
01941
01942 count = aiocb_list_max_size_;
01943 }
01944 }
01945 else if (sig_info.si_code != SI_QUEUE)
01946 {
01947
01948
01949
01950
01951 ACE_ERROR ((LM_DEBUG,
01952 ACE_TEXT ("%N:%l:(%P | %t): ")
01953 ACE_TEXT ("ACE_POSIX_SIG_Proactor::handle_events: ")
01954 ACE_TEXT ("Unexpected signal code (%d) returned ")
01955 ACE_TEXT ("from sigwait; expecting %d\n"),
01956 result_sigwait, sig_info.si_code));
01957 flg_aio = 1;
01958 }
01959
01960 int ret_aio = 0;
01961 int ret_que = 0;
01962
01963 if (flg_aio)
01964 for (;; ret_aio++)
01965 {
01966 ACE_POSIX_Asynch_Result *asynch_result =
01967 find_completed_aio (error_status,
01968 transfer_count,
01969 index,
01970 count);
01971
01972 if (asynch_result == 0)
01973 break;
01974
01975
01976 this->application_specific_code (asynch_result,
01977 transfer_count,
01978 0,
01979 error_status);
01980 }
01981
01982
01983 ret_que = this->process_result_queue ();
01984
01985
01986
01987 #if 0
01988 ACE_DEBUG ((LM_DEBUG,
01989 "(%t) NumAIO=%d NumQueue=%d\n",
01990 ret_aio, ret_que));
01991 #endif
01992
01993 return ret_aio + ret_que > 0 ? 1 : 0;
01994 }
01995
01996 #endif
01997
01998
01999
02000 ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer
02001 (const ACE_Handler::Proxy_Ptr &handler_proxy,
02002 const void *act,
02003 const ACE_Time_Value &tv,
02004 ACE_HANDLE event,
02005 int priority,
02006 int signal_number)
02007 : ACE_POSIX_Asynch_Result
02008 (handler_proxy, act, event, 0, 0, priority, signal_number),
02009 time_ (tv)
02010 {
02011 }
02012
02013 void
02014 ACE_POSIX_Asynch_Timer::complete (size_t ,
02015 int ,
02016 const void * ,
02017 u_long )
02018 {
02019 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
02020 if (handler != 0)
02021 handler->handle_time_out (this->time_, this->act ());
02022 }
02023
02024
02025
02026
02027 ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion
02028 (const ACE_Handler::Proxy_Ptr &handler_proxy,
02029 const void *act,
02030 ACE_HANDLE event,
02031 int priority,
02032 int signal_number)
02033 : ACE_Asynch_Result_Impl (),
02034 ACE_POSIX_Asynch_Result (handler_proxy,
02035 act,
02036 event,
02037 0,
02038 0,
02039 priority,
02040 signal_number)
02041 {
02042 }
02043
02044 ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void)
02045 {
02046 }
02047
02048 void
02049 ACE_POSIX_Wakeup_Completion::complete (size_t ,
02050 int ,
02051 const void * ,
02052 u_long )
02053 {
02054
02055 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
02056 if (handler != 0)
02057 handler->handle_wakeup ();
02058 }
02059
02060 ACE_END_VERSIONED_NAMESPACE_DECL
02061
02062 #endif