SUN_Proactor.cpp

Go to the documentation of this file.
00001 // $Id: SUN_Proactor.cpp 80826 2008-03-04 14:51:23Z wotte $
00002 
00003 #include "ace/SUN_Proactor.h"
00004 
00005 #if defined (ACE_HAS_AIO_CALLS) && defined (sun)
00006 
00007 #include "ace/Task_T.h"
00008 #include "ace/Log_Msg.h"
00009 #include "ace/Object_Manager.h"
00010 
00011 
00012 ACE_RCSID (ace,
00013            POSIX_CB_Proactor,
00014            "$Id: SUN_Proactor.cpp 80826 2008-03-04 14:51:23Z wotte $")
00015 
00016 
00017 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00018 
00019 ACE_SUN_Proactor::ACE_SUN_Proactor (size_t max_aio_operations)
00020   : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
00021                               ACE_POSIX_Proactor::PROACTOR_SUN),
00022     condition_ (mutex_)
00023 {
00024   // To provide correct virtual calls.
00025   create_notify_manager ();
00026 
00027   // we should start pseudo-asynchronous accept task
00028   // one per all future acceptors
00029 
00030   this->get_asynch_pseudo_task ().start ();
00031 }
00032 
00033 // Destructor.
00034 ACE_SUN_Proactor::~ACE_SUN_Proactor (void)
00035 {
00036   this->close ();
00037 }
00038 
00039 int
00040 ACE_SUN_Proactor::handle_events (ACE_Time_Value &wait_time)
00041 {
00042   // Decrement <wait_time> with the amount of time spent in the method
00043   ACE_Countdown_Time countdown (&wait_time);
00044   return this->handle_events_i (&wait_time);
00045 }
00046 
00047 int
00048 ACE_SUN_Proactor::handle_events (void)
00049 {
00050   return this->handle_events_i (0);
00051 }
00052 
00053 int ACE_SUN_Proactor::wait_for_start (ACE_Time_Value * abstime)
00054 {
00055 #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
00056 
00057   ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, -1));
00058 
00059   if (this->num_started_aio_ != 0)  // double check
00060     return 0;
00061 
00062   return this->condition_.wait (abstime);
00063 
00064 #else
00065 
00066   return 0;  // or -1 ???
00067 
00068 #endif /* ACE_MT_SAFE */
00069 }
00070 
00071 int
00072 ACE_SUN_Proactor::handle_events_i (ACE_Time_Value *delta)
00073 {
00074   int retval = 0;
00075   aio_result_t *result = 0;
00076 
00077   if (0 == delta)
00078     {
00079       if (this->num_started_aio_ == 0)
00080         this->wait_for_start (0);
00081 
00082       result = aiowait (0);
00083     }
00084   else
00085     {
00086       if (this->num_started_aio_ == 0)
00087         {
00088           // Decrement delta with the amount of time spent waiting
00089           ACE_Countdown_Time countdown (delta);
00090           ACE_Time_Value tv (*delta);
00091           tv += ACE_OS::gettimeofday ();
00092           if (this->wait_for_start (&tv) == -1)
00093             return -1;
00094         }
00095       struct timeval delta_tv = *delta;
00096       result = aiowait (&delta_tv);
00097     }
00098 
00099   if (result == 0)
00100     {
00101       // timeout, do nothing,
00102       // we should process "post_completed" queue
00103     }
00104   else if (reinterpret_cast<long> (result) == -1)
00105     {
00106       // Check errno  for  EINVAL,EAGAIN,EINTR ??
00107       switch (errno)
00108        {
00109        case EINTR :     // aiowait() was interrupted by a signal.
00110        case EINVAL:     // there are no outstanding asynchronous I/O requests.
00111          break;         // we should process "post_completed" queue
00112 
00113        default:         // EFAULT
00114          ACE_ERROR_RETURN ((LM_ERROR,
00115                        "%N:%l:(%P | %t)::%p \nNumAIO=%d\n",
00116                        "ACE_SUN_Proactor::handle_events: aiowait failed",
00117                         num_started_aio_),
00118                       -1);
00119        }
00120     }
00121   else
00122     {
00123       int error_status = 0;
00124       size_t transfer_count = 0;
00125 
00126       ACE_POSIX_Asynch_Result *asynch_result =
00127         find_completed_aio (result,
00128                             error_status,
00129                             transfer_count);
00130 
00131       if (asynch_result != 0)
00132         {
00133           // Call the application code.
00134           this->application_specific_code (asynch_result,
00135                                            transfer_count,
00136                                            0,             // No completion key.
00137                                            error_status); // Error
00138           retval++;
00139         }
00140     }
00141 
00142   // process post_completed results
00143   retval += this->process_result_queue ();
00144 
00145   return retval > 0 ? 1 : 0 ;
00146 
00147 }
00148 
00149 int
00150 ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
00151                                      int &error_status,
00152                                      size_t &transfer_count)
00153 {
00154 
00155    // Get the error status of the aio_ operation.
00156    error_status  = asynch_result->aio_resultp.aio_errno;
00157    ssize_t op_return = asynch_result->aio_resultp.aio_return;
00158 
00159    // ****** from Sun man pages *********************
00160    // Upon completion of the operation both aio_return and aio_errno
00161    // are set to reflect the result of the operation.
00162    // AIO_INPROGRESS is not a value used by the system
00163    // so the client may detect a change in state
00164    // by initializing aio_return to this value.
00165 
00166    if (error_status == EINPROGRESS || op_return == AIO_INPROGRESS)
00167      return 0;  // not completed
00168 
00169    if (op_return < 0)
00170      transfer_count = 0; // zero bytes transferred
00171    else
00172      transfer_count = static_cast<size_t> (op_return);
00173 
00174    return 1; // completed
00175 }
00176 
00177 ACE_POSIX_Asynch_Result *
00178 ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
00179                                       int &error_status,
00180                                       size_t &transfer_count)
00181 {
00182   ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, 0));
00183 
00184   size_t ai;
00185   error_status = -1;
00186   transfer_count = 0;
00187 
00188   // we call find_completed_aio always with result != 0
00189 
00190   for (ai = 0; ai < aiocb_list_max_size_; ai++)
00191     if (aiocb_list_[ai] != 0 &&                 //check for non zero
00192         result == &aiocb_list_[ai]->aio_resultp)
00193       break;
00194 
00195   if (ai >= aiocb_list_max_size_)   // not found
00196     return 0;                       // means somebody else uses aio directly!!!
00197 
00198   ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai];
00199 
00200   if (this->get_result_status (asynch_result,
00201                                error_status,
00202                                transfer_count) == 0)
00203     { // should never be
00204       ACE_ERROR ((LM_ERROR,
00205                   "%N:%l:(%P | %t)::%p\n",
00206                   "ACE_SUN_Proactor::find_completed_aio:"
00207                   "should never be !!!\n"));
00208       return 0;
00209     }
00210 
00211   aiocb_list_[ai] = 0;
00212   result_list_[ai] = 0;
00213   aiocb_list_cur_size_--;
00214 
00215   num_started_aio_--;
00216 
00217   start_deferred_aio ();
00218   //make attempt to start deferred AIO
00219   //It is safe as we are protected by mutex_
00220 
00221   return asynch_result;
00222 }
00223 
00224 // start_aio_i has new return codes
00225 // 0  successful start
00226 // 1  try later, OS queue overflow
00227 // -1 invalid request and other errors
00228 
00229 int
00230 ACE_SUN_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
00231 {
00232   ACE_TRACE ("ACE_SUN_Proactor::start_aio_i");
00233 
00234   int ret_val;
00235   const ACE_TCHAR *ptype;
00236 
00237   // ****** from Sun man pages *********************
00238   // Upon completion of the operation both aio_return and aio_errno
00239   // are set to reflect the result of the operation.
00240   // AIO_INPROGRESS is not a value used by the system
00241   // so the client may detect a change in state
00242   // by initializing aio_return to this value.
00243   result->aio_resultp.aio_return = AIO_INPROGRESS;
00244   result->aio_resultp.aio_errno  = EINPROGRESS;
00245 
00246   // Start IO
00247   switch (result->aio_lio_opcode)
00248     {
00249     case LIO_READ :
00250       ptype = ACE_TEXT ("read");
00251       ret_val = aioread (result->aio_fildes,
00252                          (char *) result->aio_buf,
00253                          result->aio_nbytes,
00254                          result->aio_offset,
00255                          SEEK_SET,
00256                          &result->aio_resultp);
00257       break;
00258 
00259     case LIO_WRITE :
00260       ptype = ACE_TEXT ("write");
00261       ret_val = aiowrite (result->aio_fildes,
00262                           (char *) result->aio_buf,
00263                           result->aio_nbytes,
00264                           result->aio_offset,
00265                           SEEK_SET,
00266                           &result->aio_resultp);
00267       break;
00268 
00269     default:
00270       ptype = ACE_TEXT ("?????");
00271       ret_val = -1;
00272       break;
00273     }
00274 
00275   if (ret_val == 0)
00276     {
00277       this->num_started_aio_++;
00278       if (this->num_started_aio_ == 1)  // wake up condition
00279         this->condition_.broadcast ();
00280     }
00281   else // if (ret_val == -1)
00282     {
00283       if (errno == EAGAIN || errno == ENOMEM) // Defer - retry this later.
00284         ret_val = 1;
00285       else
00286         ACE_ERROR ((LM_ERROR,
00287                     ACE_TEXT ("%N:%l:(%P | %t)::start_aio: aio%s %p\n"),
00288                     ptype,
00289                     ACE_TEXT ("queueing failed\n")));
00290     }
00291 
00292   return ret_val;
00293 }
00294 
00295 int
00296 ACE_SUN_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result *result)
00297 {
00298   ACE_TRACE ("ACE_SUN_Proactor::cancel_aiocb");
00299   int rc = ::aiocancel (&result->aio_resultp);
00300   if (rc == 0)    //  AIO_CANCELED
00301     {
00302       // after aiocancel Sun does not notify us
00303       // so we should send notification
00304       // to save POSIX behavoir.
00305       // Also we should do this for deffered aio's
00306 
00307       result->set_error (ECANCELED);
00308       result->set_bytes_transferred (0);
00309       this->putq_result (result);
00310       return 0;
00311     }
00312 
00313   return 2;
00314 }
00315 
00316 ACE_POSIX_Proactor::Proactor_Type
00317 ACE_SUN_Proactor::get_impl_type (void)
00318 {
00319   return PROACTOR_SUN;
00320 }
00321 
00322 ACE_END_VERSIONED_NAMESPACE_DECL
00323 
00324 #endif /* ACE_HAS_AIO_CALLS && sun */

Generated on Tue Feb 2 17:18:43 2010 for ACE by  doxygen 1.4.7