ACE_POSIX_AIOCB_Proactor Class Reference

This Proactor makes use of Asynchronous I/O Control Blocks (AIOCB) to notify/get the completion status of the operations issued. More...

#include <POSIX_Proactor.h>

Inheritance diagram for ACE_POSIX_AIOCB_Proactor:

Inheritance graph
[legend]
Collaboration diagram for ACE_POSIX_AIOCB_Proactor:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_POSIX_AIOCB_Proactor (size_t nmaxop=ACE_AIO_DEFAULT_SIZE)
virtual Proactor_Type get_impl_type (void)
virtual ~ACE_POSIX_AIOCB_Proactor (void)
 Destructor.

virtual int close (void)
 Close down the Proactor.

virtual int handle_events (ACE_Time_Value &wait_time)
virtual int handle_events (void)
virtual int post_completion (ACE_POSIX_Asynch_Result *result)
 Post a result to the completion port of the Proactor.

virtual int start_aio (ACE_POSIX_Asynch_Result *result, ACE_POSIX_Proactor::Opcode op)
virtual int cancel_aio (ACE_HANDLE h)

Protected Member Functions

 ACE_POSIX_AIOCB_Proactor (size_t nmaxop, ACE_POSIX_Proactor::Proactor_Type ptype)
virtual int get_result_status (ACE_POSIX_Asynch_Result *asynch_result, int &error_status, size_t &transfer_count)
int create_result_aiocb_list (void)
 Create aiocb list.

int delete_result_aiocb_list (void)
void create_notify_manager (void)
void delete_notify_manager (void)
void check_max_aio_num (void)
void set_notify_handle (ACE_HANDLE h)
 To identify requests from Notify_Pipe_Manager.

int handle_events_i (u_long milli_seconds)
int start_deferred_aio (void)
 Start deferred AIO if necessary.

virtual int cancel_aiocb (ACE_POSIX_Asynch_Result *result)
 Cancel running or deferred AIO.

ACE_POSIX_Asynch_Resultfind_completed_aio (int &error_status, size_t &transfer_count, size_t &index, size_t &count)
 Extract the results of aio.

virtual ssize_t allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
 Find free slot to store result and aiocb pointer.

virtual int start_aio_i (ACE_POSIX_Asynch_Result *result)
 Initiate an aio operation.

virtual int notify_completion (int sig_num)
int putq_result (ACE_POSIX_Asynch_Result *result)
 Put "post_completed" result into the internal queue.

ACE_POSIX_Asynch_Resultgetq_result (void)
 Get "post_completed" result from the internal queue.

int clear_result_queue (void)
 Clear the internal results queue.

int process_result_queue (void)
 Process the internal results queue.


Protected Attributes

ACE_AIOCB_Notify_Pipe_Manageraiocb_notify_pipe_manager_
aiocb ** aiocb_list_
ACE_POSIX_Asynch_Result ** result_list_
size_t aiocb_list_max_size_
 To maintain the maximum size of the array (list).

size_t aiocb_list_cur_size_
 To maintain the current size of the array (list).

ACE_SYNCH_MUTEX mutex_
 Mutex to protect work with lists.

ACE_HANDLE notify_pipe_read_handle_
size_t num_deferred_aiocb_
size_t num_started_aio_
 Number active,i.e. running requests.

ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * > result_queue_
 Queue which keeps "post_completed" ACE_POSIX_Asynch_Result's.


Friends

class ACE_AIOCB_Notify_Pipe_Manager
 Handler needs to call application specific code.

class ACE_POSIX_Asynch_Operation
class ACE_POSIX_Asynch_Accept
class ACE_POSIX_Asynch_Connect

Detailed Description

This Proactor makes use of Asynchronous I/O Control Blocks (AIOCB) to notify/get the completion status of the operations issued.

Definition at line 327 of file POSIX_Proactor.h.


Constructor & Destructor Documentation

ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor size_t  nmaxop = ACE_AIO_DEFAULT_SIZE  ) 
 

Constructor defines max number asynchronous operations which can be started at the same time

Definition at line 782 of file POSIX_Proactor.cpp.

References check_max_aio_num(), create_notify_manager(), create_result_aiocb_list(), ACE_POSIX_Proactor::get_asynch_pseudo_task(), and ACE_Asynch_Pseudo_Task::start().

00783   : aiocb_notify_pipe_manager_ (0),
00784     aiocb_list_ (0),
00785     result_list_ (0),
00786     aiocb_list_max_size_ (max_aio_operations),
00787     aiocb_list_cur_size_ (0),
00788     notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00789     num_deferred_aiocb_ (0),
00790     num_started_aio_ (0)
00791 {
00792   // Check for correct value for max_aio_operations
00793   check_max_aio_num ();
00794 
00795   this->create_result_aiocb_list ();
00796 
00797   this->create_notify_manager ();
00798 
00799   // start pseudo-asynchronous accept task
00800   // one per all future acceptors
00801   this->get_asynch_pseudo_task().start ();
00802 
00803 }

ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor void   )  [virtual]
 

Destructor.

Definition at line 827 of file POSIX_Proactor.cpp.

References close().

00828 {
00829   this->close();
00830 }

ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor size_t  nmaxop,
ACE_POSIX_Proactor::Proactor_Type  ptype
[protected]
 

Special constructor for ACE_SUN_Proactor and ACE_POSIX_SIG_Proactor

