#include <POSIX_Proactor.h>
Inheritance diagram for ACE_POSIX_AIOCB_Proactor:


Public Member Functions | |
| ACE_POSIX_AIOCB_Proactor (size_t nmaxop=ACE_AIO_DEFAULT_SIZE) | |
| virtual Proactor_Type | get_impl_type (void) |
| virtual | ~ACE_POSIX_AIOCB_Proactor (void) |
| Destructor. | |
| virtual int | close (void) |
| Close down the Proactor. | |
| virtual int | handle_events (ACE_Time_Value &wait_time) |
| virtual int | handle_events (void) |
| virtual int | post_completion (ACE_POSIX_Asynch_Result *result) |
| Post a result to the completion port of the Proactor. | |
| virtual int | start_aio (ACE_POSIX_Asynch_Result *result, ACE_POSIX_Proactor::Opcode op) |
| virtual int | cancel_aio (ACE_HANDLE h) |
Protected Member Functions | |
| ACE_POSIX_AIOCB_Proactor (size_t nmaxop, ACE_POSIX_Proactor::Proactor_Type ptype) | |
| virtual int | get_result_status (ACE_POSIX_Asynch_Result *asynch_result, int &error_status, size_t &transfer_count) |
| int | create_result_aiocb_list (void) |
| Create aiocb list. | |
| int | delete_result_aiocb_list (void) |
| void | create_notify_manager (void) |
| void | delete_notify_manager (void) |
| void | check_max_aio_num (void) |
| void | set_notify_handle (ACE_HANDLE h) |
| To identify requests from Notify_Pipe_Manager. | |
| int | handle_events_i (u_long milli_seconds) |
| int | start_deferred_aio (void) |
| Start deferred AIO if necessary. | |
| virtual int | cancel_aiocb (ACE_POSIX_Asynch_Result *result) |
| Cancel running or deferred AIO. | |
| ACE_POSIX_Asynch_Result * | find_completed_aio (int &error_status, size_t &transfer_count, size_t &index, size_t &count) |
| Extract the results of aio. | |
| virtual ssize_t | allocate_aio_slot (ACE_POSIX_Asynch_Result *result) |
| Find free slot to store result and aiocb pointer. | |
| virtual int | start_aio_i (ACE_POSIX_Asynch_Result *result) |
| Initiate an aio operation. | |
| virtual int | notify_completion (int sig_num) |
| int | putq_result (ACE_POSIX_Asynch_Result *result) |
| Put "post_completed" result into the internal queue. | |
| ACE_POSIX_Asynch_Result * | getq_result (void) |
| Get "post_completed" result from the internal queue. | |
| int | clear_result_queue (void) |
| Clear the internal results queue. | |
| int | process_result_queue (void) |
| Process the internal results queue. | |
Protected Attributes | |
| ACE_AIOCB_Notify_Pipe_Manager * | aiocb_notify_pipe_manager_ |
| aiocb ** | aiocb_list_ |
| ACE_POSIX_Asynch_Result ** | result_list_ |
| size_t | aiocb_list_max_size_ |
| To maintain the maximum size of the array (list). | |
| size_t | aiocb_list_cur_size_ |
| To maintain the current size of the array (list). | |
| ACE_SYNCH_MUTEX | mutex_ |
| Mutex to protect work with lists. | |
| ACE_HANDLE | notify_pipe_read_handle_ |
| size_t | num_deferred_aiocb_ |
| size_t | num_started_aio_ |
| Number active,i.e. running requests. | |
| ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * > | result_queue_ |
| Queue which keeps "post_completed" ACE_POSIX_Asynch_Result's. | |
Friends | |
| class | ACE_AIOCB_Notify_Pipe_Manager |
| Handler needs to call application specific code. | |
| class | ACE_POSIX_Asynch_Operation |
| class | ACE_POSIX_Asynch_Accept |
| class | ACE_POSIX_Asynch_Connect |
Definition at line 327 of file POSIX_Proactor.h.
|
|
Constructor defines max number asynchronous operations which can be started at the same time Definition at line 782 of file POSIX_Proactor.cpp. References check_max_aio_num(), create_notify_manager(), create_result_aiocb_list(), ACE_POSIX_Proactor::get_asynch_pseudo_task(), and ACE_Asynch_Pseudo_Task::start().
00783 : aiocb_notify_pipe_manager_ (0), 00784 aiocb_list_ (0), 00785 result_list_ (0), 00786 aiocb_list_max_size_ (max_aio_operations), 00787 aiocb_list_cur_size_ (0), 00788 notify_pipe_read_handle_ (ACE_INVALID_HANDLE), 00789 num_deferred_aiocb_ (0), 00790 num_started_aio_ (0) 00791 { 00792 // Check for correct value for max_aio_operations 00793 check_max_aio_num (); 00794 00795 this->create_result_aiocb_list (); 00796 00797 this->create_notify_manager (); 00798 00799 // start pseudo-asynchronous accept task 00800 // one per all future acceptors 00801 this->get_asynch_pseudo_task().start (); 00802 00803 } |
|
|
Destructor.
Definition at line 827 of file POSIX_Proactor.cpp. References close().
00828 {
00829 this->close();
00830 }
|
|
||||||||||||
|
Special constructor for ACE_SUN_Proactor and ACE_POSIX_SIG_Proactor Definition at line 806 of file POSIX_Proactor.cpp. References check_max_aio_num(), and create_result_aiocb_list().
00808 : aiocb_notify_pipe_manager_ (0), 00809 aiocb_list_ (0), 00810 result_list_ (0), 00811 aiocb_list_max_size_ (max_aio_operations), 00812 aiocb_list_cur_size_ (0), 00813 notify_pipe_read_handle_ (ACE_INVALID_HANDLE), 00814 num_deferred_aiocb_ (0), 00815 num_started_aio_ (0) 00816 { 00817 //check for correct value for max_aio_operations 00818 this->check_max_aio_num (); 00819 00820 this->create_result_aiocb_list (); 00821 00822 // @@ We should create Notify_Pipe_Manager in the derived class to 00823 // provide correct calls for virtual functions !!! 00824 } |
|
|
Find free slot to store result and aiocb pointer.
Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 1376 of file POSIX_Proactor.cpp. References ACE_ERROR_RETURN, aiocb_list_max_size_, LM_ERROR, notify_pipe_read_handle_, and result_list_. Referenced by ACE_POSIX_CB_Proactor::allocate_aio_slot(), and start_aio().
01377 {
01378 size_t i = 0;
01379
01380 // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager
01381 // so make check for ACE_AIOCB_Notify_Pipe_Manager request
01382
01383 if (notify_pipe_read_handle_ == result->aio_fildes) // Notify_Pipe ?
01384 { // should be free,
01385 if (result_list_[i] != 0) // only 1 request
01386 { // is allowed
01387 errno = EAGAIN;
01388 ACE_ERROR_RETURN ((LM_ERROR,
01389 "%N:%l:(%P | %t)::\n"
01390 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01391 "internal Proactor error 0\n"),
01392 -1);
01393 }
01394 }
01395 else //try to find free slot as usual, but starting from 1
01396 {
01397 for (i= 1; i < this->aiocb_list_max_size_; i++)
01398 if (result_list_[i] == 0)
01399 break;
01400 }
01401
01402 if (i >= this->aiocb_list_max_size_)
01403 ACE_ERROR_RETURN ((LM_ERROR,
01404 "%N:%l:(%P | %t)::\n"
01405 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01406 "internal Proactor error 1\n"),
01407 -1);
01408
01409 //setup OS notification methods for this aio
01410 result->aio_sigevent.sigev_notify = SIGEV_NONE;
01411
01412 return static_cast<ssize_t> (i);
01413 }
|
|
|
This method should be called from ACE_POSIX_Asynch_Operation::cancel() instead of usual ::aio_cancel. For all deferred AIO requests with handle "h" it removes its from the lists and notifies user. For all running AIO requests with handle "h" it calls ::aio_cancel. According to the POSIX standards we will receive ECANCELED for all ::aio_canceled AIO requests later on return from ::aio_suspend Implements ACE_POSIX_Proactor. Definition at line 1526 of file POSIX_Proactor.cpp. References ACE_GUARD_RETURN, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, cancel_aiocb(), num_deferred_aiocb_, putq_result(), result_list_, ACE_POSIX_Asynch_Result::set_bytes_transferred(), and ACE_POSIX_Asynch_Result::set_error().
01527 {
01528 // This new method should be called from
01529 // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel
01530 // It scans the result_list_ and defines all AIO requests
01531 // that were issued for handle "handle"
01532 //
01533 // For all deferred AIO requests with handle "handle"
01534 // it removes its from the lists and notifies user
01535 //
01536 // For all running AIO requests with handle "handle"
01537 // it calls ::aio_cancel. According to the POSIX standards
01538 // we will receive ECANCELED for all ::aio_canceled AIO requests
01539 // later on return from ::aio_suspend
01540
01541 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
01542
01543 int num_total = 0;
01544 int num_cancelled = 0;
01545
01546 {
01547 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01548
01549 size_t ai = 0;
01550
01551 for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
01552 {
01553 if (this->result_list_[ai] == 0) // Skip empty slot
01554 continue;
01555
01556 if (this->result_list_[ai]->aio_fildes != handle) // Not ours
01557 continue;
01558
01559 num_total++;
01560
01561 ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai];
01562
01563 if (this->aiocb_list_[ai] == 0) // Canceling a deferred operation
01564 {
01565 num_cancelled++;
01566 this->num_deferred_aiocb_--;
01567
01568 this->aiocb_list_[ai] = 0;
01569 this->result_list_[ai] = 0;
01570 this->aiocb_list_cur_size_--;
01571
01572 asynch_result->set_error (ECANCELED);
01573 asynch_result->set_bytes_transferred (0);
01574 this->putq_result (asynch_result);
01575 // we are with locked mutex_ here !
01576 }
01577 else // Cancel started aio
01578 {
01579 int rc_cancel = this->cancel_aiocb (asynch_result);
01580
01581 if (rc_cancel == 0) //notification in the future
01582 num_cancelled++; //it is OS responsiblity
01583 }
01584 }
01585
01586 } // release mutex_
01587
01588 if (num_total == 0)
01589 return 1; // ALLDONE
01590
01591 if (num_cancelled == num_total)
01592 return 0; // CANCELLED
01593
01594 return 2; // NOT CANCELLED
01595 }
|
|
|
Cancel running or deferred AIO.
Definition at line 1598 of file POSIX_Proactor.cpp. Referenced by cancel_aio(), and delete_result_aiocb_list().
01599 {
01600 // This method is called from cancel_aio
01601 // to cancel a previously submitted AIO request
01602 int rc = ::aio_cancel (0, result);
01603
01604 // Check the return value and return 0/1/2 appropriately.
01605 if (rc == AIO_CANCELED)
01606 return 0;
01607 else if (rc == AIO_ALLDONE)
01608 return 1;
01609 else // (rc == AIO_NOTCANCELED)
01610 return 2;
01611 }
|
|
|
Define the maximum number of asynchronous I/O requests for the current OS Definition at line 956 of file POSIX_Proactor.cpp. References ACE_AIO_MAX_SIZE, ACE_DEBUG, ACE_LIB_TEXT, aiocb_list_max_size_, LM_DEBUG, ACE::max_handles(), ACE::set_handle_limit(), and ACE_OS::sysconf(). Referenced by ACE_POSIX_AIOCB_Proactor().
00957 {
00958 long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
00959
00960 // Define max limit AIO's for concrete OS
00961 // -1 means that there is no limit, but it is not true
00962 // (example, SunOS 5.6)
00963
00964 if (max_os_aio_num > 0 &&
00965 aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
00966 aiocb_list_max_size_ = max_os_aio_num;
00967
00968 #if defined (HPUX) || defined (__FreeBSD__)
00969 // Although HPUX 11.00 allows to start 2048 AIO's for all process in
00970 // system it has a limit 256 max elements for aio_suspend () It is a
00971 // pity, but ...
00972
00973 long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
00974 if (max_os_listio_num > 0
00975 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
00976 aiocb_list_max_size_ = max_os_listio_num;
00977 #endif /* HPUX || __FreeBSD__ */
00978
00979 // check for user-defined value
00980 // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h
00981
00982 if (aiocb_list_max_size_ <= 0
00983 || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE)
00984 aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
00985
00986 // check for max number files to open
00987
00988 int max_num_files = ACE::max_handles ();
00989
00990 if (max_num_files > 0
00991 && aiocb_list_max_size_ > (unsigned long) max_num_files)
00992 {
00993 ACE::set_handle_limit (aiocb_list_max_size_);
00994
00995 max_num_files = ACE::max_handles ();
00996 }
00997
00998 if (max_num_files > 0
00999 && aiocb_list_max_size_ > (unsigned long) max_num_files)
01000 aiocb_list_max_size_ = (unsigned long) max_num_files;
01001
01002 ACE_DEBUG ((LM_DEBUG,
01003 "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
01004 aiocb_list_max_size_));
01005
01006 #if defined(__sgi)
01007
01008 ACE_DEBUG((LM_DEBUG,
01009 ACE_LIB_TEXT( "SGI IRIX specific: aio_init!\n")));
01010
01011 //typedef struct aioinit {
01012 // int aio_threads; /* The number of aio threads to start (5) */
01013 // int aio_locks; /* Initial number of preallocated locks (3) */
01014 // int aio_num; /* estimated total simultanious aiobc structs (1000) */
01015 // int aio_usedba; /* Try to use DBA for raw I/O in lio_listio (0) */
01016 // int aio_debug; /* turn on debugging (0) */
01017 // int aio_numusers; /* max number of user sprocs making aio_* calls (5) */
01018 // int aio_reserved[3];
01019 //} aioinit_t;
01020
01021 aioinit_t aioinit;
01022
01023 aioinit.aio_threads = 10; /* The number of aio threads to start (5) */
01024 aioinit.aio_locks = 20; /* Initial number of preallocated locks (3) */
01025 /* estimated total simultaneous aiobc structs (1000) */
01026 aioinit.aio_num = aiocb_list_max_size_;
01027 aioinit.aio_usedba = 0; /* Try to use DBA for raw IO in lio_listio (0) */
01028 aioinit.aio_debug = 0; /* turn on debugging (0) */
01029 aioinit.aio_numusers = 100; /* max number of user sprocs making aio_* calls (5) */
01030 aioinit.aio_reserved[0] = 0;
01031 aioinit.aio_reserved[1] = 0;
01032 aioinit.aio_reserved[2] = 0;
01033
01034 aio_sgi_init (&aioinit);
01035
01036 #endif
01037
01038 return;
01039 }
|
|
|
Clear the internal results queue.
Definition at line 1136 of file POSIX_Proactor.cpp. References getq_result(). Referenced by close().
01137 {
01138 int ret_val = 0;
01139 ACE_POSIX_Asynch_Result* result = 0;
01140
01141 while ((result = this->getq_result ()) != 0)
01142 {
01143 delete result;
01144 ret_val++;
01145 }
01146
01147 return ret_val;
01148 }
|
|
|
Close down the Proactor.
Reimplemented from ACE_POSIX_Proactor. Definition at line 840 of file POSIX_Proactor.cpp. References clear_result_queue(), delete_notify_manager(), delete_result_aiocb_list(), ACE_POSIX_Proactor::get_asynch_pseudo_task(), and ACE_Asynch_Pseudo_Task::stop(). Referenced by ~ACE_POSIX_AIOCB_Proactor(), and ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor().
00841 {
00842 // stop asynch accept task
00843 this->get_asynch_pseudo_task().stop ();
00844
00845 this->delete_notify_manager ();
00846
00847 this->clear_result_queue ();
00848
00849 return this->delete_result_aiocb_list ();
00850 }
|
|
|
Call these methods from derived class when virtual table is built. Definition at line 1042 of file POSIX_Proactor.cpp. References ACE_NEW, and aiocb_notify_pipe_manager_. Referenced by ACE_POSIX_AIOCB_Proactor().
01043 {
01044 // Remember! this issues a Asynch_Read
01045 // on the notify pipe for doing the Asynch_Accept/Connect.
01046
01047 if (aiocb_notify_pipe_manager_ == 0)
01048 ACE_NEW (aiocb_notify_pipe_manager_,
01049 ACE_AIOCB_Notify_Pipe_Manager (this));
01050 }
|
|
|
Create aiocb list.
Definition at line 857 of file POSIX_Proactor.cpp. References ACE_NEW_RETURN, aiocb_list_, aiocb_list_max_size_, and result_list_. Referenced by ACE_POSIX_AIOCB_Proactor().
00858 {
00859 if (aiocb_list_ != 0)
00860 return 0;
00861
00862 ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
00863
00864 ACE_NEW_RETURN (result_list_,
00865 ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
00866 -1);
00867
00868 // Initialize the array.
00869 for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
00870 {
00871 aiocb_list_[ai] = 0;
00872 result_list_[ai] = 0;
00873 }
00874
00875 return 0;
00876 }
|
|
|
Definition at line 1053 of file POSIX_Proactor.cpp. References aiocb_notify_pipe_manager_. Referenced by close().
01054 {
01055 // We are responsible for delete as all pointers set to 0 after
01056 // delete, it is save to delete twice
01057
01058 delete aiocb_notify_pipe_manager_;
01059 aiocb_notify_pipe_manager_ = 0;
01060 }
|
|
|
Call this method from derived class when virtual table is built. Definition at line 878 of file POSIX_Proactor.cpp. References ACE_DEBUG, ACE_ERROR, ACE_LIB_TEXT, aiocb_list_, aiocb_list_max_size_, cancel_aiocb(), get_result_status(), LM_DEBUG, LM_ERROR, result_list_, and ACE_OS::strerror(). Referenced by close().
00879 {
00880 if (aiocb_list_ == 0) // already deleted
00881 return 0;
00882
00883 size_t ai;
00884
00885 // Try to cancel all uncomlpeted operarion POSIX systems may have
00886 // hidden system threads that still can work with our aiocb's!
00887 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00888 if (this->aiocb_list_[ai] != 0) // active operation
00889 this->cancel_aiocb (result_list_[ai]);
00890
00891 int num_pending = 0;
00892
00893 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00894 {
00895 if (this->aiocb_list_[ai] == 0 ) // not active operation
00896 continue;
00897
00898 // Get the error and return status of the aio_ operation.
00899 int error_status = 0;
00900 size_t transfer_count = 0;
00901 int flg_completed = this->get_result_status (result_list_[ai],
00902 error_status,
00903 transfer_count);
00904
00905 //don't delete uncompleted AIOCB's
00906 if (flg_completed == 0) // not completed !!!
00907 {
00908 num_pending++;
00909 #if 0
00910 char * errtxt = ACE_OS::strerror (error_status);
00911 if (errtxt == 0)
00912 errtxt ="?????????";
00913
00914 char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
00915 "WRITE":"READ" ;
00916
00917
00918 ACE_ERROR ((LM_ERROR,
00919 ACE_LIB_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
00920 ai,
00921 op,
00922 error_status,
00923 transfer_count,
00924 errtxt));
00925 #endif /* 0 */
00926 }
00927 else // completed , OK
00928 {
00929 delete this->result_list_[ai];
00930 this->result_list_[ai] = 0;
00931 this->aiocb_list_[ai] = 0;
00932 }
00933 }
00934
00935 // If it is not possible cancel some operation (num_pending > 0 ),
00936 // we can do only one thing -report about this
00937 // and complain about POSIX implementation.
00938 // We know that we have memory leaks, but it is better than
00939 // segmentation fault!
00940 ACE_DEBUG
00941 ((LM_DEBUG,
00942 ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
00943 ACE_LIB_TEXT(" number pending AIO=%d\n"),
00944 num_pending));
00945
00946 delete [] this->aiocb_list_;
00947 this->aiocb_list_ = 0;
00948
00949 delete [] this->result_list_;
00950 this->result_list_ = 0;
00951
00952 return (num_pending == 0 ? 0 : -1);
00953 // ?? or just always return 0;
00954 }
|
|
||||||||||||||||||||
|
Extract the results of aio.
Definition at line 1256 of file POSIX_Proactor.cpp. References ACE_GUARD_RETURN, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, get_result_status(), num_started_aio_, result_list_, and start_deferred_aio(). Referenced by handle_events_i(), and ACE_POSIX_CB_Proactor::handle_events_i().
01260 {
01261 // parameter index defines initial slot to scan
01262 // parameter count tells us how many slots should we scan
01263
01264 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
01265
01266 ACE_POSIX_Asynch_Result *asynch_result = 0;
01267
01268 if (num_started_aio_ == 0) // save time
01269 return 0;
01270
01271 for (; count > 0; index++ , count--)
01272 {
01273 if (index >= aiocb_list_max_size_) // like a wheel
01274 index = 0;
01275
01276 if (aiocb_list_[index] == 0) // Dont process null blocks.
01277 continue;
01278
01279 if (0 != this->get_result_status (result_list_[index],
01280 error_status,
01281 transfer_count)) // completed
01282 break;
01283
01284 } // end for
01285
01286 if (count == 0) // all processed , nothing found
01287 return 0;
01288 asynch_result = result_list_[index];
01289
01290 aiocb_list_[index] = 0;
01291 result_list_[index] = 0;
01292 aiocb_list_cur_size_--;
01293
01294 num_started_aio_--; // decrement count active aios
01295 index++; // for next iteration
01296 count--; // for next iteration
01297
01298 this->start_deferred_aio ();
01299 //make attempt to start deferred AIO
01300 //It is safe as we are protected by mutex_
01301
01302 return asynch_result;
01303 }
|
|
|
Reimplemented from ACE_POSIX_Proactor. Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 833 of file POSIX_Proactor.cpp.
00834 {
00835 return PROACTOR_AIOCB;
00836 }
|
|
||||||||||||||||
|
Check AIO for completion, error and result status Return: 1 - AIO completed , 0 - not completed yet Definition at line 1237 of file POSIX_Proactor.cpp. References EINPROGRESS, and ssize_t. Referenced by delete_result_aiocb_list(), and find_completed_aio().
01240 {
01241 transfer_count = 0;
01242
01243 // Get the error status of the aio_ operation.
01244 error_status = aio_error (asynch_result);
01245 if (error_status == EINPROGRESS)
01246 return 0; // not completed
01247
01248 ssize_t op_return = aio_return (asynch_result);
01249 if (op_return > 0)
01250 transfer_count = static_cast<size_t> (op_return);
01251 // else transfer_count is already 0, error_status reports the error.
01252 return 1; // completed
01253 }
|
|
|
Get "post_completed" result from the internal queue.
Definition at line 1116 of file POSIX_Proactor.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, and ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * >::dequeue_head(). Referenced by clear_result_queue(), and process_result_queue().
01117 {
01118 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0));
01119
01120
01121 ACE_POSIX_Asynch_Result* result = 0;
01122
01123 if (this->result_queue_.dequeue_head (result) != 0)
01124 return 0;
01125
01126 // don;t waste time if queue is empty - it is normal
01127 // or check queue size before dequeue_head
01128 // ACE_ERROR_RETURN ((LM_ERROR,
01129 // "%N:%l:(%P | %t):%p\n",
01130 // "ACE_POSIX_AIOCB_Proactor::getq_result failed"),
01131 // 0);
01132
01133 return result;
01134 }
|
|
|
Block indefinitely until at least one event is dispatched. Dispatch a single set of events. If elapses before any events occur, return 0. Return 1 on success i.e., when a completion is dispatched, non-zero (-1) on errors and errno is set accordingly. Implements ACE_POSIX_Proactor. Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 1071 of file POSIX_Proactor.cpp. References ACE_INFINITE, and handle_events_i().
01072 {
01073 return this->handle_events_i (ACE_INFINITE);
01074 }
|
|
|
Dispatch a single set of events. If elapses before any events occur, return 0. Return 1 on success i.e., when a completion is dispatched, non-zero (-1) on errors and errno is set accordingly. Implements ACE_POSIX_Proactor. Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 1063 of file POSIX_Proactor.cpp. References handle_events_i(), and ACE_Time_Value::msec().
01064 {
01065 // Decrement <wait_time> with the amount of time spent in the method
01066 ACE_Countdown_Time countdown (&wait_time);
01067 return this->handle_events_i (wait_time.msec ());
01068 }
|
|
|
Dispatch a single set of events. If elapses before any events occur, return 0. Return 1 if a completion dispatched. Return -1 on errors. Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 1170 of file POSIX_Proactor.cpp. References ACE_ERROR, ACE_INFINITE, aiocb_list_, aiocb_list_max_size_, ACE_POSIX_Proactor::application_specific_code(), find_completed_aio(), LM_ERROR, process_result_queue(), timespec::tv_nsec, and timespec::tv_sec. Referenced by handle_events().
01171 {
01172 int result_suspend = 0;
01173 int retval= 0;
01174
01175 if (milli_seconds == ACE_INFINITE)
01176 // Indefinite blocking.
01177 result_suspend = aio_suspend (aiocb_list_,
01178 aiocb_list_max_size_,
01179 0);
01180 else
01181 {
01182 // Block on <aio_suspend> for <milli_seconds>
01183 timespec timeout;
01184 timeout.tv_sec = milli_seconds / 1000;
01185 timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000000;
01186 result_suspend = aio_suspend (aiocb_list_,
01187 aiocb_list_max_size_,
01188 &timeout);
01189 }
01190
01191 // Check for errors
01192 if (result_suspend == -1)
01193 {
01194 if (errno != EAGAIN && // Timeout
01195 errno != EINTR ) // Interrupted call
01196 ACE_ERROR ((LM_ERROR,
01197 "%N:%l:(%P | %t)::%p\n",
01198 "ACE_POSIX_AIOCB_Proactor::handle_events:"
01199 "aio_suspend failed\n"));
01200
01201 // let continue work
01202 // we should check "post_completed" queue
01203 }
01204 else
01205 {
01206 size_t index = 0;
01207 size_t count = aiocb_list_max_size_; // max number to iterate
01208 int error_status = 0;
01209 size_t transfer_count = 0;
01210
01211 for (;; retval++)
01212 {
01213 ACE_POSIX_Asynch_Result *asynch_result =
01214 find_completed_aio (error_status,
01215 transfer_count,
01216 index,
01217 count);
01218
01219 if (asynch_result == 0)
01220 break;
01221
01222 // Call the application code.
01223 this->application_specific_code (asynch_result,
01224 transfer_count,
01225 0, // No completion key.
01226 error_status);
01227 }
01228 }
01229
01230 // process post_completed results
01231 retval += this->process_result_queue ();
01232
01233 return retval > 0 ? 1 : 0;
01234 }
|
|
|
Notify queue of "post_completed" ACE_POSIX_Asynch_Results called from post_completion method Reimplemented in ACE_POSIX_CB_Proactor. Definition at line 1077 of file POSIX_Proactor.cpp. References aiocb_notify_pipe_manager_, and ACE_AIOCB_Notify_Pipe_Manager::notify(). Referenced by putq_result().
01078 {
01079 ACE_UNUSED_ARG (sig_num);
01080
01081 return this->aiocb_notify_pipe_manager_->notify ();
01082 }
|
|
|
Post a result to the completion port of the Proactor.
Implements ACE_POSIX_Proactor. Definition at line 1085 of file POSIX_Proactor.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, and putq_result().
01086 {
01087 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1));
01088
01089 int ret_val = this->putq_result (result);
01090
01091 return ret_val;
01092 }
|
|
|
Process the internal results queue.
Definition at line 1150 of file POSIX_Proactor.cpp. References ACE_POSIX_Asynch_Result::bytes_transferred(), and getq_result(). Referenced by handle_events_i(), and ACE_POSIX_CB_Proactor::handle_events_i().
01151 {
01152 int ret_val = 0;
01153 ACE_POSIX_Asynch_Result* result = 0;
01154
01155 while ((result = this->getq_result ()) != 0)
01156 {
01157 this->application_specific_code
01158 (result,
01159 result->bytes_transferred(), // 0, No bytes transferred.
01160 0, // No completion key.
01161 result->error()); //0, No error.
01162
01163 ret_val++;
01164 }
01165
01166 return ret_val;
01167 }
|
|
|
Put "post_completed" result into the internal queue.
Definition at line 1095 of file POSIX_Proactor.cpp. References ACE_ERROR_RETURN, ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * >::enqueue_tail(), LM_ERROR, notify_completion(), and ACE_POSIX_Asynch_Result::signal_number(). Referenced by cancel_aio(), post_completion(), and start_deferred_aio().
01096 {
01097 // this protected method should be called with locked mutex_
01098 // we can't use GUARD as Proactor uses non-recursive mutex
01099
01100 if (!result)
01101 return -1;
01102
01103 int sig_num = result->signal_number ();
01104 int ret_val = this->result_queue_.enqueue_tail (result);
01105
01106 if (ret_val == -1)
01107 ACE_ERROR_RETURN ((LM_ERROR,
01108 "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
01109 -1);
01110
01111 this->notify_completion (sig_num);
01112
01113 return 0;
01114 }
|
|
|
To identify requests from Notify_Pipe_Manager.
Definition at line 852 of file POSIX_Proactor.cpp. References notify_pipe_read_handle_. Referenced by ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager().
00853 {
00854 notify_pipe_read_handle_ = h;
00855 }
|
|
||||||||||||
|
Definition at line 1307 of file POSIX_Proactor.cpp. References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, allocate_aio_slot(), LM_ERROR, num_deferred_aiocb_, result_list_, ssize_t, and start_aio_i().
01309 {
01310 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
01311
01312 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01313
01314 int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
01315
01316 if (result == 0) // Just check the status of the list
01317 return ret_val;
01318
01319 // Save operation code in the aiocb
01320 switch (op)
01321 {
01322 case ACE_POSIX_Proactor::ACE_OPCODE_READ:
01323 result->aio_lio_opcode = LIO_READ;
01324 break;
01325
01326 case ACE_POSIX_Proactor::ACE_OPCODE_WRITE:
01327 result->aio_lio_opcode = LIO_WRITE;
01328 break;
01329
01330 default:
01331 ACE_ERROR_RETURN ((LM_ERROR,
01332 "%N:%l:(%P | %t)::\n"
01333 "start_aio: Invalid operation code\n"),
01334 -1);
01335 }
01336
01337 if (ret_val != 0) // No free slot
01338 {
01339 errno = EAGAIN;
01340 return -1;
01341 }
01342
01343 // Find a free slot and store.
01344
01345 ssize_t slot = allocate_aio_slot (result);
01346
01347 if (slot < 0)
01348 return -1;
01349
01350 size_t index = static_cast<size_t> (slot);
01351
01352 result_list_[index] = result; //Store result ptr anyway
01353 aiocb_list_cur_size_++;
01354
01355 ret_val = start_aio_i (result);
01356 switch (ret_val)
01357 {
01358 case 0: // started OK
01359 aiocb_list_[index] = result;
01360 return 0;
01361
01362 case 1: // OS AIO queue overflow
01363 num_deferred_aiocb_ ++;
01364 return 0;
01365
01366 default: // Invalid request, there is no point
01367 break; // to start it later
01368 }
01369
01370 result_list_[index] = 0;
01371 aiocb_list_cur_size_--;
01372 return -1;
01373 }
|
|
|
Initiate an aio operation.
Definition at line 1421 of file POSIX_Proactor.cpp. References ACE_ERROR, ACE_LIB_TEXT, ACE_TCHAR, ACE_TRACE, LM_ERROR, and num_started_aio_. Referenced by start_aio(), and start_deferred_aio().
01422 {
01423 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
01424
01425 int ret_val;
01426 const ACE_TCHAR *ptype;
01427
01428 // Start IO
01429
01430 switch (result->aio_lio_opcode )
01431 {
01432 case LIO_READ :
01433 ptype = ACE_LIB_TEXT ("read ");
01434 ret_val = aio_read (result);
01435 break;
01436 case LIO_WRITE :
01437 ptype = ACE_LIB_TEXT ("write");
01438 ret_val = aio_write (result);
01439 break;
01440 default:
01441 ptype = ACE_LIB_TEXT ("?????");
01442 ret_val = -1;
01443 break;
01444 }
01445
01446 if (ret_val == 0)
01447 this->num_started_aio_++;
01448 else // if (ret_val == -1)
01449 {
01450 if (errno == EAGAIN || errno == ENOMEM) //Ok, it will be deferred AIO
01451 ret_val = 1;
01452 else
01453 ACE_ERROR ((LM_ERROR,
01454 ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
01455 ptype,
01456 ACE_LIB_TEXT ("queueing failed\n")));
01457 }
01458
01459 return ret_val;
01460 }
|
|
|
Start deferred AIO if necessary.
Definition at line 1464 of file POSIX_Proactor.cpp. References ACE_ERROR_RETURN, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, LM_ERROR, num_deferred_aiocb_, putq_result(), result_list_, ACE_POSIX_Asynch_Result::set_bytes_transferred(), ACE_POSIX_Asynch_Result::set_error(), and start_aio_i(). Referenced by find_completed_aio().
01465 {
01466 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
01467
01468 // This protected method is called from
01469 // find_completed_aio after any AIO completion
01470 // We should call this method always with locked
01471 // ACE_POSIX_AIOCB_Proactor::mutex_
01472 //
01473 // It tries to start the first deferred AIO
01474 // if such exists
01475
01476 if (num_deferred_aiocb_ == 0)
01477 return 0; // nothing to do
01478
01479 size_t i = 0;
01480
01481 for (i= 0; i < this->aiocb_list_max_size_; i++)
01482 if (result_list_[i] !=0 // check for
01483 && aiocb_list_[i] ==0) // deferred AIO
01484 break;
01485
01486 if (i >= this->aiocb_list_max_size_)
01487 ACE_ERROR_RETURN ((LM_ERROR,
01488 "%N:%l:(%P | %t)::\n"
01489 "start_deferred_aio:"
01490 "internal Proactor error 3\n"),
01491 -1);
01492
01493 ACE_POSIX_Asynch_Result *result = result_list_[i];
01494
01495 int ret_val = start_aio_i (result);
01496
01497 switch (ret_val)
01498 {
01499 case 0 : //started OK , decrement count of deferred AIOs
01500 aiocb_list_[i] = result;
01501 num_deferred_aiocb_ --;
01502 return 0;
01503
01504 case 1 :
01505 return 0; //try again later
01506
01507 default : // Invalid Parameters , should never be
01508 break;
01509 }
01510
01511 //AL notify user
01512
01513 result_list_[i] = 0;
01514 aiocb_list_cur_size_--;
01515
01516 num_deferred_aiocb_ --;
01517
01518 result->set_error (errno);
01519 result->set_bytes_transferred (0);
01520 this->putq_result (result); // we are with locked mutex_ here !
01521
01522 return -1;
01523 }
|
|
|
Handler needs to call application specific code.
Definition at line 331 of file POSIX_Proactor.h. |
|
|
Definition at line 336 of file POSIX_Proactor.h. |
|
|
Definition at line 337 of file POSIX_Proactor.h. |
|
|
This class does the registering of Asynch Operations with the Proactor which is necessary in the AIOCB strategy. Definition at line 335 of file POSIX_Proactor.h. |
|
|
Use a dynamically allocated array to keep track of all the aio's issued currently. Definition at line 469 of file POSIX_Proactor.h. Referenced by cancel_aio(), create_result_aiocb_list(), delete_result_aiocb_list(), find_completed_aio(), handle_events_i(), start_aio(), and start_deferred_aio(). |
|
|
To maintain the current size of the array (list).
Definition at line 476 of file POSIX_Proactor.h. Referenced by cancel_aio(), find_completed_aio(), start_aio(), and start_deferred_aio(). |
|
|
To maintain the maximum size of the array (list).
Definition at line 473 of file POSIX_Proactor.h. Referenced by allocate_aio_slot(), cancel_aio(), check_max_aio_num(), create_result_aiocb_list(), delete_result_aiocb_list(), find_completed_aio(), handle_events_i(), start_aio(), and start_deferred_aio(). |
|
|
This class takes care of doing when we use AIO_CONTROL_BLOCKS strategy. Definition at line 465 of file POSIX_Proactor.h. Referenced by create_notify_manager(), delete_notify_manager(), and notify_completion(). |
|
|
Mutex to protect work with lists.
Definition at line 479 of file POSIX_Proactor.h. |
|
|
The purpose of this member is only to identify asynchronous request from NotifyManager. We will reserve for it always slot 0 in the list of aiocb's to be sure that don't lose notifications. Definition at line 484 of file POSIX_Proactor.h. Referenced by allocate_aio_slot(), and set_notify_handle(). |
|
|
Number of ACE_POSIX_Asynch_Result's waiting for start i.e. deferred AIOs Definition at line 488 of file POSIX_Proactor.h. Referenced by cancel_aio(), start_aio(), and start_deferred_aio(). |
|
|
Number active,i.e. running requests.
Definition at line 491 of file POSIX_Proactor.h. Referenced by find_completed_aio(), and start_aio_i(). |
|
|
Definition at line 470 of file POSIX_Proactor.h. Referenced by allocate_aio_slot(), cancel_aio(), create_result_aiocb_list(), delete_result_aiocb_list(), find_completed_aio(), start_aio(), and start_deferred_aio(). |
|
|
Queue which keeps "post_completed" ACE_POSIX_Asynch_Result's.
Definition at line 494 of file POSIX_Proactor.h. |
1.3.6