#include <POSIX_Proactor.h>
Inheritance diagram for ACE_POSIX_AIOCB_Proactor:
Public Member Functions | |
ACE_POSIX_AIOCB_Proactor (size_t nmaxop=ACE_AIO_DEFAULT_SIZE) | |
virtual Proactor_Type | get_impl_type (void) |
virtual | ~ACE_POSIX_AIOCB_Proactor (void) |
Destructor. | |
virtual int | close (void) |
Close down the Proactor. | |
virtual int | handle_events (ACE_Time_Value &wait_time) |
virtual int | handle_events (void) |
virtual int | post_completion (ACE_POSIX_Asynch_Result *result) |
Post a result to the completion port of the Proactor. | |
virtual int | start_aio (ACE_POSIX_Asynch_Result *result, ACE_POSIX_Proactor::Opcode op) |
virtual int | cancel_aio (ACE_HANDLE h) |
Protected Member Functions | |
ACE_POSIX_AIOCB_Proactor (size_t nmaxop, ACE_POSIX_Proactor::Proactor_Type ptype) | |
virtual int | get_result_status (ACE_POSIX_Asynch_Result *asynch_result, int &error_status, size_t &transfer_count) |
int | create_result_aiocb_list (void) |
Create aiocb list. | |
int | delete_result_aiocb_list (void) |
void | create_notify_manager (void) |
void | delete_notify_manager (void) |
void | check_max_aio_num (void) |
void | set_notify_handle (ACE_HANDLE h) |
To identify requests from Notify_Pipe_Manager. | |
int | handle_events_i (u_long milli_seconds) |
int | start_deferred_aio (void) |
Start deferred AIO if necessary. | |
virtual int | cancel_aiocb (ACE_POSIX_Asynch_Result *result) |
Cancel running or deferred AIO. | |
ACE_POSIX_Asynch_Result * | find_completed_aio (int &error_status, size_t &transfer_count, size_t &index, size_t &count) |
Extract the results of aio. | |
virtual ssize_t | allocate_aio_slot (ACE_POSIX_Asynch_Result *result) |
Find free slot to store result and aiocb pointer. | |
virtual int | start_aio_i (ACE_POSIX_Asynch_Result *result) |
Initiate an aio operation. | |
virtual int | notify_completion (int sig_num) |
int | putq_result (ACE_POSIX_Asynch_Result *result) |
Put "post_completed" result into the internal queue. | |
ACE_POSIX_Asynch_Result * | getq_result (void) |
Get "post_completed" result from the internal queue. | |
int | clear_result_queue (void) |
Clear the internal results queue. | |
int | process_result_queue (void) |
Process the internal results queue. | |
Protected Attributes | |
ACE_AIOCB_Notify_Pipe_Manager * | aiocb_notify_pipe_manager_ |
aiocb ** | aiocb_list_ |
ACE_POSIX_Asynch_Result ** | result_list_ |
size_t | aiocb_list_max_size_ |
To maintain the maximum size of the array (list). | |
size_t | aiocb_list_cur_size_ |
To maintain the current size of the array (list). | |
ACE_SYNCH_MUTEX | mutex_ |
Mutex to protect work with lists. | |
ACE_HANDLE | notify_pipe_read_handle_ |
size_t | num_deferred_aiocb_ |
size_t | num_started_aio_ |
Number active,i.e. running requests. | |
ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * > | result_queue_ |
Queue which keeps "post_completed" ACE_POSIX_Asynch_Result's. | |
Friends | |
class | ACE_AIOCB_Notify_Pipe_Manager |
Handler needs to call application specific code. | |
class | ACE_POSIX_Asynch_Operation |
class | ACE_POSIX_Asynch_Accept |
class | ACE_POSIX_Asynch_Connect |
Definition at line 327 of file POSIX_Proactor.h.
|
Constructor defines max number asynchronous operations which can be started at the same time Definition at line 782 of file POSIX_Proactor.cpp. References check_max_aio_num(), create_notify_manager(), create_result_aiocb_list(), ACE_POSIX_Proactor::get_asynch_pseudo_task(), and ACE_Asynch_Pseudo_Task::start().
00783 : aiocb_notify_pipe_manager_ (0), 00784 aiocb_list_ (0), 00785 result_list_ (0), 00786 aiocb_list_max_size_ (max_aio_operations), 00787 aiocb_list_cur_size_ (0), 00788 notify_pipe_read_handle_ (ACE_INVALID_HANDLE), 00789 num_deferred_aiocb_ (0), 00790 num_started_aio_ (0) 00791 { 00792 // Check for correct value for max_aio_operations 00793 check_max_aio_num (); 00794 00795 this->create_result_aiocb_list (); 00796 00797 this->create_notify_manager (); 00798 00799 // start pseudo-asynchronous accept task 00800 // one per all future acceptors 00801 this->get_asynch_pseudo_task().start (); 00802 00803 } |
|
Destructor.
Definition at line 827 of file POSIX_Proactor.cpp. References close().
00828 { 00829 this->close(); 00830 } |
|
Special constructor for ACE_SUN_Proactor and ACE_POSIX_SIG_Proactor Definition at line 806 of file POSIX_Proactor.cpp. References check_max_aio_num(), and create_result_aiocb_list().
00808 : aiocb_notify_pipe_manager_ (0), 00809 aiocb_list_ (0), 00810 result_list_ (0), 00811 aiocb_list_max_size_ (max_aio_operations), 00812 aiocb_list_cur_size_ (0), 00813 notify_pipe_read_handle_ (ACE_INVALID_HANDLE), 00814 num_deferred_aiocb_ (0), 00815 num_started_aio_ (0) 00816 { 00817 //check for correct value for max_aio_operations 00818 this->check_max_aio_num (); 00819 00820 this->create_result_aiocb_list (); 00821 00822 // @@ We should create Notify_Pipe_Manager in the derived class to 00823 // provide correct calls for virtual functions !!! 00824 } |
|
Find free slot to store result and aiocb pointer.
Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 1376 of file POSIX_Proactor.cpp. References ACE_ERROR_RETURN, aiocb_list_max_size_, LM_ERROR, notify_pipe_read_handle_, and result_list_. Referenced by ACE_POSIX_CB_Proactor::allocate_aio_slot(), and start_aio().
01377 { 01378 size_t i = 0; 01379 01380 // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager 01381 // so make check for ACE_AIOCB_Notify_Pipe_Manager request 01382 01383 if (notify_pipe_read_handle_ == result->aio_fildes) // Notify_Pipe ? 01384 { // should be free, 01385 if (result_list_[i] != 0) // only 1 request 01386 { // is allowed 01387 errno = EAGAIN; 01388 ACE_ERROR_RETURN ((LM_ERROR, 01389 "%N:%l:(%P | %t)::\n" 01390 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:" 01391 "internal Proactor error 0\n"), 01392 -1); 01393 } 01394 } 01395 else //try to find free slot as usual, but starting from 1 01396 { 01397 for (i= 1; i < this->aiocb_list_max_size_; i++) 01398 if (result_list_[i] == 0) 01399 break; 01400 } 01401 01402 if (i >= this->aiocb_list_max_size_) 01403 ACE_ERROR_RETURN ((LM_ERROR, 01404 "%N:%l:(%P | %t)::\n" 01405 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:" 01406 "internal Proactor error 1\n"), 01407 -1); 01408 01409 //setup OS notification methods for this aio 01410 result->aio_sigevent.sigev_notify = SIGEV_NONE; 01411 01412 return static_cast<ssize_t> (i); 01413 } |
|
This method should be called from ACE_POSIX_Asynch_Operation::cancel() instead of usual ::aio_cancel. For all deferred AIO requests with handle "h" it removes its from the lists and notifies user. For all running AIO requests with handle "h" it calls ::aio_cancel. According to the POSIX standards we will receive ECANCELED for all ::aio_canceled AIO requests later on return from ::aio_suspend Implements ACE_POSIX_Proactor. Definition at line 1526 of file POSIX_Proactor.cpp. References ACE_GUARD_RETURN, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, cancel_aiocb(), num_deferred_aiocb_, putq_result(), result_list_, ACE_POSIX_Asynch_Result::set_bytes_transferred(), and ACE_POSIX_Asynch_Result::set_error().
01527 { 01528 // This new method should be called from 01529 // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel 01530 // It scans the result_list_ and defines all AIO requests 01531 // that were issued for handle "handle" 01532 // 01533 // For all deferred AIO requests with handle "handle" 01534 // it removes its from the lists and notifies user 01535 // 01536 // For all running AIO requests with handle "handle" 01537 // it calls ::aio_cancel. According to the POSIX standards 01538 // we will receive ECANCELED for all ::aio_canceled AIO requests 01539 // later on return from ::aio_suspend 01540 01541 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio"); 01542 01543 int num_total = 0; 01544 int num_cancelled = 0; 01545 01546 { 01547 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1)); 01548 01549 size_t ai = 0; 01550 01551 for (ai = 0; ai < this->aiocb_list_max_size_; ai++) 01552 { 01553 if (this->result_list_[ai] == 0) // Skip empty slot 01554 continue; 01555 01556 if (this->result_list_[ai]->aio_fildes != handle) // Not ours 01557 continue; 01558 01559 num_total++; 01560 01561 ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai]; 01562 01563 if (this->aiocb_list_[ai] == 0) // Canceling a deferred operation 01564 { 01565 num_cancelled++; 01566 this->num_deferred_aiocb_--; 01567 01568 this->aiocb_list_[ai] = 0; 01569 this->result_list_[ai] = 0; 01570 this->aiocb_list_cur_size_--; 01571 01572 asynch_result->set_error (ECANCELED); 01573 asynch_result->set_bytes_transferred (0); 01574 this->putq_result (asynch_result); 01575 // we are with locked mutex_ here ! 01576 } 01577 else // Cancel started aio 01578 { 01579 int rc_cancel = this->cancel_aiocb (asynch_result); 01580 01581 if (rc_cancel == 0) //notification in the future 01582 num_cancelled++; //it is OS responsiblity 01583 } 01584 } 01585 01586 } // release mutex_ 01587 01588 if (num_total == 0) 01589 return 1; // ALLDONE 01590 01591 if (num_cancelled == num_total) 01592 return 0; // CANCELLED 01593 01594 return 2; // NOT CANCELLED 01595 } |
|
Cancel running or deferred AIO.
Definition at line 1598 of file POSIX_Proactor.cpp. Referenced by cancel_aio(), and delete_result_aiocb_list().
01599 { 01600 // This method is called from cancel_aio 01601 // to cancel a previously submitted AIO request 01602 int rc = ::aio_cancel (0, result); 01603 01604 // Check the return value and return 0/1/2 appropriately. 01605 if (rc == AIO_CANCELED) 01606 return 0; 01607 else if (rc == AIO_ALLDONE) 01608 return 1; 01609 else // (rc == AIO_NOTCANCELED) 01610 return 2; 01611 } |
|
Define the maximum number of asynchronous I/O requests for the current OS Definition at line 956 of file POSIX_Proactor.cpp. References ACE_AIO_MAX_SIZE, ACE_DEBUG, ACE_LIB_TEXT, aiocb_list_max_size_, LM_DEBUG, ACE::max_handles(), ACE::set_handle_limit(), and ACE_OS::sysconf(). Referenced by ACE_POSIX_AIOCB_Proactor().
00957 { 00958 long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX); 00959 00960 // Define max limit AIO's for concrete OS 00961 // -1 means that there is no limit, but it is not true 00962 // (example, SunOS 5.6) 00963 00964 if (max_os_aio_num > 0 && 00965 aiocb_list_max_size_ > (unsigned long) max_os_aio_num) 00966 aiocb_list_max_size_ = max_os_aio_num; 00967 00968 #if defined (HPUX) || defined (__FreeBSD__) 00969 // Although HPUX 11.00 allows to start 2048 AIO's for all process in 00970 // system it has a limit 256 max elements for aio_suspend () It is a 00971 // pity, but ... 00972 00973 long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX); 00974 if (max_os_listio_num > 0 00975 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num) 00976 aiocb_list_max_size_ = max_os_listio_num; 00977 #endif /* HPUX || __FreeBSD__ */ 00978 00979 // check for user-defined value 00980 // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h 00981 00982 if (aiocb_list_max_size_ <= 0 00983 || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE) 00984 aiocb_list_max_size_ = ACE_AIO_MAX_SIZE; 00985 00986 // check for max number files to open 00987 00988 int max_num_files = ACE::max_handles (); 00989 00990 if (max_num_files > 0 00991 && aiocb_list_max_size_ > (unsigned long) max_num_files) 00992 { 00993 ACE::set_handle_limit (aiocb_list_max_size_); 00994 00995 max_num_files = ACE::max_handles (); 00996 } 00997 00998 if (max_num_files > 0 00999 && aiocb_list_max_size_ > (unsigned long) max_num_files) 01000 aiocb_list_max_size_ = (unsigned long) max_num_files; 01001 01002 ACE_DEBUG ((LM_DEBUG, 01003 "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n", 01004 aiocb_list_max_size_)); 01005 01006 #if defined(__sgi) 01007 01008 ACE_DEBUG((LM_DEBUG, 01009 ACE_LIB_TEXT( "SGI IRIX specific: aio_init!\n"))); 01010 01011 //typedef struct aioinit { 01012 // int aio_threads; /* The number of aio threads to start (5) */ 01013 // int aio_locks; /* Initial number of preallocated locks (3) */ 01014 // int aio_num; /* estimated total simultanious aiobc structs (1000) */ 01015 // int aio_usedba; /* Try to use DBA for raw I/O in lio_listio (0) */ 01016 // int aio_debug; /* turn on debugging (0) */ 01017 // int aio_numusers; /* max number of user sprocs making aio_* calls (5) */ 01018 // int aio_reserved[3]; 01019 //} aioinit_t; 01020 01021 aioinit_t aioinit; 01022 01023 aioinit.aio_threads = 10; /* The number of aio threads to start (5) */ 01024 aioinit.aio_locks = 20; /* Initial number of preallocated locks (3) */ 01025 /* estimated total simultaneous aiobc structs (1000) */ 01026 aioinit.aio_num = aiocb_list_max_size_; 01027 aioinit.aio_usedba = 0; /* Try to use DBA for raw IO in lio_listio (0) */ 01028 aioinit.aio_debug = 0; /* turn on debugging (0) */ 01029 aioinit.aio_numusers = 100; /* max number of user sprocs making aio_* calls (5) */ 01030 aioinit.aio_reserved[0] = 0; 01031 aioinit.aio_reserved[1] = 0; 01032 aioinit.aio_reserved[2] = 0; 01033 01034 aio_sgi_init (&aioinit); 01035 01036 #endif 01037 01038 return; 01039 } |
|
Clear the internal results queue.
Definition at line 1136 of file POSIX_Proactor.cpp. References getq_result(). Referenced by close().
01137 { 01138 int ret_val = 0; 01139 ACE_POSIX_Asynch_Result* result = 0; 01140 01141 while ((result = this->getq_result ()) != 0) 01142 { 01143 delete result; 01144 ret_val++; 01145 } 01146 01147 return ret_val; 01148 } |
|
Close down the Proactor.
Reimplemented from ACE_POSIX_Proactor. Definition at line 840 of file POSIX_Proactor.cpp. References clear_result_queue(), delete_notify_manager(), delete_result_aiocb_list(), ACE_POSIX_Proactor::get_asynch_pseudo_task(), and ACE_Asynch_Pseudo_Task::stop(). Referenced by ~ACE_POSIX_AIOCB_Proactor(), and ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor().
00841 { 00842 // stop asynch accept task 00843 this->get_asynch_pseudo_task().stop (); 00844 00845 this->delete_notify_manager (); 00846 00847 this->clear_result_queue (); 00848 00849 return this->delete_result_aiocb_list (); 00850 } |
|
Call these methods from derived class when virtual table is built. Definition at line 1042 of file POSIX_Proactor.cpp. References ACE_NEW, and aiocb_notify_pipe_manager_. Referenced by ACE_POSIX_AIOCB_Proactor().
01043 { 01044 // Remember! this issues a Asynch_Read 01045 // on the notify pipe for doing the Asynch_Accept/Connect. 01046 01047 if (aiocb_notify_pipe_manager_ == 0) 01048 ACE_NEW (aiocb_notify_pipe_manager_, 01049 ACE_AIOCB_Notify_Pipe_Manager (this)); 01050 } |
|
Create aiocb list.
Definition at line 857 of file POSIX_Proactor.cpp. References ACE_NEW_RETURN, aiocb_list_, aiocb_list_max_size_, and result_list_. Referenced by ACE_POSIX_AIOCB_Proactor().
00858 { 00859 if (aiocb_list_ != 0) 00860 return 0; 00861 00862 ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1); 00863 00864 ACE_NEW_RETURN (result_list_, 00865 ACE_POSIX_Asynch_Result *[aiocb_list_max_size_], 00866 -1); 00867 00868 // Initialize the array. 00869 for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) 00870 { 00871 aiocb_list_[ai] = 0; 00872 result_list_[ai] = 0; 00873 } 00874 00875 return 0; 00876 } |
|
Definition at line 1053 of file POSIX_Proactor.cpp. References aiocb_notify_pipe_manager_. Referenced by close().
01054 { 01055 // We are responsible for delete as all pointers set to 0 after 01056 // delete, it is save to delete twice 01057 01058 delete aiocb_notify_pipe_manager_; 01059 aiocb_notify_pipe_manager_ = 0; 01060 } |
|
Call this method from derived class when virtual table is built. Definition at line 878 of file POSIX_Proactor.cpp. References ACE_DEBUG, ACE_ERROR, ACE_LIB_TEXT, aiocb_list_, aiocb_list_max_size_, cancel_aiocb(), get_result_status(), LM_DEBUG, LM_ERROR, result_list_, and ACE_OS::strerror(). Referenced by close().
00879 { 00880 if (aiocb_list_ == 0) // already deleted 00881 return 0; 00882 00883 size_t ai; 00884 00885 // Try to cancel all uncomlpeted operarion POSIX systems may have 00886 // hidden system threads that still can work with our aiocb's! 00887 for (ai = 0; ai < aiocb_list_max_size_; ai++) 00888 if (this->aiocb_list_[ai] != 0) // active operation 00889 this->cancel_aiocb (result_list_[ai]); 00890 00891 int num_pending = 0; 00892 00893 for (ai = 0; ai < aiocb_list_max_size_; ai++) 00894 { 00895 if (this->aiocb_list_[ai] == 0 ) // not active operation 00896 continue; 00897 00898 // Get the error and return status of the aio_ operation. 00899 int error_status = 0; 00900 size_t transfer_count = 0; 00901 int flg_completed = this->get_result_status (result_list_[ai], 00902 error_status, 00903 transfer_count); 00904 00905 //don't delete uncompleted AIOCB's 00906 if (flg_completed == 0) // not completed !!! 00907 { 00908 num_pending++; 00909 #if 0 00910 char * errtxt = ACE_OS::strerror (error_status); 00911 if (errtxt == 0) 00912 errtxt ="?????????"; 00913 00914 char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )? 00915 "WRITE":"READ" ; 00916 00917 00918 ACE_ERROR ((LM_ERROR, 00919 ACE_LIB_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"), 00920 ai, 00921 op, 00922 error_status, 00923 transfer_count, 00924 errtxt)); 00925 #endif /* 0 */ 00926 } 00927 else // completed , OK 00928 { 00929 delete this->result_list_[ai]; 00930 this->result_list_[ai] = 0; 00931 this->aiocb_list_[ai] = 0; 00932 } 00933 } 00934 00935 // If it is not possible cancel some operation (num_pending > 0 ), 00936 // we can do only one thing -report about this 00937 // and complain about POSIX implementation. 00938 // We know that we have memory leaks, but it is better than 00939 // segmentation fault! 00940 ACE_DEBUG 00941 ((LM_DEBUG, 00942 ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n") 00943 ACE_LIB_TEXT(" number pending AIO=%d\n"), 00944 num_pending)); 00945 00946 delete [] this->aiocb_list_; 00947 this->aiocb_list_ = 0; 00948 00949 delete [] this->result_list_; 00950 this->result_list_ = 0; 00951 00952 return (num_pending == 0 ? 0 : -1); 00953 // ?? or just always return 0; 00954 } |
|
Extract the results of aio.
Definition at line 1256 of file POSIX_Proactor.cpp. References ACE_GUARD_RETURN, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, get_result_status(), num_started_aio_, result_list_, and start_deferred_aio(). Referenced by handle_events_i(), and ACE_POSIX_CB_Proactor::handle_events_i().
01260 { 01261 // parameter index defines initial slot to scan 01262 // parameter count tells us how many slots should we scan 01263 01264 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0)); 01265 01266 ACE_POSIX_Asynch_Result *asynch_result = 0; 01267 01268 if (num_started_aio_ == 0) // save time 01269 return 0; 01270 01271 for (; count > 0; index++ , count--) 01272 { 01273 if (index >= aiocb_list_max_size_) // like a wheel 01274 index = 0; 01275 01276 if (aiocb_list_[index] == 0) // Dont process null blocks. 01277 continue; 01278 01279 if (0 != this->get_result_status (result_list_[index], 01280 error_status, 01281 transfer_count)) // completed 01282 break; 01283 01284 } // end for 01285 01286 if (count == 0) // all processed , nothing found 01287 return 0; 01288 asynch_result = result_list_[index]; 01289 01290 aiocb_list_[index] = 0; 01291 result_list_[index] = 0; 01292 aiocb_list_cur_size_--; 01293 01294 num_started_aio_--; // decrement count active aios 01295 index++; // for next iteration 01296 count--; // for next iteration 01297 01298 this->start_deferred_aio (); 01299 //make attempt to start deferred AIO 01300 //It is safe as we are protected by mutex_ 01301 01302 return asynch_result; 01303 } |
|
Reimplemented from ACE_POSIX_Proactor. Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 833 of file POSIX_Proactor.cpp.
00834 { 00835 return PROACTOR_AIOCB; 00836 } |
|
Check AIO for completion, error and result status Return: 1 - AIO completed , 0 - not completed yet Definition at line 1237 of file POSIX_Proactor.cpp. References EINPROGRESS, and ssize_t. Referenced by delete_result_aiocb_list(), and find_completed_aio().
01240 { 01241 transfer_count = 0; 01242 01243 // Get the error status of the aio_ operation. 01244 error_status = aio_error (asynch_result); 01245 if (error_status == EINPROGRESS) 01246 return 0; // not completed 01247 01248 ssize_t op_return = aio_return (asynch_result); 01249 if (op_return > 0) 01250 transfer_count = static_cast<size_t> (op_return); 01251 // else transfer_count is already 0, error_status reports the error. 01252 return 1; // completed 01253 } |
|
Get "post_completed" result from the internal queue.
Definition at line 1116 of file POSIX_Proactor.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, and ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * >::dequeue_head(). Referenced by clear_result_queue(), and process_result_queue().
01117 { 01118 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0)); 01119 01120 01121 ACE_POSIX_Asynch_Result* result = 0; 01122 01123 if (this->result_queue_.dequeue_head (result) != 0) 01124 return 0; 01125 01126 // don;t waste time if queue is empty - it is normal 01127 // or check queue size before dequeue_head 01128 // ACE_ERROR_RETURN ((LM_ERROR, 01129 // "%N:%l:(%P | %t):%p\n", 01130 // "ACE_POSIX_AIOCB_Proactor::getq_result failed"), 01131 // 0); 01132 01133 return result; 01134 } |
|
Block indefinitely until at least one event is dispatched. Dispatch a single set of events. If elapses before any events occur, return 0. Return 1 on success i.e., when a completion is dispatched, non-zero (-1) on errors and errno is set accordingly. Implements ACE_POSIX_Proactor. Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 1071 of file POSIX_Proactor.cpp. References ACE_INFINITE, and handle_events_i().
01072 { 01073 return this->handle_events_i (ACE_INFINITE); 01074 } |
|
Dispatch a single set of events. If elapses before any events occur, return 0. Return 1 on success i.e., when a completion is dispatched, non-zero (-1) on errors and errno is set accordingly. Implements ACE_POSIX_Proactor. Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 1063 of file POSIX_Proactor.cpp. References handle_events_i(), and ACE_Time_Value::msec().
01064 { 01065 // Decrement <wait_time> with the amount of time spent in the method 01066 ACE_Countdown_Time countdown (&wait_time); 01067 return this->handle_events_i (wait_time.msec ()); 01068 } |
|
Dispatch a single set of events. If elapses before any events occur, return 0. Return 1 if a completion dispatched. Return -1 on errors. Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 1170 of file POSIX_Proactor.cpp. References ACE_ERROR, ACE_INFINITE, aiocb_list_, aiocb_list_max_size_, ACE_POSIX_Proactor::application_specific_code(), find_completed_aio(), LM_ERROR, process_result_queue(), timespec::tv_nsec, and timespec::tv_sec. Referenced by handle_events().
01171 { 01172 int result_suspend = 0; 01173 int retval= 0; 01174 01175 if (milli_seconds == ACE_INFINITE) 01176 // Indefinite blocking. 01177 result_suspend = aio_suspend (aiocb_list_, 01178 aiocb_list_max_size_, 01179 0); 01180 else 01181 { 01182 // Block on <aio_suspend> for <milli_seconds> 01183 timespec timeout; 01184 timeout.tv_sec = milli_seconds / 1000; 01185 timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000000; 01186 result_suspend = aio_suspend (aiocb_list_, 01187 aiocb_list_max_size_, 01188 &timeout); 01189 } 01190 01191 // Check for errors 01192 if (result_suspend == -1) 01193 { 01194 if (errno != EAGAIN && // Timeout 01195 errno != EINTR ) // Interrupted call 01196 ACE_ERROR ((LM_ERROR, 01197 "%N:%l:(%P | %t)::%p\n", 01198 "ACE_POSIX_AIOCB_Proactor::handle_events:" 01199 "aio_suspend failed\n")); 01200 01201 // let continue work 01202 // we should check "post_completed" queue 01203 } 01204 else 01205 { 01206 size_t index = 0; 01207 size_t count = aiocb_list_max_size_; // max number to iterate 01208 int error_status = 0; 01209 size_t transfer_count = 0; 01210 01211 for (;; retval++) 01212 { 01213 ACE_POSIX_Asynch_Result *asynch_result = 01214 find_completed_aio (error_status, 01215 transfer_count, 01216 index, 01217 count); 01218 01219 if (asynch_result == 0) 01220 break; 01221 01222 // Call the application code. 01223 this->application_specific_code (asynch_result, 01224 transfer_count, 01225 0, // No completion key. 01226 error_status); 01227 } 01228 } 01229 01230 // process post_completed results 01231 retval += this->process_result_queue (); 01232 01233 return retval > 0 ? 1 : 0; 01234 } |
|
Notify queue of "post_completed" ACE_POSIX_Asynch_Results called from post_completion method Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 1077 of file POSIX_Proactor.cpp. References aiocb_notify_pipe_manager_, and ACE_AIOCB_Notify_Pipe_Manager::notify(). Referenced by putq_result().
01078 { 01079 ACE_UNUSED_ARG (sig_num); 01080 01081 return this->aiocb_notify_pipe_manager_->notify (); 01082 } |
|
Post a result to the completion port of the Proactor.
Implements ACE_POSIX_Proactor. Definition at line 1085 of file POSIX_Proactor.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, and putq_result().
01086 { 01087 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1)); 01088 01089 int ret_val = this->putq_result (result); 01090 01091 return ret_val; 01092 } |
|
Process the internal results queue.
Definition at line 1150 of file POSIX_Proactor.cpp. References ACE_POSIX_Asynch_Result::bytes_transferred(), and getq_result(). Referenced by handle_events_i(), and ACE_POSIX_CB_Proactor::handle_events_i().
01151 { 01152 int ret_val = 0; 01153 ACE_POSIX_Asynch_Result* result = 0; 01154 01155 while ((result = this->getq_result ()) != 0) 01156 { 01157 this->application_specific_code 01158 (result, 01159 result->bytes_transferred(), // 0, No bytes transferred. 01160 0, // No completion key. 01161 result->error()); //0, No error. 01162 01163 ret_val++; 01164 } 01165 01166 return ret_val; 01167 } |
|
Put "post_completed" result into the internal queue.
Definition at line 1095 of file POSIX_Proactor.cpp. References ACE_ERROR_RETURN, ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * >::enqueue_tail(), LM_ERROR, notify_completion(), and ACE_POSIX_Asynch_Result::signal_number(). Referenced by cancel_aio(), post_completion(), and start_deferred_aio().
01096 { 01097 // this protected method should be called with locked mutex_ 01098 // we can't use GUARD as Proactor uses non-recursive mutex 01099 01100 if (!result) 01101 return -1; 01102 01103 int sig_num = result->signal_number (); 01104 int ret_val = this->result_queue_.enqueue_tail (result); 01105 01106 if (ret_val == -1) 01107 ACE_ERROR_RETURN ((LM_ERROR, 01108 "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"), 01109 -1); 01110 01111 this->notify_completion (sig_num); 01112 01113 return 0; 01114 } |
|
To identify requests from Notify_Pipe_Manager.
Definition at line 852 of file POSIX_Proactor.cpp. References notify_pipe_read_handle_. Referenced by ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager().
00853 { 00854 notify_pipe_read_handle_ = h; 00855 } |
|
Definition at line 1307 of file POSIX_Proactor.cpp. References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, allocate_aio_slot(), LM_ERROR, num_deferred_aiocb_, result_list_, ssize_t, and start_aio_i().
01309 { 01310 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio"); 01311 01312 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1)); 01313 01314 int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0; 01315 01316 if (result == 0) // Just check the status of the list 01317 return ret_val; 01318 01319 // Save operation code in the aiocb 01320 switch (op) 01321 { 01322 case ACE_POSIX_Proactor::ACE_OPCODE_READ: 01323 result->aio_lio_opcode = LIO_READ; 01324 break; 01325 01326 case ACE_POSIX_Proactor::ACE_OPCODE_WRITE: 01327 result->aio_lio_opcode = LIO_WRITE; 01328 break; 01329 01330 default: 01331 ACE_ERROR_RETURN ((LM_ERROR, 01332 "%N:%l:(%P | %t)::\n" 01333 "start_aio: Invalid operation code\n"), 01334 -1); 01335 } 01336 01337 if (ret_val != 0) // No free slot 01338 { 01339 errno = EAGAIN; 01340 return -1; 01341 } 01342 01343 // Find a free slot and store. 01344 01345 ssize_t slot = allocate_aio_slot (result); 01346 01347 if (slot < 0) 01348 return -1; 01349 01350 size_t index = static_cast<size_t> (slot); 01351 01352 result_list_[index] = result; //Store result ptr anyway 01353 aiocb_list_cur_size_++; 01354 01355 ret_val = start_aio_i (result); 01356 switch (ret_val) 01357 { 01358 case 0: // started OK 01359 aiocb_list_[index] = result; 01360 return 0; 01361 01362 case 1: // OS AIO queue overflow 01363 num_deferred_aiocb_ ++; 01364 return 0; 01365 01366 default: // Invalid request, there is no point 01367 break; // to start it later 01368 } 01369 01370 result_list_[index] = 0; 01371 aiocb_list_cur_size_--; 01372 return -1; 01373 } |
|
Initiate an aio operation.
Definition at line 1421 of file POSIX_Proactor.cpp. References ACE_ERROR, ACE_LIB_TEXT, ACE_TCHAR, ACE_TRACE, LM_ERROR, and num_started_aio_. Referenced by start_aio(), and start_deferred_aio().
01422 { 01423 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i"); 01424 01425 int ret_val; 01426 const ACE_TCHAR *ptype; 01427 01428 // Start IO 01429 01430 switch (result->aio_lio_opcode ) 01431 { 01432 case LIO_READ : 01433 ptype = ACE_LIB_TEXT ("read "); 01434 ret_val = aio_read (result); 01435 break; 01436 case LIO_WRITE : 01437 ptype = ACE_LIB_TEXT ("write"); 01438 ret_val = aio_write (result); 01439 break; 01440 default: 01441 ptype = ACE_LIB_TEXT ("?????"); 01442 ret_val = -1; 01443 break; 01444 } 01445 01446 if (ret_val == 0) 01447 this->num_started_aio_++; 01448 else // if (ret_val == -1) 01449 { 01450 if (errno == EAGAIN || errno == ENOMEM) //Ok, it will be deferred AIO 01451 ret_val = 1; 01452 else 01453 ACE_ERROR ((LM_ERROR, 01454 ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"), 01455 ptype, 01456 ACE_LIB_TEXT ("queueing failed\n"))); 01457 } 01458 01459 return ret_val; 01460 } |
|
Start deferred AIO if necessary.
Definition at line 1464 of file POSIX_Proactor.cpp. References ACE_ERROR_RETURN, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, LM_ERROR, num_deferred_aiocb_, putq_result(), result_list_, ACE_POSIX_Asynch_Result::set_bytes_transferred(), ACE_POSIX_Asynch_Result::set_error(), and start_aio_i(). Referenced by find_completed_aio().
01465 { 01466 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio"); 01467 01468 // This protected method is called from 01469 // find_completed_aio after any AIO completion 01470 // We should call this method always with locked 01471 // ACE_POSIX_AIOCB_Proactor::mutex_ 01472 // 01473 // It tries to start the first deferred AIO 01474 // if such exists 01475 01476 if (num_deferred_aiocb_ == 0) 01477 return 0; // nothing to do 01478 01479 size_t i = 0; 01480 01481 for (i= 0; i < this->aiocb_list_max_size_; i++) 01482 if (result_list_[i] !=0 // check for 01483 && aiocb_list_[i] ==0) // deferred AIO 01484 break; 01485 01486 if (i >= this->aiocb_list_max_size_) 01487 ACE_ERROR_RETURN ((LM_ERROR, 01488 "%N:%l:(%P | %t)::\n" 01489 "start_deferred_aio:" 01490 "internal Proactor error 3\n"), 01491 -1); 01492 01493 ACE_POSIX_Asynch_Result *result = result_list_[i]; 01494 01495 int ret_val = start_aio_i (result); 01496 01497 switch (ret_val) 01498 { 01499 case 0 : //started OK , decrement count of deferred AIOs 01500 aiocb_list_[i] = result; 01501 num_deferred_aiocb_ --; 01502 return 0; 01503 01504 case 1 : 01505 return 0; //try again later 01506 01507 default : // Invalid Parameters , should never be 01508 break; 01509 } 01510 01511 //AL notify user 01512 01513 result_list_[i] = 0; 01514 aiocb_list_cur_size_--; 01515 01516 num_deferred_aiocb_ --; 01517 01518 result->set_error (errno); 01519 result->set_bytes_transferred (0); 01520 this->putq_result (result); // we are with locked mutex_ here ! 01521 01522 return -1; 01523 } |
|
Handler needs to call application specific code.
Definition at line 331 of file POSIX_Proactor.h. |
|
Definition at line 336 of file POSIX_Proactor.h. |
|
Definition at line 337 of file POSIX_Proactor.h. |
|
This class does the registering of Asynch Operations with the Proactor which is necessary in the AIOCB strategy. Definition at line 335 of file POSIX_Proactor.h. |
|
Use a dynamically allocated array to keep track of all the aio's issued currently. Definition at line 469 of file POSIX_Proactor.h. Referenced by cancel_aio(), create_result_aiocb_list(), delete_result_aiocb_list(), find_completed_aio(), handle_events_i(), start_aio(), and start_deferred_aio(). |
|
To maintain the current size of the array (list).
Definition at line 476 of file POSIX_Proactor.h. Referenced by cancel_aio(), find_completed_aio(), start_aio(), and start_deferred_aio(). |
|
To maintain the maximum size of the array (list).
Definition at line 473 of file POSIX_Proactor.h. Referenced by allocate_aio_slot(), cancel_aio(), check_max_aio_num(), create_result_aiocb_list(), delete_result_aiocb_list(), find_completed_aio(), handle_events_i(), start_aio(), and start_deferred_aio(). |
|
This class takes care of doing when we use AIO_CONTROL_BLOCKS strategy. Definition at line 465 of file POSIX_Proactor.h. Referenced by create_notify_manager(), delete_notify_manager(), and notify_completion(). |
|
Mutex to protect work with lists.
Definition at line 479 of file POSIX_Proactor.h. |
|
The purpose of this member is only to identify asynchronous request from NotifyManager. We will reserve for it always slot 0 in the list of aiocb's to be sure that don't lose notifications. Definition at line 484 of file POSIX_Proactor.h. Referenced by allocate_aio_slot(), and set_notify_handle(). |
|
Number of ACE_POSIX_Asynch_Result's waiting for start i.e. deferred AIOs Definition at line 488 of file POSIX_Proactor.h. Referenced by cancel_aio(), start_aio(), and start_deferred_aio(). |
|
Number active,i.e. running requests.
Definition at line 491 of file POSIX_Proactor.h. Referenced by find_completed_aio(), and start_aio_i(). |
|
Definition at line 470 of file POSIX_Proactor.h. Referenced by allocate_aio_slot(), cancel_aio(), create_result_aiocb_list(), delete_result_aiocb_list(), find_completed_aio(), start_aio(), and start_deferred_aio(). |
|
Queue which keeps "post_completed" ACE_POSIX_Asynch_Result's.
Definition at line 494 of file POSIX_Proactor.h. |