Definition at line 806 of file POSIX_Proactor.cpp.

References check_max_aio_num(), and create_result_aiocb_list().

00808   : aiocb_notify_pipe_manager_ (0),
00809     aiocb_list_ (0),
00810     result_list_ (0),
00811     aiocb_list_max_size_ (max_aio_operations),
00812     aiocb_list_cur_size_ (0),
00813     notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00814     num_deferred_aiocb_ (0),
00815     num_started_aio_ (0)
00816 {
00817   //check for correct value for max_aio_operations
00818   this->check_max_aio_num ();
00819 
00820   this->create_result_aiocb_list ();
00821 
00822   // @@ We should create Notify_Pipe_Manager in the derived class to
00823   // provide correct calls for virtual functions !!!
00824 }


Member Function Documentation

ssize_t ACE_POSIX_AIOCB_Proactor::allocate_aio_slot ACE_POSIX_Asynch_Result result  )  [protected, virtual]
 

Find free slot to store result and aiocb pointer.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 1376 of file POSIX_Proactor.cpp.

References ACE_ERROR_RETURN, aiocb_list_max_size_, LM_ERROR, notify_pipe_read_handle_, and result_list_.

Referenced by ACE_POSIX_CB_Proactor::allocate_aio_slot(), and start_aio().

01377 {
01378   size_t i = 0;
01379 
01380   // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager
01381   // so make check for ACE_AIOCB_Notify_Pipe_Manager request
01382 
01383   if (notify_pipe_read_handle_ == result->aio_fildes) // Notify_Pipe ?
01384     {                                       // should be free,
01385       if (result_list_[i] != 0)           // only 1 request
01386         {                                   // is allowed
01387           errno   = EAGAIN;
01388           ACE_ERROR_RETURN ((LM_ERROR,
01389                      "%N:%l:(%P | %t)::\n"
01390                      "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01391                      "internal Proactor error 0\n"),
01392                      -1);
01393         }
01394     }
01395   else  //try to find free slot as usual, but starting from 1
01396     {
01397       for (i= 1; i < this->aiocb_list_max_size_; i++)
01398         if (result_list_[i] == 0)
01399           break;
01400     }
01401 
01402   if (i >= this->aiocb_list_max_size_)
01403     ACE_ERROR_RETURN ((LM_ERROR,
01404               "%N:%l:(%P | %t)::\n"
01405               "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01406               "internal Proactor error 1\n"),
01407               -1);
01408 
01409   //setup OS notification methods for this aio
01410   result->aio_sigevent.sigev_notify = SIGEV_NONE;
01411 
01412   return static_cast<ssize_t> (i);
01413 }

int ACE_POSIX_AIOCB_Proactor::cancel_aio ACE_HANDLE  h  )  [virtual]
 

This method should be called from ACE_POSIX_Asynch_Operation::cancel() instead of usual ::aio_cancel. For all deferred AIO requests with handle "h" it removes its from the lists and notifies user. For all running AIO requests with handle "h" it calls ::aio_cancel. According to the POSIX standards we will receive ECANCELED for all ::aio_canceled AIO requests later on return from ::aio_suspend

Implements ACE_POSIX_Proactor.

Definition at line 1526 of file POSIX_Proactor.cpp.

References ACE_GUARD_RETURN, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, cancel_aiocb(), num_deferred_aiocb_, putq_result(), result_list_, ACE_POSIX_Asynch_Result::set_bytes_transferred(), and ACE_POSIX_Asynch_Result::set_error().

01527 {
01528   // This new method should be called from
01529   // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel
01530   // It scans the result_list_ and defines all AIO requests
01531   // that were issued for handle "handle"
01532   //
01533   // For all deferred AIO requests with handle "handle"
01534   // it removes its from the lists and notifies user
01535   //
01536   // For all running AIO requests with handle "handle"
01537   // it calls ::aio_cancel. According to the POSIX standards
01538   // we will receive ECANCELED  for all ::aio_canceled AIO requests
01539   // later on return from ::aio_suspend
01540 
01541   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
01542 
01543   int    num_total     = 0;
01544   int    num_cancelled = 0;
01545 
01546   {
01547     ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01548 
01549     size_t ai = 0;
01550 
01551     for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
01552       {
01553         if (this->result_list_[ai] == 0)    // Skip empty slot
01554           continue;
01555 
01556         if (this->result_list_[ai]->aio_fildes != handle)  // Not ours
01557           continue;
01558 
01559         num_total++;
01560 
01561         ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai];
01562 
01563         if (this->aiocb_list_[ai] == 0)  // Canceling a deferred operation
01564           {
01565             num_cancelled++;
01566             this->num_deferred_aiocb_--;
01567 
01568             this->aiocb_list_[ai] = 0;
01569             this->result_list_[ai] = 0;
01570             this->aiocb_list_cur_size_--;
01571 
01572             asynch_result->set_error (ECANCELED);
01573             asynch_result->set_bytes_transferred (0);
01574             this->putq_result (asynch_result);
01575             // we are with locked mutex_ here !
01576           }
01577         else      // Cancel started aio
01578           {
01579             int rc_cancel = this->cancel_aiocb (asynch_result);
01580 
01581             if (rc_cancel == 0)    //notification in the future
01582               num_cancelled++;     //it is OS responsiblity
01583           }
01584       }
01585 
01586   } // release mutex_
01587 
01588   if (num_total == 0)
01589     return 1;  // ALLDONE
01590 
01591   if (num_cancelled == num_total)
01592     return 0;  // CANCELLED
01593 
01594   return 2; // NOT CANCELLED
01595 }

