00001
00002
00003 #include "ace/Select_Reactor_Base.h"
00004 #include "ace/Reactor.h"
00005 #include "ace/Thread.h"
00006 #include "ace/SOCK_Acceptor.h"
00007 #include "ace/SOCK_Connector.h"
00008 #include "ace/Timer_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/Signal.h"
00011 #include "ace/OS_NS_fcntl.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/Select_Reactor_Base.inl"
00015 #endif
00016
00017 #ifndef ACE_WIN32
00018 # include <algorithm>
00019 #endif
00020
00021 ACE_RCSID (ace,
00022 Select_Reactor_Base,
00023 "$Id: Select_Reactor_Base.cpp 90989 2010-07-05 11:22:50Z johnnyw $")
00024
00025
00026 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 template<typename iterator>
00029 inline ACE_Event_Handler *
00030 ACE_SELECT_REACTOR_EVENT_HANDLER (iterator i)
00031 {
00032 #ifdef ACE_WIN32
00033 return (*i).item ();
00034 #else
00035 return (*i);
00036 #endif
00037 }
00038
00039
00040
00041 bool
00042 ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
00043 {
00044 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
00045 #if defined (ACE_WIN32)
00046
00047
00048 if (handle == ACE_INVALID_HANDLE)
00049 #else
00050 if (handle < 0
00051 || static_cast<size_type> (handle) >= this->event_handlers_.size ())
00052 #endif
00053 {
00054 errno = EINVAL;
00055 return true;
00056 }
00057
00058 return false;
00059 }
00060
00061
00062
00063 bool
00064 ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
00065 {
00066 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
00067 #if defined (ACE_WIN32)
00068
00069
00070 if (handle != ACE_INVALID_HANDLE)
00071 #else
00072 if (handle >= 0 && handle < this->max_handlep1_)
00073 #endif
00074 {
00075 return true;
00076 }
00077
00078
00079
00080
00081
00082 return false;
00083 }
00084
00085 int
00086 ACE_Select_Reactor_Handler_Repository::open (size_type size)
00087 {
00088 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
00089
00090 #if defined (ACE_WIN32)
00091 if (this->event_handlers_.open (size) == -1)
00092 return -1;
00093 #else
00094 if (this->event_handlers_.size (size) == -1)
00095 return -1;
00096
00097
00098 std::fill (this->event_handlers_.begin (),
00099 this->event_handlers_.end (),
00100 static_cast<ACE_Event_Handler *> (0));
00101
00102 this->max_handlep1_ = 0;
00103 #endif
00104
00105
00106
00107 return ACE::set_handle_limit (static_cast<int> (size), 1);
00108 }
00109
00110
00111
00112 ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
00113 : select_reactor_ (select_reactor),
00114 #ifndef ACE_WIN32
00115 max_handlep1_ (0),
00116 #endif
00117 event_handlers_ ()
00118 {
00119 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
00120 }
00121
00122 int
00123 ACE_Select_Reactor_Handler_Repository::unbind_all (void)
00124 {
00125
00126 #ifdef ACE_WIN32
00127 map_type::iterator const end = this->event_handlers_.end ();
00128 for (map_type::iterator pos = this->event_handlers_.begin ();
00129 pos != end;
00130 )
00131 {
00132
00133
00134
00135 map_type::iterator const the_pos (pos++);
00136
00137 ACE_HANDLE const handle = (*the_pos).key ();
00138 (void) this->unbind (handle,
00139 the_pos,
00140 ACE_Event_Handler::ALL_EVENTS_MASK);
00141 }
00142 #else
00143
00144
00145
00146 map_type::iterator pos =
00147 this->event_handlers_.begin ();
00148
00149 for (ACE_HANDLE handle = 0;
00150 handle < this->max_handlep1_;
00151 ++handle)
00152 {
00153 (void) this->unbind (handle,
00154 pos,
00155 ACE_Event_Handler::ALL_EVENTS_MASK);
00156 ++pos;
00157 }
00158 #endif
00159
00160 return 0;
00161 }
00162
00163 int
00164 ACE_Select_Reactor_Handler_Repository::close (void)
00165 {
00166 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
00167
00168 return this->unbind_all ();
00169 }
00170
00171 ACE_Select_Reactor_Handler_Repository::map_type::iterator
00172 ACE_Select_Reactor_Handler_Repository::find_eh (ACE_HANDLE handle)
00173 {
00174 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find_eh");
00175
00176 map_type::iterator pos (this->event_handlers_.end ());
00177
00178
00179 #if defined (ACE_WIN32)
00180 this->event_handlers_.find (handle, pos);
00181 #else
00182 map_type::iterator const tmp = &this->event_handlers_[handle];
00183
00184 if (*tmp != 0)
00185 pos = tmp;
00186 #endif
00187
00188 return pos;
00189 }
00190
00191
00192 int
00193 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
00194 ACE_Event_Handler *event_handler,
00195 ACE_Reactor_Mask mask)
00196 {
00197 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
00198
00199 if (event_handler == 0)
00200 return -1;
00201
00202 if (handle == ACE_INVALID_HANDLE)
00203 handle = event_handler->get_handle ();
00204
00205 if (this->invalid_handle (handle))
00206 return -1;
00207
00208
00209 bool existing_handle = false;
00210
00211 #if defined (ACE_WIN32)
00212
00213 map_type::ENTRY * entry = 0;
00214
00215 int const result =
00216 this->event_handlers_.bind (handle, event_handler, entry);
00217
00218 if (result == -1)
00219 {
00220 return -1;
00221 }
00222 else if (result == 1)
00223 {
00224
00225 if (event_handler != entry->item ())
00226 {
00227 return -1;
00228 }
00229 else
00230 {
00231
00232
00233 existing_handle = true;
00234 }
00235 }
00236
00237 #else
00238
00239
00240 ACE_Event_Handler * const current_handler =
00241 this->event_handlers_[handle];
00242
00243 if (current_handler)
00244 {
00245
00246 if (current_handler != event_handler)
00247 return -1;
00248
00249
00250
00251 existing_handle = true;
00252 }
00253
00254 this->event_handlers_[handle] = event_handler;
00255
00256 if (this->max_handlep1_ < handle + 1)
00257 this->max_handlep1_ = handle + 1;
00258
00259 #endif
00260
00261 if (this->select_reactor_.is_suspended_i (handle))
00262 {
00263 this->select_reactor_.bit_ops (handle,
00264 mask,
00265 this->select_reactor_.suspend_set_,
00266 ACE_Reactor::ADD_MASK);
00267 }
00268 else
00269 {
00270 this->select_reactor_.bit_ops (handle,
00271 mask,
00272 this->select_reactor_.wait_set_,
00273 ACE_Reactor::ADD_MASK);
00274
00275
00276
00277
00278
00279 }
00280
00281
00282 if (!existing_handle)
00283 event_handler->add_reference ();
00284
00285 return 0;
00286 }
00287
00288
00289
00290 int
00291 ACE_Select_Reactor_Handler_Repository::unbind (
00292 ACE_HANDLE handle,
00293 map_type::iterator pos,
00294 ACE_Reactor_Mask mask)
00295 {
00296 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
00297
00298
00299
00300
00301 ACE_Event_Handler * const event_handler =
00302 (pos == this->event_handlers_.end ()
00303 ? 0
00304 : ACE_SELECT_REACTOR_EVENT_HANDLER (pos));
00305
00306
00307 this->select_reactor_.bit_ops (handle,
00308 mask,
00309 this->select_reactor_.wait_set_,
00310 ACE_Reactor::CLR_MASK);
00311
00312
00313 this->select_reactor_.bit_ops (handle,
00314 mask,
00315 this->select_reactor_.suspend_set_,
00316 ACE_Reactor::CLR_MASK);
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326 bool const has_any_wait_mask =
00327 (this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
00328 || this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
00329 || this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
00330 bool const has_any_suspend_mask =
00331 (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
00332 || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
00333 || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
00334
00335 bool complete_removal = false;
00336
00337 if (!has_any_wait_mask && !has_any_suspend_mask)
00338 {
00339 #if defined (ACE_WIN32)
00340 if (event_handler != 0 && this->event_handlers_.unbind (pos) == -1)
00341 return -1;
00342 #else
00343 this->event_handlers_[handle] = 0;
00344
00345 if (this->max_handlep1_ == handle + 1)
00346 {
00347
00348
00349
00350 ACE_HANDLE const wait_rd_max =
00351 this->select_reactor_.wait_set_.rd_mask_.max_set ();
00352 ACE_HANDLE const wait_wr_max =
00353 this->select_reactor_.wait_set_.wr_mask_.max_set ();
00354 ACE_HANDLE const wait_ex_max =
00355 this->select_reactor_.wait_set_.ex_mask_.max_set ();
00356
00357 ACE_HANDLE const suspend_rd_max =
00358 this->select_reactor_.suspend_set_.rd_mask_.max_set ();
00359 ACE_HANDLE const suspend_wr_max =
00360 this->select_reactor_.suspend_set_.wr_mask_.max_set ();
00361 ACE_HANDLE const suspend_ex_max =
00362 this->select_reactor_.suspend_set_.ex_mask_.max_set ();
00363
00364
00365 this->max_handlep1_ = wait_rd_max;
00366 if (this->max_handlep1_ < wait_wr_max)
00367 this->max_handlep1_ = wait_wr_max;
00368 if (this->max_handlep1_ < wait_ex_max)
00369 this->max_handlep1_ = wait_ex_max;
00370
00371 if (this->max_handlep1_ < suspend_rd_max)
00372 this->max_handlep1_ = suspend_rd_max;
00373 if (this->max_handlep1_ < suspend_wr_max)
00374 this->max_handlep1_ = suspend_wr_max;
00375 if (this->max_handlep1_ < suspend_ex_max)
00376 this->max_handlep1_ = suspend_ex_max;
00377
00378 ++this->max_handlep1_;
00379 }
00380
00381 #endif
00382
00383
00384 complete_removal = true;
00385 }
00386
00387 if (event_handler == 0)
00388 return -1;
00389
00390 bool const requires_reference_counting =
00391 event_handler->reference_counting_policy ().value () ==
00392 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00393
00394
00395
00396 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
00397 (void) event_handler->handle_close (handle, mask);
00398
00399
00400
00401 if (complete_removal && requires_reference_counting)
00402 {
00403 (void) event_handler->remove_reference ();
00404 }
00405
00406 return 0;
00407 }
00408
00409 ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
00410 (ACE_Select_Reactor_Handler_Repository const * s)
00411 : rep_ (s),
00412 current_ (s->event_handlers_.begin ())
00413 {
00414 #ifndef ACE_WIN32
00415
00416
00417 const_base_iterator const end =
00418 &this->rep_->event_handlers_[this->rep_->max_handlep1 ()];
00419
00420
00421
00422
00423 while (this->current_ != end && (*(this->current_) == 0))
00424 ++this->current_;
00425 #endif
00426 }
00427
00428
00429
00430
00431 bool
00432 ACE_Select_Reactor_Handler_Repository_Iterator::next (
00433 ACE_Event_Handler *&next_item)
00434 {
00435 bool result = true;
00436
00437 if (this->done ())
00438 result = false;
00439 else
00440 next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->current_);
00441
00442 return result;
00443 }
00444
00445
00446
00447 bool
00448 ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
00449 {
00450 #ifdef ACE_WIN32
00451
00452
00453 const_base_iterator const end = this->rep_->event_handlers_.end ();
00454 #else
00455
00456
00457 const_base_iterator const end =
00458 &this->rep_->event_handlers_[this->rep_->max_handlep1 ()];
00459 #endif
00460
00461 if (this->current_ != end)
00462 ++this->current_;
00463
00464 #ifndef ACE_WIN32
00465
00466
00467
00468 while (this->current_ != end && (*(this->current_) == 0))
00469 ++this->current_;
00470 #endif
00471
00472 return this->current_ != end;
00473 }
00474
00475
00476
00477 void
00478 ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
00479 {
00480 #if defined (ACE_HAS_DUMP)
00481 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
00482
00483 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00484 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("rep_ = %u"), this->rep_));
00485 # ifdef ACE_WIN32
00486 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = ")));
00487 this->current_.dump ();
00488 # else
00489 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = %@"), this->current_));
00490 # endif
00491 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00492 #endif
00493 }
00494
00495 void
00496 ACE_Select_Reactor_Handler_Repository::dump (void) const
00497 {
00498 #if defined (ACE_HAS_DUMP)
00499 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
00500
00501 # ifdef ACE_WIN32
00502 # define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%@")
00503 # define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%u")
00504 # else
00505 # define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%d")
00506 # define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%d")
00507 # endif
00508
00509 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00510 ACE_DEBUG ((LM_DEBUG,
00511 ACE_TEXT ("max_handlep1_ = ")
00512 ACE_MAX_HANDLEP1_FORMAT_SPECIFIER
00513 ACE_TEXT ("\n"),
00514 this->max_handlep1 ()));
00515 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("[")));
00516
00517 ACE_Event_Handler *event_handler = 0;
00518
00519 for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
00520 iter.next (event_handler) != 0;
00521 iter.advance ())
00522 ACE_DEBUG ((LM_DEBUG,
00523 ACE_TEXT (" (event_handler = %@,")
00524 ACE_TEXT (" event_handler->handle_ = ")
00525 ACE_HANDLE_FORMAT_SPECIFIER
00526 ACE_TEXT ("\n"),
00527 event_handler,
00528 event_handler->get_handle ()));
00529
00530 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" ]\n")));
00531 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00532 #endif
00533 }
00534
00535 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
00536
00537 ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
00538 : max_notify_iterations_ (-1)
00539 {
00540 }
00541
00542 ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void)
00543 {
00544 }
00545
00546 void
00547 ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
00548 {
00549
00550 if (iterations == 0)
00551 iterations = 1;
00552
00553 this->max_notify_iterations_ = iterations;
00554 }
00555
00556 int
00557 ACE_Select_Reactor_Notify::max_notify_iterations (void)
00558 {
00559 return this->max_notify_iterations_;
00560 }
00561
00562
00563
00564
00565
00566
00567
00568 int
00569 ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
00570 ACE_Reactor_Mask mask )
00571 {
00572 ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00573
00574 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00575
00576 return notification_queue_.purge_pending_notifications(eh, mask);
00577
00578 #else
00579 ACE_UNUSED_ARG (eh);
00580 ACE_UNUSED_ARG (mask);
00581 ACE_NOTSUP_RETURN (-1);
00582 #endif
00583 }
00584
00585 void
00586 ACE_Select_Reactor_Notify::dump (void) const
00587 {
00588 #if defined (ACE_HAS_DUMP)
00589 ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00590
00591 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00592 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00593 this->notification_pipe_.dump ();
00594 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00595 #endif
00596 }
00597
00598 int
00599 ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
00600 ACE_Timer_Queue *,
00601 int disable_notify_pipe)
00602 {
00603 ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00604
00605 if (disable_notify_pipe == 0)
00606 {
00607 this->select_reactor_ =
00608 dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00609
00610 if (select_reactor_ == 0)
00611 {
00612 errno = EINVAL;
00613 return -1;
00614 }
00615
00616 if (this->notification_pipe_.open () == -1)
00617 return -1;
00618 #if defined (F_SETFD)
00619 ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00620 ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00621 #endif
00622
00623 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00624 if (notification_queue_.open() == -1)
00625 {
00626 return -1;
00627 }
00628 #endif
00629
00630
00631
00632 if (ACE::set_flags (this->notification_pipe_.read_handle (),
00633 ACE_NONBLOCK) == -1)
00634 return -1;
00635 else
00636 return this->select_reactor_->register_handler
00637 (this->notification_pipe_.read_handle (),
00638 this,
00639 ACE_Event_Handler::READ_MASK);
00640 }
00641 else
00642 {
00643 this->select_reactor_ = 0;
00644 return 0;
00645 }
00646 }
00647
00648 int
00649 ACE_Select_Reactor_Notify::close (void)
00650 {
00651 ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00652
00653 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00654 notification_queue_.reset();
00655 #else
00656 if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
00657 {
00658
00659
00660
00661
00662
00663 ACE_Notification_Buffer b;
00664 for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
00665 r > 0;
00666 r = read_notify_pipe(notification_pipe_.read_handle(), b))
00667 {
00668 if (b.eh_ != 0)
00669 {
00670 b.eh_->remove_reference();
00671 }
00672 }
00673 }
00674 #endif
00675
00676 return this->notification_pipe_.close ();
00677 }
00678
00679 int
00680 ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
00681 ACE_Reactor_Mask mask,
00682 ACE_Time_Value *timeout)
00683 {
00684 ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00685
00686
00687
00688 if (this->select_reactor_ == 0)
00689 return 0;
00690
00691 ACE_Event_Handler_var safe_handler (event_handler);
00692
00693 if (event_handler)
00694 {
00695 event_handler->add_reference ();
00696 }
00697
00698 ACE_Notification_Buffer buffer (event_handler, mask);
00699
00700 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00701 int const notification_required =
00702 notification_queue_.push_new_notification(buffer);
00703
00704 if (notification_required == -1)
00705 {
00706 return -1;
00707 }
00708
00709 if (notification_required == 0)
00710 {
00711
00712 safe_handler.release ();
00713
00714 return 0;
00715 }
00716 #endif
00717
00718 ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
00719 (char *) &buffer,
00720 sizeof buffer,
00721 timeout);
00722 if (n == -1)
00723 {
00724 return -1;
00725 }
00726
00727
00728 safe_handler.release ();
00729
00730 return 0;
00731 }
00732
00733
00734
00735
00736 int
00737 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
00738 ACE_Handle_Set &rd_mask)
00739 {
00740 ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00741
00742 ACE_HANDLE const read_handle =
00743 this->notification_pipe_.read_handle ();
00744
00745 if (read_handle != ACE_INVALID_HANDLE
00746 && rd_mask.is_set (read_handle))
00747 {
00748 --number_of_active_handles;
00749 rd_mask.clr_bit (read_handle);
00750 return this->handle_input (read_handle);
00751 }
00752 else
00753 return 0;
00754 }
00755
00756
00757 ACE_HANDLE
00758 ACE_Select_Reactor_Notify::notify_handle (void)
00759 {
00760 ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00761
00762 return this->notification_pipe_.read_handle ();
00763 }
00764
00765
00766 int
00767 ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
00768 {
00769 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00770 ACE_UNUSED_ARG(buffer);
00771 return 1;
00772 #else
00773
00774
00775
00776
00777
00778 if (buffer.eh_ != 0)
00779 {
00780 return 1;
00781 }
00782 else
00783 {
00784
00785 return 0;
00786 }
00787 #endif
00788 }
00789
00790 int
00791 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00792 {
00793 int result = 0;
00794
00795 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00796
00797
00798
00799
00800 bool more_messages_queued = false;
00801 ACE_Notification_Buffer next;
00802
00803 result = notification_queue_.pop_next_notification(buffer,
00804 more_messages_queued,
00805 next);
00806
00807 if (result == 0 || result == -1)
00808 {
00809 return result;
00810 }
00811
00812 if(more_messages_queued)
00813 {
00814 (void) ACE::send(this->notification_pipe_.write_handle(),
00815 (char *)&next, sizeof(ACE_Notification_Buffer));
00816 }
00817 #endif
00818
00819
00820
00821
00822
00823
00824 if (buffer.eh_ != 0)
00825 {
00826 ACE_Event_Handler *event_handler = buffer.eh_;
00827
00828 bool const requires_reference_counting =
00829 event_handler->reference_counting_policy ().value () ==
00830 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00831
00832 switch (buffer.mask_)
00833 {
00834 case ACE_Event_Handler::READ_MASK:
00835 case ACE_Event_Handler::ACCEPT_MASK:
00836 result = event_handler->handle_input (ACE_INVALID_HANDLE);
00837 break;
00838 case ACE_Event_Handler::WRITE_MASK:
00839 result = event_handler->handle_output (ACE_INVALID_HANDLE);
00840 break;
00841 case ACE_Event_Handler::EXCEPT_MASK:
00842 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00843 break;
00844 case ACE_Event_Handler::QOS_MASK:
00845 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00846 break;
00847 case ACE_Event_Handler::GROUP_QOS_MASK:
00848 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00849 break;
00850 default:
00851
00852 ACE_ERROR ((LM_ERROR,
00853 ACE_TEXT ("invalid mask = %d\n"),
00854 buffer.mask_));
00855 }
00856
00857 if (result == -1)
00858 event_handler->handle_close (ACE_INVALID_HANDLE,
00859 ACE_Event_Handler::EXCEPT_MASK);
00860
00861 if (requires_reference_counting)
00862 {
00863 event_handler->remove_reference ();
00864 }
00865 }
00866
00867 return 1;
00868 }
00869
00870 int
00871 ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
00872 ACE_Notification_Buffer &buffer)
00873 {
00874 ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888
00889
00890
00891
00892
00893
00894
00895 ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00896
00897 if (n > 0)
00898 {
00899
00900 if (n != sizeof buffer)
00901 {
00902 ssize_t const remainder = sizeof buffer - n;
00903
00904
00905
00906
00907
00908 if (ACE::recv_n (handle,
00909 ((char *) &buffer) + n,
00910 remainder) != remainder)
00911 return -1;
00912 }
00913
00914
00915 return 1;
00916 }
00917
00918
00919 if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00920 return -1;
00921
00922 return 0;
00923 }
00924
00925
00926 int
00927 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
00928 {
00929 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
00930
00931
00932
00933 int number_dispatched = 0;
00934 int result = 0;
00935 ACE_Notification_Buffer buffer;
00936
00937
00938
00939
00940 while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00941 {
00942
00943
00944 if (this->dispatch_notify (buffer) > 0)
00945 ++number_dispatched;
00946
00947
00948
00949
00950 if (number_dispatched == this->max_notify_iterations_)
00951 break;
00952 }
00953
00954
00955
00956 if (result < 0)
00957 number_dispatched = -1;
00958
00959
00960
00961
00962
00963 this->select_reactor_->renew ();
00964 return number_dispatched;
00965 }
00966
00967
00968
00969 int
00970 ACE_Select_Reactor_Impl::purge_pending_notifications (ACE_Event_Handler *eh,
00971 ACE_Reactor_Mask mask)
00972 {
00973 if (this->notify_handler_ == 0)
00974 return 0;
00975 else
00976 return this->notify_handler_->purge_pending_notifications (eh, mask);
00977 }
00978
00979
00980
00981
00982
00983
00984
00985
00986
00987
00988
00989
00990 int
00991 ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
00992 ACE_Reactor_Mask mask,
00993 ACE_Select_Reactor_Handle_Set &handle_set,
00994 int ops)
00995 {
00996 ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops");
00997 if (this->handler_rep_.handle_in_range (handle) == 0)
00998 return -1;
00999
01000 #if !defined (ACE_WIN32)
01001 ACE_Sig_Guard sb (0,
01002 this->mask_signals_);
01003 #endif
01004
01005 ACE_FDS_PTMF ptmf = &ACE_Handle_Set::set_bit;
01006 u_long omask = ACE_Event_Handler::NULL_MASK;
01007
01008
01009
01010 if (handle_set.rd_mask_.is_set (handle))
01011 ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
01012 if (handle_set.wr_mask_.is_set (handle))
01013 ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
01014 if (handle_set.ex_mask_.is_set (handle))
01015 ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
01016
01017 switch (ops)
01018 {
01019 case ACE_Reactor::GET_MASK:
01020
01021
01022 break;
01023 case ACE_Reactor::CLR_MASK:
01024 ptmf = &ACE_Handle_Set::clr_bit;
01025
01026
01027
01028
01029 this->clear_dispatch_mask (handle, mask);
01030
01031 case ACE_Reactor::SET_MASK:
01032
01033 case ACE_Reactor::ADD_MASK:
01034
01035
01036
01037
01038
01039
01040
01041
01042
01043
01044 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
01045 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
01046 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
01047 {
01048 (handle_set.rd_mask_.*ptmf) (handle);
01049 }
01050 else if (ops == ACE_Reactor::SET_MASK)
01051 handle_set.rd_mask_.clr_bit (handle);
01052
01053
01054 if (ACE_BIT_ENABLED (mask,
01055 ACE_Event_Handler::WRITE_MASK)
01056 || ACE_BIT_ENABLED (mask,
01057 ACE_Event_Handler::CONNECT_MASK))
01058 {
01059 (handle_set.wr_mask_.*ptmf) (handle);
01060 }
01061 else if (ops == ACE_Reactor::SET_MASK)
01062 handle_set.wr_mask_.clr_bit (handle);
01063
01064
01065
01066 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
01067 #if defined (ACE_WIN32)
01068 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
01069 #endif
01070 )
01071 {
01072 (handle_set.ex_mask_.*ptmf) (handle);
01073 }
01074 else if (ops == ACE_Reactor::SET_MASK)
01075 handle_set.ex_mask_.clr_bit (handle);
01076 break;
01077 default:
01078 return -1;
01079 }
01080 return omask;
01081 }
01082
01083 void
01084 ACE_Select_Reactor_Impl::clear_dispatch_mask (ACE_HANDLE handle,
01085 ACE_Reactor_Mask mask)
01086 {
01087 ACE_TRACE ("ACE_Select_Reactor_Impl::clear_dispatch_mask");
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098
01099
01100
01101
01102
01103
01104
01105
01106
01107
01108 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) ||
01109 ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
01110 {
01111 this->dispatch_set_.rd_mask_.clr_bit (handle);
01112 }
01113 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
01114 {
01115 this->dispatch_set_.wr_mask_.clr_bit (handle);
01116 }
01117 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
01118 {
01119 this->dispatch_set_.ex_mask_.clr_bit (handle);
01120 }
01121
01122
01123
01124 this->state_changed_ = true;
01125 }
01126
01127
01128 int
01129 ACE_Select_Reactor_Impl::resumable_handler (void)
01130 {
01131
01132
01133
01134 return 0;
01135 }
01136
01137 ACE_END_VERSIONED_NAMESPACE_DECL