00001
00002
00003 #include "ace/POSIX_Proactor.h"
00004
00005 #if defined (ACE_HAS_AIO_CALLS)
00006
00007 #if !defined (__ACE_INLINE__)
00008 #include "ace/POSIX_Proactor.inl"
00009 #endif
00010
00011 # if defined (ACE_HAS_SYS_SYSTEMINFO_H)
00012 # include <sys/systeminfo.h>
00013 # endif
00014
00015 #include "ace/ACE.h"
00016 #include "ace/Flag_Manip.h"
00017 #include "ace/Task_T.h"
00018 #include "ace/Log_Msg.h"
00019 #include "ace/Object_Manager.h"
00020 #include "ace/OS_NS_sys_socket.h"
00021 #include "ace/OS_NS_signal.h"
00022 #include "ace/OS_NS_unistd.h"
00023
00024 #if defined (sun)
00025 # include "ace/OS_NS_strings.h"
00026 #endif
00027
00028
00029
00030 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00031
00032
00033
00034
00035
00036
00037
00038
00039 class ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result
00040 {
00041 public:
00042
00043 ACE_POSIX_Wakeup_Completion (const ACE_Handler::Proxy_Ptr &handler_proxy,
00044 const void *act = 0,
00045 ACE_HANDLE event = ACE_INVALID_HANDLE,
00046 int priority = 0,
00047 int signal_number = ACE_SIGRTMIN);
00048
00049
00050 virtual ~ACE_POSIX_Wakeup_Completion (void);
00051
00052
00053
00054 virtual void complete (size_t bytes_transferred = 0,
00055 int success = 1,
00056 const void *completion_key = 0,
00057 u_long error = 0);
00058 };
00059
00060
00061 ACE_POSIX_Proactor::ACE_POSIX_Proactor (void)
00062 : os_id_ (ACE_OS_UNDEFINED)
00063 {
00064 #if defined(sun)
00065
00066 os_id_ = ACE_OS_SUN;
00067
00068 char Buf [32];
00069
00070 ::memset(Buf,0,sizeof(Buf));
00071
00072 ACE_OS::sysinfo (SI_RELEASE , Buf, sizeof(Buf)-1);
00073
00074 if (ACE_OS::strcasecmp (Buf , "5.6") == 0)
00075 os_id_ = ACE_OS_SUN_56;
00076 else if (ACE_OS::strcasecmp (Buf , "5.7") == 0)
00077 os_id_ = ACE_OS_SUN_57;
00078 else if (ACE_OS::strcasecmp (Buf , "5.8") == 0)
00079 os_id_ = ACE_OS_SUN_58;
00080
00081 #elif defined(HPUX)
00082
00083 os_id_ = ACE_OS_HPUX;
00084
00085 #elif defined(__sgi)
00086
00087 os_id_ = ACE_OS_IRIX;
00088
00089 #elif defined(__OpenBSD)
00090
00091 os_id_ = ACE_OS_OPENBSD;
00092
00093
00094
00095
00096
00097 #endif
00098 }
00099
00100 ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void)
00101 {
00102 this->close ();
00103 }
00104
00105 int
00106 ACE_POSIX_Proactor::close (void)
00107 {
00108 return 0;
00109 }
00110
00111 int
00112 ACE_POSIX_Proactor::register_handle (ACE_HANDLE handle,
00113 const void *completion_key)
00114 {
00115 ACE_UNUSED_ARG (handle);
00116 ACE_UNUSED_ARG (completion_key);
00117 return 0;
00118 }
00119
00120 int
00121 ACE_POSIX_Proactor::wake_up_dispatch_threads (void)
00122 {
00123 return 0;
00124 }
00125
00126 int
00127 ACE_POSIX_Proactor::close_dispatch_threads (int)
00128 {
00129 return 0;
00130 }
00131
00132 size_t
00133 ACE_POSIX_Proactor::number_of_threads (void) const
00134 {
00135
00136 ACE_NOTSUP_RETURN (0);
00137 }
00138
00139 void
00140 ACE_POSIX_Proactor::number_of_threads (size_t threads)
00141 {
00142
00143 ACE_UNUSED_ARG (threads);
00144 }
00145
00146 ACE_HANDLE
00147 ACE_POSIX_Proactor::get_handle (void) const
00148 {
00149 return ACE_INVALID_HANDLE;
00150 }
00151
00152 ACE_Asynch_Read_Stream_Impl *
00153 ACE_POSIX_Proactor::create_asynch_read_stream (void)
00154 {
00155 ACE_Asynch_Read_Stream_Impl *implementation = 0;
00156 ACE_NEW_RETURN (implementation,
00157 ACE_POSIX_Asynch_Read_Stream (this),
00158 0);
00159 return implementation;
00160 }
00161
00162 ACE_Asynch_Read_Stream_Result_Impl *
00163 ACE_POSIX_Proactor::create_asynch_read_stream_result
00164 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00165 ACE_HANDLE handle,
00166 ACE_Message_Block &message_block,
00167 size_t bytes_to_read,
00168 const void* act,
00169 ACE_HANDLE event,
00170 int priority,
00171 int signal_number)
00172 {
00173 ACE_Asynch_Read_Stream_Result_Impl *implementation;
00174 ACE_NEW_RETURN (implementation,
00175 ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy,
00176 handle,
00177 message_block,
00178 bytes_to_read,
00179 act,
00180 event,
00181 priority,
00182 signal_number),
00183 0);
00184 return implementation;
00185 }
00186
00187
00188 ACE_Asynch_Write_Stream_Impl *
00189 ACE_POSIX_Proactor::create_asynch_write_stream (void)
00190 {
00191 ACE_Asynch_Write_Stream_Impl *implementation = 0;
00192 ACE_NEW_RETURN (implementation,
00193 ACE_POSIX_Asynch_Write_Stream (this),
00194 0);
00195 return implementation;
00196 }
00197
00198 ACE_Asynch_Write_Stream_Result_Impl *
00199 ACE_POSIX_Proactor::create_asynch_write_stream_result
00200 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00201 ACE_HANDLE handle,
00202 ACE_Message_Block &message_block,
00203 size_t bytes_to_write,
00204 const void* act,
00205 ACE_HANDLE event,
00206 int priority,
00207 int signal_number)
00208 {
00209 ACE_Asynch_Write_Stream_Result_Impl *implementation;
00210 ACE_NEW_RETURN (implementation,
00211 ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy,
00212 handle,
00213 message_block,
00214 bytes_to_write,
00215 act,
00216 event,
00217 priority,
00218 signal_number),
00219 0);
00220 return implementation;
00221 }
00222
00223
00224 ACE_Asynch_Read_File_Impl *
00225 ACE_POSIX_Proactor::create_asynch_read_file (void)
00226 {
00227 ACE_Asynch_Read_File_Impl *implementation = 0;
00228 ACE_NEW_RETURN (implementation,
00229 ACE_POSIX_Asynch_Read_File (this),
00230 0);
00231 return implementation;
00232 }
00233
00234 ACE_Asynch_Read_File_Result_Impl *
00235 ACE_POSIX_Proactor::create_asynch_read_file_result
00236 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00237 ACE_HANDLE handle,
00238 ACE_Message_Block &message_block,
00239 size_t bytes_to_read,
00240 const void* act,
00241 u_long offset,
00242 u_long offset_high,
00243 ACE_HANDLE event,
00244 int priority,
00245 int signal_number)
00246 {
00247 ACE_Asynch_Read_File_Result_Impl *implementation;
00248 ACE_NEW_RETURN (implementation,
00249 ACE_POSIX_Asynch_Read_File_Result (handler_proxy,
00250 handle,
00251 message_block,
00252 bytes_to_read,
00253 act,
00254 offset,
00255 offset_high,
00256 event,
00257 priority,
00258 signal_number),
00259 0);
00260 return implementation;
00261 }
00262
00263
00264 ACE_Asynch_Write_File_Impl *
00265 ACE_POSIX_Proactor::create_asynch_write_file (void)
00266 {
00267 ACE_Asynch_Write_File_Impl *implementation = 0;
00268 ACE_NEW_RETURN (implementation,
00269 ACE_POSIX_Asynch_Write_File (this),
00270 0);
00271 return implementation;
00272 }
00273
00274 ACE_Asynch_Write_File_Result_Impl *
00275 ACE_POSIX_Proactor::create_asynch_write_file_result
00276 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00277 ACE_HANDLE handle,
00278 ACE_Message_Block &message_block,
00279 size_t bytes_to_write,
00280 const void* act,
00281 u_long offset,
00282 u_long offset_high,
00283 ACE_HANDLE event,
00284 int priority,
00285 int signal_number)
00286 {
00287 ACE_Asynch_Write_File_Result_Impl *implementation;
00288 ACE_NEW_RETURN (implementation,
00289 ACE_POSIX_Asynch_Write_File_Result (handler_proxy,
00290 handle,
00291 message_block,
00292 bytes_to_write,
00293 act,
00294 offset,
00295 offset_high,
00296 event,
00297 priority,
00298 signal_number),
00299 0);
00300 return implementation;
00301 }
00302
00303
00304 ACE_Asynch_Read_Dgram_Impl *
00305 ACE_POSIX_Proactor::create_asynch_read_dgram (void)
00306 {
00307 ACE_Asynch_Read_Dgram_Impl *implementation = 0;
00308 ACE_NEW_RETURN (implementation,
00309 ACE_POSIX_Asynch_Read_Dgram (this),
00310 0);
00311 return implementation;
00312 }
00313
00314 ACE_Asynch_Read_Dgram_Result_Impl *
00315 ACE_POSIX_Proactor::create_asynch_read_dgram_result
00316 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00317 ACE_HANDLE handle,
00318 ACE_Message_Block *message_block,
00319 size_t bytes_to_read,
00320 int flags,
00321 int protocol_family,
00322 const void* act,
00323 ACE_HANDLE event ,
00324 int priority ,
00325 int signal_number)
00326 {
00327 ACE_Asynch_Read_Dgram_Result_Impl *implementation=0;
00328 ACE_NEW_RETURN (implementation,
00329 ACE_POSIX_Asynch_Read_Dgram_Result(handler_proxy,
00330 handle,
00331 message_block,
00332 bytes_to_read,
00333 flags,
00334 protocol_family,
00335 act,
00336 event,
00337 priority,
00338 signal_number),
00339 0);
00340
00341 return implementation;
00342 }
00343
00344
00345 ACE_Asynch_Write_Dgram_Impl *
00346 ACE_POSIX_Proactor::create_asynch_write_dgram (void)
00347 {
00348 ACE_Asynch_Write_Dgram_Impl *implementation = 0;
00349 ACE_NEW_RETURN (implementation,
00350 ACE_POSIX_Asynch_Write_Dgram (this),
00351 0);
00352
00353 return implementation;
00354 }
00355
00356 ACE_Asynch_Write_Dgram_Result_Impl *
00357 ACE_POSIX_Proactor::create_asynch_write_dgram_result
00358 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00359 ACE_HANDLE handle,
00360 ACE_Message_Block *message_block,
00361 size_t bytes_to_write,
00362 int flags,
00363 const void* act,
00364 ACE_HANDLE event,
00365 int priority ,
00366 int signal_number)
00367 {
00368 ACE_Asynch_Write_Dgram_Result_Impl *implementation=0;
00369 ACE_NEW_RETURN (implementation,
00370 ACE_POSIX_Asynch_Write_Dgram_Result(handler_proxy,
00371 handle,
00372 message_block,
00373 bytes_to_write,
00374 flags,
00375 act,
00376 event,
00377 priority,
00378 signal_number),
00379 0);
00380
00381 return implementation;
00382 }
00383
00384
00385 ACE_Asynch_Accept_Impl *
00386 ACE_POSIX_Proactor::create_asynch_accept (void)
00387 {
00388 ACE_Asynch_Accept_Impl *implementation = 0;
00389 ACE_NEW_RETURN (implementation,
00390 ACE_POSIX_Asynch_Accept (this),
00391 0);
00392
00393 return implementation;
00394 }
00395
00396 ACE_Asynch_Accept_Result_Impl *
00397 ACE_POSIX_Proactor::create_asynch_accept_result
00398 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00399 ACE_HANDLE listen_handle,
00400 ACE_HANDLE accept_handle,
00401 ACE_Message_Block &message_block,
00402 size_t bytes_to_read,
00403 const void* act,
00404 ACE_HANDLE event,
00405 int priority,
00406 int signal_number)
00407 {
00408 ACE_Asynch_Accept_Result_Impl *implementation;
00409 ACE_NEW_RETURN (implementation,
00410 ACE_POSIX_Asynch_Accept_Result (handler_proxy,
00411 listen_handle,
00412 accept_handle,
00413 message_block,
00414 bytes_to_read,
00415 act,
00416 event,
00417 priority,
00418 signal_number),
00419 0);
00420 return implementation;
00421 }
00422
00423
00424 ACE_Asynch_Connect_Impl *
00425 ACE_POSIX_Proactor::create_asynch_connect (void)
00426 {
00427 ACE_Asynch_Connect_Impl *implementation = 0;
00428 ACE_NEW_RETURN (implementation,
00429 ACE_POSIX_Asynch_Connect (this),
00430 0);
00431
00432 return implementation;
00433 }
00434
00435 ACE_Asynch_Connect_Result_Impl *
00436 ACE_POSIX_Proactor::create_asynch_connect_result
00437 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00438 ACE_HANDLE connect_handle,
00439 const void* act,
00440 ACE_HANDLE event,
00441 int priority,
00442 int signal_number)
00443 {
00444 ACE_Asynch_Connect_Result_Impl *implementation;
00445 ACE_NEW_RETURN (implementation,
00446 ACE_POSIX_Asynch_Connect_Result (handler_proxy,
00447 connect_handle,
00448 act,
00449 event,
00450 priority,
00451 signal_number),
00452 0);
00453 return implementation;
00454 }
00455
00456
00457 ACE_Asynch_Transmit_File_Impl *
00458 ACE_POSIX_Proactor::create_asynch_transmit_file (void)
00459 {
00460 ACE_Asynch_Transmit_File_Impl *implementation = 0;
00461 ACE_NEW_RETURN (implementation,
00462 ACE_POSIX_Asynch_Transmit_File (this),
00463 0);
00464 return implementation;
00465 }
00466
00467 ACE_Asynch_Transmit_File_Result_Impl *
00468 ACE_POSIX_Proactor::create_asynch_transmit_file_result
00469 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00470 ACE_HANDLE socket,
00471 ACE_HANDLE file,
00472 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
00473 size_t bytes_to_write,
00474 u_long offset,
00475 u_long offset_high,
00476 size_t bytes_per_send,
00477 u_long flags,
00478 const void *act,
00479 ACE_HANDLE event,
00480 int priority,
00481 int signal_number)
00482 {
00483 ACE_Asynch_Transmit_File_Result_Impl *implementation;
00484 ACE_NEW_RETURN (implementation,
00485 ACE_POSIX_Asynch_Transmit_File_Result (handler_proxy,
00486 socket,
00487 file,
00488 header_and_trailer,
00489 bytes_to_write,
00490 offset,
00491 offset_high,
00492 bytes_per_send,
00493 flags,
00494 act,
00495 event,
00496 priority,
00497 signal_number),
00498 0);
00499 return implementation;
00500 }
00501
00502 ACE_Asynch_Result_Impl *
00503 ACE_POSIX_Proactor::create_asynch_timer
00504 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00505 const void *act,
00506 const ACE_Time_Value &tv,
00507 ACE_HANDLE event,
00508 int priority,
00509 int signal_number)
00510 {
00511 ACE_POSIX_Asynch_Timer *implementation;
00512 ACE_NEW_RETURN (implementation,
00513 ACE_POSIX_Asynch_Timer (handler_proxy,
00514 act,
00515 tv,
00516 event,
00517 priority,
00518 signal_number),
00519 0);
00520 return implementation;
00521 }
00522
00523 #if 0
00524 int
00525 ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
00526 {
00527
00528
00529
00530 ACE_Time_Value timeout (0, 0);
00531 int result = 0;
00532
00533 for (;;)
00534 {
00535 result = this->handle_events (timeout);
00536 if (result != 0 || errno == ETIME)
00537 break;
00538 }
00539
00540
00541
00542 return result == -1 ? -1 : 0;
00543 }
00544
00545 int
00546 ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle,
00547 ACE_Reactor_Mask close_mask)
00548 {
00549 ACE_UNUSED_ARG (close_mask);
00550 ACE_UNUSED_ARG (handle);
00551
00552 return this->close ();
00553 }
00554 #endif
00555
00556 void
00557 ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
00558 size_t bytes_transferred,
00559 const void *,
00560 u_long error)
00561 {
00562 ACE_SEH_TRY
00563 {
00564
00565 asynch_result->complete (bytes_transferred,
00566 error ? 0 : 1,
00567 0,
00568 error);
00569 }
00570 ACE_SEH_FINALLY
00571 {
00572
00573 delete asynch_result;
00574 }
00575 }
00576
00577 int
00578 ACE_POSIX_Proactor::post_wakeup_completions (int how_many)
00579 {
00580 ACE_POSIX_Wakeup_Completion *wakeup_completion = 0;
00581
00582 for (int ci = 0; ci < how_many; ci++)
00583 {
00584 ACE_NEW_RETURN
00585 (wakeup_completion,
00586 ACE_POSIX_Wakeup_Completion (this->wakeup_handler_.proxy ()),
00587 -1);
00588 if (this->post_completion (wakeup_completion) == -1)
00589 return -1;
00590 }
00591
00592 return 0;
00593 }
00594
00595 ACE_POSIX_Proactor::Proactor_Type
00596 ACE_POSIX_Proactor::get_impl_type (void)
00597 {
00598 return PROACTOR_POSIX;
00599 }
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629 class ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
00630 {
00631 public:
00632
00633
00634 ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
00635
00636
00637 virtual ~ACE_AIOCB_Notify_Pipe_Manager (void);
00638
00639
00640 int notify ();
00641
00642
00643
00644 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
00645
00646 private:
00647
00648 ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_;
00649
00650
00651 ACE_Message_Block message_block_;
00652
00653
00654
00655 ACE_Pipe pipe_;
00656
00657
00658 ACE_POSIX_Asynch_Read_Stream read_stream_;
00659
00660
00661 ACE_AIOCB_Notify_Pipe_Manager (void);
00662 };
00663
00664 ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
00665 : posix_aiocb_proactor_ (posix_aiocb_proactor),
00666 message_block_ (sizeof (2)),
00667 read_stream_ (posix_aiocb_proactor)
00668 {
00669
00670 this->pipe_.open ();
00671
00672
00673 ACE::set_flags (this->pipe_.write_handle (), ACE_NONBLOCK);
00674
00675
00676 ACE::clr_flags (this->pipe_.read_handle (), ACE_NONBLOCK);
00677
00678
00679 posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ());
00680
00681
00682 if (this->read_stream_.open (this->proxy (),
00683 this->pipe_.read_handle (),
00684 0,
00685 0)
00686 == -1)
00687 ACE_ERROR ((LM_ERROR,
00688 ACE_TEXT("%N:%l:%p\n"),
00689 ACE_TEXT("ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:")
00690 ACE_TEXT("Open on Read Stream failed")));
00691
00692
00693 if (this->read_stream_.read (this->message_block_,
00694 1,
00695 0,
00696 0)
00697 == -1)
00698 ACE_ERROR ((LM_ERROR,
00699 ACE_TEXT("%N:%l:%p\n"),
00700 ACE_TEXT("ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:")
00701 ACE_TEXT("Read from pipe failed")));
00702 }
00703
00704 ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void)
00705 {
00706
00707 this->read_stream_.cancel ();
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719 ACE_HANDLE h = this->pipe_.write_handle ();
00720 if (h != ACE_INVALID_HANDLE)
00721 ACE_OS::closesocket (h);
00722
00723 h = this->pipe_.read_handle ();
00724 if ( h != ACE_INVALID_HANDLE)
00725 ACE_OS::closesocket (h);
00726
00727 }
00728
00729
00730 int
00731 ACE_AIOCB_Notify_Pipe_Manager::notify ()
00732 {
00733
00734 char char_send = 0;
00735 ssize_t ret_val = ACE::send (this->pipe_.write_handle (),
00736 &char_send,
00737 sizeof (char_send));
00738
00739 if (ret_val < 0)
00740 {
00741 if (errno != EWOULDBLOCK)
00742 #if 0
00743 ACE_ERROR ((LM_ERROR,
00744 ACE_TEXT ("(%P %t):%p\n"),
00745 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify")
00746 ACE_TEXT ("Error:Writing on to notify pipe failed")));
00747 #endif
00748 return -1;
00749 }
00750
00751 return 0;
00752 }
00753
00754 void
00755 ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream
00756 (const ACE_Asynch_Read_Stream::Result & )
00757 {
00758
00759
00760
00761
00762 if (this->message_block_.length () > 0)
00763 this->message_block_.wr_ptr (this->message_block_.rd_ptr ());
00764
00765
00766
00767 if (-1 == this->read_stream_.read (this->message_block_,
00768 1,
00769 0,
00770 0))
00771 ACE_ERROR ((LM_ERROR,
00772 ACE_TEXT ("%N:%l:(%P | %t):%p\n"),
00773 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:")
00774 ACE_TEXT ("Read from pipe failed")));
00775
00776
00777
00778
00779 }
00780
00781
00782 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
00783 : aiocb_notify_pipe_manager_ (0),
00784 aiocb_list_ (0),
00785 result_list_ (0),
00786 aiocb_list_max_size_ (max_aio_operations),
00787 aiocb_list_cur_size_ (0),
00788 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00789 num_deferred_aiocb_ (0),
00790 num_started_aio_ (0)
00791 {
00792
00793 check_max_aio_num ();
00794
00795 this->create_result_aiocb_list ();
00796
00797 this->create_notify_manager ();
00798
00799
00800
00801 this->get_asynch_pseudo_task().start ();
00802
00803 }
00804
00805
00806 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
00807 ACE_POSIX_Proactor::Proactor_Type)
00808 : aiocb_notify_pipe_manager_ (0),
00809 aiocb_list_ (0),
00810 result_list_ (0),
00811 aiocb_list_max_size_ (max_aio_operations),
00812 aiocb_list_cur_size_ (0),
00813 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00814 num_deferred_aiocb_ (0),
00815 num_started_aio_ (0)
00816 {
00817
00818 this->check_max_aio_num ();
00819
00820 this->create_result_aiocb_list ();
00821
00822
00823
00824 }
00825
00826
00827 ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
00828 {
00829 this->close();
00830 }
00831
00832 ACE_POSIX_Proactor::Proactor_Type
00833 ACE_POSIX_AIOCB_Proactor::get_impl_type (void)
00834 {
00835 return PROACTOR_AIOCB;
00836 }
00837
00838
00839 int
00840 ACE_POSIX_AIOCB_Proactor::close (void)
00841 {
00842
00843 this->get_asynch_pseudo_task().stop ();
00844
00845 this->delete_notify_manager ();
00846
00847 this->clear_result_queue ();
00848
00849 return this->delete_result_aiocb_list ();
00850 }
00851
00852 void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
00853 {
00854 notify_pipe_read_handle_ = h;
00855 }
00856
00857 int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void)
00858 {
00859 if (aiocb_list_ != 0)
00860 return 0;
00861
00862 ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
00863
00864 ACE_NEW_RETURN (result_list_,
00865 ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
00866 -1);
00867
00868
00869 for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
00870 {
00871 aiocb_list_[ai] = 0;
00872 result_list_[ai] = 0;
00873 }
00874
00875 return 0;
00876 }
00877
00878 int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
00879 {
00880 if (aiocb_list_ == 0)
00881 return 0;
00882
00883 size_t ai;
00884
00885
00886
00887 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00888 if (this->aiocb_list_[ai] != 0)
00889 this->cancel_aiocb (result_list_[ai]);
00890
00891 int num_pending = 0;
00892
00893 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00894 {
00895 if (this->aiocb_list_[ai] == 0 )
00896 continue;
00897
00898
00899 int error_status = 0;
00900 size_t transfer_count = 0;
00901 int flg_completed = this->get_result_status (result_list_[ai],
00902 error_status,
00903 transfer_count);
00904
00905
00906 if (flg_completed == 0)
00907 {
00908 num_pending++;
00909 #if 0
00910 char * errtxt = ACE_OS::strerror (error_status);
00911 if (errtxt == 0)
00912 errtxt ="?????????";
00913
00914 char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
00915 "WRITE":"READ" ;
00916
00917
00918 ACE_ERROR ((LM_ERROR,
00919 ACE_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
00920 ai,
00921 op,
00922 error_status,
00923 transfer_count,
00924 errtxt));
00925 #endif
00926 }
00927 else
00928 {
00929 delete this->result_list_[ai];
00930 this->result_list_[ai] = 0;
00931 this->aiocb_list_[ai] = 0;
00932 }
00933 }
00934
00935
00936
00937
00938
00939
00940 ACE_DEBUG
00941 ((LM_DEBUG,
00942 ACE_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
00943 ACE_TEXT(" number pending AIO=%d\n"),
00944 num_pending));
00945
00946 delete [] this->aiocb_list_;
00947 this->aiocb_list_ = 0;
00948
00949 delete [] this->result_list_;
00950 this->result_list_ = 0;
00951
00952 return (num_pending == 0 ? 0 : -1);
00953
00954 }
00955
00956 void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
00957 {
00958 long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
00959
00960
00961
00962
00963
00964 if (max_os_aio_num > 0 &&
00965 aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
00966 aiocb_list_max_size_ = max_os_aio_num;
00967
00968 #if defined (HPUX) || defined (__FreeBSD__)
00969
00970
00971
00972
00973 long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
00974 if (max_os_listio_num > 0
00975 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
00976 aiocb_list_max_size_ = max_os_listio_num;
00977 #endif
00978
00979
00980
00981
00982 if (aiocb_list_max_size_ <= 0
00983 || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE)
00984 aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
00985
00986
00987
00988 int max_num_files = ACE::max_handles ();
00989
00990 if (max_num_files > 0
00991 && aiocb_list_max_size_ > (unsigned long) max_num_files)
00992 {
00993 ACE::set_handle_limit (aiocb_list_max_size_);
00994
00995 max_num_files = ACE::max_handles ();
00996 }
00997
00998 if (max_num_files > 0
00999 && aiocb_list_max_size_ > (unsigned long) max_num_files)
01000 aiocb_list_max_size_ = (unsigned long) max_num_files;
01001
01002 ACE_DEBUG ((LM_DEBUG,
01003 "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
01004 aiocb_list_max_size_));
01005
01006 #if defined(__sgi)
01007
01008 ACE_DEBUG((LM_DEBUG,
01009 ACE_TEXT( "SGI IRIX specific: aio_init!\n")));
01010
01011
01012
01013
01014
01015
01016
01017
01018
01019
01020
01021 aioinit_t aioinit;
01022
01023 aioinit.aio_threads = 10;
01024 aioinit.aio_locks = 20;
01025
01026 aioinit.aio_num = aiocb_list_max_size_;
01027 aioinit.aio_usedba = 0;
01028 aioinit.aio_debug = 0;
01029 aioinit.aio_numusers = 100;
01030 aioinit.aio_reserved[0] = 0;
01031 aioinit.aio_reserved[1] = 0;
01032 aioinit.aio_reserved[2] = 0;
01033
01034 aio_sgi_init (&aioinit);
01035
01036 #endif
01037
01038 return;
01039 }
01040
01041 void
01042 ACE_POSIX_AIOCB_Proactor::create_notify_manager (void)
01043 {
01044
01045
01046
01047 if (aiocb_notify_pipe_manager_ == 0)
01048 ACE_NEW (aiocb_notify_pipe_manager_,
01049 ACE_AIOCB_Notify_Pipe_Manager (this));
01050 }
01051
01052 void
01053 ACE_POSIX_AIOCB_Proactor::delete_notify_manager (void)
01054 {
01055
01056
01057
01058 delete aiocb_notify_pipe_manager_;
01059 aiocb_notify_pipe_manager_ = 0;
01060 }
01061
01062 int
01063 ACE_POSIX_AIOCB_Proactor::handle_events (ACE_Time_Value &wait_time)
01064 {
01065
01066 ACE_Countdown_Time countdown (&wait_time);
01067 return this->handle_events_i (wait_time.msec ());
01068 }
01069
01070 int
01071 ACE_POSIX_AIOCB_Proactor::handle_events (void)
01072 {
01073 return this->handle_events_i (ACE_INFINITE);
01074 }
01075
01076 int
01077 ACE_POSIX_AIOCB_Proactor::notify_completion(int sig_num)
01078 {
01079 ACE_UNUSED_ARG (sig_num);
01080
01081 return this->aiocb_notify_pipe_manager_->notify ();
01082 }
01083
01084 int
01085 ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
01086 {
01087 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1));
01088
01089 int ret_val = this->putq_result (result);
01090
01091 return ret_val;
01092 }
01093
01094 int
01095 ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result)
01096 {
01097
01098
01099
01100 if (!result)
01101 return -1;
01102
01103 int sig_num = result->signal_number ();
01104 int ret_val = this->result_queue_.enqueue_tail (result);
01105
01106 if (ret_val == -1)
01107 ACE_ERROR_RETURN ((LM_ERROR,
01108 "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
01109 -1);
01110
01111 this->notify_completion (sig_num);
01112
01113 return 0;
01114 }
01115
01116 ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result (void)
01117 {
01118 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0));
01119
01120
01121 ACE_POSIX_Asynch_Result* result = 0;
01122
01123 if (this->result_queue_.dequeue_head (result) != 0)
01124 return 0;
01125
01126
01127
01128
01129
01130
01131
01132
01133 return result;
01134 }
01135
01136 int ACE_POSIX_AIOCB_Proactor::clear_result_queue (void)
01137 {
01138 int ret_val = 0;
01139 ACE_POSIX_Asynch_Result* result = 0;
01140
01141 while ((result = this->getq_result ()) != 0)
01142 {
01143 delete result;
01144 ret_val++;
01145 }
01146
01147 return ret_val;
01148 }
01149
01150 int ACE_POSIX_AIOCB_Proactor::process_result_queue (void)
01151 {
01152 int ret_val = 0;
01153 ACE_POSIX_Asynch_Result* result = 0;
01154
01155 while ((result = this->getq_result ()) != 0)
01156 {
01157 this->application_specific_code
01158 (result,
01159 result->bytes_transferred(),
01160 0,
01161 result->error());
01162
01163 ret_val++;
01164 }
01165
01166 return ret_val;
01167 }
01168
01169 int
01170 ACE_POSIX_AIOCB_Proactor::handle_events_i (u_long milli_seconds)
01171 {
01172 int result_suspend = 0;
01173 int retval= 0;
01174
01175 if (milli_seconds == ACE_INFINITE)
01176
01177 result_suspend = aio_suspend (aiocb_list_,
01178 aiocb_list_max_size_,
01179 0);
01180 else
01181 {
01182
01183 timespec timeout;
01184 timeout.tv_sec = milli_seconds / 1000;
01185 timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000000;
01186 result_suspend = aio_suspend (aiocb_list_,
01187 aiocb_list_max_size_,
01188 &timeout);
01189 }
01190
01191
01192 if (result_suspend == -1)
01193 {
01194 if (errno != EAGAIN &&
01195 errno != EINTR )
01196 ACE_ERROR ((LM_ERROR,
01197 ACE_TEXT ("%N:%l:(%P|%t)::%p\n"),
01198 ACE_TEXT ("handle_events: aio_suspend failed")));
01199
01200
01201 }
01202 else
01203 {
01204 size_t index = 0;
01205 size_t count = aiocb_list_max_size_;
01206 int error_status = 0;
01207 size_t transfer_count = 0;
01208
01209 for (;; retval++)
01210 {
01211 ACE_POSIX_Asynch_Result *asynch_result =
01212 find_completed_aio (error_status,
01213 transfer_count,
01214 index,
01215 count);
01216
01217 if (asynch_result == 0)
01218 break;
01219
01220
01221 this->application_specific_code (asynch_result,
01222 transfer_count,
01223 0,
01224 error_status);
01225 }
01226 }
01227
01228
01229 retval += this->process_result_queue ();
01230
01231 return retval > 0 ? 1 : 0;
01232 }
01233
01234 int
01235 ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result *asynch_result,
01236 int &error_status,
01237 size_t &transfer_count)
01238 {
01239 transfer_count = 0;
01240
01241
01242
01243
01244 aiocb *aio_ptr (asynch_result);
01245 error_status = aio_error (aio_ptr);
01246 if (error_status == EINPROGRESS)
01247 return 0;
01248
01249 ssize_t op_return = aio_return (aio_ptr);
01250 if (op_return > 0)
01251 transfer_count = static_cast<size_t> (op_return);
01252
01253 return 1;
01254 }
01255
01256 ACE_POSIX_Asynch_Result *
01257 ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
01258 size_t &transfer_count,
01259 size_t &index,
01260 size_t &count)
01261 {
01262
01263
01264
01265 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
01266
01267 ACE_POSIX_Asynch_Result *asynch_result = 0;
01268
01269 if (num_started_aio_ == 0)
01270 return 0;
01271
01272 for (; count > 0; index++ , count--)
01273 {
01274 if (index >= aiocb_list_max_size_)
01275 index = 0;
01276
01277 if (aiocb_list_[index] == 0)
01278 continue;
01279
01280 if (0 != this->get_result_status (result_list_[index],
01281 error_status,
01282 transfer_count))
01283 break;
01284
01285 }
01286
01287 if (count == 0)
01288 return 0;
01289 asynch_result = result_list_[index];
01290
01291 aiocb_list_[index] = 0;
01292 result_list_[index] = 0;
01293 aiocb_list_cur_size_--;
01294
01295 num_started_aio_--;
01296 index++;
01297 count--;
01298
01299 this->start_deferred_aio ();
01300
01301
01302
01303 return asynch_result;
01304 }
01305
01306
01307 int
01308 ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
01309 ACE_POSIX_Proactor::Opcode op)
01310 {
01311 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
01312
01313 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01314
01315 int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
01316
01317 if (result == 0)
01318 return ret_val;
01319
01320
01321 switch (op)
01322 {
01323 case ACE_POSIX_Proactor::ACE_OPCODE_READ:
01324 result->aio_lio_opcode = LIO_READ;
01325 break;
01326
01327 case ACE_POSIX_Proactor::ACE_OPCODE_WRITE:
01328 result->aio_lio_opcode = LIO_WRITE;
01329 break;
01330
01331 default:
01332 ACE_ERROR_RETURN ((LM_ERROR,
01333 ACE_TEXT ("%N:%l:(%P|%t)::")
01334 ACE_TEXT ("start_aio: Invalid op code %d\n"),
01335 op),
01336 -1);
01337 }
01338
01339 if (ret_val != 0)
01340 {
01341 errno = EAGAIN;
01342 return -1;
01343 }
01344
01345
01346
01347 ssize_t slot = allocate_aio_slot (result);
01348
01349 if (slot < 0)
01350 return -1;
01351
01352 size_t index = static_cast<size_t> (slot);
01353
01354 result_list_[index] = result;
01355 aiocb_list_cur_size_++;
01356
01357 ret_val = start_aio_i (result);
01358 switch (ret_val)
01359 {
01360 case 0:
01361 aiocb_list_[index] = result;
01362 return 0;
01363
01364 case 1:
01365 num_deferred_aiocb_ ++;
01366 return 0;
01367
01368 default:
01369 break;
01370 }
01371
01372 result_list_[index] = 0;
01373 aiocb_list_cur_size_--;
01374 return -1;
01375 }
01376
01377 ssize_t
01378 ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
01379 {
01380 size_t i = 0;
01381
01382
01383
01384
01385 if (notify_pipe_read_handle_ == result->aio_fildes)
01386 {
01387 if (result_list_[i] != 0)
01388 {
01389 errno = EAGAIN;
01390 ACE_ERROR_RETURN ((LM_ERROR,
01391 "%N:%l:(%P | %t)::\n"
01392 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01393 "internal Proactor error 0\n"),
01394 -1);
01395 }
01396 }
01397 else
01398 {
01399 for (i= 1; i < this->aiocb_list_max_size_; i++)
01400 if (result_list_[i] == 0)
01401 break;
01402 }
01403
01404 if (i >= this->aiocb_list_max_size_)
01405 ACE_ERROR_RETURN ((LM_ERROR,
01406 "%N:%l:(%P | %t)::\n"
01407 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01408 "internal Proactor error 1\n"),
01409 -1);
01410
01411
01412 result->aio_sigevent.sigev_notify = SIGEV_NONE;
01413
01414 return static_cast<ssize_t> (i);
01415 }
01416
01417
01418
01419
01420
01421
01422 int
01423 ACE_POSIX_AIOCB_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
01424 {
01425 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
01426
01427 int ret_val;
01428 const ACE_TCHAR *ptype = 0;
01429
01430
01431
01432
01433 aiocb * aio_ptr (result);
01434 switch (result->aio_lio_opcode )
01435 {
01436 case LIO_READ :
01437 ptype = ACE_TEXT ("read ");
01438 ret_val = aio_read (aio_ptr);
01439 break;
01440 case LIO_WRITE :
01441 ptype = ACE_TEXT ("write");
01442 ret_val = aio_write (aio_ptr);
01443 break;
01444 default:
01445 ptype = ACE_TEXT ("?????");
01446 ret_val = -1;
01447 break;
01448 }
01449
01450 if (ret_val == 0)
01451 {
01452 ++this->num_started_aio_;
01453 }
01454 else
01455 {
01456 if (errno == EAGAIN || errno == ENOMEM)
01457 ret_val = 1;
01458 else
01459 ACE_ERROR ((LM_ERROR,
01460 ACE_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
01461 ptype,
01462 ACE_TEXT ("queueing failed")));
01463 }
01464
01465 return ret_val;
01466 }
01467
01468
01469 int
01470 ACE_POSIX_AIOCB_Proactor::start_deferred_aio ()
01471 {
01472 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
01473
01474
01475
01476
01477
01478
01479
01480
01481
01482 if (num_deferred_aiocb_ == 0)
01483 return 0;
01484
01485 size_t i = 0;
01486
01487 for (i= 0; i < this->aiocb_list_max_size_; i++)
01488 if (result_list_[i] !=0
01489 && aiocb_list_[i] ==0)
01490 break;
01491
01492 if (i >= this->aiocb_list_max_size_)
01493 ACE_ERROR_RETURN ((LM_ERROR,
01494 "%N:%l:(%P | %t)::\n"
01495 "start_deferred_aio:"
01496 "internal Proactor error 3\n"),
01497 -1);
01498
01499 ACE_POSIX_Asynch_Result *result = result_list_[i];
01500
01501 int ret_val = start_aio_i (result);
01502
01503 switch (ret_val)
01504 {
01505 case 0 :
01506 aiocb_list_[i] = result;
01507 num_deferred_aiocb_ --;
01508 return 0;
01509
01510 case 1 :
01511 return 0;
01512
01513 default :
01514 break;
01515 }
01516
01517
01518
01519 result_list_[i] = 0;
01520 --aiocb_list_cur_size_;
01521
01522 --num_deferred_aiocb_;
01523
01524 result->set_error (errno);
01525 result->set_bytes_transferred (0);
01526 this->putq_result (result);
01527
01528 return -1;
01529 }
01530
01531 int
01532 ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle)
01533 {
01534
01535
01536
01537
01538
01539
01540
01541
01542
01543
01544
01545
01546
01547 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
01548
01549 int num_total = 0;
01550 int num_cancelled = 0;
01551
01552 {
01553 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01554
01555 size_t ai = 0;
01556
01557 for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
01558 {
01559 if (this->result_list_[ai] == 0)
01560 continue;
01561
01562 if (this->result_list_[ai]->aio_fildes != handle)
01563 continue;
01564
01565 ++num_total;
01566
01567 ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai];
01568
01569 if (this->aiocb_list_[ai] == 0)
01570 {
01571 num_cancelled++;
01572 this->num_deferred_aiocb_--;
01573
01574 this->aiocb_list_[ai] = 0;
01575 this->result_list_[ai] = 0;
01576 this->aiocb_list_cur_size_--;
01577
01578 asynch_result->set_error (ECANCELED);
01579 asynch_result->set_bytes_transferred (0);
01580 this->putq_result (asynch_result);
01581
01582 }
01583 else
01584 {
01585 int rc_cancel = this->cancel_aiocb (asynch_result);
01586
01587 if (rc_cancel == 0)
01588 num_cancelled++;
01589 }
01590 }
01591
01592 }
01593
01594 if (num_total == 0)
01595 return 1;
01596
01597 if (num_cancelled == num_total)
01598 return 0;
01599
01600 return 2;
01601 }
01602
01603 int
01604 ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result)
01605 {
01606
01607
01608 int rc = ::aio_cancel (0, result);
01609
01610
01611 if (rc == AIO_CANCELED)
01612 return 0;
01613 else if (rc == AIO_ALLDONE)
01614 return 1;
01615 else
01616 return 2;
01617 }
01618
01619
01620
01621
01622 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
01623
01624 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations)
01625 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
01626 ACE_POSIX_Proactor::PROACTOR_SIG)
01627 {
01628
01629
01630
01631
01632
01633 ACE_OS::sigemptyset (&this->RT_completion_signals_);
01634
01635
01636 if (ACE_OS::sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1)
01637 ACE_ERROR ((LM_ERROR, ACE_TEXT ("ACE_POSIX_SIG_Proactor: %p\n"),
01638 ACE_TEXT ("sigaddset")));
01639 this->block_signals ();
01640
01641 this->setup_signal_handler (ACE_SIGRTMIN);
01642
01643
01644
01645
01646
01647 this->get_asynch_pseudo_task().start ();
01648 return;
01649 }
01650
01651 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set,
01652 size_t max_aio_operations)
01653 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
01654 ACE_POSIX_Proactor::PROACTOR_SIG)
01655 {
01656
01657
01658
01659
01660
01661
01662 if (sigemptyset (&this->RT_completion_signals_) == -1)
01663 ACE_ERROR ((LM_ERROR,
01664 ACE_TEXT("Error:(%P | %t):%p\n"),
01665 ACE_TEXT("sigemptyset failed")));
01666
01667
01668
01669
01670 int member = 0;
01671 for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
01672 {
01673 member = sigismember (&signal_set,
01674 si);
01675 if (member == -1)
01676 ACE_ERROR ((LM_ERROR,
01677 ACE_TEXT("%N:%l:(%P | %t)::%p\n"),
01678 ACE_TEXT("ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:")
01679 ACE_TEXT("sigismember failed")));
01680 else if (member == 1)
01681 {
01682 sigaddset (&this->RT_completion_signals_, si);
01683 this->setup_signal_handler (si);
01684 }
01685 }
01686
01687
01688 this->block_signals ();
01689
01690
01691
01692
01693
01694 this->get_asynch_pseudo_task().start ();
01695 return;
01696 }
01697
01698 ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
01699 {
01700 this->close ();
01701
01702
01703 }
01704
01705 ACE_POSIX_Proactor::Proactor_Type
01706 ACE_POSIX_SIG_Proactor::get_impl_type (void)
01707 {
01708 return PROACTOR_SIG;
01709 }
01710
01711 int
01712 ACE_POSIX_SIG_Proactor::handle_events (ACE_Time_Value &wait_time)
01713 {
01714
01715 ACE_Countdown_Time countdown (&wait_time);
01716 return this->handle_events_i (&wait_time);
01717 }
01718
01719 int
01720 ACE_POSIX_SIG_Proactor::handle_events (void)
01721 {
01722 return this->handle_events_i (0);
01723 }
01724
01725 int
01726 ACE_POSIX_SIG_Proactor::notify_completion (int sig_num)
01727 {
01728
01729 pid_t const pid = ACE_OS::getpid ();
01730 if (pid == (pid_t) -1)
01731 ACE_ERROR_RETURN ((LM_ERROR,
01732 ACE_TEXT("Error:%N:%l(%P | %t):%p"),
01733 ACE_TEXT("<getpid> failed")),
01734 -1);
01735
01736
01737 sigval value;
01738 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
01739 value.sigval_int = -1;
01740 #else
01741 value.sival_int = -1;
01742 #endif
01743
01744
01745 if (sigqueue (pid, sig_num, value) == 0)
01746 return 0;
01747
01748 if (errno != EAGAIN)
01749 ACE_ERROR_RETURN ((LM_ERROR,
01750 ACE_TEXT("Error:%N:%l:(%P | %t):%p\n"),
01751 ACE_TEXT("<sigqueue> failed")),
01752 -1);
01753 return -1;
01754 }
01755
01756 ACE_Asynch_Result_Impl *
01757 ACE_POSIX_SIG_Proactor::create_asynch_timer
01758 (const ACE_Handler::Proxy_Ptr &handler_proxy,
01759 const void *act,
01760 const ACE_Time_Value &tv,
01761 ACE_HANDLE event,
01762 int priority,
01763 int signal_number)
01764 {
01765 int is_member = 0;
01766
01767
01768 if (signal_number == -1)
01769 {
01770 int si;
01771 for (si = ACE_SIGRTMAX;
01772 (is_member == 0) && (si >= ACE_SIGRTMIN);
01773 si--)
01774 {
01775 is_member = sigismember (&this->RT_completion_signals_,
01776 si);
01777 if (is_member == -1)
01778 ACE_ERROR_RETURN ((LM_ERROR,
01779 "%N:%l:(%P | %t)::%s\n",
01780 "ACE_POSIX_SIG_Proactor::create_asynch_timer:"
01781 "sigismember failed"),
01782 0);
01783 }
01784
01785 if (is_member == 0)
01786 ACE_ERROR_RETURN ((LM_ERROR,
01787 "Error:%N:%l:(%P | %t)::%s\n",
01788 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
01789 "Signal mask set empty"),
01790 0);
01791 else
01792
01793 signal_number = si + 1;
01794 }
01795
01796 ACE_Asynch_Result_Impl *implementation;
01797 ACE_NEW_RETURN (implementation,
01798 ACE_POSIX_Asynch_Timer (handler_proxy,
01799 act,
01800 tv,
01801 event,
01802 priority,
01803 signal_number),
01804 0);
01805 return implementation;
01806 }
01807
01808 #if 0
01809 static void
01810 sig_handler (int sig_num, siginfo_t *, ucontext_t *)
01811 {
01812
01813 ACE_DEBUG ((LM_DEBUG,
01814 "%N:%l:(%P | %t)::sig_handler received signal: %d\n",
01815 sig_num));
01816 }
01817 #endif
01818
01819 int
01820 ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const
01821 {
01822
01823
01824
01825
01826
01827
01828
01829
01830
01831
01832 #if 0
01833 struct sigaction reaction;
01834 sigemptyset (&reaction.sa_mask);
01835 reaction.sa_flags = SA_SIGINFO;
01836 reaction.sa_sigaction = ACE_SIGNAL_C_FUNC (sig_handler);
01837 int sigaction_return = ACE_OS::sigaction (signal_number,
01838 &reaction,
01839 0);
01840 if (sigaction_return == -1)
01841 ACE_ERROR_RETURN ((LM_ERROR,
01842 ACE_TEXT("Error:%p\n"),
01843 ACE_TEXT("Proactor couldnt do sigaction for the RT SIGNAL")),
01844 -1);
01845 #else
01846 ACE_UNUSED_ARG(signal_number);
01847 #endif
01848 return 0;
01849 }
01850
01851
01852 int
01853 ACE_POSIX_SIG_Proactor::block_signals (void) const
01854 {
01855 return ACE_OS::pthread_sigmask (SIG_BLOCK, &this->RT_completion_signals_, 0);
01856 }
01857
01858 ssize_t
01859 ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
01860 {
01861 size_t i = 0;
01862
01863
01864 for (i = 0; i < this->aiocb_list_max_size_; i++)
01865 if (result_list_[i] == 0)
01866 break;
01867
01868 if (i >= this->aiocb_list_max_size_)
01869 ACE_ERROR_RETURN ((LM_ERROR,
01870 "%N:%l:(%P | %t)::\n"
01871 "ACE_POSIX_SIG_Proactor::allocate_aio_slot "
01872 "internal Proactor error 1\n"),
01873 -1);
01874
01875
01876
01877 result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
01878 result->aio_sigevent.sigev_signo = result->signal_number ();
01879 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
01880 result->aio_sigevent.sigev_value.sigval_int = static_cast<int> (i);
01881 #else
01882 result->aio_sigevent.sigev_value.sival_int = static_cast<int> (i);
01883 #endif
01884
01885 return static_cast<ssize_t> (i);
01886 }
01887
01888 int
01889 ACE_POSIX_SIG_Proactor::handle_events_i (const ACE_Time_Value *timeout)
01890 {
01891 int result_sigwait = 0;
01892 siginfo_t sig_info;
01893
01894 do
01895 {
01896
01897 if (timeout == 0)
01898 {
01899 result_sigwait = ACE_OS::sigwaitinfo (&this->RT_completion_signals_,
01900 &sig_info);
01901 }
01902 else
01903 {
01904 result_sigwait = ACE_OS::sigtimedwait (&this->RT_completion_signals_,
01905 &sig_info,
01906 timeout);
01907 if (result_sigwait == -1 && errno == EAGAIN)
01908 return 0;
01909 }
01910 }
01911 while (result_sigwait == -1 && errno == EINTR);
01912
01913 if (result_sigwait == -1)
01914 return -1;
01915
01916
01917
01918
01919 int flg_aio = 0;
01920
01921 size_t index = 0;
01922 size_t count = 1;
01923 int error_status = 0;
01924 size_t transfer_count = 0;
01925
01926 if (sig_info.si_code == SI_ASYNCIO || this->os_id_ == ACE_OS_SUN_56)
01927 {
01928 flg_aio = 1;
01929
01930
01931 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
01932 index = static_cast<size_t> (sig_info.si_value.sigval_int);
01933 #else
01934 index = static_cast<size_t> (sig_info.si_value.sival_int);
01935 #endif
01936
01937
01938
01939
01940 if (os_id_ == ACE_OS_SUN_56)
01941 {
01942
01943
01944
01945
01946
01947
01948
01949 count = aiocb_list_max_size_;
01950 }
01951 }
01952 else if (sig_info.si_code != SI_QUEUE)
01953 {
01954
01955
01956
01957
01958 ACE_ERROR ((LM_DEBUG,
01959 ACE_TEXT ("%N:%l:(%P | %t): ")
01960 ACE_TEXT ("ACE_POSIX_SIG_Proactor::handle_events: ")
01961 ACE_TEXT ("Unexpected signal code (%d) returned ")
01962 ACE_TEXT ("from sigwait; expecting %d\n"),
01963 result_sigwait, sig_info.si_code));
01964 flg_aio = 1;
01965 }
01966
01967 int ret_aio = 0;
01968 int ret_que = 0;
01969
01970 if (flg_aio)
01971 for (;; ret_aio++)
01972 {
01973 ACE_POSIX_Asynch_Result *asynch_result =
01974 find_completed_aio (error_status,
01975 transfer_count,
01976 index,
01977 count);
01978
01979 if (asynch_result == 0)
01980 break;
01981
01982
01983 this->application_specific_code (asynch_result,
01984 transfer_count,
01985 0,
01986 error_status);
01987 }
01988
01989
01990 ret_que = this->process_result_queue ();
01991
01992
01993
01994 #if 0
01995 ACE_DEBUG ((LM_DEBUG,
01996 "(%t) NumAIO=%d NumQueue=%d\n",
01997 ret_aio, ret_que));
01998 #endif
01999
02000 return ret_aio + ret_que > 0 ? 1 : 0;
02001 }
02002
02003 #endif
02004
02005
02006
02007 ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer
02008 (const ACE_Handler::Proxy_Ptr &handler_proxy,
02009 const void *act,
02010 const ACE_Time_Value &tv,
02011 ACE_HANDLE event,
02012 int priority,
02013 int signal_number)
02014 : ACE_POSIX_Asynch_Result
02015 (handler_proxy, act, event, 0, 0, priority, signal_number),
02016 time_ (tv)
02017 {
02018 }
02019
02020 void
02021 ACE_POSIX_Asynch_Timer::complete (size_t ,
02022 int ,
02023 const void * ,
02024 u_long )
02025 {
02026 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
02027 if (handler != 0)
02028 handler->handle_time_out (this->time_, this->act ());
02029 }
02030
02031
02032
02033
02034 ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion
02035 (const ACE_Handler::Proxy_Ptr &handler_proxy,
02036 const void *act,
02037 ACE_HANDLE event,
02038 int priority,
02039 int signal_number)
02040 : ACE_Asynch_Result_Impl (),
02041 ACE_POSIX_Asynch_Result (handler_proxy,
02042 act,
02043 event,
02044 0,
02045 0,
02046 priority,
02047 signal_number)
02048 {
02049 }
02050
02051 ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void)
02052 {
02053 }
02054
02055 void
02056 ACE_POSIX_Wakeup_Completion::complete (size_t ,
02057 int ,
02058 const void * ,
02059 u_long )
02060 {
02061
02062 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
02063 if (handler != 0)
02064 handler->handle_wakeup ();
02065 }
02066
02067 ACE_END_VERSIONED_NAMESPACE_DECL
02068
02069 #endif