int ACE_POSIX_AIOCB_Proactor::cancel_aiocb ACE_POSIX_Asynch_Result result  )  [protected, virtual]
 

Cancel running or deferred AIO.

Definition at line 1598 of file POSIX_Proactor.cpp.

Referenced by cancel_aio(), and delete_result_aiocb_list().

01599 {
01600   // This method is called from cancel_aio
01601   // to cancel a previously submitted AIO request
01602   int rc = ::aio_cancel (0, result);
01603 
01604   // Check the return value and return 0/1/2 appropriately.
01605   if (rc == AIO_CANCELED)
01606     return 0;
01607   else if (rc == AIO_ALLDONE)
01608     return 1;
01609   else // (rc == AIO_NOTCANCELED)
01610     return 2;
01611 }

void ACE_POSIX_AIOCB_Proactor::check_max_aio_num void   )  [protected]
 

Define the maximum number of asynchronous I/O requests for the current OS

Definition at line 956 of file POSIX_Proactor.cpp.

References ACE_AIO_MAX_SIZE, ACE_DEBUG, ACE_LIB_TEXT, aiocb_list_max_size_, LM_DEBUG, ACE::max_handles(), ACE::set_handle_limit(), and ACE_OS::sysconf().

Referenced by ACE_POSIX_AIOCB_Proactor().

00957 {
00958   long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
00959 
00960   // Define max limit AIO's for concrete OS
00961   // -1 means that there is no limit, but it is not true
00962   // (example, SunOS 5.6)
00963 
00964   if (max_os_aio_num > 0 &&
00965       aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
00966      aiocb_list_max_size_ = max_os_aio_num;
00967 
00968 #if defined (HPUX) || defined (__FreeBSD__)
00969   // Although HPUX 11.00 allows to start 2048 AIO's for all process in
00970   // system it has a limit 256 max elements for aio_suspend () It is a
00971   // pity, but ...
00972 
00973   long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
00974   if (max_os_listio_num > 0
00975       && aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
00976     aiocb_list_max_size_ = max_os_listio_num;
00977 #endif /* HPUX || __FreeBSD__ */
00978 
00979   // check for user-defined value
00980   // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h
00981 
00982   if (aiocb_list_max_size_ <= 0
00983      || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE)
00984      aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
00985 
00986   // check for max number files to open
00987 
00988   int max_num_files = ACE::max_handles ();
00989 
00990   if (max_num_files > 0
00991       && aiocb_list_max_size_ > (unsigned long) max_num_files)
00992     {
00993       ACE::set_handle_limit (aiocb_list_max_size_);
00994 
00995       max_num_files = ACE::max_handles ();
00996     }
00997 
00998   if (max_num_files > 0
00999       && aiocb_list_max_size_ > (unsigned long) max_num_files)
01000     aiocb_list_max_size_ = (unsigned long) max_num_files;
01001 
01002   ACE_DEBUG ((LM_DEBUG,
01003              "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
01004               aiocb_list_max_size_));
01005 
01006 #if defined(__sgi)
01007 
01008    ACE_DEBUG((LM_DEBUG,
01009               ACE_LIB_TEXT( "SGI IRIX specific: aio_init!\n")));
01010 
01011 //typedef struct aioinit {
01012 //    int aio_threads;  /* The number of aio threads to start (5) */
01013 //    int aio_locks;    /* Initial number of preallocated locks (3) */
01014 //    int aio_num;      /* estimated total simultanious aiobc structs (1000) */
01015 //    int aio_usedba;   /* Try to use DBA for raw I/O in lio_listio (0) */
01016 //    int aio_debug;    /* turn on debugging (0) */
01017 //    int aio_numusers; /* max number of user sprocs making aio_* calls (5) */
01018 //    int aio_reserved[3];
01019 //} aioinit_t;
01020 
01021     aioinit_t  aioinit;
01022 
01023     aioinit.aio_threads = 10; /* The number of aio threads to start (5) */
01024     aioinit.aio_locks = 20;   /* Initial number of preallocated locks (3) */
01025                        /* estimated total simultaneous aiobc structs (1000) */
01026     aioinit.aio_num = aiocb_list_max_size_;
01027     aioinit.aio_usedba = 0;   /* Try to use DBA for raw IO in lio_listio (0) */
01028     aioinit.aio_debug = 0;    /* turn on debugging (0) */
01029     aioinit.aio_numusers = 100; /* max number of user sprocs making aio_* calls (5) */
01030     aioinit.aio_reserved[0] = 0;
01031     aioinit.aio_reserved[1] = 0;
01032     aioinit.aio_reserved[2] = 0;
01033 
01034     aio_sgi_init (&aioinit);
01035 
01036 #endif
01037 
01038     return;
01039 }

int ACE_POSIX_AIOCB_Proactor::clear_result_queue void   )  [protected]
 

Clear the internal results queue.

Definition at line 1136 of file POSIX_Proactor.cpp.

References getq_result().

Referenced by close().

01137 {
01138   int ret_val = 0;
01139   ACE_POSIX_Asynch_Result* result = 0;
01140 
01141   while ((result = this->getq_result ()) != 0)
01142     {
01143       delete result;
01144       ret_val++;
01145     }
01146 
01147   return ret_val;
01148 }

int ACE_POSIX_AIOCB_Proactor::close void   )  [virtual]
 

