00001
00002
00003 #include "ace/WFMO_Reactor.h"
00004
00005 #if defined (ACE_WIN32)
00006
00007 #include "ace/Handle_Set.h"
00008 #include "ace/Timer_Heap.h"
00009 #include "ace/Thread.h"
00010 #include "ace/OS_NS_errno.h"
00011 #include "ace/Null_Condition.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/WFMO_Reactor.inl"
00015 #endif
00016
00017 ACE_RCSID(ace, WFMO_Reactor, "$Id: WFMO_Reactor.cpp 81138 2008-03-28 09:18:15Z johnnyw $")
00018
00019 #include "ace/Auto_Ptr.h"
00020
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor &wfmo_reactor)
00024 : wfmo_reactor_ (wfmo_reactor)
00025 {
00026 }
00027
00028 int
00029 ACE_WFMO_Reactor_Handler_Repository::open (size_t size)
00030 {
00031 if (size > MAXIMUM_WAIT_OBJECTS)
00032 ACE_ERROR_RETURN ((LM_ERROR,
00033 ACE_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
00034 size,
00035 MAXIMUM_WAIT_OBJECTS),
00036 -1);
00037
00038
00039 ACE_NEW_RETURN (this->current_handles_,
00040 ACE_HANDLE[size],
00041 -1);
00042 ACE_NEW_RETURN (this->current_info_,
00043 Current_Info[size],
00044 -1);
00045 ACE_NEW_RETURN (this->current_suspended_info_,
00046 Suspended_Info[size],
00047 -1);
00048 ACE_NEW_RETURN (this->to_be_added_info_,
00049 To_Be_Added_Info[size],
00050 -1);
00051
00052
00053 this->max_size_ = size;
00054 this->max_handlep1_ = 0;
00055 this->suspended_handles_ = 0;
00056 this->handles_to_be_added_ = 0;
00057 this->handles_to_be_deleted_ = 0;
00058 this->handles_to_be_suspended_ = 0;
00059 this->handles_to_be_resumed_ = 0;
00060
00061 for (size_t i = 0; i < size; ++i)
00062 this->current_handles_[i] = ACE_INVALID_HANDLE;
00063
00064 return 0;
00065 }
00066
00067 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void)
00068 {
00069
00070 delete [] this->current_handles_;
00071 delete [] this->current_info_;
00072 delete [] this->current_suspended_info_;
00073 delete [] this->to_be_added_info_;
00074 }
00075
00076 ACE_Reactor_Mask
00077 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks,
00078 ACE_Reactor_Mask change_masks,
00079 int operation)
00080 {
00081
00082
00083
00084 ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK;
00085
00086 if (ACE_BIT_ENABLED (existing_masks, FD_READ)
00087 || ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
00088 ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK);
00089
00090 if (ACE_BIT_ENABLED (existing_masks, FD_WRITE))
00091 ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK);
00092
00093 if (ACE_BIT_ENABLED (existing_masks, FD_OOB))
00094 ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK);
00095
00096 if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
00097 ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK);
00098
00099 if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
00100 ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK);
00101
00102 if (ACE_BIT_ENABLED (existing_masks, FD_QOS))
00103 ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK);
00104
00105 if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
00106 ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK);
00107
00108 switch (operation)
00109 {
00110 case ACE_Reactor::CLR_MASK:
00111
00112
00113 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00114 {
00115 ACE_CLR_BITS (existing_masks, FD_READ);
00116 ACE_CLR_BITS (existing_masks, FD_CLOSE);
00117 }
00118
00119 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00120 ACE_CLR_BITS (existing_masks, FD_WRITE);
00121
00122 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00123 ACE_CLR_BITS (existing_masks, FD_OOB);
00124
00125 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00126 ACE_CLR_BITS (existing_masks, FD_ACCEPT);
00127
00128 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00129 ACE_CLR_BITS (existing_masks, FD_CONNECT);
00130
00131 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00132 ACE_CLR_BITS (existing_masks, FD_QOS);
00133
00134 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00135 ACE_CLR_BITS (existing_masks, FD_GROUP_QOS);
00136
00137 break;
00138
00139 case ACE_Reactor::SET_MASK:
00140
00141
00142 existing_masks = 0;
00143
00144
00145 case ACE_Reactor::ADD_MASK:
00146
00147
00148
00149 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00150 {
00151 ACE_SET_BITS (existing_masks, FD_READ);
00152 ACE_SET_BITS (existing_masks, FD_CLOSE);
00153 }
00154
00155 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00156 ACE_SET_BITS (existing_masks, FD_WRITE);
00157
00158 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00159 ACE_SET_BITS (existing_masks, FD_OOB);
00160
00161 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00162 ACE_SET_BITS (existing_masks, FD_ACCEPT);
00163
00164 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00165 ACE_SET_BITS (existing_masks, FD_CONNECT);
00166
00167 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00168 ACE_SET_BITS (existing_masks, FD_QOS);
00169
00170 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00171 ACE_SET_BITS (existing_masks, FD_GROUP_QOS);
00172
00173 break;
00174
00175 case ACE_Reactor::GET_MASK:
00176
00177
00178
00179
00180 ACE_UNUSED_ARG (change_masks);
00181
00182 break;
00183 }
00184
00185 return old_masks;
00186 }
00187
00188 int
00189 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle,
00190 ACE_Reactor_Mask mask,
00191 bool &changes_required)
00192 {
00193 int error = 0;
00194
00195
00196
00197 size_t const original_handle_count = this->handles_to_be_deleted_;
00198 size_t i;
00199
00200
00201
00202
00203
00204
00205 for (i = 0; i < this->max_handlep1_ && error == 0; ++i)
00206
00207
00208 if ((this->current_handles_[i] == handle
00209 || this->current_info_[i].io_handle_ == handle)
00210 &&
00211 !this->current_info_[i].delete_entry_)
00212 {
00213 if (this->remove_handler_i (i, mask) == -1)
00214 error = 1;
00215 }
00216
00217
00218 for (i = 0; i < this->suspended_handles_ && error == 0; ++i)
00219
00220
00221 if ((this->current_suspended_info_[i].io_handle_ == handle
00222 || this->current_suspended_info_[i].event_handle_ == handle)
00223 &&
00224
00225 !this->current_suspended_info_[i].delete_entry_)
00226 {
00227 if (this->remove_suspended_handler_i (i, mask) == -1)
00228 error = 1;
00229 }
00230
00231
00232 for (i = 0; i < this->handles_to_be_added_ && error == 0; ++i)
00233
00234
00235 if ((this->to_be_added_info_[i].io_handle_ == handle
00236 || this->to_be_added_info_[i].event_handle_ == handle)
00237 &&
00238
00239 !this->to_be_added_info_[i].delete_entry_)
00240 {
00241 if (this->remove_to_be_added_handler_i (i, mask) == -1)
00242 error = 1;
00243 }
00244
00245
00246
00247 if (original_handle_count < this->handles_to_be_deleted_)
00248 changes_required = true;
00249
00250 return error ? -1 : 0;
00251 }
00252
00253 int
00254 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
00255 ACE_Reactor_Mask to_be_removed_masks)
00256 {
00257
00258 if (this->current_info_[slot].io_entry_)
00259 {
00260
00261
00262 this->bit_ops (this->current_info_[slot].network_events_,
00263 to_be_removed_masks,
00264 ACE_Reactor::CLR_MASK);
00265
00266
00267
00268
00269
00270
00271 ::WSAEventSelect ((SOCKET) this->current_info_[slot].io_handle_,
00272 this->current_handles_[slot],
00273 this->current_info_[slot].network_events_);
00274 }
00275
00276 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00277
00278 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00279 else
00280
00281 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00282
00283
00284
00285 if (this->current_info_[slot].suspend_entry_)
00286 {
00287
00288 this->current_info_[slot].suspend_entry_ = false;
00289
00290 --this->handles_to_be_suspended_;
00291 }
00292
00293
00294
00295
00296 if (this->current_info_[slot].network_events_ == 0)
00297 {
00298
00299 this->current_info_[slot].delete_entry_ = true;
00300
00301 this->current_info_[slot].close_masks_ = to_be_removed_masks;
00302
00303 ++this->handles_to_be_deleted_;
00304 }
00305
00306
00307
00308
00309
00310
00311 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00312 {
00313 ACE_HANDLE handle = this->current_info_[slot].io_handle_;
00314 this->current_info_[slot].event_handler_->handle_close (handle,
00315 to_be_removed_masks);
00316 }
00317
00318 return 0;
00319 }
00320
00321 int
00322 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot,
00323 ACE_Reactor_Mask to_be_removed_masks)
00324 {
00325
00326 if (this->current_suspended_info_[slot].io_entry_)
00327 {
00328
00329
00330 this->bit_ops (this->current_suspended_info_[slot].network_events_,
00331 to_be_removed_masks,
00332 ACE_Reactor::CLR_MASK);
00333
00334
00335
00336
00337
00338
00339 ::WSAEventSelect ((SOCKET) this->current_suspended_info_[slot].io_handle_,
00340 this->current_suspended_info_[slot].event_handle_,
00341 this->current_suspended_info_[slot].network_events_);
00342 }
00343
00344 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00345
00346 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00347 else
00348
00349 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00350
00351
00352
00353 if (this->current_suspended_info_[slot].resume_entry_)
00354 {
00355
00356 this->current_suspended_info_[slot].resume_entry_ = false;
00357
00358 --this->handles_to_be_resumed_;
00359 }
00360
00361
00362
00363
00364 if (this->current_suspended_info_[slot].network_events_ == 0)
00365 {
00366
00367 this->current_suspended_info_[slot].delete_entry_ = true;
00368
00369 this->current_suspended_info_[slot].close_masks_ = to_be_removed_masks;
00370
00371 ++this->handles_to_be_deleted_;
00372 }
00373
00374
00375
00376
00377
00378 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00379 {
00380 ACE_HANDLE handle = this->current_suspended_info_[slot].io_handle_;
00381 this->current_suspended_info_[slot].event_handler_->handle_close (handle,
00382 to_be_removed_masks);
00383 }
00384
00385 return 0;
00386 }
00387
00388 int
00389 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot,
00390 ACE_Reactor_Mask to_be_removed_masks)
00391 {
00392
00393 if (this->to_be_added_info_[slot].io_entry_)
00394 {
00395
00396
00397 this->bit_ops (this->to_be_added_info_[slot].network_events_,
00398 to_be_removed_masks,
00399 ACE_Reactor::CLR_MASK);
00400
00401
00402
00403
00404
00405
00406 ::WSAEventSelect ((SOCKET) this->to_be_added_info_[slot].io_handle_,
00407 this->to_be_added_info_[slot].event_handle_,
00408 this->to_be_added_info_[slot].network_events_);
00409 }
00410
00411 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00412
00413 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00414 else
00415
00416 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00417
00418
00419
00420 if (this->to_be_added_info_[slot].suspend_entry_)
00421 {
00422
00423 this->to_be_added_info_[slot].suspend_entry_ = false;
00424
00425 --this->handles_to_be_suspended_;
00426 }
00427
00428
00429
00430
00431 if (this->to_be_added_info_[slot].network_events_ == 0)
00432 {
00433
00434 this->to_be_added_info_[slot].delete_entry_ = true;
00435
00436 this->to_be_added_info_[slot].close_masks_ = to_be_removed_masks;
00437
00438 ++this->handles_to_be_deleted_;
00439 }
00440
00441
00442
00443
00444
00445 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00446 {
00447 ACE_HANDLE handle = this->to_be_added_info_[slot].io_handle_;
00448 this->to_be_added_info_[slot].event_handler_->handle_close (handle,
00449 to_be_removed_masks);
00450 }
00451
00452 return 0;
00453 }
00454
00455 int
00456 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle,
00457 bool &changes_required)
00458 {
00459 size_t i = 0;
00460
00461
00462
00463
00464
00465
00466 for (i = 0; i < this->max_handlep1_; ++i)
00467
00468
00469 if ((this->current_handles_[i] == handle ||
00470 this->current_info_[i].io_handle_ == handle) &&
00471
00472 !this->current_info_[i].suspend_entry_)
00473 {
00474
00475 this->current_info_[i].suspend_entry_ = true;
00476
00477 ++this->handles_to_be_suspended_;
00478
00479 changes_required = true;
00480 }
00481
00482
00483 for (i = 0; i < this->suspended_handles_; ++i)
00484
00485
00486 if ((this->current_suspended_info_[i].event_handle_ == handle ||
00487 this->current_suspended_info_[i].io_handle_ == handle) &&
00488
00489 this->current_suspended_info_[i].resume_entry_)
00490 {
00491
00492 this->current_suspended_info_[i].resume_entry_ = false;
00493
00494 --this->handles_to_be_resumed_;
00495
00496 changes_required = true;
00497 }
00498
00499
00500 for (i = 0; i < this->handles_to_be_added_; ++i)
00501
00502
00503 if ((this->to_be_added_info_[i].io_handle_ == handle ||
00504 this->to_be_added_info_[i].event_handle_ == handle) &&
00505
00506 !this->to_be_added_info_[i].suspend_entry_)
00507 {
00508
00509 this->to_be_added_info_[i].suspend_entry_ = true;
00510
00511 ++this->handles_to_be_suspended_;
00512
00513 changes_required = true;
00514 }
00515
00516 return 0;
00517 }
00518
00519 int
00520 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle,
00521 bool &changes_required)
00522 {
00523 size_t i = 0;
00524
00525
00526
00527
00528
00529
00530 for (i = 0; i < this->max_handlep1_; ++i)
00531
00532
00533 if ((this->current_handles_[i] == handle ||
00534 this->current_info_[i].io_handle_ == handle) &&
00535
00536 this->current_info_[i].suspend_entry_)
00537 {
00538
00539 this->current_info_[i].suspend_entry_ = false;
00540
00541 --this->handles_to_be_suspended_;
00542
00543 changes_required = true;
00544 }
00545
00546
00547 for (i = 0; i < this->suspended_handles_; ++i)
00548
00549
00550 if ((this->current_suspended_info_[i].event_handle_ == handle ||
00551 this->current_suspended_info_[i].io_handle_ == handle) &&
00552
00553 !this->current_suspended_info_[i].resume_entry_)
00554 {
00555
00556 this->current_suspended_info_[i].resume_entry_ = true;
00557
00558 ++this->handles_to_be_resumed_;
00559
00560 changes_required = true;
00561 }
00562
00563
00564 for (i = 0; i < this->handles_to_be_added_; ++i)
00565
00566
00567 if ((this->to_be_added_info_[i].io_handle_ == handle ||
00568 this->to_be_added_info_[i].event_handle_ == handle) &&
00569
00570 this->to_be_added_info_[i].suspend_entry_)
00571 {
00572
00573 this->to_be_added_info_[i].suspend_entry_ = false;
00574
00575 --this->handles_to_be_suspended_;
00576
00577 changes_required = true;
00578 }
00579
00580 return 0;
00581 }
00582
00583 void
00584 ACE_WFMO_Reactor_Handler_Repository::unbind_all (void)
00585 {
00586 {
00587 ACE_GUARD (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_);
00588
00589 bool dummy;
00590 size_t i;
00591
00592
00593 for (i = 0; i < this->max_handlep1_; ++i)
00594 this->unbind_i (this->current_handles_[i],
00595 ACE_Event_Handler::ALL_EVENTS_MASK,
00596 dummy);
00597
00598
00599 for (i = 0; i < this->suspended_handles_; ++i)
00600 this->unbind_i (this->current_suspended_info_[i].event_handle_,
00601 ACE_Event_Handler::ALL_EVENTS_MASK,
00602 dummy);
00603
00604
00605 for (i = 0; i < this->handles_to_be_added_; ++i)
00606 this->unbind_i (this->to_be_added_info_[i].event_handle_,
00607 ACE_Event_Handler::ALL_EVENTS_MASK,
00608 dummy);
00609 }
00610
00611
00612
00613
00614
00615 this->wfmo_reactor_.wakeup_all_threads ();
00616 }
00617
00618 int
00619 ACE_WFMO_Reactor_Handler_Repository::bind_i (bool io_entry,
00620 ACE_Event_Handler *event_handler,
00621 long network_events,
00622 ACE_HANDLE io_handle,
00623 ACE_HANDLE event_handle,
00624 bool delete_event)
00625 {
00626 if (event_handler == 0)
00627 return -1;
00628
00629
00630 if (event_handle == ACE_INVALID_HANDLE)
00631 event_handle = event_handler->get_handle ();
00632 if (this->invalid_handle (event_handle))
00633 return -1;
00634
00635 size_t current_size = this->max_handlep1_ +
00636 this->handles_to_be_added_ -
00637 this->handles_to_be_deleted_ +
00638 this->suspended_handles_;
00639
00640
00641
00642
00643 if (current_size < this->max_size_ &&
00644 this->handles_to_be_added_ < this->max_size_)
00645 {
00646
00647
00648 this->to_be_added_info_[this->handles_to_be_added_].set (event_handle,
00649 io_entry,
00650 event_handler,
00651 io_handle,
00652 network_events,
00653 delete_event);
00654
00655 ++this->handles_to_be_added_;
00656
00657 event_handler->add_reference ();
00658
00659
00660
00661 this->wfmo_reactor_.wakeup_all_threads ();
00662 }
00663 else
00664 {
00665 errno = EMFILE;
00666 return -1;
00667 }
00668
00669 return 0;
00670 }
00671
00672 int
00673 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
00674 {
00675
00676
00677 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
00678 {
00679 size_t i = 0;
00680 while (i < this->max_handlep1_)
00681 {
00682
00683
00684
00685
00686 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00687 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00688 ACE_Event_Handler *event_handler = 0;
00689
00690
00691 if (this->current_info_[i].delete_entry_)
00692 {
00693
00694
00695
00696
00697
00698
00699
00700 masks = this->current_info_[i].close_masks_;
00701 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00702 {
00703
00704 if (this->current_info_[i].io_entry_)
00705 handle = this->current_info_[i].io_handle_;
00706 else
00707 handle = this->current_handles_[i];
00708
00709
00710 event_handler = this->current_info_[i].event_handler_;
00711 }
00712
00713
00714 if (this->current_info_[i].delete_event_)
00715 ACE_OS::event_destroy (&this->current_handles_[i]);
00716
00717
00718 --this->handles_to_be_deleted_;
00719 }
00720
00721
00722 else if (this->current_info_[i].suspend_entry_)
00723 {
00724 this->current_suspended_info_ [this->suspended_handles_].set (this->current_handles_[i],
00725 this->current_info_[i]);
00726
00727 ++this->suspended_handles_;
00728
00729
00730 --this->handles_to_be_suspended_;
00731 }
00732
00733
00734
00735 if (this->current_info_[i].delete_entry_ ||
00736 this->current_info_[i].suspend_entry_ )
00737 {
00738 size_t last_valid_slot = this->max_handlep1_ - 1;
00739
00740
00741 if (i < last_valid_slot)
00742
00743 {
00744
00745 this->current_info_[i] =
00746 this->current_info_[last_valid_slot];
00747 this->current_handles_[i] =
00748 this->current_handles_[last_valid_slot];
00749 }
00750
00751 this->current_info_[last_valid_slot].reset ();
00752 this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
00753 --this->max_handlep1_;
00754 }
00755 else
00756 {
00757
00758
00759
00760 ++i;
00761 }
00762
00763
00764
00765 if (event_handler != 0)
00766 {
00767 bool const requires_reference_counting =
00768 event_handler->reference_counting_policy ().value () ==
00769 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00770
00771 event_handler->handle_close (handle, masks);
00772
00773 if (requires_reference_counting)
00774 {
00775 event_handler->remove_reference ();
00776 }
00777 }
00778 }
00779 }
00780
00781 return 0;
00782 }
00783
00784 int
00785 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
00786 {
00787
00788 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
00789 {
00790 size_t i = 0;
00791 while (i < this->suspended_handles_)
00792 {
00793
00794
00795
00796
00797 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00798 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00799 ACE_Event_Handler *event_handler = 0;
00800
00801
00802 if (this->current_suspended_info_[i].delete_entry_)
00803 {
00804
00805
00806
00807
00808
00809
00810
00811 masks = this->current_suspended_info_[i].close_masks_;
00812 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00813 {
00814
00815 if (this->current_suspended_info_[i].io_entry_)
00816 handle = this->current_suspended_info_[i].io_handle_;
00817 else
00818 handle = this->current_suspended_info_[i].event_handle_;
00819
00820
00821 event_handler = this->current_suspended_info_[i].event_handler_;
00822 }
00823
00824
00825 if (this->current_suspended_info_[i].delete_event_)
00826 ACE_OS::event_destroy (&this->current_suspended_info_[i].event_handle_);
00827
00828
00829 --this->handles_to_be_deleted_;
00830 }
00831
00832 else if (this->current_suspended_info_[i].resume_entry_)
00833 {
00834
00835 this->current_handles_[this->max_handlep1_] = this->current_suspended_info_[i].event_handle_;
00836
00837 this->current_info_[this->max_handlep1_].set (this->current_suspended_info_[i]);
00838 ++this->max_handlep1_;
00839
00840
00841 --this->handles_to_be_resumed_;
00842 }
00843
00844
00845
00846
00847 if (this->current_suspended_info_[i].resume_entry_ ||
00848 this->current_suspended_info_[i].delete_entry_)
00849 {
00850 size_t last_valid_slot = this->suspended_handles_ - 1;
00851
00852
00853
00854
00855
00856 if (i < last_valid_slot)
00857
00858 this->current_suspended_info_[i] =
00859 this->current_suspended_info_[last_valid_slot];
00860 this->current_suspended_info_[last_valid_slot].reset ();
00861 --this->suspended_handles_;
00862 }
00863 else
00864 {
00865
00866
00867
00868 ++i;
00869 }
00870
00871
00872
00873 if (event_handler != 0)
00874 {
00875 int requires_reference_counting =
00876 event_handler->reference_counting_policy ().value () ==
00877 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00878
00879 event_handler->handle_close (handle, masks);
00880
00881 if (requires_reference_counting)
00882 {
00883 event_handler->remove_reference ();
00884 }
00885 }
00886 }
00887 }
00888
00889 return 0;
00890 }
00891
00892 int
00893 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void)
00894 {
00895
00896 for (size_t i = 0; i < this->handles_to_be_added_; ++i)
00897 {
00898
00899
00900
00901
00902 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00903 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00904 ACE_Event_Handler *event_handler = 0;
00905
00906
00907 if (this->to_be_added_info_[i].delete_entry_)
00908 {
00909
00910
00911
00912
00913
00914
00915
00916 masks = this->to_be_added_info_[i].close_masks_;
00917 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00918 {
00919
00920 if (this->to_be_added_info_[i].io_entry_)
00921 handle = this->to_be_added_info_[i].io_handle_;
00922 else
00923 handle = this->to_be_added_info_[i].event_handle_;
00924
00925
00926 event_handler = this->to_be_added_info_[i].event_handler_;
00927 }
00928
00929
00930 if (this->to_be_added_info_[i].delete_event_)
00931 ACE_OS::event_destroy (&this->to_be_added_info_[i].event_handle_);
00932
00933
00934 --this->handles_to_be_deleted_;
00935 }
00936
00937
00938 else if (this->to_be_added_info_[i].suspend_entry_)
00939 {
00940 this->current_suspended_info_ [this->suspended_handles_].set (this->to_be_added_info_[i].event_handle_,
00941 this->to_be_added_info_[i]);
00942
00943 ++this->suspended_handles_;
00944
00945
00946 --this->handles_to_be_suspended_;
00947 }
00948
00949
00950 else
00951 {
00952
00953 this->current_handles_[this->max_handlep1_] = this->to_be_added_info_[i].event_handle_;
00954
00955 this->current_info_[this->max_handlep1_].set (this->to_be_added_info_[i]);
00956 ++this->max_handlep1_;
00957 }
00958
00959
00960 this->to_be_added_info_[i].reset ();
00961
00962
00963
00964 if (event_handler != 0)
00965 {
00966 int requires_reference_counting =
00967 event_handler->reference_counting_policy ().value () ==
00968 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00969
00970 event_handler->handle_close (handle, masks);
00971
00972 if (requires_reference_counting)
00973 {
00974 event_handler->remove_reference ();
00975 }
00976 }
00977 }
00978
00979
00980
00981 this->handles_to_be_added_ = 0;
00982
00983 return 0;
00984 }
00985
00986 void
00987 ACE_WFMO_Reactor_Handler_Repository::dump (void) const
00988 {
00989 #if defined (ACE_HAS_DUMP)
00990 size_t i = 0;
00991
00992 ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
00993
00994 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00995
00996 ACE_DEBUG ((LM_DEBUG,
00997 ACE_TEXT ("Max size = %d\n"),
00998 this->max_size_));
00999
01000 ACE_DEBUG ((LM_DEBUG,
01001 ACE_TEXT ("Current info table\n\n")));
01002 ACE_DEBUG ((LM_DEBUG,
01003 ACE_TEXT ("\tSize = %d\n"),
01004 this->max_handlep1_));
01005 ACE_DEBUG ((LM_DEBUG,
01006 ACE_TEXT ("\tHandles to be suspended = %d\n"),
01007 this->handles_to_be_suspended_));
01008
01009 for (i = 0; i < this->max_handlep1_; ++i)
01010 this->current_info_[i].dump (this->current_handles_[i]);
01011
01012 ACE_DEBUG ((LM_DEBUG,
01013 ACE_TEXT ("\n")));
01014
01015 ACE_DEBUG ((LM_DEBUG,
01016 ACE_TEXT ("To-be-added info table\n\n")));
01017 ACE_DEBUG ((LM_DEBUG,
01018 ACE_TEXT ("\tSize = %d\n"),
01019 this->handles_to_be_added_));
01020
01021 for (i = 0; i < this->handles_to_be_added_; ++i)
01022 this->to_be_added_info_[i].dump ();
01023
01024 ACE_DEBUG ((LM_DEBUG,
01025 ACE_TEXT ("\n")));
01026
01027 ACE_DEBUG ((LM_DEBUG,
01028 ACE_TEXT ("Suspended info table\n\n")));
01029 ACE_DEBUG ((LM_DEBUG,
01030 ACE_TEXT ("\tSize = %d\n"),
01031 this->suspended_handles_));
01032 ACE_DEBUG ((LM_DEBUG,
01033 ACE_TEXT ("\tHandles to be resumed = %d\n"),
01034 this->handles_to_be_resumed_));
01035
01036 for (i = 0; i < this->suspended_handles_; ++i)
01037 this->current_suspended_info_[i].dump ();
01038
01039 ACE_DEBUG ((LM_DEBUG,
01040 ACE_TEXT ("\n")));
01041
01042 ACE_DEBUG ((LM_DEBUG,
01043 ACE_TEXT ("Total handles to be deleted = %d\n"),
01044 this->handles_to_be_deleted_));
01045
01046 ACE_DEBUG ((LM_DEBUG,
01047 ACE_END_DUMP));
01048 #endif
01049 }
01050
01051
01052
01053 int
01054 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
01055 {
01056 ACE_NOTSUP_RETURN (-1);
01057 }
01058
01059 #if defined (ACE_WIN32_VC8)
01060 # pragma warning (push)
01061 # pragma warning (disable:4355)
01062 # endif
01063 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
01064 ACE_Timer_Queue *tq,
01065 ACE_Reactor_Notify *notify)
01066 : signal_handler_ (0),
01067 delete_signal_handler_ (false),
01068 timer_queue_ (0),
01069 delete_timer_queue_ (false),
01070 delete_handler_rep_ (false),
01071 notify_handler_ (0),
01072 delete_notify_handler_ (false),
01073 lock_adapter_ (lock_),
01074 handler_rep_ (*this),
01075
01076 ok_to_wait_ (1),
01077
01078 wakeup_all_threads_ (0),
01079
01080 waiting_to_change_state_ (0),
01081 active_threads_ (0),
01082 owner_ (ACE_Thread::self ()),
01083 new_owner_ (0),
01084 change_state_thread_ (0),
01085 open_for_business_ (false),
01086 deactivated_ (0)
01087 {
01088 if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
01089 ACE_ERROR ((LM_ERROR,
01090 ACE_TEXT ("%p\n"),
01091 ACE_TEXT ("WFMO_Reactor")));
01092 }
01093
01094 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
01095 int unused,
01096 ACE_Sig_Handler *sh,
01097 ACE_Timer_Queue *tq,
01098 ACE_Reactor_Notify *notify)
01099 : signal_handler_ (0),
01100 delete_signal_handler_ (false),
01101 timer_queue_ (0),
01102 delete_timer_queue_ (false),
01103 delete_handler_rep_ (false),
01104 notify_handler_ (0),
01105 delete_notify_handler_ (false),
01106 lock_adapter_ (lock_),
01107 handler_rep_ (*this),
01108
01109 ok_to_wait_ (1),
01110
01111 wakeup_all_threads_ (0),
01112
01113 waiting_to_change_state_ (0),
01114 active_threads_ (0),
01115 owner_ (ACE_Thread::self ()),
01116 new_owner_ (0),
01117 change_state_thread_ (0),
01118 open_for_business_ (false),
01119 deactivated_ (0)
01120 {
01121 ACE_UNUSED_ARG (unused);
01122
01123 if (this->open (size, 0, sh, tq, 0, notify) == -1)
01124 ACE_ERROR ((LM_ERROR,
01125 ACE_TEXT ("%p\n"),
01126 ACE_TEXT ("WFMO_Reactor")));
01127 }
01128 #if defined (ACE_WIN32_VC8)
01129 # pragma warning (pop)
01130 #endif
01131
01132 int
01133 ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &)
01134 {
01135 return -1;
01136 }
01137
01138 int
01139 ACE_WFMO_Reactor::open (size_t size,
01140 int unused,
01141 ACE_Sig_Handler *sh,
01142 ACE_Timer_Queue *tq,
01143 int disable_notify_pipe,
01144 ACE_Reactor_Notify *notify)
01145 {
01146 ACE_UNUSED_ARG (unused);
01147 ACE_UNUSED_ARG (disable_notify_pipe);
01148
01149
01150 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01151
01152
01153 if (this->open_for_business_)
01154 return -1;
01155
01156
01157 if (this->delete_timer_queue_)
01158 delete this->timer_queue_;
01159
01160 if (tq == 0)
01161 {
01162 ACE_NEW_RETURN (this->timer_queue_,
01163 ACE_Timer_Heap,
01164 -1);
01165 this->delete_timer_queue_ = true;
01166 }
01167 else
01168 {
01169 this->timer_queue_ = tq;
01170 this->delete_timer_queue_ = false;
01171 }
01172
01173
01174 if (this->delete_signal_handler_)
01175 delete this->signal_handler_;
01176
01177 if (sh == 0)
01178 {
01179 ACE_NEW_RETURN (this->signal_handler_,
01180 ACE_Sig_Handler,
01181 -1);
01182 this->delete_signal_handler_ = true;
01183 }
01184 else
01185 {
01186 this->signal_handler_ = sh;
01187 this->delete_signal_handler_ = false;
01188 }
01189
01190
01191 this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
01192 this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
01193
01194
01195 if (this->delete_handler_rep_)
01196 {
01197 if (this->handler_rep_.changes_required ())
01198 {
01199
01200 this->handler_rep_.make_changes ();
01201
01202
01203 this->wakeup_all_threads_.reset ();
01204 }
01205
01206 this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository ();
01207 }
01208
01209
01210
01211 if (this->handler_rep_.open (size + 2) == -1)
01212 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
01213 ACE_TEXT ("opening handler repository")),
01214 -1);
01215 else
01216 this->delete_handler_rep_ = true;
01217
01218 if (this->notify_handler_ != 0 && this->delete_notify_handler_)
01219 delete this->notify_handler_;
01220
01221 this->notify_handler_ = notify;
01222
01223 if (this->notify_handler_ == 0)
01224 {
01225 ACE_NEW_RETURN (this->notify_handler_,
01226 ACE_WFMO_Reactor_Notify,
01227 -1);
01228
01229 if (this->notify_handler_ == 0)
01230 return -1;
01231 else
01232 this->delete_notify_handler_ = true;
01233 }
01234
01235
01236
01237
01238
01239 if (this->notify_handler_->open (this, this->timer_queue_) == -1)
01240 ACE_ERROR_RETURN ((LM_ERROR,
01241 ACE_TEXT ("%p\n"),
01242 ACE_TEXT ("opening notify handler ")),
01243 -1);
01244
01245
01246 if (this->register_handler (&this->wakeup_all_threads_handler_,
01247 this->wakeup_all_threads_.handle ()) == -1)
01248 ACE_ERROR_RETURN ((LM_ERROR,
01249 ACE_TEXT ("%p\n"),
01250 ACE_TEXT ("registering thread wakeup handler")),
01251 -1);
01252
01253
01254
01255 if (this->handler_rep_.changes_required ())
01256 {
01257
01258 this->handler_rep_.make_changes ();
01259
01260
01261 this->wakeup_all_threads_.reset ();
01262 }
01263
01264
01265 this->open_for_business_ = true;
01266
01267 return 0;
01268 }
01269
01270 int
01271 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
01272 {
01273 if (this->signal_handler_ != 0 && this->delete_signal_handler_)
01274 delete this->signal_handler_;
01275 this->signal_handler_ = signal_handler;
01276 this->delete_signal_handler_ = false;
01277 return 0;
01278 }
01279
01280 ACE_Timer_Queue *
01281 ACE_WFMO_Reactor::timer_queue (void) const
01282 {
01283 return this->timer_queue_;
01284 }
01285
01286 int
01287 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue *tq)
01288 {
01289 if (this->timer_queue_ != 0 && this->delete_timer_queue_)
01290 delete this->timer_queue_;
01291 this->timer_queue_ = tq;
01292 this->delete_timer_queue_ = false;
01293 return 0;
01294 }
01295
01296 int
01297 ACE_WFMO_Reactor::close (void)
01298 {
01299
01300 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01301
01302
01303 if (!this->open_for_business_)
01304 return -1;
01305
01306
01307 this->open_for_business_ = false;
01308
01309 this->handler_rep_.close ();
01310
01311 return 0;
01312 }
01313
01314 ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void)
01315 {
01316
01317
01318
01319
01320 this->close ();
01321
01322
01323
01324 this->handler_rep_.make_changes ();
01325
01326 if (this->delete_timer_queue_)
01327 {
01328 delete this->timer_queue_;
01329 this->timer_queue_ = 0;
01330 this->delete_timer_queue_ = false;
01331 }
01332
01333 if (this->delete_signal_handler_)
01334 {
01335 delete this->signal_handler_;
01336 this->signal_handler_ = 0;
01337 this->delete_signal_handler_ = false;
01338 }
01339
01340 if (this->delete_notify_handler_)
01341 {
01342 delete this->notify_handler_;
01343 this->notify_handler_ = 0;
01344 this->delete_notify_handler_ = false;
01345 }
01346 }
01347
01348 int
01349 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
01350 ACE_HANDLE io_handle,
01351 ACE_Event_Handler *event_handler,
01352 ACE_Reactor_Mask new_masks)
01353 {
01354
01355
01356
01357
01358 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
01359
01360 ACE_UNUSED_ARG (event_handle);
01361 ACE_UNUSED_ARG (io_handle);
01362 ACE_UNUSED_ARG (event_handler);
01363 ACE_UNUSED_ARG (new_masks);
01364 ACE_NOTSUP_RETURN (-1);
01365
01366 #else
01367
01368
01369 if (io_handle == ACE_INVALID_HANDLE)
01370 io_handle = event_handler->get_handle ();
01371
01372 if (this->handler_rep_.invalid_handle (io_handle))
01373 {
01374 errno = ERROR_INVALID_HANDLE;
01375 return -1;
01376 }
01377
01378 long new_network_events = 0;
01379 bool delete_event = false;
01380 auto_ptr <ACE_Auto_Event> event;
01381
01382
01383
01384 ACE_Reactor_Mask old_masks;
01385 int found = this->handler_rep_.modify_network_events_i (io_handle,
01386 new_masks,
01387 old_masks,
01388 new_network_events,
01389 event_handle,
01390 delete_event,
01391 ACE_Reactor::ADD_MASK);
01392
01393
01394
01395 if (event_handle == ACE_INVALID_HANDLE)
01396 {
01397
01398
01399 auto_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
01400 event = tmp;
01401 event_handle = event->handle ();
01402 delete_event = true;
01403 }
01404
01405 int result = ::WSAEventSelect ((SOCKET) io_handle,
01406 event_handle,
01407 new_network_events);
01408
01409 if (found)
01410 return result;
01411 else if (result != SOCKET_ERROR &&
01412 this->handler_rep_.bind_i (1,
01413 event_handler,
01414 new_network_events,
01415 io_handle,
01416 event_handle,
01417 delete_event) != -1)
01418 {
01419
01420
01421 if (delete_event)
01422 {
01423
01424
01425
01426
01427
01428
01429
01430
01431 ACE_Errno_Guard guard (errno);
01432 event->handle (ACE_INVALID_HANDLE);
01433 event->remove ();
01434 }
01435 return 0;
01436 }
01437 else
01438 return -1;
01439
01440 #endif
01441
01442 }
01443
01444 int
01445 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle,
01446 ACE_Reactor_Mask new_masks,
01447 int operation)
01448 {
01449
01450 if (this->handler_rep_.invalid_handle (io_handle))
01451 return -1;
01452
01453 long new_network_events = 0;
01454 bool delete_event = false;
01455 ACE_HANDLE event_handle = ACE_INVALID_HANDLE;
01456
01457
01458
01459 ACE_Reactor_Mask old_masks;
01460 int found = this->handler_rep_.modify_network_events_i (io_handle,
01461 new_masks,
01462 old_masks,
01463 new_network_events,
01464 event_handle,
01465 delete_event,
01466 operation);
01467 if (found)
01468 {
01469 int result = ::WSAEventSelect ((SOCKET) io_handle,
01470 event_handle,
01471 new_network_events);
01472 if (result == 0)
01473 return old_masks;
01474 else
01475 return result;
01476 }
01477 else
01478 return -1;
01479 }
01480
01481
01482
01483 int
01484 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle,
01485 ACE_Reactor_Mask new_masks,
01486 ACE_Reactor_Mask &old_masks,
01487 long &new_network_events,
01488 ACE_HANDLE &event_handle,
01489 bool &delete_event,
01490 int operation)
01491 {
01492 long *modified_network_events = &new_network_events;
01493 int found = 0;
01494 size_t i;
01495
01496
01497
01498
01499
01500 for (i = 0; i < this->max_handlep1_ && !found; ++i)
01501 if (io_handle == this->current_info_[i].io_handle_ &&
01502 !this->current_info_[i].delete_entry_)
01503 {
01504 found = 1;
01505 modified_network_events = &this->current_info_[i].network_events_;
01506 delete_event = this->current_info_[i].delete_event_;
01507 event_handle = this->current_handles_[i];
01508 }
01509
01510
01511
01512
01513
01514 for (i = 0; i < this->suspended_handles_ && !found; ++i)
01515 if (io_handle == this->current_suspended_info_[i].io_handle_ &&
01516 !this->current_suspended_info_[i].delete_entry_)
01517 {
01518 found = 1;
01519 modified_network_events = &this->current_suspended_info_[i].network_events_;
01520 delete_event = this->current_suspended_info_[i].delete_event_;
01521 event_handle = this->current_suspended_info_[i].event_handle_;
01522 }
01523
01524
01525
01526
01527
01528 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01529 if (io_handle == this->to_be_added_info_[i].io_handle_ &&
01530 !this->to_be_added_info_[i].delete_entry_)
01531 {
01532 found = 1;
01533 modified_network_events = &this->to_be_added_info_[i].network_events_;
01534 delete_event = this->to_be_added_info_[i].delete_event_;
01535 event_handle = this->to_be_added_info_[i].event_handle_;
01536 }
01537
01538 old_masks = this->bit_ops (*modified_network_events,
01539 new_masks,
01540 operation);
01541
01542 new_network_events = *modified_network_events;
01543
01544 return found;
01545 }
01546
01547 ACE_Event_Handler *
01548 ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle)
01549 {
01550 long existing_masks_ignored = 0;
01551 return this->handler (handle, existing_masks_ignored);
01552 }
01553
01554 ACE_Event_Handler *
01555 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01556 long &existing_masks)
01557 {
01558 int found = 0;
01559 size_t i = 0;
01560 ACE_Event_Handler *event_handler = 0;
01561 existing_masks = 0;
01562
01563
01564
01565
01566
01567
01568
01569 for (i = 0; i < this->max_handlep1_ && !found; ++i)
01570 if ((handle == this->current_info_[i].io_handle_ ||
01571 handle == this->current_handles_[i]) &&
01572 !this->current_info_[i].delete_entry_)
01573 {
01574 found = 1;
01575 event_handler = this->current_info_[i].event_handler_;
01576 existing_masks = this->current_info_[i].network_events_;
01577 }
01578
01579
01580
01581
01582
01583 for (i = 0; i < this->suspended_handles_ && !found; ++i)
01584 if ((handle == this->current_suspended_info_[i].io_handle_ ||
01585 handle == this->current_suspended_info_[i].event_handle_) &&
01586 !this->current_suspended_info_[i].delete_entry_)
01587 {
01588 found = 1;
01589 event_handler = this->current_suspended_info_[i].event_handler_;
01590 existing_masks = this->current_suspended_info_[i].network_events_;
01591 }
01592
01593
01594
01595
01596
01597 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01598 if ((handle == this->to_be_added_info_[i].io_handle_ ||
01599 handle == this->to_be_added_info_[i].event_handle_) &&
01600 !this->to_be_added_info_[i].delete_entry_)
01601 {
01602 found = 1;
01603 event_handler = this->to_be_added_info_[i].event_handler_;
01604 existing_masks = this->to_be_added_info_[i].network_events_;
01605 }
01606
01607 if (event_handler)
01608 event_handler->add_reference ();
01609
01610 return event_handler;
01611 }
01612
01613 int
01614 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01615 ACE_Reactor_Mask user_masks,
01616 ACE_Event_Handler **user_event_handler)
01617 {
01618 long existing_masks = 0;
01619 int found = 0;
01620
01621 ACE_Event_Handler_var safe_event_handler =
01622 this->handler (handle,
01623 existing_masks);
01624
01625 if (safe_event_handler.handler ())
01626 found = 1;
01627
01628 if (!found)
01629 return -1;
01630
01631
01632
01633 if (found &&
01634 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
01635 if (!ACE_BIT_ENABLED (existing_masks, FD_READ) &&
01636 !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
01637 found = 0;
01638
01639 if (found &&
01640 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::WRITE_MASK))
01641 if (!ACE_BIT_ENABLED (existing_masks, FD_WRITE))
01642 found = 0;
01643
01644 if (found &&
01645 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::EXCEPT_MASK))
01646 if (!ACE_BIT_ENABLED (existing_masks, FD_OOB))
01647 found = 0;
01648
01649 if (found &&
01650 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::ACCEPT_MASK))
01651 if (!ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
01652 found = 0;
01653
01654 if (found &&
01655 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::CONNECT_MASK))
01656 if (!ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
01657 found = 0;
01658
01659 if (found &&
01660 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::QOS_MASK))
01661 if (!ACE_BIT_ENABLED (existing_masks, FD_QOS))
01662 found = 0;
01663
01664 if (found &&
01665 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::GROUP_QOS_MASK))
01666 if (!ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
01667 found = 0;
01668
01669 if (found &&
01670 user_event_handler)
01671 *user_event_handler = safe_event_handler.release ();
01672
01673 if (found)
01674 return 0;
01675 else
01676 return -1;
01677 }
01678
01679
01680
01681 int
01682 ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
01683 int alertable)
01684 {
01685 ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
01686
01687
01688 if (!this->open_for_business_ || this->deactivated_)
01689 return -1;
01690
01691
01692
01693
01694 ACE_Countdown_Time countdown (max_wait_time);
01695
01696 int result;
01697 do
01698 {
01699
01700
01701
01702 result = this->ok_to_wait (max_wait_time, alertable);
01703 if (result != 1)
01704 return result;
01705
01706
01707 ++this->active_threads_;
01708
01709
01710 this->lock_.release ();
01711
01712
01713
01714 countdown.update ();
01715
01716
01717 int timeout = this->calculate_timeout (max_wait_time);
01718
01719
01720 DWORD wait_status = this->wait_for_multiple_events (timeout,
01721 alertable);
01722
01723
01724 result = this->safe_dispatch (wait_status);
01725 if (0 == result)
01726 {
01727
01728
01729
01730
01731
01732
01733
01734
01735 countdown.update ();
01736 if (0 == max_wait_time || max_wait_time->usec () == 0)
01737 break;
01738 }
01739 }
01740 while (result == 0);
01741
01742 return result;
01743 }
01744
01745 int
01746 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value *max_wait_time,
01747 int alertable)
01748 {
01749
01750
01751
01752
01753
01754
01755
01756
01757
01758
01759
01760
01761 #if defined (ACE_HAS_WINCE)
01762 ACE_Time_Value timeout = ACE_OS::gettimeofday ();
01763 if (max_wait_time != 0)
01764 timeout += *max_wait_time;
01765 while (1)
01766 {
01767 int status;
01768 if (max_wait_time == 0)
01769 status = this->ok_to_wait_.wait ();
01770 else
01771 status = this->ok_to_wait_.wait (&timeout);
01772 if (status == -1)
01773 return -1;
01774
01775
01776 if (max_wait_time == 0)
01777 status = this->lock_.acquire ();
01778 else
01779 status = this->lock_.acquire (timeout);
01780 if (status == -1)
01781 return -1;
01782
01783
01784
01785 ACE_Time_Value poll_it = ACE_OS::gettimeofday ();
01786 if (this->ok_to_wait_.wait (&poll_it) == 0)
01787 break;
01788 this->lock_.release ();
01789 }
01790 return 1;
01791
01792 #else
01793 int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
01794 DWORD result = 0;
01795 while (1)
01796 {
01797 # if defined (ACE_HAS_PHARLAP)
01798
01799
01800 result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01801 this->atomic_wait_array_,
01802 TRUE,
01803 timeout);
01804
01805 if (result != WAIT_IO_COMPLETION)
01806 break;
01807
01808 # else
01809 result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01810 this->atomic_wait_array_,
01811 TRUE,
01812 timeout,
01813 alertable);
01814
01815 if (result != WAIT_IO_COMPLETION)
01816 break;
01817
01818 # endif
01819 }
01820
01821 switch (result)
01822 {
01823 case WAIT_TIMEOUT:
01824 errno = ETIME;
01825 return 0;
01826 case WAIT_FAILED:
01827 case WAIT_ABANDONED_0:
01828 ACE_OS::set_errno_to_last_error ();
01829 return -1;
01830 default:
01831 break;
01832 }
01833
01834
01835 return 1;
01836 #endif
01837 }
01838
01839 DWORD
01840 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
01841 int alertable)
01842 {
01843
01844
01845
01846
01847 #if defined (ACE_HAS_PHARLAP) || defined (ACE_HAS_WINCE)
01848
01849
01850 ACE_UNUSED_ARG (alertable);
01851 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 (),
01852 this->handler_rep_.handles (),
01853 FALSE,
01854 timeout);
01855 #else
01856 return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
01857 this->handler_rep_.handles (),
01858 FALSE,
01859 timeout,
01860 alertable);
01861 #endif
01862 }
01863
01864 DWORD
01865 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot)
01866 {
01867 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - slot,
01868 this->handler_rep_.handles () + slot,
01869 FALSE,
01870 0);
01871 }
01872
01873 int
01874 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value *max_wait_time)
01875 {
01876 ACE_Time_Value *time = 0;
01877 if (this->owner_ == ACE_Thread::self ())
01878 time = this->timer_queue_->calculate_timeout (max_wait_time);
01879 else
01880 time = max_wait_time;
01881
01882 if (time == 0)
01883 return INFINITE;
01884 else
01885 return time->msec ();
01886 }
01887
01888
01889 int
01890 ACE_WFMO_Reactor::expire_timers (void)
01891 {
01892
01893 if (ACE_Thread::self () == this->owner_)
01894
01895 return this->timer_queue_->expire ();
01896
01897 else
01898
01899 return 0;
01900 }
01901
01902 int
01903 ACE_WFMO_Reactor::dispatch (DWORD wait_status)
01904 {
01905 int handlers_dispatched = 0;
01906
01907
01908 handlers_dispatched += this->expire_timers ();
01909
01910 switch (wait_status)
01911 {
01912 case WAIT_FAILED:
01913 ACE_OS::set_errno_to_last_error ();
01914 return -1;
01915
01916 case WAIT_TIMEOUT:
01917 errno = ETIME;
01918 return handlers_dispatched;
01919
01920 #ifndef ACE_HAS_WINCE
01921 case WAIT_IO_COMPLETION:
01922 return handlers_dispatched;
01923 #endif // ACE_HAS_WINCE
01924
01925 default:
01926
01927 handlers_dispatched += this->dispatch_handles (wait_status);
01928 return handlers_dispatched;
01929 }
01930 }
01931
01932
01933
01934
01935 int
01936 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
01937 {
01938
01939
01940 DWORD dispatch_slot = 0;
01941
01942
01943 DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
01944
01945
01946
01947 DWORD nCount = max_handlep1;
01948
01949 for (int number_of_handlers_dispatched = 1;
01950 ;
01951 ++number_of_handlers_dispatched)
01952 {
01953 const bool ok = (
01954 #if ! defined(__BORLANDC__) \
01955 && !defined (ghs) \
01956 && !defined (__MINGW32__) \
01957 && !(defined (_MSC_VER) && _MSC_VER >= 1300)
01958
01959
01960
01961 wait_status >= WAIT_OBJECT_0 &&
01962 #endif
01963 wait_status <= (WAIT_OBJECT_0 + nCount));
01964
01965 if (ok)
01966 dispatch_slot += wait_status - WAIT_OBJECT_0;
01967 else
01968
01969 dispatch_slot += wait_status - WAIT_ABANDONED_0;
01970
01971
01972 if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
01973 return -1;
01974
01975
01976 ++dispatch_slot;
01977
01978
01979 if (dispatch_slot >= max_handlep1)
01980 return number_of_handlers_dispatched;
01981
01982
01983 nCount = max_handlep1 - dispatch_slot;
01984
01985
01986 wait_status = this->poll_remaining_handles (dispatch_slot);
01987 switch (wait_status)
01988 {
01989 case WAIT_FAILED:
01990 ACE_OS::set_errno_to_last_error ();
01991
01992 case WAIT_TIMEOUT:
01993
01994 return number_of_handlers_dispatched;
01995 }
01996 }
01997 }
01998
01999 int
02000 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
02001 DWORD max_handlep1)
02002 {
02003
02004 if (slot == max_handlep1)
02005 return this->dispatch_window_messages ();
02006
02007
02008
02009
02010
02011
02012
02013 else if (!this->handler_rep_.scheduled_for_deletion (slot))
02014 {
02015 ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
02016
02017 if (this->handler_rep_.current_info ()[slot].io_entry_)
02018 return this->complex_dispatch_handler (slot,
02019 event_handle);
02020 else
02021 return this->simple_dispatch_handler (slot,
02022 event_handle);
02023 }
02024 else
02025
02026 return 0;
02027 }
02028
02029 int
02030 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
02031 ACE_HANDLE event_handle)
02032 {
02033
02034
02035
02036
02037 siginfo_t sig (event_handle);
02038
02039 ACE_Event_Handler *event_handler =
02040 this->handler_rep_.current_info ()[slot].event_handler_;
02041
02042 int requires_reference_counting =
02043 event_handler->reference_counting_policy ().value () ==
02044 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02045
02046 if (requires_reference_counting)
02047 {
02048 event_handler->add_reference ();
02049 }
02050
02051
02052 if (event_handler->handle_signal (0, &sig) == -1)
02053 this->handler_rep_.unbind (event_handle,
02054 ACE_Event_Handler::NULL_MASK);
02055
02056
02057 if (requires_reference_counting)
02058 {
02059 event_handler->remove_reference ();
02060 }
02061
02062 return 0;
02063 }
02064
02065 int
02066 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
02067 ACE_HANDLE event_handle)
02068 {
02069
02070
02071 ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
02072 this->handler_rep_.current_info ()[slot];
02073
02074 WSANETWORKEVENTS events;
02075 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02076 if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
02077 event_handle,
02078 &events) == SOCKET_ERROR)
02079 problems = ACE_Event_Handler::ALL_EVENTS_MASK;
02080 else
02081 {
02082
02083
02084
02085
02086
02087
02088
02089
02090
02091
02092
02093 events.lNetworkEvents &= current_info.network_events_;
02094 while (events.lNetworkEvents != 0)
02095 {
02096 ACE_Event_Handler *event_handler =
02097 current_info.event_handler_;
02098
02099 int reference_counting_required =
02100 event_handler->reference_counting_policy ().value () ==
02101 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02102
02103
02104 if (reference_counting_required)
02105 {
02106 event_handler->add_reference ();
02107 }
02108
02109
02110 problems |= this->upcall (current_info.event_handler_,
02111 current_info.io_handle_,
02112 events);
02113
02114
02115 if (reference_counting_required)
02116 {
02117 event_handler->remove_reference ();
02118 }
02119
02120 if (this->handler_rep_.scheduled_for_deletion (slot))
02121 break;
02122 }
02123 }
02124
02125 if (problems != ACE_Event_Handler::NULL_MASK
02126 && !this->handler_rep_.scheduled_for_deletion (slot) )
02127 this->handler_rep_.unbind (event_handle, problems);
02128
02129 return 0;
02130 }
02131
02132 ACE_Reactor_Mask
02133 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
02134 ACE_HANDLE io_handle,
02135 WSANETWORKEVENTS &events)
02136 {
02137
02138
02139 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02140
02141
02142
02143
02144
02145 long actual_events = events.lNetworkEvents;
02146 int action;
02147
02148 if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
02149 {
02150 action = event_handler->handle_output (io_handle);
02151 if (action <= 0)
02152 {
02153 ACE_CLR_BITS (actual_events, FD_WRITE);
02154 if (action == -1)
02155 ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
02156 }
02157 }
02158
02159 if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
02160 {
02161 if (events.iErrorCode[FD_CONNECT_BIT] == 0)
02162 {
02163
02164 action = event_handler->handle_output (io_handle);
02165 if (action <= 0)
02166 {
02167 ACE_CLR_BITS (actual_events, FD_CONNECT);
02168 if (action == -1)
02169 ACE_SET_BITS (problems,
02170 ACE_Event_Handler::CONNECT_MASK);
02171 }
02172 }
02173
02174 else
02175 {
02176 action = event_handler->handle_input (io_handle);
02177 if (action <= 0)
02178 {
02179 ACE_CLR_BITS (actual_events, FD_CONNECT);
02180 if (action == -1)
02181 ACE_SET_BITS (problems,
02182 ACE_Event_Handler::CONNECT_MASK);
02183 }
02184 }
02185 }
02186
02187 if (ACE_BIT_ENABLED (actual_events, FD_OOB))
02188 {
02189 action = event_handler->handle_exception (io_handle);
02190 if (action <= 0)
02191 {
02192 ACE_CLR_BITS (actual_events, FD_OOB);
02193 if (action == -1)
02194 ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
02195 }
02196 }
02197
02198 if (ACE_BIT_ENABLED (actual_events, FD_READ))
02199 {
02200 action = event_handler->handle_input (io_handle);
02201 if (action <= 0)
02202 {
02203 ACE_CLR_BITS (actual_events, FD_READ);
02204 if (action == -1)
02205 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02206 }
02207 }
02208
02209 if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
02210 && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
02211 {
02212 action = event_handler->handle_input (io_handle);
02213 if (action <= 0)
02214 {
02215 ACE_CLR_BITS (actual_events, FD_CLOSE);
02216 if (action == -1)
02217 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02218 }
02219 }
02220
02221 if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
02222 {
02223 action = event_handler->handle_input (io_handle);
02224 if (action <= 0)
02225 {
02226 ACE_CLR_BITS (actual_events, FD_ACCEPT);
02227 if (action == -1)
02228 ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
02229 }
02230 }
02231
02232 if (ACE_BIT_ENABLED (actual_events, FD_QOS))
02233 {
02234 action = event_handler->handle_qos (io_handle);
02235 if (action <= 0)
02236 {
02237 ACE_CLR_BITS (actual_events, FD_QOS);
02238 if (action == -1)
02239 ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
02240 }
02241 }
02242
02243 if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
02244 {
02245 action = event_handler->handle_group_qos (io_handle);
02246 if (action <= 0)
02247 {
02248 ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
02249 if (action == -1)
02250 ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
02251 }
02252 }
02253
02254 events.lNetworkEvents = actual_events;
02255 return problems;
02256 }
02257
02258
02259 int
02260 ACE_WFMO_Reactor::update_state (void)
02261 {
02262
02263 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02264
02265
02266 --this->active_threads_;
02267
02268
02269
02270 if (this->handler_rep_.changes_required () || this->new_owner ())
02271 {
02272 if (this->change_state_thread_ == 0)
02273
02274
02275 {
02276 this->change_state_thread_ = ACE_Thread::self ();
02277
02278 this->ok_to_wait_.reset ();
02279
02280 if (this->active_threads_ > 0)
02281
02282 {
02283
02284 this->wakeup_all_threads_.signal ();
02285
02286 monitor.release ();
02287
02288 this->waiting_to_change_state_.wait ();
02289
02290 monitor.acquire ();
02291 }
02292
02293
02294
02295
02296 while (this->handler_rep_.changes_required ())
02297
02298 this->handler_rep_.make_changes ();
02299 if (this->new_owner ())
02300
02301 this->change_owner ();
02302
02303 this->wakeup_all_threads_.reset ();
02304
02305 this->ok_to_wait_.signal ();
02306
02307 this->change_state_thread_ = 0;
02308 }
02309 else if (this->active_threads_ == 0)
02310
02311
02312
02313 this->waiting_to_change_state_.signal ();
02314 }
02315
02316
02317 else if (this->active_threads_ == 0)
02318
02319 this->wakeup_all_threads_.reset ();
02320
02321 return 0;
02322 }
02323
02324 void
02325 ACE_WFMO_Reactor::dump (void) const
02326 {
02327 #if defined (ACE_HAS_DUMP)
02328 ACE_TRACE ("ACE_WFMO_Reactor::dump");
02329
02330 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02331
02332 ACE_DEBUG ((LM_DEBUG,
02333 ACE_TEXT ("Count of currently active threads = %d\n"),
02334 this->active_threads_));
02335
02336 ACE_DEBUG ((LM_DEBUG,
02337 ACE_TEXT ("ID of owner thread = %d\n"),
02338 this->owner_));
02339
02340 this->handler_rep_.dump ();
02341 this->signal_handler_->dump ();
02342 this->timer_queue_->dump ();
02343
02344 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02345 #endif
02346 }
02347
02348 int
02349 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & ,
02350 ACE_Handle_Set & )
02351 {
02352 return -1;
02353 }
02354
02355 int
02356 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer & )
02357 {
02358 return 0;
02359 }
02360
02361 ACE_HANDLE
02362 ACE_WFMO_Reactor_Notify::notify_handle (void)
02363 {
02364 return ACE_INVALID_HANDLE;
02365 }
02366
02367 int
02368 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE ,
02369 ACE_Notification_Buffer &)
02370 {
02371 return 0;
02372 }
02373
02374 int
02375 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &)
02376 {
02377 return 0;
02378 }
02379
02380 int
02381 ACE_WFMO_Reactor_Notify::close (void)
02382 {
02383 return -1;
02384 }
02385
02386 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
02387 : timer_queue_ (0),
02388 message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
02389 max_notifies * sizeof (ACE_Notification_Buffer)),
02390 max_notify_iterations_ (-1)
02391 {
02392 }
02393
02394 int
02395 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor,
02396 ACE_Timer_Queue *timer_queue,
02397 int ignore_notify)
02398 {
02399 ACE_UNUSED_ARG (ignore_notify);
02400 timer_queue_ = timer_queue;
02401 return wfmo_reactor->register_handler (this);
02402 }
02403
02404 ACE_HANDLE
02405 ACE_WFMO_Reactor_Notify::get_handle (void) const
02406 {
02407 return this->wakeup_one_thread_.handle ();
02408 }
02409
02410
02411
02412 int
02413 ACE_WFMO_Reactor_Notify::handle_signal (int signum,
02414 siginfo_t *siginfo,
02415 ucontext_t *)
02416 {
02417 ACE_UNUSED_ARG (signum);
02418
02419
02420 if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
02421 return -1;
02422
02423
02424
02425
02426
02427
02428 for (int i = 1; ; ++i)
02429 {
02430 ACE_Message_Block *mb = 0;
02431
02432 ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
02433 if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
02434 {
02435 if (errno == EWOULDBLOCK)
02436
02437
02438 return 0;
02439 else
02440 return -1;
02441 }
02442 else
02443 {
02444 ACE_Notification_Buffer *buffer =
02445 reinterpret_cast <ACE_Notification_Buffer *> (mb->base ());
02446
02447
02448
02449
02450
02451 if (buffer->eh_ != 0)
02452 {
02453 ACE_Event_Handler *event_handler =
02454 buffer->eh_;
02455
02456 bool const requires_reference_counting =
02457 event_handler->reference_counting_policy ().value () ==
02458 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02459
02460 int result = 0;
02461
02462 switch (buffer->mask_)
02463 {
02464 case ACE_Event_Handler::READ_MASK:
02465 case ACE_Event_Handler::ACCEPT_MASK:
02466 result = event_handler->handle_input (ACE_INVALID_HANDLE);
02467 break;
02468 case ACE_Event_Handler::WRITE_MASK:
02469 result = event_handler->handle_output (ACE_INVALID_HANDLE);
02470 break;
02471 case ACE_Event_Handler::EXCEPT_MASK:
02472 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
02473 break;
02474 case ACE_Event_Handler::QOS_MASK:
02475 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
02476 break;
02477 case ACE_Event_Handler::GROUP_QOS_MASK:
02478 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
02479 break;
02480 default:
02481 ACE_ERROR ((LM_ERROR,
02482 ACE_TEXT ("invalid mask = %d\n"),
02483 buffer->mask_));
02484 break;
02485 }
02486
02487 if (result == -1)
02488 event_handler->handle_close (ACE_INVALID_HANDLE,
02489 ACE_Event_Handler::EXCEPT_MASK);
02490
02491 if (requires_reference_counting)
02492 {
02493 event_handler->remove_reference ();
02494 }
02495 }
02496
02497
02498
02499 mb->release ();
02500
02501
02502
02503
02504 if (i == this->max_notify_iterations_)
02505 {
02506
02507
02508 if (!this->message_queue_.is_empty ())
02509 this->wakeup_one_thread_.signal ();
02510
02511
02512 return 0;
02513 }
02514 }
02515 }
02516 }
02517
02518
02519
02520
02521
02522 int
02523 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
02524 ACE_Reactor_Mask mask,
02525 ACE_Time_Value *timeout)
02526 {
02527 if (event_handler != 0)
02528 {
02529 ACE_Message_Block *mb = 0;
02530 ACE_NEW_RETURN (mb,
02531 ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
02532 -1);
02533
02534 ACE_Notification_Buffer *buffer =
02535 (ACE_Notification_Buffer *) mb->base ();
02536 buffer->eh_ = event_handler;
02537 buffer->mask_ = mask;
02538
02539
02540
02541
02542 if (timeout != 0)
02543 *timeout += timer_queue_->gettimeofday ();
02544
02545 if (this->message_queue_.enqueue_tail
02546 (mb, timeout) == -1)
02547 {
02548 mb->release ();
02549 return -1;
02550 }
02551
02552 event_handler->add_reference ();
02553 }
02554
02555 return this->wakeup_one_thread_.signal ();
02556 }
02557
02558 void
02559 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations)
02560 {
02561 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02562
02563 if (iterations == 0)
02564 iterations = 1;
02565
02566 this->max_notify_iterations_ = iterations;
02567 }
02568
02569 int
02570 ACE_WFMO_Reactor_Notify::max_notify_iterations (void)
02571 {
02572 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02573 return this->max_notify_iterations_;
02574 }
02575
02576 int
02577 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
02578 ACE_Reactor_Mask mask)
02579 {
02580 ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
02581
02582
02583
02584
02585
02586
02587 if (this->message_queue_.is_empty ())
02588 return 0;
02589
02590
02591
02592
02593
02594
02595
02596 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
02597
02598
02599
02600 ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
02601
02602 size_t queue_size = this->message_queue_.message_count ();
02603 int number_purged = 0;
02604
02605 size_t index;
02606
02607 for (index = 0; index < queue_size; ++index)
02608 {
02609 ACE_Message_Block *mb = 0;
02610 if (-1 == this->message_queue_.dequeue_head (mb))
02611 return -1;
02612
02613 ACE_Notification_Buffer *buffer =
02614 reinterpret_cast<ACE_Notification_Buffer *> (mb->base ());
02615
02616
02617
02618
02619
02620 if ((0 != buffer->eh_) &&
02621 (0 == eh || eh == buffer->eh_) &&
02622 ACE_BIT_DISABLED (buffer->mask_, ~mask))
02623
02624
02625 {
02626 ACE_Event_Handler *event_handler = buffer->eh_;
02627
02628 event_handler->remove_reference ();
02629
02630 mb->release ();
02631 ++number_purged;
02632 }
02633 else
02634 {
02635
02636
02637
02638
02639 if ((0 != buffer->eh_) &&
02640 (0 == eh || eh == buffer->eh_))
02641 ACE_CLR_BITS(buffer->mask_, mask);
02642 if (-1 == local_queue.enqueue_head (mb))
02643 return -1;
02644 }
02645 }
02646
02647 if (this->message_queue_.message_count ())
02648 {
02649 ACE_ASSERT (0);
02650 return -1;
02651 }
02652
02653
02654
02655 queue_size = local_queue.message_count ();
02656 for (index = 0; index < queue_size; ++index)
02657 {
02658 ACE_Message_Block *mb = 0;
02659 if (-1 == local_queue.dequeue_head (mb))
02660 {
02661 ACE_ASSERT (0);
02662 return -1;
02663 }
02664
02665 if (-1 == this->message_queue_.enqueue_head (mb))
02666 {
02667 ACE_ASSERT (0);
02668 return -1;
02669 }
02670 }
02671
02672 return number_purged;
02673 }
02674
02675 void
02676 ACE_WFMO_Reactor_Notify::dump (void) const
02677 {
02678 #if defined (ACE_HAS_DUMP)
02679 ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
02680 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02681 this->timer_queue_->dump ();
02682 ACE_DEBUG ((LM_DEBUG,
02683 ACE_TEXT ("Max. iteration: %d\n"),
02684 this->max_notify_iterations_));
02685 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02686 #endif
02687 }
02688
02689 void
02690 ACE_WFMO_Reactor::max_notify_iterations (int iterations)
02691 {
02692 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02693 ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
02694
02695
02696 this->notify_handler_->max_notify_iterations (iterations);
02697 }
02698
02699 int
02700 ACE_WFMO_Reactor::max_notify_iterations (void)
02701 {
02702 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02703 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02704
02705 return this->notify_handler_->max_notify_iterations ();
02706 }
02707
02708 int
02709 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh,
02710 ACE_Reactor_Mask mask)
02711 {
02712 ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
02713 if (this->notify_handler_ == 0)
02714 return 0;
02715 else
02716 return this->notify_handler_->purge_pending_notifications (eh, mask);
02717 }
02718
02719 int
02720 ACE_WFMO_Reactor::resumable_handler (void)
02721 {
02722 ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
02723 return 0;
02724 }
02725
02726
02727
02728 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
02729 int
02730 WSAEventSelect (SOCKET ,
02731 WSAEVENT ,
02732 long )
02733 {
02734 return -1;
02735 }
02736
02737 int
02738 WSAEnumNetworkEvents (SOCKET ,
02739 WSAEVENT ,
02740 LPWSANETWORKEVENTS )
02741 {
02742 return -1;
02743 }
02744 #endif
02745
02746 ACE_END_VERSIONED_NAMESPACE_DECL
02747
02748 #endif