00001
00002
00003 #include "ace/Select_Reactor_Base.h"
00004 #include "ace/Reactor.h"
00005 #include "ace/Thread.h"
00006 #include "ace/SOCK_Acceptor.h"
00007 #include "ace/SOCK_Connector.h"
00008 #include "ace/Timer_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 #include "ace/Signal.h"
00011 #include "ace/OS_NS_fcntl.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/Select_Reactor_Base.inl"
00015 #endif
00016
00017 ACE_RCSID (ace,
00018 Select_Reactor_Base,
00019 "Select_Reactor_Base.cpp,v 4.75 2006/04/19 19:13:09 jwillemsen Exp")
00020
00021
00022 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00023
00024 #if defined (ACE_WIN32)
00025 #define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_)
00026 #define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_)
00027 #else
00028 #define ACE_SELECT_REACTOR_HANDLE(H) (H)
00029 #define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)])
00030 #endif
00031
00032
00033
00034 int
00035 ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
00036 {
00037 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
00038 #if defined (ACE_WIN32)
00039
00040
00041 if (handle == ACE_INVALID_HANDLE)
00042 #else
00043 if (handle < 0 || handle >= this->max_size_)
00044 #endif
00045 {
00046 errno = EINVAL;
00047 return 1;
00048 }
00049 else
00050 return 0;
00051 }
00052
00053
00054
00055 int
00056 ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
00057 {
00058 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
00059 #if defined (ACE_WIN32)
00060
00061
00062 if (handle != ACE_INVALID_HANDLE)
00063 #else
00064 if (handle >= 0 && handle < this->max_handlep1_)
00065 #endif
00066 return 1;
00067 else
00068 {
00069 errno = EINVAL;
00070 return 0;
00071 }
00072 }
00073
00074 size_t
00075 ACE_Select_Reactor_Handler_Repository::max_handlep1 (void)
00076 {
00077 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1");
00078
00079 return this->max_handlep1_;
00080 }
00081
00082 int
00083 ACE_Select_Reactor_Handler_Repository::open (size_t size)
00084 {
00085 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
00086 this->max_size_ = size;
00087 this->max_handlep1_ = 0;
00088
00089 #if defined (ACE_WIN32)
00090
00091 ACE_NEW_RETURN (this->event_handlers_,
00092 ACE_Event_Tuple[size],
00093 -1);
00094
00095
00096 for (size_t h = 0; h < size; ++h)
00097 {
00098 ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE;
00099 ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
00100 }
00101 #else
00102
00103 ACE_NEW_RETURN (this->event_handlers_,
00104 ACE_Event_Handler *[size],
00105 -1);
00106
00107
00108 for (size_t h = 0; h < size; ++h)
00109 ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
00110 #endif
00111
00112
00113
00114 return ACE::set_handle_limit (static_cast<int> (size), 1);
00115 }
00116
00117
00118
00119 ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
00120 : select_reactor_ (select_reactor),
00121 max_size_ (0),
00122 max_handlep1_ (0),
00123 event_handlers_ (0)
00124 {
00125 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
00126 }
00127
00128 int
00129 ACE_Select_Reactor_Handler_Repository::unbind_all (void)
00130 {
00131
00132 for (int slot = 0;
00133 slot < this->max_handlep1_;
00134 ++slot)
00135 this->unbind (ACE_SELECT_REACTOR_HANDLE (slot),
00136 ACE_Event_Handler::ALL_EVENTS_MASK);
00137
00138 return 0;
00139 }
00140
00141 int
00142 ACE_Select_Reactor_Handler_Repository::close (void)
00143 {
00144 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
00145
00146 if (this->event_handlers_ != 0)
00147 {
00148 this->unbind_all ();
00149
00150 delete [] this->event_handlers_;
00151 this->event_handlers_ = 0;
00152 }
00153 return 0;
00154 }
00155
00156
00157
00158 ACE_Event_Handler *
00159 ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle,
00160 size_t *index_p)
00161 {
00162 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find");
00163
00164 ACE_Event_Handler *eh = 0;
00165 ssize_t i;
00166
00167
00168 if (this->handle_in_range (handle))
00169 {
00170 #if defined (ACE_WIN32)
00171 i = 0;
00172
00173 for (; i < this->max_handlep1_; ++i)
00174 if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
00175 {
00176 eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i);
00177 break;
00178 }
00179 #else
00180 i = handle;
00181
00182 eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle);
00183 #endif
00184 }
00185 else
00186
00187
00188 i = 0;
00189
00190 if (eh != 0)
00191 {
00192 if (index_p != 0)
00193 *index_p = i;
00194 }
00195 else
00196 errno = ENOENT;
00197
00198 return eh;
00199 }
00200
00201
00202
00203 int
00204 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
00205 ACE_Event_Handler *event_handler,
00206 ACE_Reactor_Mask mask)
00207 {
00208 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
00209
00210 if (handle == ACE_INVALID_HANDLE)
00211 handle = event_handler->get_handle ();
00212
00213 if (this->invalid_handle (handle))
00214 return -1;
00215
00216
00217 int existing_handle = 0;
00218
00219 #if defined (ACE_WIN32)
00220
00221 ssize_t assigned_slot = -1;
00222
00223 for (ssize_t i = 0; i < this->max_handlep1_; ++i)
00224 {
00225
00226 if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
00227 {
00228
00229 if (ACE_SELECT_REACTOR_EVENT_HANDLER (this, i) !=
00230 event_handler)
00231 return -1;
00232
00233
00234 assigned_slot = i;
00235
00236
00237
00238 existing_handle = 1;
00239
00240
00241 break;
00242 }
00243 else
00244
00245 if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE &&
00246 assigned_slot == -1)
00247 {
00248 assigned_slot = i;
00249 }
00250 }
00251
00252 if (assigned_slot > -1)
00253
00254 {
00255 ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle;
00256 ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler;
00257 }
00258 else if (this->max_handlep1_ < this->max_size_)
00259 {
00260
00261 ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle;
00262 ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler;
00263 ++this->max_handlep1_;
00264 }
00265 else
00266 {
00267
00268 errno = ENOMEM;
00269 return -1;
00270 }
00271
00272 #else
00273
00274
00275 ACE_Event_Handler *current_handler =
00276 ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle);
00277
00278 if (current_handler)
00279 {
00280
00281 if (current_handler != event_handler)
00282 return -1;
00283
00284
00285
00286 existing_handle = 1;
00287 }
00288
00289 ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler;
00290
00291 if (this->max_handlep1_ < handle + 1)
00292 this->max_handlep1_ = handle + 1;
00293
00294 #endif
00295
00296 if (this->select_reactor_.is_suspended_i (handle))
00297 {
00298 this->select_reactor_.bit_ops (handle,
00299 mask,
00300 this->select_reactor_.suspend_set_,
00301 ACE_Reactor::ADD_MASK);
00302 }
00303 else
00304 {
00305 this->select_reactor_.bit_ops (handle,
00306 mask,
00307 this->select_reactor_.wait_set_,
00308 ACE_Reactor::ADD_MASK);
00309
00310
00311
00312
00313
00314 }
00315
00316
00317 if (!existing_handle)
00318 event_handler->add_reference ();
00319
00320 return 0;
00321 }
00322
00323
00324
00325 int
00326 ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
00327 ACE_Reactor_Mask mask)
00328 {
00329 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
00330
00331 size_t slot = 0;
00332 ACE_Event_Handler *event_handler = this->find (handle, &slot);
00333
00334 if (event_handler == 0)
00335 return -1;
00336
00337
00338 this->select_reactor_.bit_ops (handle,
00339 mask,
00340 this->select_reactor_.wait_set_,
00341 ACE_Reactor::CLR_MASK);
00342
00343
00344 this->select_reactor_.bit_ops (handle,
00345 mask,
00346 this->select_reactor_.suspend_set_,
00347 ACE_Reactor::CLR_MASK);
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357 int has_any_wait_mask =
00358 (this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
00359 || this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
00360 || this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
00361 int has_any_suspend_mask =
00362 (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
00363 || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
00364 || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
00365
00366 int complete_removal = 0;
00367
00368 if (!has_any_wait_mask && !has_any_suspend_mask)
00369 {
00370
00371 complete_removal = 1;
00372
00373 ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0;
00374
00375 #if defined (ACE_WIN32)
00376
00377 ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE;
00378
00379 if (this->max_handlep1_ == (int) slot + 1)
00380 {
00381
00382
00383
00384
00385
00386 int i;
00387
00388 for (i = this->max_handlep1_ - 1;
00389 i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE;
00390 --i)
00391 continue;
00392
00393 this->max_handlep1_ = i + 1;
00394 }
00395
00396 #else
00397
00398 if (this->max_handlep1_ == handle + 1)
00399 {
00400
00401
00402
00403 ACE_HANDLE wait_rd_max =
00404 this->select_reactor_.wait_set_.rd_mask_.max_set ();
00405 ACE_HANDLE wait_wr_max =
00406 this->select_reactor_.wait_set_.wr_mask_.max_set ();
00407 ACE_HANDLE wait_ex_max =
00408 this->select_reactor_.wait_set_.ex_mask_.max_set ();
00409
00410 ACE_HANDLE suspend_rd_max =
00411 this->select_reactor_.suspend_set_.rd_mask_.max_set ();
00412 ACE_HANDLE suspend_wr_max =
00413 this->select_reactor_.suspend_set_.wr_mask_.max_set ();
00414 ACE_HANDLE suspend_ex_max =
00415 this->select_reactor_.suspend_set_.ex_mask_.max_set ();
00416
00417
00418 this->max_handlep1_ = wait_rd_max;
00419 if (this->max_handlep1_ < wait_wr_max)
00420 this->max_handlep1_ = wait_wr_max;
00421 if (this->max_handlep1_ < wait_ex_max)
00422 this->max_handlep1_ = wait_ex_max;
00423
00424 if (this->max_handlep1_ < suspend_rd_max)
00425 this->max_handlep1_ = suspend_rd_max;
00426 if (this->max_handlep1_ < suspend_wr_max)
00427 this->max_handlep1_ = suspend_wr_max;
00428 if (this->max_handlep1_ < suspend_ex_max)
00429 this->max_handlep1_ = suspend_ex_max;
00430
00431 ++this->max_handlep1_;
00432 }
00433
00434 #endif
00435
00436 }
00437
00438 int requires_reference_counting =
00439 event_handler->reference_counting_policy ().value () ==
00440 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00441
00442
00443
00444 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
00445 event_handler->handle_close (handle, mask);
00446
00447
00448
00449 if (complete_removal && requires_reference_counting)
00450 {
00451 event_handler->remove_reference ();
00452 }
00453
00454 return 0;
00455 }
00456
00457 ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
00458 (const ACE_Select_Reactor_Handler_Repository *s)
00459 : rep_ (s),
00460 current_ (-1)
00461 {
00462 this->advance ();
00463 }
00464
00465
00466
00467
00468 int
00469 ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item)
00470 {
00471 int result = 1;
00472
00473 if (this->current_ >= this->rep_->max_handlep1_)
00474 result = 0;
00475 else
00476 next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_,
00477 this->current_);
00478 return result;
00479 }
00480
00481 int
00482 ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const
00483 {
00484 return this->current_ >= this->rep_->max_handlep1_;
00485 }
00486
00487
00488
00489 int
00490 ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
00491 {
00492 if (this->current_ < this->rep_->max_handlep1_)
00493 ++this->current_;
00494
00495 while (this->current_ < this->rep_->max_handlep1_)
00496 if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0)
00497 return 1;
00498 else
00499 ++this->current_;
00500
00501 return this->current_ < this->rep_->max_handlep1_;
00502 }
00503
00504
00505
00506 void
00507 ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
00508 {
00509 #if defined (ACE_HAS_DUMP)
00510 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
00511
00512 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00513 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("rep_ = %u"), this->rep_));
00514 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("current_ = %d"), this->current_));
00515 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00516 #endif
00517 }
00518
00519 void
00520 ACE_Select_Reactor_Handler_Repository::dump (void) const
00521 {
00522 #if defined (ACE_HAS_DUMP)
00523 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
00524
00525 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00526 ACE_DEBUG ((LM_DEBUG,
00527 ACE_LIB_TEXT ("max_handlep1_ = %d, max_size_ = %d\n"),
00528 this->max_handlep1_, this->max_size_));
00529 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("[")));
00530
00531 ACE_Event_Handler *event_handler = 0;
00532
00533 for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
00534 iter.next (event_handler) != 0;
00535 iter.advance ())
00536 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (event_handler = %x, event_handler->handle_ = %d)\n"),
00537 event_handler, event_handler->get_handle ()));
00538
00539 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]\n")));
00540 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00541 #endif
00542 }
00543
00544 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
00545
00546 ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
00547 : max_notify_iterations_ (-1)
00548 {
00549 }
00550
00551 ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify (void)
00552 {
00553 }
00554
00555 void
00556 ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
00557 {
00558
00559 if (iterations == 0)
00560 iterations = 1;
00561
00562 this->max_notify_iterations_ = iterations;
00563 }
00564
00565 int
00566 ACE_Select_Reactor_Notify::max_notify_iterations (void)
00567 {
00568 return this->max_notify_iterations_;
00569 }
00570
00571
00572
00573
00574
00575
00576
00577 int
00578 ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
00579 ACE_Reactor_Mask mask )
00580 {
00581 ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00582
00583 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00584
00585 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00586
00587 if (this->notify_queue_.is_empty ())
00588 return 0;
00589
00590 ACE_Notification_Buffer *temp = 0;
00591 ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
00592
00593 size_t queue_size = this->notify_queue_.size ();
00594 int number_purged = 0;
00595 size_t i;
00596 for (i = 0; i < queue_size; ++i)
00597 {
00598 if (-1 == this->notify_queue_.dequeue_head (temp))
00599 ACE_ERROR_RETURN ((LM_ERROR,
00600 ACE_LIB_TEXT ("%p\n"),
00601 ACE_LIB_TEXT ("dequeue_head")),
00602 -1);
00603
00604
00605
00606
00607
00608 if ((0 != temp->eh_) &&
00609 (0 == eh || eh == temp->eh_) &&
00610 ACE_BIT_DISABLED (temp->mask_, ~mask))
00611
00612
00613 {
00614 if (-1 == this->free_queue_.enqueue_head (temp))
00615 ACE_ERROR_RETURN ((LM_ERROR,
00616 ACE_LIB_TEXT ("%p\n"),
00617 ACE_LIB_TEXT ("enqueue_head")),
00618 -1);
00619
00620 ACE_Event_Handler *event_handler = temp->eh_;
00621 event_handler->remove_reference ();
00622
00623 ++number_purged;
00624 }
00625 else
00626 {
00627
00628
00629
00630
00631 if ((0 != temp->eh_) &&
00632 (0 == eh || eh == temp->eh_))
00633 ACE_CLR_BITS(temp->mask_, mask);
00634 if (-1 == local_queue.enqueue_head (temp))
00635 return -1;
00636 }
00637 }
00638
00639 if (this->notify_queue_.size ())
00640 {
00641 ACE_ASSERT (0);
00642 return -1;
00643 }
00644
00645
00646 queue_size = local_queue.size ();
00647 for (i = 0; i < queue_size; ++i)
00648 {
00649 if (-1 == local_queue.dequeue_head (temp))
00650 ACE_ERROR_RETURN ((LM_ERROR,
00651 ACE_LIB_TEXT ("%p\n"),
00652 ACE_LIB_TEXT ("dequeue_head")),
00653 -1);
00654
00655 if (-1 == this->notify_queue_.enqueue_head (temp))
00656 ACE_ERROR_RETURN ((LM_ERROR,
00657 ACE_LIB_TEXT ("%p\n"),
00658 ACE_LIB_TEXT ("enqueue_head")),
00659 -1);
00660 }
00661
00662 return number_purged;
00663
00664 #else
00665 ACE_UNUSED_ARG (eh);
00666 ACE_UNUSED_ARG (mask);
00667 ACE_NOTSUP_RETURN (-1);
00668 #endif
00669 }
00670
00671 void
00672 ACE_Select_Reactor_Notify::dump (void) const
00673 {
00674 #if defined (ACE_HAS_DUMP)
00675 ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00676
00677 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00678 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00679 this->notification_pipe_.dump ();
00680 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00681 #endif
00682 }
00683
00684 int
00685 ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
00686 ACE_Timer_Queue *,
00687 int disable_notify_pipe)
00688 {
00689 ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00690
00691 if (disable_notify_pipe == 0)
00692 {
00693 this->select_reactor_ =
00694 dynamic_cast<ACE_Select_Reactor_Impl *> (r);
00695
00696 if (select_reactor_ == 0)
00697 {
00698 errno = EINVAL;
00699 return -1;
00700 }
00701
00702 if (this->notification_pipe_.open () == -1)
00703 return -1;
00704 #if defined (F_SETFD)
00705 ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00706 ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00707 #endif
00708
00709 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00710 ACE_Notification_Buffer *temp = 0;
00711
00712 ACE_NEW_RETURN (temp,
00713 ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00714 -1);
00715
00716 if (this->alloc_queue_.enqueue_head (temp) == -1)
00717 {
00718 delete [] temp;
00719 return -1;
00720 }
00721
00722 for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00723 if (free_queue_.enqueue_head (temp + i) == -1)
00724 return -1;
00725
00726 #endif
00727
00728
00729
00730 if (ACE::set_flags (this->notification_pipe_.read_handle (),
00731 ACE_NONBLOCK) == -1)
00732 return -1;
00733 else
00734 return this->select_reactor_->register_handler
00735 (this->notification_pipe_.read_handle (),
00736 this,
00737 ACE_Event_Handler::READ_MASK);
00738 }
00739 else
00740 {
00741 this->select_reactor_ = 0;
00742 return 0;
00743 }
00744 }
00745
00746 int
00747 ACE_Select_Reactor_Notify::close (void)
00748 {
00749 ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00750
00751 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00752
00753 ACE_Notification_Buffer **b = 0;
00754
00755 for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
00756 alloc_iter.next (b) != 0;
00757 alloc_iter.advance ())
00758 {
00759 delete [] *b;
00760 *b = 0;
00761 }
00762
00763 this->alloc_queue_.reset ();
00764 this->notify_queue_.reset ();
00765 this->free_queue_.reset ();
00766 #endif
00767
00768 return this->notification_pipe_.close ();
00769 }
00770
00771 int
00772 ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
00773 ACE_Reactor_Mask mask,
00774 ACE_Time_Value *timeout)
00775 {
00776 ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00777
00778
00779
00780 if (this->select_reactor_ == 0)
00781 return 0;
00782
00783 ACE_Event_Handler_var safe_handler (event_handler);
00784
00785 if (event_handler)
00786 event_handler->add_reference ();
00787
00788 ACE_Notification_Buffer buffer (event_handler, mask);
00789
00790 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00791
00792 {
00793 bool notification_required = false;
00794
00795 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00796
00797
00798 if (this->notify_queue_.is_empty ())
00799 notification_required = true;
00800
00801 ACE_Notification_Buffer *temp = 0;
00802
00803 if (free_queue_.dequeue_head (temp) == -1)
00804 {
00805
00806 ACE_Notification_Buffer *temp1 = 0;
00807
00808 ACE_NEW_RETURN (temp1,
00809 ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00810 -1);
00811
00812 if (this->alloc_queue_.enqueue_head (temp1) == -1)
00813 {
00814 delete [] temp1;
00815 return -1;
00816 }
00817
00818
00819
00820
00821 for (size_t i = 1;
00822 i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
00823 ++i)
00824 this->free_queue_.enqueue_head (temp1 + i);
00825
00826 temp = temp1;
00827 }
00828
00829 ACE_ASSERT (temp != 0);
00830 *temp = buffer;
00831
00832 if (notify_queue_.enqueue_tail (temp) == -1)
00833 return -1;
00834
00835 if (!notification_required)
00836 {
00837
00838 safe_handler.release ();
00839
00840 return 0;
00841 }
00842 }
00843 #endif
00844
00845 ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00846 (char *) &buffer,
00847 sizeof buffer,
00848 timeout);
00849 if (n == -1)
00850 return -1;
00851
00852
00853 safe_handler.release ();
00854
00855 return 0;
00856 }
00857
00858
00859
00860
00861 int
00862 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
00863 ACE_Handle_Set &rd_mask)
00864 {
00865 ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00866
00867 ACE_HANDLE read_handle =
00868 this->notification_pipe_.read_handle ();
00869
00870 if (read_handle != ACE_INVALID_HANDLE
00871 && rd_mask.is_set (read_handle))
00872 {
00873 --number_of_active_handles;
00874 rd_mask.clr_bit (read_handle);
00875 return this->handle_input (read_handle);
00876 }
00877 else
00878 return 0;
00879 }
00880
00881
00882 ACE_HANDLE
00883 ACE_Select_Reactor_Notify::notify_handle (void)
00884 {
00885 ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00886
00887 return this->notification_pipe_.read_handle ();
00888 }
00889
00890
00891 int
00892 ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
00893 {
00894 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00895 ACE_UNUSED_ARG(buffer);
00896 return 1;
00897 #else
00898
00899
00900
00901
00902
00903 if (buffer.eh_ != 0)
00904 return 1;
00905
00906 #endif
00907
00908
00909 return 0;
00910 }
00911
00912 int
00913 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00914 {
00915 int result = 0;
00916
00917 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00918
00919
00920
00921 {
00922
00923
00924 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00925
00926 ACE_Notification_Buffer *temp = 0;
00927
00928 if (notify_queue_.is_empty ())
00929 return 0;
00930 else if (notify_queue_.dequeue_head (temp) == -1)
00931 ACE_ERROR_RETURN ((LM_ERROR,
00932 ACE_LIB_TEXT ("%p\n"),
00933 ACE_LIB_TEXT ("dequeue_head")),
00934 -1);
00935 buffer = *temp;
00936 if (free_queue_.enqueue_head (temp) == -1)
00937 ACE_ERROR_RETURN ((LM_ERROR,
00938 ACE_LIB_TEXT ("%p\n"),
00939 ACE_LIB_TEXT ("enqueue_head")),
00940 -1);
00941
00942 bool write_next_buffer = false;
00943 ACE_Notification_Buffer ** next = 0;
00944
00945 if(!this->notify_queue_.is_empty())
00946 {
00947
00948 this->notify_queue_.get (next, 0);
00949 write_next_buffer = true;
00950 }
00951
00952 if(write_next_buffer)
00953 {
00954 (void) ACE::send(
00955 this->notification_pipe_.write_handle(),
00956 (char *)*next, sizeof(ACE_Notification_Buffer));
00957 }
00958 }
00959
00960 #endif
00961
00962
00963
00964
00965
00966
00967 if (buffer.eh_ != 0)
00968 {
00969 ACE_Event_Handler *event_handler =
00970 buffer.eh_;
00971
00972 int requires_reference_counting =
00973 event_handler->reference_counting_policy ().value () ==
00974 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00975
00976 switch (buffer.mask_)
00977 {
00978 case ACE_Event_Handler::READ_MASK:
00979 case ACE_Event_Handler::ACCEPT_MASK:
00980 result = event_handler->handle_input (ACE_INVALID_HANDLE);
00981 break;
00982 case ACE_Event_Handler::WRITE_MASK:
00983 result = event_handler->handle_output (ACE_INVALID_HANDLE);
00984 break;
00985 case ACE_Event_Handler::EXCEPT_MASK:
00986 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
00987 break;
00988 case ACE_Event_Handler::QOS_MASK:
00989 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
00990 break;
00991 case ACE_Event_Handler::GROUP_QOS_MASK:
00992 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
00993 break;
00994 default:
00995
00996 ACE_ERROR ((LM_ERROR,
00997 ACE_LIB_TEXT ("invalid mask = %d\n"),
00998 buffer.mask_));
00999 }
01000
01001 if (result == -1)
01002 event_handler->handle_close (ACE_INVALID_HANDLE,
01003 ACE_Event_Handler::EXCEPT_MASK);
01004
01005 if (requires_reference_counting)
01006 {
01007 event_handler->remove_reference ();
01008 }
01009 }
01010
01011 return 1;
01012 }
01013
01014 int
01015 ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
01016 ACE_Notification_Buffer &buffer)
01017 {
01018 ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
01019
01020 ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
01021
01022 if (n > 0)
01023 {
01024
01025 if (n != sizeof buffer)
01026 {
01027 ssize_t remainder = sizeof buffer - n;
01028
01029
01030
01031
01032
01033 if (ACE::recv (handle,
01034 ((char *) &buffer) + n,
01035 remainder) != remainder)
01036 return -1;
01037 }
01038
01039
01040 return 1;
01041 }
01042
01043
01044 if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
01045 return -1;
01046
01047 return 0;
01048 }
01049
01050
01051 int
01052 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
01053 {
01054 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
01055
01056
01057
01058 int number_dispatched = 0;
01059 int result = 0;
01060 ACE_Notification_Buffer buffer;
01061
01062 while ((result = this->read_notify_pipe (handle, buffer)) > 0)
01063 {
01064
01065
01066 if (this->dispatch_notify (buffer) > 0)
01067 ++number_dispatched;
01068
01069
01070
01071
01072 if (number_dispatched == this->max_notify_iterations_)
01073 break;
01074 }
01075
01076
01077
01078 if (result < 0)
01079 number_dispatched = -1;
01080
01081
01082
01083
01084
01085 this->select_reactor_->renew ();
01086 return number_dispatched;
01087 }
01088
01089
01090
01091 int
01092 ACE_Select_Reactor_Impl::purge_pending_notifications (ACE_Event_Handler *eh,
01093 ACE_Reactor_Mask mask)
01094 {
01095 if (this->notify_handler_ == 0)
01096 return 0;
01097 else
01098 return this->notify_handler_->purge_pending_notifications (eh, mask);
01099 }
01100
01101
01102
01103
01104
01105
01106
01107
01108
01109
01110
01111
01112 int
01113 ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
01114 ACE_Reactor_Mask mask,
01115 ACE_Select_Reactor_Handle_Set &handle_set,
01116 int ops)
01117 {
01118 ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops");
01119 if (this->handler_rep_.handle_in_range (handle) == 0)
01120 return -1;
01121
01122 #if !defined (ACE_WIN32)
01123 ACE_Sig_Guard sb (0,
01124 this->mask_signals_);
01125 #endif
01126
01127 ACE_FDS_PTMF ptmf = &ACE_Handle_Set::set_bit;
01128 u_long omask = ACE_Event_Handler::NULL_MASK;
01129
01130
01131
01132 if (handle_set.rd_mask_.is_set (handle))
01133 ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
01134 if (handle_set.wr_mask_.is_set (handle))
01135 ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
01136 if (handle_set.ex_mask_.is_set (handle))
01137 ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
01138
01139 switch (ops)
01140 {
01141 case ACE_Reactor::GET_MASK:
01142
01143
01144 break;
01145 case ACE_Reactor::CLR_MASK:
01146 ptmf = &ACE_Handle_Set::clr_bit;
01147
01148
01149
01150
01151 this->clear_dispatch_mask (handle, mask);
01152
01153 case ACE_Reactor::SET_MASK:
01154
01155 case ACE_Reactor::ADD_MASK:
01156
01157
01158
01159
01160
01161
01162
01163
01164
01165
01166 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
01167 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
01168 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
01169 {
01170 (handle_set.rd_mask_.*ptmf) (handle);
01171 }
01172 else if (ops == ACE_Reactor::SET_MASK)
01173 handle_set.rd_mask_.clr_bit (handle);
01174
01175
01176 if (ACE_BIT_ENABLED (mask,
01177 ACE_Event_Handler::WRITE_MASK)
01178 || ACE_BIT_ENABLED (mask,
01179 ACE_Event_Handler::CONNECT_MASK))
01180 {
01181 (handle_set.wr_mask_.*ptmf) (handle);
01182 }
01183 else if (ops == ACE_Reactor::SET_MASK)
01184 handle_set.wr_mask_.clr_bit (handle);
01185
01186
01187
01188 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
01189 #if defined (ACE_WIN32)
01190 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
01191 #endif
01192 )
01193 {
01194 (handle_set.ex_mask_.*ptmf) (handle);
01195 }
01196 else if (ops == ACE_Reactor::SET_MASK)
01197 handle_set.ex_mask_.clr_bit (handle);
01198 break;
01199 default:
01200 return -1;
01201 }
01202 return omask;
01203 }
01204
01205 void
01206 ACE_Select_Reactor_Impl::clear_dispatch_mask (ACE_HANDLE handle,
01207 ACE_Reactor_Mask mask)
01208 {
01209 ACE_TRACE ("ACE_Select_Reactor_Impl::clear_dispatch_mask");
01210
01211
01212
01213
01214
01215
01216
01217
01218
01219
01220
01221
01222
01223
01224
01225
01226
01227
01228
01229
01230 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) ||
01231 ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
01232 {
01233 this->dispatch_set_.rd_mask_.clr_bit (handle);
01234 }
01235 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
01236 {
01237 this->dispatch_set_.wr_mask_.clr_bit (handle);
01238 }
01239 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
01240 {
01241 this->dispatch_set_.ex_mask_.clr_bit (handle);
01242 }
01243
01244
01245
01246 this->state_changed_ = true;
01247 }
01248
01249
01250 int
01251 ACE_Select_Reactor_Impl::resumable_handler (void)
01252 {
01253
01254
01255
01256 return 0;
01257 }
01258
01259 ACE_END_VERSIONED_NAMESPACE_DECL