Close down the Proactor.

Reimplemented from ACE_POSIX_Proactor.

Definition at line 840 of file POSIX_Proactor.cpp.

References clear_result_queue(), delete_notify_manager(), delete_result_aiocb_list(), ACE_POSIX_Proactor::get_asynch_pseudo_task(), and ACE_Asynch_Pseudo_Task::stop().

Referenced by ~ACE_POSIX_AIOCB_Proactor(), and ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor().

00841 {
00842   // stop asynch accept task
00843   this->get_asynch_pseudo_task().stop ();
00844 
00845   this->delete_notify_manager ();
00846 
00847   this->clear_result_queue ();
00848 
00849   return this->delete_result_aiocb_list ();
00850 }

void ACE_POSIX_AIOCB_Proactor::create_notify_manager void   )  [protected]
 

Call these methods from derived class when virtual table is built.

Definition at line 1042 of file POSIX_Proactor.cpp.

References ACE_NEW, and aiocb_notify_pipe_manager_.

Referenced by ACE_POSIX_AIOCB_Proactor().

01043 {
01044   // Remember! this issues a Asynch_Read
01045   // on the notify pipe for doing the Asynch_Accept/Connect.
01046 
01047   if (aiocb_notify_pipe_manager_ == 0)
01048     ACE_NEW (aiocb_notify_pipe_manager_,
01049              ACE_AIOCB_Notify_Pipe_Manager (this));
01050 }

int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list void   )  [protected]
 

Create aiocb list.

Definition at line 857 of file POSIX_Proactor.cpp.

References ACE_NEW_RETURN, aiocb_list_, aiocb_list_max_size_, and result_list_.

Referenced by ACE_POSIX_AIOCB_Proactor().

00858 {
00859   if (aiocb_list_ != 0)
00860     return 0;
00861 
00862   ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
00863 
00864   ACE_NEW_RETURN (result_list_,
00865                   ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
00866                   -1);
00867 
00868   // Initialize the array.
00869   for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
00870     {
00871       aiocb_list_[ai] = 0;
00872       result_list_[ai] = 0;
00873     }
00874 
00875   return 0;
00876 }

void ACE_POSIX_AIOCB_Proactor::delete_notify_manager void   )  [protected]
 

Definition at line 1053 of file POSIX_Proactor.cpp.

References aiocb_notify_pipe_manager_.

Referenced by close().

01054 {
01055   // We are responsible for delete as all pointers set to 0 after
01056   // delete, it is save to delete twice
01057 
01058   delete aiocb_notify_pipe_manager_;
01059   aiocb_notify_pipe_manager_ = 0;
01060 }

int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list void   )  [protected]
 

Call this method from derived class when virtual table is built.

Definition at line 878 of file POSIX_Proactor.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_LIB_TEXT, aiocb_list_, aiocb_list_max_size_, cancel_aiocb(), get_result_status(), LM_DEBUG, LM_ERROR, result_list_, and ACE_OS::strerror().

Referenced by close().

00879 {
00880   if (aiocb_list_ == 0)  // already deleted
00881     return 0;
00882 
00883   size_t ai;
00884 
00885   // Try to cancel all uncomlpeted operarion POSIX systems may have
00886   // hidden system threads that still can work with our aiocb's!
00887   for (ai = 0; ai < aiocb_list_max_size_; ai++)
00888     if (this->aiocb_list_[ai] != 0)  // active operation
00889       this->cancel_aiocb (result_list_[ai]);
00890 
00891   int num_pending = 0;
00892 
00893   for (ai = 0; ai < aiocb_list_max_size_; ai++)
00894     {
00895       if (this->aiocb_list_[ai] == 0 ) //  not active operation
00896         continue;
00897 
00898       // Get the error and return status of the aio_ operation.
00899       int error_status  = 0;
00900       size_t transfer_count = 0;
00901       int flg_completed = this->get_result_status (result_list_[ai],
00902                                                    error_status,
00903                                                    transfer_count);
00904 
00905       //don't delete uncompleted AIOCB's
00906       if (flg_completed == 0)  // not completed !!!
00907         {
00908           num_pending++;
00909 #if 0
00910           char * errtxt = ACE_OS::strerror (error_status);
00911           if (errtxt == 0)
00912               errtxt ="?????????";
00913 
00914           char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
00915             "WRITE":"READ" ;
00916 
00917 
00918           ACE_ERROR ((LM_ERROR,
00919                       ACE_LIB_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
00920                       ai,
00921                       op,
00922                       error_status,
00923                       transfer_count,
00924                       errtxt));
00925 #endif /* 0 */
00926         }
00927       else // completed , OK
00928         {
00929           delete this->result_list_[ai];
00930           this->result_list_[ai] = 0;
00931           this->aiocb_list_[ai] = 0;
00932         }
00933     }
00934 
00935   // If it is not possible cancel some operation (num_pending > 0 ),
00936   // we can do only one thing -report about this
00937   // and complain about POSIX implementation.
00938   // We know that we have memory leaks, but it is better than
00939   // segmentation fault!
00940   ACE_DEBUG
00941     ((LM_DEBUG,
00942       ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
00943       ACE_LIB_TEXT(" number pending AIO=%d\n"),
00944       num_pending));
00945 
00946   delete [] this->aiocb_list_;
00947   this->aiocb_list_ = 0;
00948 
00949   delete [] this->result_list_;
00950   this->result_list_ = 0;
00951 
00952   return (num_pending == 0 ? 0 : -1);
00953   // ?? or just always return 0;
00954 }

ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::find_completed_aio int &  error_status,
size_t &  transfer_count,
size_t &  index,
size_t &  count
[protected]
 

