00001
00002
00003 #include "ace/POSIX_Asynch_IO.h"
00004
00005 #if defined (ACE_HAS_AIO_CALLS)
00006
00007 #include "ace/Flag_Manip.h"
00008 #include "ace/Proactor.h"
00009 #include "ace/Message_Block.h"
00010 #include "ace/INET_Addr.h"
00011 #include "ace/Asynch_Pseudo_Task.h"
00012 #include "ace/POSIX_Proactor.h"
00013 #include "ace/OS_NS_errno.h"
00014 #include "ace/OS_NS_sys_socket.h"
00015 #include "ace/OS_NS_sys_stat.h"
00016
00017 ACE_RCSID (ace,
00018 POSIX_Asynch_IO,
00019 "$Id: POSIX_Asynch_IO.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00020
00021
00022 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00023
00024 size_t
00025 ACE_POSIX_Asynch_Result::bytes_transferred (void) const
00026 {
00027 return this->bytes_transferred_;
00028 }
00029
00030 void
00031 ACE_POSIX_Asynch_Result::set_bytes_transferred (size_t nbytes)
00032 {
00033 this->bytes_transferred_= nbytes;
00034 }
00035
00036 const void *
00037 ACE_POSIX_Asynch_Result::act (void) const
00038 {
00039 return this->act_;
00040 }
00041
00042 int
00043 ACE_POSIX_Asynch_Result::success (void) const
00044 {
00045 return this->success_;
00046 }
00047
00048 const void *
00049 ACE_POSIX_Asynch_Result::completion_key (void) const
00050 {
00051 return this->completion_key_;
00052 }
00053
00054 u_long
00055 ACE_POSIX_Asynch_Result::error (void) const
00056 {
00057 return this->error_;
00058 }
00059
00060 void
00061 ACE_POSIX_Asynch_Result::set_error (u_long errcode)
00062 {
00063 this->error_=errcode;
00064 }
00065 ACE_HANDLE
00066 ACE_POSIX_Asynch_Result::event (void) const
00067 {
00068 return ACE_INVALID_HANDLE;
00069 }
00070
00071 u_long
00072 ACE_POSIX_Asynch_Result::offset (void) const
00073 {
00074 return this->aio_offset;
00075 }
00076
00077 u_long
00078 ACE_POSIX_Asynch_Result::offset_high (void) const
00079 {
00080
00081
00082
00083 ACE_NOTSUP_RETURN (0);
00084 }
00085
00086 int
00087 ACE_POSIX_Asynch_Result::priority (void) const
00088 {
00089 return this->aio_reqprio;
00090 }
00091
00092 int
00093 ACE_POSIX_Asynch_Result::signal_number (void) const
00094 {
00095 return this->aio_sigevent.sigev_signo;
00096 }
00097
00098 int
00099 ACE_POSIX_Asynch_Result::post_completion (ACE_Proactor_Impl *proactor_impl)
00100 {
00101
00102 ACE_POSIX_Proactor *posix_proactor = dynamic_cast<ACE_POSIX_Proactor *> (proactor_impl);
00103
00104 if (posix_proactor == 0)
00105 ACE_ERROR_RETURN ((LM_ERROR, "Dynamic cast to POSIX Proactor failed\n"), -1);
00106
00107
00108 return posix_proactor->post_completion (this);
00109 }
00110
00111 ACE_POSIX_Asynch_Result::~ACE_POSIX_Asynch_Result (void)
00112 {
00113 }
00114
00115 ACE_POSIX_Asynch_Result::ACE_POSIX_Asynch_Result
00116 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00117 const void* act,
00118 ACE_HANDLE ,
00119 u_long offset,
00120 u_long offset_high,
00121 int priority,
00122 int signal_number)
00123 : handler_proxy_ (handler_proxy),
00124 act_ (act),
00125 bytes_transferred_ (0),
00126 success_ (0),
00127 completion_key_ (0),
00128 error_ (0)
00129 {
00130 aio_offset = offset;
00131 aio_reqprio = priority;
00132 aio_sigevent.sigev_signo = signal_number;
00133
00134
00135
00136
00137 ACE_UNUSED_ARG (offset_high);
00138
00139
00140
00141 }
00142
00143
00144
00145 int
00146 ACE_POSIX_Asynch_Operation::open (const ACE_Handler::Proxy_Ptr &handler_proxy,
00147 ACE_HANDLE handle,
00148 const void * ,
00149 ACE_Proactor *proactor)
00150 {
00151 this->proactor_ = proactor;
00152 this->handler_proxy_ = handler_proxy;
00153 this->handle_ = handle;
00154
00155
00156 if (this->handle_ == ACE_INVALID_HANDLE)
00157 {
00158 ACE_Handler *handler = handler_proxy.get ()->handler ();
00159 if (handler != 0)
00160 this->handle_ = handler->handle ();
00161 }
00162 if (this->handle_ == ACE_INVALID_HANDLE)
00163 return -1;
00164
00165 #if 0
00166
00167
00168
00169
00170
00171 if (this->proactor_ == 0)
00172 {
00173
00174
00175 this->proactor_ = this->handler_->proactor ();
00176 if (this->proactor_ == 0)
00177 this->proactor_ = ACE_Proactor::instance();
00178 }
00179 #endif
00180
00181 return 0;
00182 }
00183
00184 int
00185 ACE_POSIX_Asynch_Operation::cancel (void)
00186 {
00187 if (!posix_proactor_)
00188 return -1;
00189 return posix_proactor_->cancel_aio (this->handle_);
00190 }
00191
00192 ACE_Proactor *
00193 ACE_POSIX_Asynch_Operation::proactor (void) const
00194 {
00195 return this->proactor_;
00196 }
00197
00198 ACE_POSIX_Proactor *
00199 ACE_POSIX_Asynch_Operation::posix_proactor (void) const
00200 {
00201 return this->posix_proactor_;
00202 }
00203
00204 ACE_POSIX_Asynch_Operation::~ACE_POSIX_Asynch_Operation (void)
00205 {
00206 }
00207
00208 ACE_POSIX_Asynch_Operation::ACE_POSIX_Asynch_Operation (ACE_POSIX_Proactor *posix_proactor)
00209 : posix_proactor_ (posix_proactor),
00210 handle_ (ACE_INVALID_HANDLE)
00211 {
00212 }
00213
00214
00215
00216 size_t
00217 ACE_POSIX_Asynch_Read_Stream_Result::bytes_to_read (void) const
00218 {
00219 return this->aio_nbytes;
00220 }
00221
00222 ACE_Message_Block &
00223 ACE_POSIX_Asynch_Read_Stream_Result::message_block (void) const
00224 {
00225 return this->message_block_;
00226 }
00227
00228 ACE_HANDLE
00229 ACE_POSIX_Asynch_Read_Stream_Result::handle (void) const
00230 {
00231 return this->aio_fildes;
00232 }
00233
00234 ACE_POSIX_Asynch_Read_Stream_Result::ACE_POSIX_Asynch_Read_Stream_Result
00235 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00236 ACE_HANDLE handle,
00237 ACE_Message_Block &message_block,
00238 size_t bytes_to_read,
00239 const void* act,
00240 ACE_HANDLE event,
00241 int priority,
00242 int signal_number)
00243 : ACE_POSIX_Asynch_Result
00244 (handler_proxy, act, event, 0, 0, priority, signal_number),
00245 message_block_ (message_block)
00246 {
00247 this->aio_fildes = handle;
00248 this->aio_buf = message_block.wr_ptr ();
00249 this->aio_nbytes = bytes_to_read;
00250 }
00251
00252 void
00253 ACE_POSIX_Asynch_Read_Stream_Result::complete (size_t bytes_transferred,
00254 int success,
00255 const void *completion_key,
00256 u_long error)
00257 {
00258 this->bytes_transferred_ = bytes_transferred;
00259 this->success_ = success;
00260 this->completion_key_ = completion_key;
00261 this->error_ = error;
00262
00263
00264 ACE_UNUSED_ARG (error);
00265
00266
00267 this->message_block_.wr_ptr (bytes_transferred);
00268
00269
00270 ACE_Asynch_Read_Stream::Result result (this);
00271
00272
00273 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
00274 if (handler != 0)
00275 handler->handle_read_stream (result);
00276 }
00277
00278 ACE_POSIX_Asynch_Read_Stream_Result::~ACE_POSIX_Asynch_Read_Stream_Result (void)
00279 {
00280 }
00281
00282
00283
00284 ACE_POSIX_Asynch_Read_Stream::ACE_POSIX_Asynch_Read_Stream (ACE_POSIX_Proactor *posix_proactor)
00285 : ACE_POSIX_Asynch_Operation (posix_proactor)
00286 {
00287 }
00288
00289 int
00290 ACE_POSIX_Asynch_Read_Stream::read (ACE_Message_Block &message_block,
00291 size_t bytes_to_read,
00292 const void *act,
00293 int priority,
00294 int signal_number)
00295 {
00296 size_t space = message_block.space ();
00297 if (bytes_to_read > space)
00298 bytes_to_read=space;
00299
00300 if (bytes_to_read == 0)
00301 {
00302 errno = ENOSPC;
00303 return -1;
00304 }
00305
00306
00307 ACE_POSIX_Asynch_Read_Stream_Result *result = 0;
00308 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
00309 ACE_NEW_RETURN (result,
00310 ACE_POSIX_Asynch_Read_Stream_Result (this->handler_proxy_,
00311 this->handle_,
00312 message_block,
00313 bytes_to_read,
00314 act,
00315 proactor->get_handle (),
00316 priority,
00317 signal_number),
00318 -1);
00319
00320 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_READ);
00321 if (return_val == -1)
00322 delete result;
00323
00324 return return_val;
00325 }
00326
00327 ACE_POSIX_Asynch_Read_Stream::~ACE_POSIX_Asynch_Read_Stream (void)
00328 {
00329 }
00330
00331
00332
00333 size_t
00334 ACE_POSIX_Asynch_Write_Stream_Result::bytes_to_write (void) const
00335 {
00336 return this->aio_nbytes;
00337 }
00338
00339 ACE_Message_Block &
00340 ACE_POSIX_Asynch_Write_Stream_Result::message_block (void) const
00341 {
00342 return this->message_block_;
00343 }
00344
00345 ACE_HANDLE
00346 ACE_POSIX_Asynch_Write_Stream_Result::handle (void) const
00347 {
00348 return this->aio_fildes;
00349 }
00350
00351 ACE_POSIX_Asynch_Write_Stream_Result::ACE_POSIX_Asynch_Write_Stream_Result
00352 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00353 ACE_HANDLE handle,
00354 ACE_Message_Block &message_block,
00355 size_t bytes_to_write,
00356 const void* act,
00357 ACE_HANDLE event,
00358 int priority,
00359 int signal_number)
00360 : ACE_POSIX_Asynch_Result
00361 (handler_proxy, act, event, 0, 0, priority, signal_number),
00362 message_block_ (message_block)
00363 {
00364 this->aio_fildes = handle;
00365 this->aio_buf = message_block.rd_ptr ();
00366 this->aio_nbytes = bytes_to_write;
00367 }
00368
00369 void
00370 ACE_POSIX_Asynch_Write_Stream_Result::complete (size_t bytes_transferred,
00371 int success,
00372 const void *completion_key,
00373 u_long error)
00374 {
00375
00376 this->bytes_transferred_ = bytes_transferred;
00377 this->success_ = success;
00378 this->completion_key_ = completion_key;
00379 this->error_ = error;
00380
00381
00382 ACE_UNUSED_ARG (error);
00383
00384
00385 this->message_block_.rd_ptr (bytes_transferred);
00386
00387
00388 ACE_Asynch_Write_Stream::Result result (this);
00389
00390
00391 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
00392 if (handler != 0)
00393 handler->handle_write_stream (result);
00394 }
00395
00396 ACE_POSIX_Asynch_Write_Stream_Result::~ACE_POSIX_Asynch_Write_Stream_Result (void)
00397 {
00398 }
00399
00400
00401
00402 ACE_POSIX_Asynch_Write_Stream::ACE_POSIX_Asynch_Write_Stream (ACE_POSIX_Proactor *posix_proactor)
00403 : ACE_POSIX_Asynch_Operation (posix_proactor)
00404 {
00405 }
00406
00407 int
00408 ACE_POSIX_Asynch_Write_Stream::write (ACE_Message_Block &message_block,
00409 size_t bytes_to_write,
00410 const void *act,
00411 int priority,
00412 int signal_number)
00413 {
00414 size_t len = message_block.length ();
00415 if (bytes_to_write > len)
00416 bytes_to_write = len;
00417
00418 if (bytes_to_write == 0)
00419 ACE_ERROR_RETURN
00420 ((LM_ERROR,
00421 ACE_TEXT ("ACE_POSIX_Asynch_Write_Stream::write:")
00422 ACE_TEXT ("Attempt to write 0 bytes\n")),
00423 -1);
00424
00425 ACE_POSIX_Asynch_Write_Stream_Result *result = 0;
00426 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
00427 ACE_NEW_RETURN (result,
00428 ACE_POSIX_Asynch_Write_Stream_Result (this->handler_proxy_,
00429 this->handle_,
00430 message_block,
00431 bytes_to_write,
00432 act,
00433 proactor->get_handle (),
00434 priority,
00435 signal_number),
00436 -1);
00437
00438 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_WRITE);
00439 if (return_val == -1)
00440 delete result;
00441
00442 return return_val;
00443 }
00444
00445 ACE_POSIX_Asynch_Write_Stream::~ACE_POSIX_Asynch_Write_Stream (void)
00446 {
00447 }
00448
00449
00450
00451 ACE_POSIX_Asynch_Read_File_Result::ACE_POSIX_Asynch_Read_File_Result
00452 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00453 ACE_HANDLE handle,
00454 ACE_Message_Block &message_block,
00455 size_t bytes_to_read,
00456 const void* act,
00457 u_long offset,
00458 u_long offset_high,
00459 ACE_HANDLE event,
00460 int priority,
00461 int signal_number)
00462 : ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy,
00463 handle,
00464 message_block,
00465 bytes_to_read,
00466 act,
00467 event,
00468 priority,
00469 signal_number)
00470 {
00471 this->aio_offset = offset;
00472
00473
00474
00475 ACE_UNUSED_ARG (offset_high);
00476 }
00477
00478 void
00479 ACE_POSIX_Asynch_Read_File_Result::complete (size_t bytes_transferred,
00480 int success,
00481 const void *completion_key,
00482 u_long error)
00483 {
00484
00485 this->bytes_transferred_ = bytes_transferred;
00486 this->success_ = success;
00487 this->completion_key_ = completion_key;
00488 this->error_ = error;
00489
00490
00491 ACE_UNUSED_ARG (error);
00492
00493
00494 this->message_block_.wr_ptr (bytes_transferred);
00495
00496
00497 ACE_Asynch_Read_File::Result result (this);
00498
00499
00500 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
00501 if (handler != 0)
00502 handler->handle_read_file (result);
00503 }
00504
00505 ACE_POSIX_Asynch_Read_File_Result::~ACE_POSIX_Asynch_Read_File_Result (void)
00506 {
00507 }
00508
00509
00510
00511 ACE_POSIX_Asynch_Read_File::ACE_POSIX_Asynch_Read_File (ACE_POSIX_Proactor *posix_proactor)
00512 : ACE_POSIX_Asynch_Read_Stream (posix_proactor)
00513 {
00514 }
00515
00516 int
00517 ACE_POSIX_Asynch_Read_File::read (ACE_Message_Block &message_block,
00518 size_t bytes_to_read,
00519 u_long offset,
00520 u_long offset_high,
00521 const void *act,
00522 int priority,
00523 int signal_number)
00524 {
00525 size_t space = message_block.space ();
00526 if ( bytes_to_read > space )
00527 bytes_to_read=space;
00528
00529 if ( bytes_to_read == 0 )
00530 ACE_ERROR_RETURN
00531 ((LM_ERROR,
00532 ACE_TEXT ("ACE_POSIX_Asynch_Read_File::read:")
00533 ACE_TEXT ("Attempt to read 0 bytes or no space in the message block\n")),
00534 -1);
00535
00536 ACE_POSIX_Asynch_Read_File_Result *result = 0;
00537 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
00538 ACE_NEW_RETURN (result,
00539 ACE_POSIX_Asynch_Read_File_Result (this->handler_proxy_,
00540 this->handle_,
00541 message_block,
00542 bytes_to_read,
00543 act,
00544 offset,
00545 offset_high,
00546 posix_proactor ()->get_handle (),
00547 priority,
00548 signal_number),
00549 -1);
00550
00551 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_READ);
00552 if (return_val == -1)
00553 delete result;
00554
00555 return return_val;
00556 }
00557
00558 ACE_POSIX_Asynch_Read_File::~ACE_POSIX_Asynch_Read_File (void)
00559 {
00560 }
00561
00562 int
00563 ACE_POSIX_Asynch_Read_File::read (ACE_Message_Block &message_block,
00564 size_t bytes_to_read,
00565 const void *act,
00566 int priority,
00567 int signal_number)
00568 {
00569 return ACE_POSIX_Asynch_Read_Stream::read (message_block,
00570 bytes_to_read,
00571 act,
00572 priority,
00573 signal_number);
00574 }
00575
00576
00577
00578 ACE_POSIX_Asynch_Write_File_Result::ACE_POSIX_Asynch_Write_File_Result
00579 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00580 ACE_HANDLE handle,
00581 ACE_Message_Block &message_block,
00582 size_t bytes_to_write,
00583 const void* act,
00584 u_long offset,
00585 u_long offset_high,
00586 ACE_HANDLE event,
00587 int priority,
00588 int signal_number)
00589 : ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy,
00590 handle,
00591 message_block,
00592 bytes_to_write,
00593 act,
00594 event,
00595 priority,
00596 signal_number)
00597 {
00598 this->aio_offset = offset;
00599
00600
00601
00602 ACE_UNUSED_ARG (offset_high);
00603 }
00604
00605 void
00606 ACE_POSIX_Asynch_Write_File_Result::complete (size_t bytes_transferred,
00607 int success,
00608 const void *completion_key,
00609 u_long error)
00610 {
00611
00612 this->bytes_transferred_ = bytes_transferred;
00613 this->success_ = success;
00614 this->completion_key_ = completion_key;
00615 this->error_ = error;
00616
00617
00618 ACE_UNUSED_ARG (error);
00619
00620
00621 this->message_block_.rd_ptr (bytes_transferred);
00622
00623
00624 ACE_Asynch_Write_File::Result result (this);
00625
00626
00627 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
00628 if (handler != 0)
00629 handler->handle_write_file (result);
00630 }
00631
00632 ACE_POSIX_Asynch_Write_File_Result::~ACE_POSIX_Asynch_Write_File_Result (void)
00633 {
00634 }
00635
00636
00637
00638 ACE_POSIX_Asynch_Write_File::ACE_POSIX_Asynch_Write_File (ACE_POSIX_Proactor *posix_proactor)
00639 : ACE_POSIX_Asynch_Write_Stream (posix_proactor)
00640 {
00641 }
00642
00643 int
00644 ACE_POSIX_Asynch_Write_File::write (ACE_Message_Block &message_block,
00645 size_t bytes_to_write,
00646 u_long offset,
00647 u_long offset_high,
00648 const void *act,
00649 int priority,
00650 int signal_number)
00651 {
00652 size_t len = message_block.length ();
00653 if (bytes_to_write > len)
00654 bytes_to_write = len;
00655
00656 if (bytes_to_write == 0)
00657 ACE_ERROR_RETURN
00658 ((LM_ERROR,
00659 ACE_TEXT ("ACE_POSIX_Asynch_Write_File::write:")
00660 ACE_TEXT ("Attempt to write 0 bytes\n")),
00661 -1);
00662
00663 ACE_POSIX_Asynch_Write_File_Result *result = 0;
00664 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
00665 ACE_NEW_RETURN (result,
00666 ACE_POSIX_Asynch_Write_File_Result (this->handler_proxy_,
00667 this->handle_,
00668 message_block,
00669 bytes_to_write,
00670 act,
00671 offset,
00672 offset_high,
00673 proactor->get_handle (),
00674 priority,
00675 signal_number),
00676 -1);
00677
00678 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_WRITE);
00679 if (return_val == -1)
00680 delete result;
00681
00682 return return_val;
00683 }
00684
00685 ACE_POSIX_Asynch_Write_File::~ACE_POSIX_Asynch_Write_File (void)
00686 {
00687 }
00688
00689 int
00690 ACE_POSIX_Asynch_Write_File::write (ACE_Message_Block &message_block,
00691 size_t bytes_to_write,
00692 const void *act,
00693 int priority,
00694 int signal_number)
00695 {
00696 return ACE_POSIX_Asynch_Write_Stream::write (message_block,
00697 bytes_to_write,
00698 act,
00699 priority,
00700 signal_number);
00701 }
00702
00703
00704
00705
00706 size_t
00707 ACE_POSIX_Asynch_Accept_Result::bytes_to_read (void) const
00708 {
00709 return this->aio_nbytes;
00710 }
00711
00712 ACE_Message_Block &
00713 ACE_POSIX_Asynch_Accept_Result::message_block (void) const
00714 {
00715 return this->message_block_;
00716 }
00717
00718 ACE_HANDLE
00719 ACE_POSIX_Asynch_Accept_Result::listen_handle (void) const
00720 {
00721 return this->listen_handle_;
00722 }
00723
00724 ACE_HANDLE
00725 ACE_POSIX_Asynch_Accept_Result::accept_handle (void) const
00726 {
00727 return this->aio_fildes;
00728 }
00729
00730 ACE_POSIX_Asynch_Accept_Result::ACE_POSIX_Asynch_Accept_Result
00731 (const ACE_Handler::Proxy_Ptr &handler_proxy,
00732 ACE_HANDLE listen_handle,
00733 ACE_HANDLE accept_handle,
00734 ACE_Message_Block &message_block,
00735 size_t bytes_to_read,
00736 const void* act,
00737 ACE_HANDLE event,
00738 int priority,
00739 int signal_number)
00740
00741 : ACE_POSIX_Asynch_Result
00742 (handler_proxy, act, event, 0, 0, priority, signal_number),
00743 message_block_ (message_block),
00744 listen_handle_ (listen_handle)
00745 {
00746 this->aio_fildes = accept_handle;
00747 this->aio_nbytes = bytes_to_read;
00748 }
00749
00750 void
00751 ACE_POSIX_Asynch_Accept_Result::complete (size_t bytes_transferred,
00752 int success,
00753 const void *completion_key,
00754 u_long error)
00755 {
00756
00757 this->bytes_transferred_ = bytes_transferred;
00758 this->success_ = success;
00759 this->completion_key_ = completion_key;
00760 this->error_ = error;
00761
00762
00763 this->message_block_.wr_ptr (bytes_transferred);
00764
00765
00766 ACE_Asynch_Accept::Result result (this);
00767
00768
00769 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
00770 if (handler != 0)
00771 handler->handle_accept (result);
00772 }
00773
00774 ACE_POSIX_Asynch_Accept_Result::~ACE_POSIX_Asynch_Accept_Result (void)
00775 {
00776 }
00777
00778
00779
00780 ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor * posix_proactor)
00781 : ACE_POSIX_Asynch_Operation (posix_proactor),
00782 flg_open_ (false)
00783 {
00784 }
00785
00786 ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept (void)
00787 {
00788 this->close ();
00789 this->reactor (0);
00790 }
00791
00792 ACE_HANDLE
00793 ACE_POSIX_Asynch_Accept::get_handle (void) const
00794 {
00795 return this->handle_;
00796 }
00797
00798 void
00799 ACE_POSIX_Asynch_Accept::set_handle (ACE_HANDLE handle)
00800 {
00801 ACE_ASSERT (handle_ == ACE_INVALID_HANDLE);
00802 this->handle_ = handle;
00803 }
00804
00805 int
00806 ACE_POSIX_Asynch_Accept::open (const ACE_Handler::Proxy_Ptr &handler_proxy,
00807 ACE_HANDLE handle,
00808 const void *completion_key,
00809 ACE_Proactor *proactor)
00810 {
00811 ACE_TRACE ("ACE_POSIX_Asynch_Accept::open");
00812
00813
00814
00815 if (this->flg_open_)
00816 ACE_ERROR_RETURN ((LM_ERROR,
00817 ACE_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::open:")
00818 ACE_TEXT("acceptor already open \n")),
00819 -1);
00820
00821 if (-1 == ACE_POSIX_Asynch_Operation::open (handler_proxy,
00822 handle,
00823 completion_key,
00824 proactor))
00825 return -1;
00826
00827 flg_open_ = true;
00828
00829 ACE_Asynch_Pseudo_Task & task =
00830 this->posix_proactor ()->get_asynch_pseudo_task ();
00831
00832 if (-1 == task.register_io_handler (this->get_handle(),
00833 this,
00834 ACE_Event_Handler::ACCEPT_MASK,
00835 1))
00836 {
00837 this->flg_open_= false;
00838 this->handle_ = ACE_INVALID_HANDLE;
00839 return -1 ;
00840 }
00841
00842 return 0;
00843 }
00844
00845 int
00846 ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
00847 size_t bytes_to_read,
00848 ACE_HANDLE accept_handle,
00849 const void *act,
00850 int priority,
00851 int signal_number,
00852 int addr_family)
00853 {
00854 ACE_TRACE ("ACE_POSIX_Asynch_Accept::accept");
00855
00856 if (!this->flg_open_)
00857 ACE_ERROR_RETURN ((LM_ERROR,
00858 ACE_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept")
00859 ACE_TEXT("acceptor was not opened before\n")),
00860 -1);
00861
00862
00863
00864 size_t address_size = sizeof (sockaddr_in);
00865 #if defined (ACE_HAS_IPV6)
00866 if (addr_family == AF_INET6)
00867 address_size = sizeof (sockaddr_in6);
00868 #else
00869 ACE_UNUSED_ARG (addr_family);
00870 #endif
00871 size_t available_space = message_block.space ();
00872 size_t space_needed = bytes_to_read + 2 * address_size;
00873
00874 if (available_space < space_needed)
00875 {
00876 ACE_OS::last_error (ENOBUFS);
00877 return -1;
00878 }
00879
00880
00881
00882 ACE_POSIX_Asynch_Accept_Result *result = 0;
00883 ACE_NEW_RETURN (result,
00884 ACE_POSIX_Asynch_Accept_Result (this->handler_proxy_,
00885 this->handle_,
00886 accept_handle,
00887 message_block,
00888 bytes_to_read,
00889 act,
00890 this->posix_proactor()->get_handle (),
00891 priority,
00892 signal_number),
00893 -1);
00894
00895
00896 {
00897 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
00898 if (this->result_queue_.enqueue_tail (result) == -1)
00899 {
00900 ACE_ERROR ((LM_ERROR,
00901 ACE_TEXT ("ACE_POSIX_Asynch_Accept::accept: %p\n")
00902 ACE_TEXT ("enqueue_tail")));
00903 delete result;
00904 return -1;
00905 }
00906
00907 if (this->result_queue_.size () > 1)
00908 return 0;
00909 }
00910
00911
00912
00913
00914 ACE_Asynch_Pseudo_Task & task =
00915 this->posix_proactor ()->get_asynch_pseudo_task ();
00916
00917 return task.resume_io_handler (this->get_handle ());
00918 }
00919
00920
00921
00922
00923
00924
00925
00926
00927
00928
00929
00930
00931
00932 int
00933 ACE_POSIX_Asynch_Accept::cancel_uncompleted (int flg_notify)
00934 {
00935 ACE_TRACE ("ACE_POSIX_Asynch_Accept::cancel_uncompleted");
00936
00937 int retval = 0;
00938
00939 for (; ; retval++)
00940 {
00941 ACE_POSIX_Asynch_Accept_Result* result = 0;
00942
00943 this->result_queue_.dequeue_head (result);
00944
00945 if (result == 0)
00946 break;
00947
00948 if (this->flg_open_ == 0 || flg_notify == 0)
00949 delete result ;
00950 else
00951 {
00952
00953 result->aio_fildes = ACE_INVALID_HANDLE ;
00954 result->set_bytes_transferred (0);
00955 result->set_error (ECANCELED);
00956
00957 if (this->posix_proactor ()->post_completion (result) == -1)
00958 ACE_ERROR ((LM_ERROR,
00959 ACE_TEXT("(%P | %t):%p\n"),
00960 ACE_TEXT("ACE_POSIX_Asynch_Accept::")
00961 ACE_TEXT("cancel_uncompleted")
00962 ));
00963 }
00964 }
00965 return retval;
00966 }
00967
00968 int
00969 ACE_POSIX_Asynch_Accept::cancel (void)
00970 {
00971 ACE_TRACE ("ACE_POSIX_Asynch_Accept::cancel");
00972
00973
00974
00975
00976
00977 int rc = -1 ;
00978
00979 {
00980 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
00981
00982 int num_cancelled = cancel_uncompleted (flg_open_);
00983
00984 if (num_cancelled == 0)
00985 rc = 1 ;
00986 else if (num_cancelled > 0)
00987 rc = 0 ;
00988
00989 if (!this->flg_open_)
00990 return rc ;
00991 }
00992
00993 ACE_Asynch_Pseudo_Task & task =
00994 this->posix_proactor ()->get_asynch_pseudo_task ();
00995
00996 task.suspend_io_handler (this->get_handle());
00997 return 0;
00998 }
00999
01000 int
01001 ACE_POSIX_Asynch_Accept::close ()
01002 {
01003 ACE_TRACE ("ACE_POSIX_Asynch_Accept::close");
01004
01005
01006
01007
01008
01009
01010
01011
01012
01013
01014
01015
01016
01017
01018 {
01019 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01020 this->cancel_uncompleted (flg_open_);
01021 }
01022
01023 if (!this->flg_open_)
01024 {
01025 if (this->handle_ != ACE_INVALID_HANDLE)
01026 {
01027 ACE_OS::closesocket (this->handle_);
01028 this->handle_ = ACE_INVALID_HANDLE;
01029 }
01030 return 0;
01031 }
01032
01033 if (this->handle_ == ACE_INVALID_HANDLE)
01034 return 0;
01035
01036 ACE_Asynch_Pseudo_Task & task =
01037 this->posix_proactor ()->get_asynch_pseudo_task ();
01038
01039 task.remove_io_handler (this->get_handle ());
01040 if (this->handle_ != ACE_INVALID_HANDLE)
01041 {
01042 ACE_OS::closesocket (this->handle_);
01043 this->handle_ = ACE_INVALID_HANDLE;
01044 }
01045
01046 this->flg_open_ = false;
01047
01048 return 0;
01049 }
01050
01051 int
01052 ACE_POSIX_Asynch_Accept::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
01053 {
01054 ACE_TRACE ("ACE_POSIX_Asynch_Accept::handle_close");
01055
01056 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
01057
01058
01059
01060
01061
01062 this->cancel_uncompleted (0);
01063
01064 this->flg_open_ = false;
01065 this->handle_ = ACE_INVALID_HANDLE;
01066 return 0;
01067 }
01068
01069 int
01070 ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE )
01071 {
01072 ACE_TRACE ("ACE_POSIX_Asynch_Accept::handle_input");
01073
01074
01075
01076
01077
01078 ACE_POSIX_Asynch_Accept_Result* result = 0;
01079
01080 {
01081 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
01082
01083
01084 if (this->result_queue_.dequeue_head (result) != 0)
01085 ACE_ERROR ((LM_ERROR,
01086 ACE_TEXT("%N:%l:(%P | %t):%p\n"),
01087 ACE_TEXT("ACE_POSIX_Asynch_Accept::handle_input:")
01088 ACE_TEXT( " dequeueing failed")));
01089
01090
01091 if (this->result_queue_.size () == 0)
01092 {
01093 ACE_Asynch_Pseudo_Task & task =
01094 this->posix_proactor ()->get_asynch_pseudo_task ();
01095
01096 task.suspend_io_handler (this->get_handle());
01097 }
01098 }
01099
01100
01101
01102
01103
01104 ACE_HANDLE new_handle = ACE_OS::accept (this->handle_, 0, 0);
01105
01106 if (result == 0)
01107 {
01108 ACE_OS::closesocket (new_handle);
01109 return 0;
01110 }
01111
01112 if (new_handle == ACE_INVALID_HANDLE)
01113 {
01114 result->set_error (errno);
01115 ACE_ERROR ((LM_ERROR,
01116 ACE_TEXT("%N:%l:(%P | %t):%p\n"),
01117 ACE_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
01118 ACE_TEXT("accept")));
01119
01120
01121 }
01122
01123
01124 result->aio_fildes = new_handle;
01125
01126
01127
01128 if (this->posix_proactor ()->post_completion (result) == -1)
01129 ACE_ERROR ((LM_ERROR,
01130 ACE_TEXT("Error:(%P | %t):%p\n"),
01131 ACE_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
01132 ACE_TEXT(" <post_completion> failed")));
01133
01134 return 0;
01135 }
01136
01137
01138
01139 ACE_HANDLE
01140 ACE_POSIX_Asynch_Connect_Result::connect_handle (void) const
01141 {
01142 return this->aio_fildes;
01143 }
01144
01145 void ACE_POSIX_Asynch_Connect_Result::connect_handle (ACE_HANDLE handle)
01146 {
01147 this->aio_fildes = handle;
01148 }
01149
01150
01151 ACE_POSIX_Asynch_Connect_Result::ACE_POSIX_Asynch_Connect_Result
01152 (const ACE_Handler::Proxy_Ptr &handler_proxy,
01153 ACE_HANDLE connect_handle,
01154 const void* act,
01155 ACE_HANDLE event,
01156 int priority,
01157 int signal_number)
01158 : ACE_POSIX_Asynch_Result
01159 (handler_proxy, act, event, 0, 0, priority, signal_number)
01160 {
01161 this->aio_fildes = connect_handle;
01162 this->aio_nbytes = 0;
01163 }
01164
01165 void
01166 ACE_POSIX_Asynch_Connect_Result::complete (size_t bytes_transferred,
01167 int success,
01168 const void *completion_key,
01169 u_long error)
01170 {
01171
01172 this->bytes_transferred_ = bytes_transferred;
01173 this->success_ = success;
01174 this->completion_key_ = completion_key;
01175 this->error_ = error;
01176
01177
01178 ACE_Asynch_Connect::Result result (this);
01179
01180
01181 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
01182 if (handler != 0)
01183 handler->handle_connect (result);
01184 }
01185
01186 ACE_POSIX_Asynch_Connect_Result::~ACE_POSIX_Asynch_Connect_Result (void)
01187 {
01188 }
01189
01190
01191
01192 ACE_POSIX_Asynch_Connect::ACE_POSIX_Asynch_Connect (ACE_POSIX_Proactor * posix_proactor)
01193 : ACE_POSIX_Asynch_Operation (posix_proactor),
01194 flg_open_ (false)
01195 {
01196 }
01197
01198 ACE_POSIX_Asynch_Connect::~ACE_POSIX_Asynch_Connect (void)
01199 {
01200 this->close ();
01201 this->reactor(0);
01202 }
01203
01204 ACE_HANDLE
01205 ACE_POSIX_Asynch_Connect::get_handle (void) const
01206 {
01207 ACE_ASSERT (0);
01208 return ACE_INVALID_HANDLE;
01209 }
01210
01211 void
01212 ACE_POSIX_Asynch_Connect::set_handle (ACE_HANDLE)
01213 {
01214 ACE_ASSERT (0) ;
01215 }
01216
01217 int
01218 ACE_POSIX_Asynch_Connect::open (const ACE_Handler::Proxy_Ptr &handler_proxy,
01219 ACE_HANDLE handle,
01220 const void *completion_key,
01221 ACE_Proactor *proactor)
01222 {
01223 ACE_TRACE ("ACE_POSIX_Asynch_Connect::open");
01224
01225 if (this->flg_open_)
01226 return -1;
01227
01228
01229 ACE_POSIX_Asynch_Operation::open (handler_proxy,
01230 handle,
01231 completion_key,
01232 proactor);
01233
01234
01235
01236
01237
01238 this->flg_open_ = true;
01239
01240 return 0;
01241 }
01242
01243 int
01244 ACE_POSIX_Asynch_Connect::connect (ACE_HANDLE connect_handle,
01245 const ACE_Addr & remote_sap,
01246 const ACE_Addr & local_sap,
01247 int reuse_addr,
01248 const void *act,
01249 int priority,
01250 int signal_number)
01251 {
01252 ACE_TRACE ("ACE_POSIX_Asynch_Connect::connect");
01253
01254 if (this->flg_open_ == 0)
01255 ACE_ERROR_RETURN ((LM_ERROR,
01256 ACE_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect")
01257 ACE_TEXT("connector was not opened before\n")),
01258 -1);
01259
01260
01261
01262 ACE_POSIX_Asynch_Connect_Result *result = 0;
01263 ACE_NEW_RETURN (result,
01264 ACE_POSIX_Asynch_Connect_Result (this->handler_proxy_,
01265 connect_handle,
01266 act,
01267 this->posix_proactor ()->get_handle (),
01268 priority,
01269 signal_number),
01270 -1);
01271
01272 int rc = connect_i (result,
01273 remote_sap,
01274 local_sap,
01275 reuse_addr);
01276
01277
01278 connect_handle = result->connect_handle ();
01279
01280 if (rc != 0)
01281 return post_result (result, true);
01282
01283
01284 {
01285 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01286
01287 if (this->result_map_.bind (connect_handle, result) == -1)
01288 {
01289 ACE_ERROR ((LM_ERROR,
01290 ACE_TEXT ("%N:%l:%p\n"),
01291 ACE_TEXT ("ACE_POSIX_Asynch_Connect::connect:")
01292 ACE_TEXT ("bind")));
01293
01294 result->set_error (EFAULT);
01295 return post_result (result, true);
01296 }
01297 }
01298
01299 ACE_Asynch_Pseudo_Task & task =
01300 this->posix_proactor ()->get_asynch_pseudo_task ();
01301
01302 rc = task.register_io_handler (connect_handle,
01303 this,
01304 ACE_Event_Handler::CONNECT_MASK,
01305 0);
01306 if (rc < 0)
01307 {
01308 {
01309 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01310
01311 this->result_map_.unbind (connect_handle, result);
01312 }
01313 if (result != 0)
01314 {
01315 result->set_error (EFAULT);
01316 this->post_result (result, true);
01317 }
01318 return -1;
01319 }
01320 else
01321 result = 0;
01322
01323
01324 return 0;
01325 }
01326
01327 int ACE_POSIX_Asynch_Connect::post_result (ACE_POSIX_Asynch_Connect_Result * result,
01328 bool post_enable)
01329 {
01330 if (this->flg_open_ && post_enable != 0)
01331 {
01332 if (this->posix_proactor ()->post_completion (result) == 0)
01333 return 0;
01334
01335 ACE_ERROR ((LM_ERROR,
01336 ACE_TEXT("Error:(%P | %t):%p\n"),
01337 ACE_TEXT("ACE_POSIX_Asynch_Connect::post_result: ")
01338 ACE_TEXT(" <post_completion> failed")));
01339 }
01340
01341 ACE_HANDLE handle = result->connect_handle ();
01342
01343 if (handle != ACE_INVALID_HANDLE)
01344 ACE_OS::closesocket (handle);
01345
01346 delete result;
01347
01348 return -1;
01349 }
01350
01351
01352
01353
01354
01355
01356
01357 int
01358 ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result,
01359 const ACE_Addr & remote_sap,
01360 const ACE_Addr & local_sap,
01361 int reuse_addr)
01362 {
01363 result->set_bytes_transferred (0);
01364
01365 ACE_HANDLE handle = result->connect_handle ();
01366
01367 if (handle == ACE_INVALID_HANDLE)
01368 {
01369 int protocol_family = remote_sap.get_type ();
01370
01371 handle = ACE_OS::socket (protocol_family,
01372 SOCK_STREAM,
01373 0);
01374
01375 result->connect_handle (handle);
01376 if (handle == ACE_INVALID_HANDLE)
01377 {
01378 result->set_error (errno);
01379 ACE_ERROR_RETURN
01380 ((LM_ERROR,
01381 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
01382 ACE_TEXT("socket")),
01383 -1);
01384 }
01385
01386
01387 int one = 1;
01388 if (protocol_family != PF_UNIX &&
01389 reuse_addr != 0 &&
01390 ACE_OS::setsockopt (handle,
01391 SOL_SOCKET,
01392 SO_REUSEADDR,
01393 (const char*) &one,
01394 sizeof one) == -1 )
01395 {
01396 result->set_error (errno);
01397 ACE_ERROR_RETURN
01398 ((LM_ERROR,
01399 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
01400 ACE_TEXT("setsockopt")),
01401 -1);
01402 }
01403 }
01404
01405 if (local_sap != ACE_Addr::sap_any)
01406 {
01407 sockaddr * laddr = reinterpret_cast<sockaddr *> (local_sap.get_addr ());
01408 size_t size = local_sap.get_size ();
01409
01410 if (ACE_OS::bind (handle, laddr, size) == -1)
01411 {
01412 result->set_error (errno);
01413 ACE_ERROR_RETURN
01414 ((LM_ERROR,
01415 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
01416 ACE_TEXT("bind")),
01417 -1);
01418 }
01419 }
01420
01421
01422 if (ACE::set_flags (handle, ACE_NONBLOCK) != 0)
01423 {
01424 result->set_error (errno);
01425 ACE_ERROR_RETURN
01426 ((LM_ERROR,
01427 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n")
01428 ACE_TEXT("set_flags")),
01429 -1);
01430 }
01431
01432 for (;;)
01433 {
01434 int rc = ACE_OS::connect
01435 (handle,
01436 reinterpret_cast<sockaddr *> (remote_sap.get_addr ()),
01437 remote_sap.get_size ());
01438 if (rc < 0)
01439 {
01440 if (errno == EWOULDBLOCK || errno == EINPROGRESS)
01441 return 0;
01442
01443 if (errno == EINTR)
01444 continue;
01445
01446 result->set_error (errno);
01447 }
01448
01449 return 1 ;
01450 }
01451
01452 ACE_NOTREACHED (return 0);
01453 }
01454
01455
01456
01457
01458
01459
01460
01461
01462
01463
01464
01465
01466
01467
01468 int
01469 ACE_POSIX_Asynch_Connect::cancel_uncompleted (bool flg_notify,
01470 ACE_Handle_Set & set)
01471 {
01472 ACE_TRACE ("ACE_POSIX_Asynch_Connect::cancel_uncompleted");
01473
01474 int retval = 0;
01475
01476 MAP_MANAGER::ITERATOR iter (result_map_);
01477 MAP_MANAGER::ENTRY * me = 0;
01478
01479 set.reset ();
01480
01481 for (; iter.next (me) != 0; retval++ , iter.advance ())
01482 {
01483 ACE_HANDLE handle = me->ext_id_;
01484 ACE_POSIX_Asynch_Connect_Result* result = me->int_id_ ;
01485
01486 set.set_bit (handle);
01487
01488 result->set_bytes_transferred (0);
01489 result->set_error (ECANCELED);
01490 this->post_result (result, flg_notify);
01491 }
01492
01493 result_map_.unbind_all ();
01494
01495 return retval;
01496 }
01497
01498 int
01499 ACE_POSIX_Asynch_Connect::cancel (void)
01500 {
01501 ACE_TRACE ("ACE_POSIX_Asynch_Connect::cancel");
01502
01503
01504
01505
01506
01507 int rc = -1 ;
01508
01509 ACE_Handle_Set set;
01510 int num_cancelled = 0;
01511 {
01512 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01513 num_cancelled = cancel_uncompleted (flg_open_, set);
01514 }
01515 if (num_cancelled == 0)
01516 rc = 1 ;
01517 else if (num_cancelled > 0)
01518 rc = 0 ;
01519
01520 if (!this->flg_open_)
01521 return rc ;
01522
01523 ACE_Asynch_Pseudo_Task & task =
01524 this->posix_proactor ()->get_asynch_pseudo_task ();
01525
01526 task.remove_io_handler (set);
01527 return rc;
01528 }
01529
01530 int
01531 ACE_POSIX_Asynch_Connect::close (void)
01532 {
01533 ACE_TRACE ("ACE_POSIX_Asynch_Connect::close");
01534
01535 ACE_Handle_Set set ;
01536 int num_cancelled = 0;
01537 {
01538 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01539 num_cancelled = cancel_uncompleted (flg_open_, set);
01540 }
01541
01542 if (num_cancelled == 0 || !this->flg_open_)
01543 {
01544 this->flg_open_ = false;
01545 return 0;
01546 }
01547
01548 ACE_Asynch_Pseudo_Task & task =
01549 this->posix_proactor ()->get_asynch_pseudo_task ();
01550
01551 task.remove_io_handler (set);
01552 this->flg_open_ = false;
01553
01554 return 0;
01555 }
01556
01557 int
01558 ACE_POSIX_Asynch_Connect::handle_output (ACE_HANDLE fd)
01559 {
01560 ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_output");
01561
01562 ACE_POSIX_Asynch_Connect_Result* result = 0;
01563
01564 {
01565 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
01566 if (this->result_map_.unbind (fd, result) != 0)
01567 return -1;
01568 }
01569
01570 int sockerror = 0 ;
01571 int lsockerror = sizeof sockerror;
01572
01573 ACE_OS::getsockopt (fd,
01574 SOL_SOCKET,
01575 SO_ERROR,
01576 (char*) &sockerror,
01577 &lsockerror);
01578
01579 result->set_bytes_transferred (0);
01580 result->set_error (sockerror);
01581
01582
01583
01584
01585
01586
01587 this->posix_proactor ()->get_asynch_pseudo_task ().remove_io_handler (fd);
01588 this->post_result (result, this->flg_open_);
01589 return 0;
01590 }
01591
01592
01593 int
01594 ACE_POSIX_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
01595 {
01596 ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_close");
01597
01598 ACE_Asynch_Pseudo_Task &task =
01599 this->posix_proactor ()->get_asynch_pseudo_task ();
01600
01601 task.remove_io_handler (fd);
01602
01603 ACE_POSIX_Asynch_Connect_Result* result = 0;
01604
01605 {
01606 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
01607 if (this->result_map_.unbind (fd, result) != 0)
01608 return -1;
01609 }
01610
01611 result->set_bytes_transferred (0);
01612 result->set_error (ECANCELED);
01613 this->post_result (result, this->flg_open_);
01614
01615 return 0;
01616 }
01617
01618
01619
01620 ACE_HANDLE
01621 ACE_POSIX_Asynch_Transmit_File_Result::socket (void) const
01622 {
01623 return this->socket_;
01624 }
01625
01626 ACE_HANDLE
01627 ACE_POSIX_Asynch_Transmit_File_Result::file (void) const
01628 {
01629 return this->aio_fildes;
01630 }
01631
01632 ACE_Asynch_Transmit_File::Header_And_Trailer *
01633 ACE_POSIX_Asynch_Transmit_File_Result::header_and_trailer (void) const
01634 {
01635 return this->header_and_trailer_;
01636 }
01637
01638 size_t
01639 ACE_POSIX_Asynch_Transmit_File_Result::bytes_to_write (void) const
01640 {
01641 return this->aio_nbytes;
01642 }
01643
01644 size_t
01645 ACE_POSIX_Asynch_Transmit_File_Result::bytes_per_send (void) const
01646 {
01647 return this->bytes_per_send_;
01648 }
01649
01650 u_long
01651 ACE_POSIX_Asynch_Transmit_File_Result::flags (void) const
01652 {
01653 return this->flags_;
01654 }
01655
01656 ACE_POSIX_Asynch_Transmit_File_Result::ACE_POSIX_Asynch_Transmit_File_Result
01657 (const ACE_Handler::Proxy_Ptr &handler_proxy,
01658 ACE_HANDLE socket,
01659 ACE_HANDLE file,
01660 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
01661 size_t bytes_to_write,
01662 u_long offset,
01663 u_long offset_high,
01664 size_t bytes_per_send,
01665 u_long flags,
01666 const void *act,
01667 ACE_HANDLE event,
01668 int priority,
01669 int signal_number)
01670 : ACE_POSIX_Asynch_Result
01671 (handler_proxy, act, event, offset, offset_high, priority, signal_number),
01672 socket_ (socket),
01673 header_and_trailer_ (header_and_trailer),
01674 bytes_per_send_ (bytes_per_send),
01675 flags_ (flags)
01676 {
01677 this->aio_fildes = file;
01678 this->aio_nbytes = bytes_to_write;
01679 }
01680
01681 void
01682 ACE_POSIX_Asynch_Transmit_File_Result::complete (size_t bytes_transferred,
01683 int success,
01684 const void *completion_key,
01685 u_long error)
01686 {
01687
01688 this->bytes_transferred_ = bytes_transferred;
01689 this->success_ = success;
01690 this->completion_key_ = completion_key;
01691 this->error_ = error;
01692
01693
01694
01695
01696
01697
01698
01699
01700
01701
01702
01703
01704
01705
01706
01707
01708
01709
01710 ACE_Asynch_Transmit_File::Result result (this);
01711
01712
01713 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
01714 if (handler != 0)
01715 handler->handle_transmit_file (result);
01716 }
01717
01718 ACE_POSIX_Asynch_Transmit_File_Result::~ACE_POSIX_Asynch_Transmit_File_Result (void)
01719 {
01720 }
01721
01722
01723
01724
01725
01726
01727
01728
01729
01730
01731
01732
01733
01734 class ACE_Export ACE_POSIX_Asynch_Transmit_Handler : public ACE_Handler
01735 {
01736 public:
01737
01738
01739
01740 ACE_POSIX_Asynch_Transmit_Handler (ACE_POSIX_Proactor *posix_proactor,
01741 ACE_POSIX_Asynch_Transmit_File_Result *result);
01742
01743
01744 virtual ~ACE_POSIX_Asynch_Transmit_Handler (void);
01745
01746
01747
01748 int transmit (void);
01749
01750 protected:
01751
01752
01753
01754 ACE_POSIX_Asynch_Transmit_File_Result *result_;
01755
01756
01757 ACE_Message_Block *mb_;
01758
01759 enum ACT
01760 {
01761 HEADER_ACT = 1,
01762 DATA_ACT = 2,
01763 TRAILER_ACT = 3
01764 };
01765
01766
01767 ACT header_act_;
01768
01769
01770 ACT data_act_;
01771
01772
01773 ACT trailer_act_;
01774
01775
01776 size_t file_offset_;
01777
01778
01779 size_t file_size_;
01780
01781
01782 size_t bytes_transferred_;
01783
01784
01785 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
01786
01787
01788 virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
01789
01790
01791 int initiate_read_file (void);
01792
01793
01794 ACE_POSIX_Asynch_Read_File rf_;
01795
01796
01797 ACE_POSIX_Asynch_Write_Stream ws_;
01798 };
01799
01800
01801
01802
01803 ACE_POSIX_Asynch_Transmit_Handler::ACE_POSIX_Asynch_Transmit_Handler
01804 (ACE_POSIX_Proactor *posix_proactor,
01805 ACE_POSIX_Asynch_Transmit_File_Result *result)
01806 : result_ (result),
01807 mb_ (0),
01808 header_act_ (this->HEADER_ACT),
01809 data_act_ (this->DATA_ACT),
01810 trailer_act_ (this->TRAILER_ACT),
01811 file_offset_ (result->offset ()),
01812 file_size_ (0),
01813 bytes_transferred_ (0),
01814 rf_ (posix_proactor),
01815 ws_ (posix_proactor)
01816 {
01817
01818 ACE_NEW (this->mb_,
01819 ACE_Message_Block (this->result_->bytes_per_send ()
01820 + 1));
01821
01822 file_size_ = ACE_OS::filesize (this->result_->file ());
01823 }
01824
01825
01826 ACE_POSIX_Asynch_Transmit_Handler::~ACE_POSIX_Asynch_Transmit_Handler (void)
01827 {
01828 delete result_;
01829 mb_->release ();
01830 }
01831
01832
01833
01834
01835
01836 int
01837 ACE_POSIX_Asynch_Transmit_Handler::transmit (void)
01838 {
01839
01840
01841
01842
01843
01844
01845 if (this->rf_.open (this->proxy (),
01846 this->result_->file (),
01847 0,
01848 0) == -1)
01849 ACE_ERROR_RETURN ((LM_ERROR,
01850 "ACE_Asynch_Transmit_Handler:read_file open failed\n"),
01851 -1);
01852
01853
01854 if (this->ws_.open (this->proxy (),
01855 this->result_->socket (),
01856 0,
01857 0) == -1)
01858 ACE_ERROR_RETURN ((LM_ERROR,
01859 "ACE_Asynch_Transmit_Handler:write_stream open failed\n"),
01860 -1);
01861
01862
01863 if (this->ws_.write (*this->result_->header_and_trailer ()->header (),
01864 this->result_->header_and_trailer ()->header_bytes (),
01865 reinterpret_cast<void *> (&this->header_act_),
01866 0) == -1)
01867 ACE_ERROR_RETURN ((LM_ERROR,
01868 "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"),
01869 -1);
01870 return 0;
01871 }
01872
01873 void
01874 ACE_POSIX_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
01875 {
01876
01877 this->bytes_transferred_ += result.bytes_transferred ();
01878
01879
01880 if (result.success () == 0)
01881 {
01882
01883
01884 ACE_ERROR ((LM_ERROR,
01885 "Asynch_Transmit_File failed.\n"));
01886
01887 ACE_SEH_TRY
01888 {
01889 this->result_->complete (this->bytes_transferred_,
01890 0,
01891 0,
01892 0);
01893 }
01894 ACE_SEH_FINALLY
01895 {
01896
01897
01898 delete this;
01899 }
01900 }
01901
01902
01903
01904
01905 size_t unsent_data = result.bytes_to_write () - result.bytes_transferred ();
01906 if (unsent_data != 0)
01907 {
01908 ACE_DEBUG ((LM_DEBUG,
01909 "%N:%l:Partial write to socket: Asynch_write called again\n"));
01910
01911
01912 if (this->ws_.write (*result.message_block ().duplicate (),
01913 unsent_data,
01914 result.act (),
01915 this->result_->priority (),
01916 this->result_->signal_number ()) == -1)
01917 {
01918
01919 ACE_ERROR ((LM_ERROR,
01920 "Asynch_Transmit_Handler:write_stream failed\n"));
01921 return;
01922 }
01923
01924
01925
01926
01927
01928
01929 return;
01930 }
01931
01932
01933
01934
01935 ACT act = * (ACT *) result.act ();
01936
01937 switch (act)
01938 {
01939 case TRAILER_ACT:
01940
01941
01942
01943 ACE_SEH_TRY
01944 {
01945 this->result_->complete (this->bytes_transferred_,
01946 1,
01947 0,
01948 0);
01949 }
01950 ACE_SEH_FINALLY
01951 {
01952 delete this;
01953 }
01954 break;
01955
01956 case HEADER_ACT:
01957 case DATA_ACT:
01958
01959 if (this->initiate_read_file () == -1)
01960
01961 ACE_ERROR ((LM_ERROR,
01962 "Error:Asynch_Transmit_Handler:read_file couldnt be initiated\n"));
01963 break;
01964
01965 default:
01966
01967 ACE_ERROR ((LM_ERROR,
01968 "Error:ACE_Asynch_Transmit_Handler::handle_write_stream::Unexpected act\n"));
01969 }
01970 }
01971
01972 void
01973 ACE_POSIX_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result)
01974 {
01975
01976 if (result.success () == 0)
01977 {
01978
01979 ACE_SEH_TRY
01980 {
01981 this->result_->complete (this->bytes_transferred_,
01982 0,
01983 0,
01984 errno);
01985 }
01986 ACE_SEH_FINALLY
01987 {
01988 delete this;
01989 }
01990 return;
01991 }
01992
01993
01994 if (result.bytes_transferred () == 0)
01995 return;
01996
01997
01998 this->file_offset_ += result.bytes_transferred ();
01999
02000
02001 if (this->ws_.write (result.message_block (),
02002 result.bytes_transferred (),
02003 (void *)&this->data_act_,
02004 this->result_->priority (),
02005 this->result_->signal_number ()) == -1)
02006 {
02007
02008 ACE_ERROR ((LM_ERROR,
02009 "Error:ACE_Asynch_Transmit_File : write to the stream failed\n"));
02010 return;
02011 }
02012 }
02013
02014 int
02015 ACE_POSIX_Asynch_Transmit_Handler::initiate_read_file (void)
02016 {
02017
02018 if (this->file_offset_ >= this->file_size_)
02019 {
02020
02021 if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (),
02022 this->result_->header_and_trailer ()->trailer_bytes (),
02023 (void *)&this->trailer_act_,
02024 this->result_->priority (),
02025 this->result_->signal_number ()) == -1)
02026 ACE_ERROR_RETURN ((LM_ERROR,
02027 "Error:Asynch_Transmit_Handler:write_stream writing trailer failed\n"),
02028 -1);
02029 return 0;
02030 }
02031 else
02032 {
02033
02034
02035
02036 this->mb_->rd_ptr (this->mb_->base ());
02037 this->mb_->wr_ptr (this->mb_->base ());
02038
02039
02040 if (this->rf_.read (*this->mb_,
02041 this->mb_->size () - 1,
02042 this->file_offset_,
02043 0,
02044 0,
02045 this->result_->priority (),
02046 this->result_->signal_number ()) == -1)
02047 ACE_ERROR_RETURN ((LM_ERROR,
02048 "Error:Asynch_Transmit_Handler::read from file failed\n"),
02049 -1);
02050 return 0;
02051 }
02052 }
02053
02054
02055
02056 ACE_POSIX_Asynch_Transmit_File::ACE_POSIX_Asynch_Transmit_File (ACE_POSIX_Proactor *posix_proactor)
02057 : ACE_POSIX_Asynch_Operation (posix_proactor)
02058 {
02059 }
02060
02061 int
02062 ACE_POSIX_Asynch_Transmit_File::transmit_file (ACE_HANDLE file,
02063 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
02064 size_t bytes_to_write,
02065 u_long offset,
02066 u_long offset_high,
02067 size_t bytes_per_send,
02068 u_long flags,
02069 const void *act,
02070 int priority,
02071 int signal_number)
02072 {
02073
02074 ssize_t file_size = ACE_OS::filesize (file);
02075
02076 if (file_size == -1)
02077 ACE_ERROR_RETURN ((LM_ERROR,
02078 "Error:%N:%l:%p\n",
02079 "POSIX_Asynch_Transmit_File:filesize failed"),
02080 -1);
02081
02082 if (bytes_to_write == 0)
02083 bytes_to_write = file_size;
02084
02085 if (offset > (size_t) file_size)
02086 ACE_ERROR_RETURN ((LM_ERROR,
02087 "Error:%p\n",
02088 "Asynch_Transmit_File:File size is less than offset"),
02089 -1);
02090
02091 if (offset != 0)
02092 bytes_to_write = file_size - offset + 1;
02093
02094 if (bytes_per_send == 0)
02095 bytes_per_send = bytes_to_write;
02096
02097
02098 ACE_POSIX_Asynch_Transmit_File_Result *result = 0;
02099
02100 ACE_NEW_RETURN (result,
02101 ACE_POSIX_Asynch_Transmit_File_Result (this->handler_proxy_,
02102 this->handle_,
02103 file,
02104 header_and_trailer,
02105 bytes_to_write,
02106 offset,
02107 offset_high,
02108 bytes_per_send,
02109 flags,
02110 act,
02111 this->posix_proactor ()->get_handle (),
02112 priority,
02113 signal_number),
02114 -1);
02115
02116
02117 ACE_POSIX_Asynch_Transmit_Handler *transmit_handler = 0;
02118
02119 ACE_NEW_RETURN (transmit_handler,
02120 ACE_POSIX_Asynch_Transmit_Handler (this->posix_proactor (),
02121 result),
02122 -1);
02123
02124 ssize_t return_val = transmit_handler->transmit ();
02125
02126 if (return_val == -1)
02127
02128 delete transmit_handler;
02129
02130 return 0;
02131 }
02132
02133 ACE_POSIX_Asynch_Transmit_File::~ACE_POSIX_Asynch_Transmit_File (void)
02134 {
02135 }
02136
02137
02138 size_t
02139 ACE_POSIX_Asynch_Read_Dgram_Result::bytes_to_read (void) const
02140 {
02141 return this->bytes_to_read_;
02142 }
02143
02144 int
02145 ACE_POSIX_Asynch_Read_Dgram_Result::remote_address (ACE_Addr& addr) const
02146 {
02147 int retVal = -1;
02148
02149
02150 if (addr.get_type () == this->remote_address_->get_type ())
02151 {
02152 addr.set_addr (this->remote_address_->get_addr (),
02153 this->remote_address_->get_size ());
02154 retVal = 0;
02155 }
02156
02157 return retVal;
02158 }
02159
02160 sockaddr *
02161 ACE_POSIX_Asynch_Read_Dgram_Result::saddr () const
02162 {
02163 return (sockaddr *) this->remote_address_->get_addr ();
02164 }
02165
02166
02167 int
02168 ACE_POSIX_Asynch_Read_Dgram_Result::flags (void) const
02169 {
02170 return this->flags_;
02171 }
02172
02173 ACE_HANDLE
02174 ACE_POSIX_Asynch_Read_Dgram_Result::handle (void) const
02175 {
02176 return this->handle_;
02177 }
02178
02179 ACE_Message_Block*
02180 ACE_POSIX_Asynch_Read_Dgram_Result::message_block () const
02181 {
02182 return this->message_block_;
02183 }
02184
02185 ACE_POSIX_Asynch_Read_Dgram_Result::ACE_POSIX_Asynch_Read_Dgram_Result
02186 (const ACE_Handler::Proxy_Ptr &handler_proxy,
02187 ACE_HANDLE handle,
02188 ACE_Message_Block *message_block,
02189 size_t bytes_to_read,
02190 int flags,
02191 int protocol_family,
02192 const void* act,
02193 ACE_HANDLE event,
02194 int priority,
02195 int signal_number)
02196 : ACE_POSIX_Asynch_Result
02197 (handler_proxy, act, event, 0, 0, priority, signal_number),
02198 bytes_to_read_ (bytes_to_read),
02199 message_block_ (message_block),
02200 remote_address_ (0),
02201 addr_len_ (0),
02202 flags_ (flags),
02203 handle_ (handle)
02204 {
02205 ACE_UNUSED_ARG (protocol_family);
02206 this->aio_fildes = handle;
02207 this->aio_nbytes = bytes_to_read;
02208 ACE_NEW (this->remote_address_, ACE_INET_Addr);
02209 }
02210
02211 void
02212 ACE_POSIX_Asynch_Read_Dgram_Result::complete (size_t bytes_transferred,
02213 int success,
02214 const void *completion_key,
02215 u_long error)
02216 {
02217
02218 this->bytes_transferred_ = bytes_transferred;
02219 this->success_ = success;
02220 this->completion_key_ = completion_key;
02221 this->error_ = error;
02222
02223
02224 ACE_UNUSED_ARG (error);
02225
02226 this->remote_address_->set_size(this->addr_len_);
02227
02228
02229 ACE_Asynch_Read_Dgram::Result result (this);
02230
02231
02232 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
02233 if (handler != 0)
02234 handler->handle_read_dgram (result);
02235 }
02236
02237 ACE_POSIX_Asynch_Read_Dgram_Result::~ACE_POSIX_Asynch_Read_Dgram_Result (void)
02238 {
02239 delete this->remote_address_;
02240 }
02241
02242
02243 size_t
02244 ACE_POSIX_Asynch_Write_Dgram_Result::bytes_to_write (void) const
02245 {
02246 return this->bytes_to_write_;
02247 }
02248
02249 int
02250 ACE_POSIX_Asynch_Write_Dgram_Result::flags (void) const
02251 {
02252 return this->flags_;
02253 }
02254
02255 ACE_HANDLE
02256 ACE_POSIX_Asynch_Write_Dgram_Result::handle (void) const
02257 {
02258 return this->handle_;
02259 }
02260
02261
02262 ACE_Message_Block*
02263 ACE_POSIX_Asynch_Write_Dgram_Result::message_block () const
02264 {
02265 return this->message_block_;
02266 }
02267
02268 ACE_POSIX_Asynch_Write_Dgram_Result::ACE_POSIX_Asynch_Write_Dgram_Result
02269 (const ACE_Handler::Proxy_Ptr &handler_proxy,
02270 ACE_HANDLE handle,
02271 ACE_Message_Block *message_block,
02272 size_t bytes_to_write,
02273 int flags,
02274 const void* act,
02275 ACE_HANDLE event,
02276 int priority,
02277 int signal_number)
02278 : ACE_POSIX_Asynch_Result
02279 (handler_proxy, act, event, 0, 0, priority, signal_number),
02280 bytes_to_write_ (bytes_to_write),
02281 message_block_ (message_block),
02282 flags_ (flags),
02283 handle_ (handle)
02284
02285 {
02286 this->aio_fildes = handle;
02287 this->aio_nbytes = bytes_to_write;
02288 }
02289
02290 void
02291 ACE_POSIX_Asynch_Write_Dgram_Result::complete (size_t bytes_transferred,
02292 int success,
02293 const void *completion_key,
02294 u_long error)
02295 {
02296
02297 this->bytes_transferred_ = bytes_transferred;
02298 this->success_ = success;
02299 this->completion_key_ = completion_key;
02300 this->error_ = error;
02301
02302
02303 ACE_UNUSED_ARG (error);
02304
02305
02306
02307
02308
02309 ACE_Asynch_Write_Dgram::Result result (this);
02310
02311
02312 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
02313 if (handler != 0)
02314 handler->handle_write_dgram (result);
02315 }
02316
02317 ACE_POSIX_Asynch_Write_Dgram_Result::~ACE_POSIX_Asynch_Write_Dgram_Result (void)
02318 {
02319 }
02320
02321
02322 ACE_POSIX_Asynch_Read_Dgram::~ACE_POSIX_Asynch_Read_Dgram (void)
02323 {
02324 }
02325
02326 ssize_t
02327 ACE_POSIX_Asynch_Read_Dgram::recv (ACE_Message_Block *message_block,
02328 size_t & ,
02329 int flags,
02330 int protocol_family,
02331 const void *act,
02332 int priority,
02333 int signal_number)
02334 {
02335 size_t space = message_block->space ();
02336
02337 ACE_POSIX_Asynch_Read_Dgram_Result *result = 0;
02338 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
02339 ACE_NEW_RETURN (result,
02340 ACE_POSIX_Asynch_Read_Dgram_Result (this->handler_proxy_,
02341 this->handle_,
02342 message_block,
02343 space,
02344 flags,
02345 protocol_family,
02346 act,
02347 proactor->get_handle (),
02348 priority,
02349 signal_number),
02350 -1);
02351
02352 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_READ);
02353 if (return_val == -1)
02354 delete result;
02355
02356 return return_val;
02357 }
02358
02359 ACE_POSIX_Asynch_Read_Dgram::ACE_POSIX_Asynch_Read_Dgram (ACE_POSIX_Proactor *posix_proactor)
02360 : ACE_POSIX_Asynch_Operation (posix_proactor)
02361 {
02362 }
02363
02364
02365
02366 ACE_POSIX_Asynch_Write_Dgram::~ACE_POSIX_Asynch_Write_Dgram (void)
02367 {
02368 }
02369
02370 ssize_t
02371 ACE_POSIX_Asynch_Write_Dgram::send (ACE_Message_Block *message_block,
02372 size_t &,
02373 int flags,
02374 const ACE_Addr &,
02375 const void *act,
02376 int priority,
02377 int signal_number)
02378 {
02379 size_t len = message_block->length ();
02380 if (len == 0)
02381 ACE_ERROR_RETURN
02382 ((LM_ERROR,
02383 ACE_TEXT ("ACE_POSIX_Asynch_Write_Stream::write:")
02384 ACE_TEXT ("Attempt to write 0 bytes\n")),
02385 -1);
02386
02387 ACE_POSIX_Asynch_Write_Dgram_Result *result = 0;
02388 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
02389 ACE_NEW_RETURN (result,
02390 ACE_POSIX_Asynch_Write_Dgram_Result (this->handler_proxy_,
02391 this->handle_,
02392 message_block,
02393 len,
02394 flags,
02395 act,
02396 proactor->get_handle (),
02397 priority,
02398 signal_number),
02399 -1);
02400
02401 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_WRITE);
02402 if (return_val == -1)
02403 delete result;
02404
02405 return return_val;
02406 }
02407
02408 ACE_POSIX_Asynch_Write_Dgram::ACE_POSIX_Asynch_Write_Dgram
02409 (ACE_POSIX_Proactor *posix_proactor)
02410 : ACE_POSIX_Asynch_Operation (posix_proactor)
02411 {
02412 }
02413
02414 ACE_END_VERSIONED_NAMESPACE_DECL
02415
02416 #endif