00001
00002
00003 #include "ace/SUN_Proactor.h"
00004
00005 #if defined (ACE_HAS_AIO_CALLS) && defined (sun)
00006
00007 #include "ace/Task_T.h"
00008 #include "ace/Log_Msg.h"
00009 #include "ace/Object_Manager.h"
00010
00011
00012 ACE_RCSID (ace,
00013 POSIX_CB_Proactor,
00014 "$Id: SUN_Proactor.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00015
00016
00017 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 ACE_SUN_Proactor::ACE_SUN_Proactor (size_t max_aio_operations)
00020 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
00021 ACE_POSIX_Proactor::PROACTOR_SUN),
00022 condition_ (mutex_)
00023 {
00024
00025 create_notify_manager ();
00026
00027
00028
00029
00030 this->get_asynch_pseudo_task ().start ();
00031 }
00032
00033
00034 ACE_SUN_Proactor::~ACE_SUN_Proactor (void)
00035 {
00036 this->close ();
00037 }
00038
00039 int
00040 ACE_SUN_Proactor::handle_events (ACE_Time_Value &wait_time)
00041 {
00042
00043 ACE_Countdown_Time countdown (&wait_time);
00044 return this->handle_events_i (&wait_time);
00045 }
00046
00047 int
00048 ACE_SUN_Proactor::handle_events (void)
00049 {
00050 return this->handle_events_i (0);
00051 }
00052
00053 int ACE_SUN_Proactor::wait_for_start (ACE_Time_Value * abstime)
00054 {
00055 #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
00056
00057 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, -1));
00058
00059 if (this->num_started_aio_ != 0)
00060 return 0;
00061
00062 return this->condition_.wait (abstime);
00063
00064 #else
00065
00066 return 0;
00067
00068 #endif
00069 }
00070
00071 int
00072 ACE_SUN_Proactor::handle_events_i (ACE_Time_Value *delta)
00073 {
00074 int retval = 0;
00075 aio_result_t *result = 0;
00076
00077 if (0 == delta)
00078 {
00079 if (this->num_started_aio_ == 0)
00080 this->wait_for_start (0);
00081
00082 result = aiowait (0);
00083 }
00084 else
00085 {
00086 if (this->num_started_aio_ == 0)
00087 {
00088
00089 ACE_Countdown_Time countdown (delta);
00090 ACE_Time_Value tv (*delta);
00091 tv += ACE_OS::gettimeofday ();
00092 if (this->wait_for_start (&tv) == -1)
00093 return -1;
00094 }
00095 struct timeval delta_tv = *delta;
00096 result = aiowait (&delta_tv);
00097 }
00098
00099 if (result == 0)
00100 {
00101
00102
00103 }
00104 else if (reinterpret_cast<long> (result) == -1)
00105 {
00106
00107 switch (errno)
00108 {
00109 case EINTR :
00110 case EINVAL:
00111 break;
00112
00113 default:
00114 ACE_ERROR_RETURN ((LM_ERROR,
00115 "%N:%l:(%P | %t)::%p \nNumAIO=%d\n",
00116 "ACE_SUN_Proactor::handle_events: aiowait failed",
00117 num_started_aio_),
00118 -1);
00119 }
00120 }
00121 else
00122 {
00123 int error_status = 0;
00124 size_t transfer_count = 0;
00125
00126 ACE_POSIX_Asynch_Result *asynch_result =
00127 find_completed_aio (result,
00128 error_status,
00129 transfer_count);
00130
00131 if (asynch_result != 0)
00132 {
00133
00134 this->application_specific_code (asynch_result,
00135 transfer_count,
00136 0,
00137 error_status);
00138 retval++;
00139 }
00140 }
00141
00142
00143 retval += this->process_result_queue ();
00144
00145 return retval > 0 ? 1 : 0 ;
00146
00147 }
00148
00149 int
00150 ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
00151 int &error_status,
00152 size_t &transfer_count)
00153 {
00154
00155
00156 error_status = asynch_result->aio_resultp.aio_errno;
00157 ssize_t op_return = asynch_result->aio_resultp.aio_return;
00158
00159
00160
00161
00162
00163
00164
00165
00166 if (error_status == EINPROGRESS || op_return == AIO_INPROGRESS)
00167 return 0;
00168
00169 if (op_return < 0)
00170 transfer_count = 0;
00171 else
00172 transfer_count = static_cast<size_t> (op_return);
00173
00174 return 1;
00175 }
00176
00177 ACE_POSIX_Asynch_Result *
00178 ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
00179 int &error_status,
00180 size_t &transfer_count)
00181 {
00182 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, 0));
00183
00184 size_t ai;
00185 error_status = -1;
00186 transfer_count = 0;
00187
00188
00189
00190 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00191 if (aiocb_list_[ai] != 0 &&
00192 result == &aiocb_list_[ai]->aio_resultp)
00193 break;
00194
00195 if (ai >= aiocb_list_max_size_)
00196 return 0;
00197
00198 ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai];
00199
00200 if (this->get_result_status (asynch_result,
00201 error_status,
00202 transfer_count) == 0)
00203 {
00204 ACE_ERROR ((LM_ERROR,
00205 "%N:%l:(%P | %t)::%p\n",
00206 "ACE_SUN_Proactor::find_completed_aio:"
00207 "should never be !!!\n"));
00208 return 0;
00209 }
00210
00211 aiocb_list_[ai] = 0;
00212 result_list_[ai] = 0;
00213 aiocb_list_cur_size_--;
00214
00215 num_started_aio_--;
00216
00217 start_deferred_aio ();
00218
00219
00220
00221 return asynch_result;
00222 }
00223
00224
00225
00226
00227
00228
00229 int
00230 ACE_SUN_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
00231 {
00232 ACE_TRACE ("ACE_SUN_Proactor::start_aio_i");
00233
00234 int ret_val;
00235 const ACE_TCHAR *ptype;
00236
00237
00238
00239
00240
00241
00242
00243 result->aio_resultp.aio_return = AIO_INPROGRESS;
00244 result->aio_resultp.aio_errno = EINPROGRESS;
00245
00246
00247 switch (result->aio_lio_opcode)
00248 {
00249 case LIO_READ :
00250 ptype = ACE_TEXT ("read");
00251 ret_val = aioread (result->aio_fildes,
00252 (char *) result->aio_buf,
00253 result->aio_nbytes,
00254 result->aio_offset,
00255 SEEK_SET,
00256 &result->aio_resultp);
00257 break;
00258
00259 case LIO_WRITE :
00260 ptype = ACE_TEXT ("write");
00261 ret_val = aiowrite (result->aio_fildes,
00262 (char *) result->aio_buf,
00263 result->aio_nbytes,
00264 result->aio_offset,
00265 SEEK_SET,
00266 &result->aio_resultp);
00267 break;
00268
00269 default:
00270 ptype = ACE_TEXT ("?????");
00271 ret_val = -1;
00272 break;
00273 }
00274
00275 if (ret_val == 0)
00276 {
00277 this->num_started_aio_++;
00278 if (this->num_started_aio_ == 1)
00279 this->condition_.broadcast ();
00280 }
00281 else
00282 {
00283 if (errno == EAGAIN || errno == ENOMEM)
00284 ret_val = 1;
00285 else
00286 ACE_ERROR ((LM_ERROR,
00287 ACE_TEXT ("%N:%l:(%P | %t)::start_aio: aio%s %p\n"),
00288 ptype,
00289 ACE_TEXT ("queueing failed\n")));
00290 }
00291
00292 return ret_val;
00293 }
00294
00295 int
00296 ACE_SUN_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result *result)
00297 {
00298 ACE_TRACE ("ACE_SUN_Proactor::cancel_aiocb");
00299 int rc = ::aiocancel (&result->aio_resultp);
00300 if (rc == 0)
00301 {
00302
00303
00304
00305
00306
00307 result->set_error (ECANCELED);
00308 result->set_bytes_transferred (0);
00309 this->putq_result (result);
00310 return 0;
00311 }
00312
00313 return 2;
00314 }
00315
00316 ACE_POSIX_Proactor::Proactor_Type
00317 ACE_SUN_Proactor::get_impl_type (void)
00318 {
00319 return PROACTOR_SUN;
00320 }
00321
00322 ACE_END_VERSIONED_NAMESPACE_DECL
00323
00324 #endif