Extract the results of aio.

Definition at line 1256 of file POSIX_Proactor.cpp.

References ACE_GUARD_RETURN, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, get_result_status(), num_started_aio_, result_list_, and start_deferred_aio().

Referenced by handle_events_i(), and ACE_POSIX_CB_Proactor::handle_events_i().

01260 {
01261   // parameter index defines initial slot to scan
01262   // parameter count tells us how many slots should we scan
01263 
01264   ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
01265 
01266   ACE_POSIX_Asynch_Result *asynch_result = 0;
01267 
01268   if (num_started_aio_ == 0)  // save time
01269     return 0;
01270 
01271   for (; count > 0; index++ , count--)
01272     {
01273       if (index >= aiocb_list_max_size_) // like a wheel
01274         index = 0;
01275 
01276       if (aiocb_list_[index] == 0) // Dont process null blocks.
01277         continue;
01278 
01279       if (0 != this->get_result_status (result_list_[index],
01280                                         error_status,
01281                                         transfer_count))  // completed
01282         break;
01283 
01284     } // end for
01285 
01286   if (count == 0) // all processed , nothing found
01287     return 0;
01288   asynch_result = result_list_[index];
01289 
01290   aiocb_list_[index] = 0;
01291   result_list_[index] = 0;
01292   aiocb_list_cur_size_--;
01293 
01294   num_started_aio_--;  // decrement count active aios
01295   index++;             // for next iteration
01296   count--;             // for next iteration
01297 
01298   this->start_deferred_aio ();
01299   //make attempt to start deferred AIO
01300   //It is safe as we are protected by mutex_
01301 
01302   return asynch_result;
01303 }

ACE_POSIX_Proactor::Proactor_Type ACE_POSIX_AIOCB_Proactor::get_impl_type void   )  [virtual]
 

Reimplemented from ACE_POSIX_Proactor.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 833 of file POSIX_Proactor.cpp.

00834 {
00835   return PROACTOR_AIOCB;
00836 }

int ACE_POSIX_AIOCB_Proactor::get_result_status ACE_POSIX_Asynch_Result asynch_result,
int &  error_status,
size_t &  transfer_count
[protected, virtual]
 

Check AIO for completion, error and result status Return: 1 - AIO completed , 0 - not completed yet

Definition at line 1237 of file POSIX_Proactor.cpp.

References EINPROGRESS, and ssize_t.

Referenced by delete_result_aiocb_list(), and find_completed_aio().

01240 {
01241   transfer_count = 0;
01242 
01243   // Get the error status of the aio_ operation.
01244   error_status  = aio_error (asynch_result);
01245   if (error_status == EINPROGRESS)
01246     return 0;  // not completed
01247 
01248   ssize_t op_return = aio_return (asynch_result);
01249   if (op_return > 0)
01250     transfer_count = static_cast<size_t> (op_return);
01251   // else transfer_count is already 0, error_status reports the error.
01252   return 1; // completed
01253 }

ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result void   )  [protected]
 

Get "post_completed" result from the internal queue.

Definition at line 1116 of file POSIX_Proactor.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, and ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * >::dequeue_head().

Referenced by clear_result_queue(), and process_result_queue().

01117 {
01118   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0));
01119 
01120 
01121   ACE_POSIX_Asynch_Result* result = 0;
01122 
01123   if (this->result_queue_.dequeue_head (result) != 0)
01124     return 0;
01125 
01126 //  don;t waste time if queue is empty - it is normal
01127 //  or check queue size before dequeue_head
01128 //    ACE_ERROR_RETURN ((LM_ERROR,
01129 //                       "%N:%l:(%P | %t):%p\n",
01130 //                       "ACE_POSIX_AIOCB_Proactor::getq_result failed"),
01131 //                      0);
01132 
01133   return result;
01134 }

int ACE_POSIX_AIOCB_Proactor::handle_events void   )  [virtual]
 

Block indefinitely until at least one event is dispatched. Dispatch a single set of events. If elapses before any events occur, return 0. Return 1 on success i.e., when a completion is dispatched, non-zero (-1) on errors and errno is set accordingly.

Implements ACE_POSIX_Proactor.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 1071 of file POSIX_Proactor.cpp.

References ACE_INFINITE, and handle_events_i().

01072 {
01073   return this->handle_events_i (ACE_INFINITE);
01074 }

int ACE_POSIX_AIOCB_Proactor::handle_events ACE_Time_Value wait_time  )  [virtual]
 

Dispatch a single set of events. If elapses before any events occur, return 0. Return 1 on success i.e., when a completion is dispatched, non-zero (-1) on errors and errno is set accordingly.

Implements ACE_POSIX_Proactor.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 1063 of file POSIX_Proactor.cpp.

References handle_events_i(), and ACE_Time_Value::msec().

01064 {
01065   // Decrement <wait_time> with the amount of time spent in the method
01066   ACE_Countdown_Time countdown (&wait_time);
01067   return this->handle_events_i (wait_time.msec ());
01068 }

int ACE_POSIX_AIOCB_Proactor::handle_events_i u_long  milli_seconds  )  [protected]
 

Dispatch a single set of events. If elapses before any events occur, return 0. Return 1 if a completion dispatched. Return -1 on errors.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 1170 of file POSIX_Proactor.cpp.

References ACE_ERROR, ACE_INFINITE, aiocb_list_, aiocb_list_max_size_, ACE_POSIX_Proactor::application_specific_code(), find_completed_aio(), LM_ERROR, process_result_queue(), timespec::tv_nsec, and timespec::tv_sec.

Referenced by handle_events().

01171 {
01172   int result_suspend = 0;
01173   int retval= 0;
01174 
01175   if (milli_seconds == ACE_INFINITE)
01176     // Indefinite blocking.
01177     result_suspend = aio_suspend (aiocb_list_,
01178                                   aiocb_list_max_size_,
01179                                   0);
01180   else
01181     {
01182       // Block on <aio_suspend> for <milli_seconds>
01183       timespec timeout;
01184       timeout.tv_sec = milli_seconds / 1000;
01185       timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000000;
01186       result_suspend = aio_suspend (aiocb_list_,
01187                                     aiocb_list_max_size_,
01188                                     &timeout);
01189     }
01190 
01191   // Check for errors
01192   if (result_suspend == -1)
01193     {
01194       if (errno != EAGAIN &&   // Timeout
01195           errno != EINTR )    // Interrupted call
01196           ACE_ERROR ((LM_ERROR,
01197                       "%N:%l:(%P | %t)::%p\n",
01198                       "ACE_POSIX_AIOCB_Proactor::handle_events:"
01199                       "aio_suspend failed\n"));
01200 
01201       // let continue work
01202       // we should check "post_completed" queue
01203     }
01204   else
01205     {
01206       size_t index = 0;
01207       size_t count = aiocb_list_max_size_;  // max number to iterate
01208       int error_status = 0;
01209       size_t transfer_count = 0;
01210 
01211       for (;; retval++)
01212         {
01213           ACE_POSIX_Asynch_Result *asynch_result =
01214             find_completed_aio (error_status,
01215                                 transfer_count,
01216                                 index,
01217                                 count);
01218 
01219           if (asynch_result == 0)
01220             break;
01221 
01222           // Call the application code.
01223           this->application_specific_code (asynch_result,
01224                                            transfer_count,
01225                                            0,             // No completion key.
01226                                            error_status);
01227         }
01228     }
01229 
01230   // process post_completed results
01231   retval += this->process_result_queue ();
01232 
01233   return retval > 0 ? 1 : 0;
01234 }

int ACE_POSIX_AIOCB_Proactor::notify_completion int  sig_num  )  [protected, virtual]
 

Notify queue of "post_completed" ACE_POSIX_Asynch_Results called from post_completion method

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 1077 of file POSIX_Proactor.cpp.

References aiocb_notify_pipe_manager_, and ACE_AIOCB_Notify_Pipe_Manager::notify().

Referenced by putq_result().

01078 {
01079   ACE_UNUSED_ARG (sig_num);
01080 
01081   return this->aiocb_notify_pipe_manager_->notify ();
01082 }

int ACE_POSIX_AIOCB_Proactor::post_completion ACE_POSIX_Asynch_Result result  )  [virtual]
 

Post a result to the completion port of the Proactor.

Implements ACE_POSIX_Proactor.

Definition at line 1085 of file POSIX_Proactor.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, and putq_result().

01086 {
01087   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1));
01088 
01089   int ret_val = this->putq_result (result);
01090 
01091   return ret_val;
01092 }

int ACE_POSIX_AIOCB_Proactor::process_result_queue void   )  [protected]
 

Process the internal results queue.

Definition at line 1150 of file POSIX_Proactor.cpp.

References ACE_POSIX_Asynch_Result::bytes_transferred(), and getq_result().

Referenced by handle_events_i(), and ACE_POSIX_CB_Proactor::handle_events_i().

01151 {
01152   int ret_val = 0;
01153   ACE_POSIX_Asynch_Result* result = 0;
01154 
01155   while ((result = this->getq_result ()) != 0)
01156     {
01157       this->application_specific_code
01158             (result,
01159              result->bytes_transferred(), // 0, No bytes transferred.
01160              0,  // No completion key.
01161              result->error());   //0, No error.
01162 
01163       ret_val++;
01164     }
01165 
01166   return ret_val;
01167 }

int ACE_POSIX_AIOCB_Proactor::putq_result ACE_POSIX_Asynch_Result result  )  [protected]
 

Put "post_completed" result into the internal queue.

Definition at line 1095 of file POSIX_Proactor.cpp.

References ACE_ERROR_RETURN, ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * >::enqueue_tail(), LM_ERROR, notify_completion(), and ACE_POSIX_Asynch_Result::signal_number().

Referenced by cancel_aio(), post_completion(), and start_deferred_aio().

01096 {
01097   // this protected method should be called with locked mutex_
01098   // we can't use GUARD as Proactor uses non-recursive mutex
01099 
01100   if (!result)
01101     return -1;
01102 
01103   int sig_num = result->signal_number ();
01104   int ret_val = this->result_queue_.enqueue_tail (result);
01105 
01106   if (ret_val == -1)
01107     ACE_ERROR_RETURN ((LM_ERROR,
01108                        "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
01109                       -1);
01110 
01111   this->notify_completion (sig_num);
01112 
01113   return 0;
01114 }

void ACE_POSIX_AIOCB_Proactor::set_notify_handle ACE_HANDLE  h  )  [protected]
 

To identify requests from Notify_Pipe_Manager.

Definition at line 852 of file POSIX_Proactor.cpp.

References notify_pipe_read_handle_.

Referenced by ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager().

00853 {
00854   notify_pipe_read_handle_ = h;
00855 }

int ACE_POSIX_AIOCB_Proactor::start_aio ACE_POSIX_Asynch_Result result,
ACE_POSIX_Proactor::Opcode  op
[virtual]
 

Definition at line 1307 of file POSIX_Proactor.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, allocate_aio_slot(), LM_ERROR, num_deferred_aiocb_, result_list_, ssize_t, and start_aio_i().

01309 {
01310   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
01311 
01312   ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01313 
01314   int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
01315 
01316   if (result == 0) // Just check the status of the list
01317     return ret_val;
01318 
01319   // Save operation code in the aiocb
01320   switch (op)
01321     {
01322     case ACE_POSIX_Proactor::ACE_OPCODE_READ:
01323       result->aio_lio_opcode = LIO_READ;
01324       break;
01325 
01326     case ACE_POSIX_Proactor::ACE_OPCODE_WRITE:
01327       result->aio_lio_opcode = LIO_WRITE;
01328       break;
01329 
01330     default:
01331       ACE_ERROR_RETURN ((LM_ERROR,
01332                          "%N:%l:(%P | %t)::\n"
01333                          "start_aio: Invalid operation code\n"),
01334                         -1);
01335     }
01336 
01337   if (ret_val != 0)   // No free slot
01338     {
01339       errno = EAGAIN;
01340       return -1;
01341     }
01342 
01343   // Find a free slot and store.
01344 
01345   ssize_t slot = allocate_aio_slot (result);
01346 
01347   if (slot < 0)
01348     return -1;
01349 
01350   size_t index = static_cast<size_t> (slot);
01351 
01352   result_list_[index] = result;   //Store result ptr anyway
01353   aiocb_list_cur_size_++;
01354 
01355   ret_val = start_aio_i (result);
01356   switch (ret_val)
01357     {
01358     case 0:     // started OK
01359       aiocb_list_[index] = result;
01360       return 0;
01361 
01362     case 1:     // OS AIO queue overflow
01363       num_deferred_aiocb_ ++;
01364       return 0;
01365 
01366     default:    // Invalid request, there is no point
01367       break;    // to start it later
01368     }
01369 
01370   result_list_[index] = 0;
01371   aiocb_list_cur_size_--;
01372   return -1;
01373 }

int ACE_POSIX_AIOCB_Proactor::start_aio_i ACE_POSIX_Asynch_Result result  )  [protected, virtual]
 

Initiate an aio operation.

Definition at line 1421 of file POSIX_Proactor.cpp.

References ACE_ERROR, ACE_LIB_TEXT, ACE_TCHAR, ACE_TRACE, LM_ERROR, and num_started_aio_.

Referenced by start_aio(), and start_deferred_aio().

01422 {
01423   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
01424 
01425   int ret_val;
01426   const ACE_TCHAR *ptype;
01427 
01428   // Start IO
01429 
01430   switch (result->aio_lio_opcode )
01431     {
01432     case LIO_READ :
01433       ptype = ACE_LIB_TEXT ("read ");
01434       ret_val = aio_read (result);
01435       break;
01436     case LIO_WRITE :
01437       ptype = ACE_LIB_TEXT ("write");
01438       ret_val = aio_write (result);
01439       break;
01440     default:
01441       ptype = ACE_LIB_TEXT ("?????");
01442       ret_val = -1;
01443       break;
01444     }
01445 
01446   if (ret_val == 0)
01447     this->num_started_aio_++;
01448   else // if (ret_val == -1)
01449     {
01450       if (errno == EAGAIN || errno == ENOMEM)  //Ok, it will be deferred AIO
01451         ret_val = 1;
01452       else
01453         ACE_ERROR ((LM_ERROR,
01454                     ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
01455                     ptype,
01456                     ACE_LIB_TEXT ("queueing failed\n")));
01457     }
01458 
01459   return ret_val;
01460 }

int ACE_POSIX_AIOCB_Proactor::start_deferred_aio void   )  [protected]
 

Start deferred AIO if necessary.

Definition at line 1464 of file POSIX_Proactor.cpp.

References ACE_ERROR_RETURN, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, LM_ERROR, num_deferred_aiocb_, putq_result(), result_list_, ACE_POSIX_Asynch_Result::set_bytes_transferred(), ACE_POSIX_Asynch_Result::set_error(), and start_aio_i().

Referenced by find_completed_aio().

01465 {
01466   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
01467 
01468   // This protected method is called from
01469   // find_completed_aio after any AIO completion
01470   // We should call this method always with locked
01471   // ACE_POSIX_AIOCB_Proactor::mutex_
01472   //
01473   // It tries to start the first deferred AIO
01474   // if such exists
01475 
01476   if (num_deferred_aiocb_ == 0)
01477     return 0;  //  nothing to do
01478 
01479   size_t i = 0;
01480 
01481   for (i= 0; i < this->aiocb_list_max_size_; i++)
01482     if (result_list_[i] !=0       // check for
01483        && aiocb_list_[i]  ==0)     // deferred AIO
01484       break;
01485 
01486   if (i >= this->aiocb_list_max_size_)
01487     ACE_ERROR_RETURN ((LM_ERROR,
01488                  "%N:%l:(%P | %t)::\n"
01489                  "start_deferred_aio:"
01490                  "internal Proactor error 3\n"),
01491                  -1);
01492 
01493   ACE_POSIX_Asynch_Result *result = result_list_[i];
01494 
01495   int ret_val = start_aio_i (result);
01496 
01497   switch (ret_val)
01498     {
01499     case 0 :    //started OK , decrement count of deferred AIOs
01500       aiocb_list_[i] = result;
01501       num_deferred_aiocb_ --;
01502       return 0;
01503 
01504     case 1 :
01505       return 0;  //try again later
01506 
01507     default :     // Invalid Parameters , should never be
01508       break;
01509     }
01510 
01511   //AL notify  user
01512 
01513   result_list_[i] = 0;
01514   aiocb_list_cur_size_--;
01515 
01516   num_deferred_aiocb_ --;
01517 
01518   result->set_error (errno);
01519   result->set_bytes_transferred (0);
01520   this->putq_result (result);  // we are with locked mutex_ here !
01521 
01522   return -1;
01523 }


Friends And Related Function Documentation

friend class ACE_AIOCB_Notify_Pipe_Manager [friend]
 

Handler needs to call application specific code.

Definition at line 331 of file POSIX_Proactor.h.

friend class ACE_POSIX_Asynch_Accept [friend]
 

Definition at line 336 of file POSIX_Proactor.h.

friend class ACE_POSIX_Asynch_Connect [friend]
 

Definition at line 337 of file POSIX_Proactor.h.

friend class ACE_POSIX_Asynch_Operation [friend]
 

This class does the registering of Asynch Operations with the Proactor which is necessary in the AIOCB strategy.

Definition at line 335 of file POSIX_Proactor.h.


Member Data Documentation

aiocb** ACE_POSIX_AIOCB_Proactor::aiocb_list_ [protected]
 

Use a dynamically allocated array to keep track of all the aio's issued currently.

Definition at line 469 of file POSIX_Proactor.h.

Referenced by cancel_aio(), create_result_aiocb_list(), delete_result_aiocb_list(), find_completed_aio(), handle_events_i(), start_aio(), and start_deferred_aio().

size_t ACE_POSIX_AIOCB_Proactor::aiocb_list_cur_size_ [protected]
 

To maintain the current size of the array (list).

Definition at line 476 of file POSIX_Proactor.h.

Referenced by cancel_aio(), find_completed_aio(), start_aio(), and start_deferred_aio().

size_t ACE_POSIX_AIOCB_Proactor::aiocb_list_max_size_ [protected]
 

To maintain the maximum size of the array (list).

Definition at line 473 of file POSIX_Proactor.h.

Referenced by allocate_aio_slot(), cancel_aio(), check_max_aio_num(), create_result_aiocb_list(), delete_result_aiocb_list(), find_completed_aio(), handle_events_i(), start_aio(), and start_deferred_aio().

ACE_AIOCB_Notify_Pipe_Manager* ACE_POSIX_AIOCB_Proactor::aiocb_notify_pipe_manager_ [protected]
 

This class takes care of doing when we use AIO_CONTROL_BLOCKS strategy.

Definition at line 465 of file POSIX_Proactor.h.

Referenced by create_notify_manager(), delete_notify_manager(), and notify_completion().

ACE_SYNCH_MUTEX ACE_POSIX_AIOCB_Proactor::mutex_ [protected]
 

Mutex to protect work with lists.

Definition at line 479 of file POSIX_Proactor.h.

ACE_HANDLE ACE_POSIX_AIOCB_Proactor::notify_pipe_read_handle_ [protected]
 

The purpose of this member is only to identify asynchronous request from NotifyManager. We will reserve for it always slot 0 in the list of aiocb's to be sure that don't lose notifications.

Definition at line 484 of file POSIX_Proactor.h.

Referenced by allocate_aio_slot(), and set_notify_handle().

size_t ACE_POSIX_AIOCB_Proactor::num_deferred_aiocb_ [protected]
 

Number of ACE_POSIX_Asynch_Result's waiting for start i.e. deferred AIOs

Definition at line 488 of file POSIX_Proactor.h.

Referenced by cancel_aio(), start_aio(), and start_deferred_aio().

size_t ACE_POSIX_AIOCB_Proactor::num_started_aio_ [protected]
 

Number active,i.e. running requests.

Definition at line 491 of file POSIX_Proactor.h.

Referenced by find_completed_aio(), and start_aio_i().

ACE_POSIX_Asynch_Result** ACE_POSIX_AIOCB_Proactor::result_list_ [protected]
 

Definition at line 470 of file POSIX_Proactor.h.

Referenced by allocate_aio_slot(), cancel_aio(), create_result_aiocb_list(), delete_result_aiocb_list(), find_completed_aio(), start_aio(), and start_deferred_aio().

ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *> ACE_POSIX_AIOCB_Proactor::result_queue_ [protected]
 

Queue which keeps "post_completed" ACE_POSIX_Asynch_Result's.

Definition at line 494 of file POSIX_Proactor.h.


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 11:26:26 2006 for ACE by doxygen 1.3.6