00001
00002
00003 #include "ace/WFMO_Reactor.h"
00004
00005 #if defined (ACE_WIN32)
00006
00007 #include "ace/Handle_Set.h"
00008 #include "ace/Timer_Heap.h"
00009 #include "ace/Thread.h"
00010 #include "ace/OS_NS_errno.h"
00011 #include "ace/Null_Condition.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/WFMO_Reactor.inl"
00015 #endif
00016
00017 ACE_RCSID(ace, WFMO_Reactor, "$Id: WFMO_Reactor.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00018
00019 #include "ace/Auto_Ptr.h"
00020
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor &wfmo_reactor)
00024 : wfmo_reactor_ (wfmo_reactor)
00025 {
00026 }
00027
00028 int
00029 ACE_WFMO_Reactor_Handler_Repository::open (size_t size)
00030 {
00031 if (size > MAXIMUM_WAIT_OBJECTS)
00032 ACE_ERROR_RETURN ((LM_ERROR,
00033 ACE_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
00034 size,
00035 MAXIMUM_WAIT_OBJECTS),
00036 -1);
00037
00038
00039 ACE_NEW_RETURN (this->current_handles_,
00040 ACE_HANDLE[size],
00041 -1);
00042 ACE_NEW_RETURN (this->current_info_,
00043 Current_Info[size],
00044 -1);
00045 ACE_NEW_RETURN (this->current_suspended_info_,
00046 Suspended_Info[size],
00047 -1);
00048 ACE_NEW_RETURN (this->to_be_added_info_,
00049 To_Be_Added_Info[size],
00050 -1);
00051
00052
00053 this->max_size_ = size;
00054 this->max_handlep1_ = 0;
00055 this->suspended_handles_ = 0;
00056 this->handles_to_be_added_ = 0;
00057 this->handles_to_be_deleted_ = 0;
00058 this->handles_to_be_suspended_ = 0;
00059 this->handles_to_be_resumed_ = 0;
00060
00061 for (size_t i = 0; i < size; ++i)
00062 this->current_handles_[i] = ACE_INVALID_HANDLE;
00063
00064 return 0;
00065 }
00066
00067 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void)
00068 {
00069
00070 delete [] this->current_handles_;
00071 delete [] this->current_info_;
00072 delete [] this->current_suspended_info_;
00073 delete [] this->to_be_added_info_;
00074 }
00075
00076 ACE_Reactor_Mask
00077 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks,
00078 ACE_Reactor_Mask change_masks,
00079 int operation)
00080 {
00081
00082
00083
00084 ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK;
00085
00086 if (ACE_BIT_ENABLED (existing_masks, FD_READ)
00087 || ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
00088 ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK);
00089
00090 if (ACE_BIT_ENABLED (existing_masks, FD_WRITE))
00091 ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK);
00092
00093 if (ACE_BIT_ENABLED (existing_masks, FD_OOB))
00094 ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK);
00095
00096 if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
00097 ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK);
00098
00099 if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
00100 ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK);
00101
00102 if (ACE_BIT_ENABLED (existing_masks, FD_QOS))
00103 ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK);
00104
00105 if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
00106 ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK);
00107
00108 switch (operation)
00109 {
00110 case ACE_Reactor::CLR_MASK:
00111
00112
00113 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00114 {
00115 ACE_CLR_BITS (existing_masks, FD_READ);
00116 ACE_CLR_BITS (existing_masks, FD_CLOSE);
00117 }
00118
00119 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00120 ACE_CLR_BITS (existing_masks, FD_WRITE);
00121
00122 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00123 ACE_CLR_BITS (existing_masks, FD_OOB);
00124
00125 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00126 ACE_CLR_BITS (existing_masks, FD_ACCEPT);
00127
00128 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00129 ACE_CLR_BITS (existing_masks, FD_CONNECT);
00130
00131 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00132 ACE_CLR_BITS (existing_masks, FD_QOS);
00133
00134 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00135 ACE_CLR_BITS (existing_masks, FD_GROUP_QOS);
00136
00137 break;
00138
00139 case ACE_Reactor::SET_MASK:
00140
00141
00142 existing_masks = 0;
00143
00144
00145 case ACE_Reactor::ADD_MASK:
00146
00147
00148
00149 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00150 {
00151 ACE_SET_BITS (existing_masks, FD_READ);
00152 ACE_SET_BITS (existing_masks, FD_CLOSE);
00153 }
00154
00155 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00156 ACE_SET_BITS (existing_masks, FD_WRITE);
00157
00158 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00159 ACE_SET_BITS (existing_masks, FD_OOB);
00160
00161 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00162 ACE_SET_BITS (existing_masks, FD_ACCEPT);
00163
00164 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00165 ACE_SET_BITS (existing_masks, FD_CONNECT);
00166
00167 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00168 ACE_SET_BITS (existing_masks, FD_QOS);
00169
00170 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00171 ACE_SET_BITS (existing_masks, FD_GROUP_QOS);
00172
00173 break;
00174
00175 case ACE_Reactor::GET_MASK:
00176
00177
00178
00179
00180 ACE_UNUSED_ARG (change_masks);
00181
00182 break;
00183 }
00184
00185 return old_masks;
00186 }
00187
00188 int
00189 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle,
00190 ACE_Reactor_Mask mask,
00191 int &changes_required)
00192 {
00193 int error = 0;
00194
00195
00196
00197 size_t const original_handle_count = this->handles_to_be_deleted_;
00198 int result = 0;
00199 size_t i;
00200
00201
00202
00203
00204
00205
00206 for (i = 0; i < this->max_handlep1_ && error == 0; ++i)
00207
00208
00209 if ((this->current_handles_[i] == handle
00210 || this->current_info_[i].io_handle_ == handle)
00211 &&
00212 !this->current_info_[i].delete_entry_)
00213 {
00214 result = this->remove_handler_i (i,
00215 mask);
00216 if (result == -1)
00217 error = 1;
00218 }
00219
00220
00221 for (i = 0; i < this->suspended_handles_ && error == 0; ++i)
00222
00223
00224 if ((this->current_suspended_info_[i].io_handle_ == handle
00225 || this->current_suspended_info_[i].event_handle_ == handle)
00226 &&
00227
00228 !this->current_suspended_info_[i].delete_entry_)
00229 {
00230 result = this->remove_suspended_handler_i (i,
00231 mask);
00232 if (result == -1)
00233 error = 1;
00234 }
00235
00236
00237 for (i = 0; i < this->handles_to_be_added_ && error == 0; ++i)
00238
00239
00240 if ((this->to_be_added_info_[i].io_handle_ == handle
00241 || this->to_be_added_info_[i].event_handle_ == handle)
00242 &&
00243
00244 !this->to_be_added_info_[i].delete_entry_)
00245 {
00246 result = this->remove_to_be_added_handler_i (i,
00247 mask);
00248 if (result == -1)
00249 error = 1;
00250 }
00251
00252
00253
00254 if (original_handle_count < this->handles_to_be_deleted_)
00255 changes_required = 1;
00256
00257 return error ? -1 : 0;
00258 }
00259
00260 int
00261 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
00262 ACE_Reactor_Mask to_be_removed_masks)
00263 {
00264
00265 if (this->current_info_[slot].io_entry_)
00266 {
00267
00268
00269 this->bit_ops (this->current_info_[slot].network_events_,
00270 to_be_removed_masks,
00271 ACE_Reactor::CLR_MASK);
00272
00273
00274
00275
00276
00277
00278 ::WSAEventSelect ((SOCKET) this->current_info_[slot].io_handle_,
00279 this->current_handles_[slot],
00280 this->current_info_[slot].network_events_);
00281 }
00282
00283 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00284
00285 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00286 else
00287
00288 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00289
00290
00291
00292 if (this->current_info_[slot].suspend_entry_)
00293 {
00294
00295 this->current_info_[slot].suspend_entry_ = 0;
00296
00297 --this->handles_to_be_suspended_;
00298 }
00299
00300
00301
00302
00303 if (this->current_info_[slot].network_events_ == 0)
00304 {
00305
00306 this->current_info_[slot].delete_entry_ = 1;
00307
00308 this->current_info_[slot].close_masks_ = to_be_removed_masks;
00309
00310 ++this->handles_to_be_deleted_;
00311 }
00312
00313
00314
00315
00316
00317
00318 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00319 {
00320 ACE_HANDLE handle = this->current_info_[slot].io_handle_;
00321 this->current_info_[slot].event_handler_->handle_close (handle,
00322 to_be_removed_masks);
00323 }
00324
00325 return 0;
00326 }
00327
00328 int
00329 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot,
00330 ACE_Reactor_Mask to_be_removed_masks)
00331 {
00332
00333 if (this->current_suspended_info_[slot].io_entry_)
00334 {
00335
00336
00337 this->bit_ops (this->current_suspended_info_[slot].network_events_,
00338 to_be_removed_masks,
00339 ACE_Reactor::CLR_MASK);
00340
00341
00342
00343
00344
00345
00346 ::WSAEventSelect ((SOCKET) this->current_suspended_info_[slot].io_handle_,
00347 this->current_suspended_info_[slot].event_handle_,
00348 this->current_suspended_info_[slot].network_events_);
00349 }
00350
00351 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00352
00353 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00354 else
00355
00356 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00357
00358
00359
00360 if (this->current_suspended_info_[slot].resume_entry_)
00361 {
00362
00363 this->current_suspended_info_[slot].resume_entry_ = 0;
00364
00365 --this->handles_to_be_resumed_;
00366 }
00367
00368
00369
00370
00371 if (this->current_suspended_info_[slot].network_events_ == 0)
00372 {
00373
00374 this->current_suspended_info_[slot].delete_entry_ = 1;
00375
00376 this->current_suspended_info_[slot].close_masks_ = to_be_removed_masks;
00377
00378 ++this->handles_to_be_deleted_;
00379 }
00380
00381
00382
00383
00384
00385 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00386 {
00387 ACE_HANDLE handle = this->current_suspended_info_[slot].io_handle_;
00388 this->current_suspended_info_[slot].event_handler_->handle_close (handle,
00389 to_be_removed_masks);
00390 }
00391
00392 return 0;
00393 }
00394
00395 int
00396 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot,
00397 ACE_Reactor_Mask to_be_removed_masks)
00398 {
00399
00400 if (this->to_be_added_info_[slot].io_entry_)
00401 {
00402
00403
00404 this->bit_ops (this->to_be_added_info_[slot].network_events_,
00405 to_be_removed_masks,
00406 ACE_Reactor::CLR_MASK);
00407
00408
00409
00410
00411
00412
00413 ::WSAEventSelect ((SOCKET) this->to_be_added_info_[slot].io_handle_,
00414 this->to_be_added_info_[slot].event_handle_,
00415 this->to_be_added_info_[slot].network_events_);
00416 }
00417
00418 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00419
00420 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00421 else
00422
00423 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00424
00425
00426
00427 if (this->to_be_added_info_[slot].suspend_entry_)
00428 {
00429
00430 this->to_be_added_info_[slot].suspend_entry_ = 0;
00431
00432 --this->handles_to_be_suspended_;
00433 }
00434
00435
00436
00437
00438 if (this->to_be_added_info_[slot].network_events_ == 0)
00439 {
00440
00441 this->to_be_added_info_[slot].delete_entry_ = 1;
00442
00443 this->to_be_added_info_[slot].close_masks_ = to_be_removed_masks;
00444
00445 ++this->handles_to_be_deleted_;
00446 }
00447
00448
00449
00450
00451
00452 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00453 {
00454 ACE_HANDLE handle = this->to_be_added_info_[slot].io_handle_;
00455 this->to_be_added_info_[slot].event_handler_->handle_close (handle,
00456 to_be_removed_masks);
00457 }
00458
00459 return 0;
00460 }
00461
00462 int
00463 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle,
00464 int &changes_required)
00465 {
00466 size_t i = 0;
00467
00468
00469
00470
00471
00472
00473 for (i = 0; i < this->max_handlep1_; ++i)
00474
00475
00476 if ((this->current_handles_[i] == handle ||
00477 this->current_info_[i].io_handle_ == handle) &&
00478
00479 !this->current_info_[i].suspend_entry_)
00480 {
00481
00482 this->current_info_[i].suspend_entry_ = 1;
00483
00484 ++this->handles_to_be_suspended_;
00485
00486 changes_required = 1;
00487 }
00488
00489
00490 for (i = 0; i < this->suspended_handles_; ++i)
00491
00492
00493 if ((this->current_suspended_info_[i].event_handle_ == handle ||
00494 this->current_suspended_info_[i].io_handle_ == handle) &&
00495
00496 this->current_suspended_info_[i].resume_entry_)
00497 {
00498
00499 this->current_suspended_info_[i].resume_entry_ = 0;
00500
00501 --this->handles_to_be_resumed_;
00502
00503 changes_required = 1;
00504 }
00505
00506
00507 for (i = 0; i < this->handles_to_be_added_; ++i)
00508
00509
00510 if ((this->to_be_added_info_[i].io_handle_ == handle ||
00511 this->to_be_added_info_[i].event_handle_ == handle) &&
00512
00513 !this->to_be_added_info_[i].suspend_entry_)
00514 {
00515
00516 this->to_be_added_info_[i].suspend_entry_ = 1;
00517
00518 ++this->handles_to_be_suspended_;
00519
00520 changes_required = 1;
00521 }
00522
00523 return 0;
00524 }
00525
00526 int
00527 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle,
00528 int &changes_required)
00529 {
00530 size_t i = 0;
00531
00532
00533
00534
00535
00536
00537 for (i = 0; i < this->max_handlep1_; ++i)
00538
00539
00540 if ((this->current_handles_[i] == handle ||
00541 this->current_info_[i].io_handle_ == handle) &&
00542
00543 this->current_info_[i].suspend_entry_)
00544 {
00545
00546 this->current_info_[i].suspend_entry_ = 0;
00547
00548 --this->handles_to_be_suspended_;
00549
00550 changes_required = 1;
00551 }
00552
00553
00554 for (i = 0; i < this->suspended_handles_; ++i)
00555
00556
00557 if ((this->current_suspended_info_[i].event_handle_ == handle ||
00558 this->current_suspended_info_[i].io_handle_ == handle) &&
00559
00560 !this->current_suspended_info_[i].resume_entry_)
00561 {
00562
00563 this->current_suspended_info_[i].resume_entry_ = 1;
00564
00565 ++this->handles_to_be_resumed_;
00566
00567 changes_required = 1;
00568 }
00569
00570
00571 for (i = 0; i < this->handles_to_be_added_; ++i)
00572
00573
00574 if ((this->to_be_added_info_[i].io_handle_ == handle ||
00575 this->to_be_added_info_[i].event_handle_ == handle) &&
00576
00577 this->to_be_added_info_[i].suspend_entry_)
00578 {
00579
00580 this->to_be_added_info_[i].suspend_entry_ = 0;
00581
00582 --this->handles_to_be_suspended_;
00583
00584 changes_required = 1;
00585 }
00586
00587 return 0;
00588 }
00589
00590 void
00591 ACE_WFMO_Reactor_Handler_Repository::unbind_all (void)
00592 {
00593 {
00594 ACE_GUARD (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_);
00595
00596 int dummy;
00597 size_t i;
00598
00599
00600 for (i = 0; i < this->max_handlep1_; ++i)
00601 this->unbind_i (this->current_handles_[i],
00602 ACE_Event_Handler::ALL_EVENTS_MASK,
00603 dummy);
00604
00605
00606 for (i = 0; i < this->suspended_handles_; ++i)
00607 this->unbind_i (this->current_suspended_info_[i].event_handle_,
00608 ACE_Event_Handler::ALL_EVENTS_MASK,
00609 dummy);
00610
00611
00612 for (i = 0; i < this->handles_to_be_added_; ++i)
00613 this->unbind_i (this->to_be_added_info_[i].event_handle_,
00614 ACE_Event_Handler::ALL_EVENTS_MASK,
00615 dummy);
00616
00617 }
00618
00619
00620
00621
00622
00623 this->wfmo_reactor_.wakeup_all_threads ();
00624 }
00625
00626 int
00627 ACE_WFMO_Reactor_Handler_Repository::bind_i (int io_entry,
00628 ACE_Event_Handler *event_handler,
00629 long network_events,
00630 ACE_HANDLE io_handle,
00631 ACE_HANDLE event_handle,
00632 int delete_event)
00633 {
00634 if (event_handler == 0)
00635 return -1;
00636
00637
00638 if (event_handle == ACE_INVALID_HANDLE)
00639 event_handle = event_handler->get_handle ();
00640 if (this->invalid_handle (event_handle))
00641 return -1;
00642
00643 size_t current_size = this->max_handlep1_ +
00644 this->handles_to_be_added_ -
00645 this->handles_to_be_deleted_ +
00646 this->suspended_handles_;
00647
00648
00649
00650
00651 if (current_size < this->max_size_ &&
00652 this->handles_to_be_added_ < this->max_size_)
00653 {
00654
00655
00656 this->to_be_added_info_[this->handles_to_be_added_].set (event_handle,
00657 io_entry,
00658 event_handler,
00659 io_handle,
00660 network_events,
00661 delete_event);
00662
00663 ++this->handles_to_be_added_;
00664
00665 event_handler->add_reference ();
00666
00667
00668
00669 this->wfmo_reactor_.wakeup_all_threads ();
00670 }
00671 else
00672 {
00673 errno = EMFILE;
00674 return -1;
00675 }
00676
00677 return 0;
00678 }
00679
00680 int
00681 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
00682 {
00683
00684
00685 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
00686 {
00687 size_t i = 0;
00688 while (i < this->max_handlep1_)
00689 {
00690
00691
00692
00693
00694 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00695 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00696 ACE_Event_Handler *event_handler = 0;
00697
00698
00699 if (this->current_info_[i].delete_entry_)
00700 {
00701
00702
00703
00704
00705
00706
00707
00708 masks = this->current_info_[i].close_masks_;
00709 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00710 {
00711
00712 if (this->current_info_[i].io_entry_)
00713 handle = this->current_info_[i].io_handle_;
00714 else
00715 handle = this->current_handles_[i];
00716
00717
00718 event_handler = this->current_info_[i].event_handler_;
00719 }
00720
00721
00722 if (this->current_info_[i].delete_event_)
00723 ACE_OS::event_destroy (&this->current_handles_[i]);
00724
00725
00726 --this->handles_to_be_deleted_;
00727 }
00728
00729
00730 else if (this->current_info_[i].suspend_entry_)
00731 {
00732 this->current_suspended_info_ [this->suspended_handles_].set (this->current_handles_[i],
00733 this->current_info_[i]);
00734
00735 ++this->suspended_handles_;
00736
00737
00738 --this->handles_to_be_suspended_;
00739 }
00740
00741
00742
00743 if (this->current_info_[i].delete_entry_ ||
00744 this->current_info_[i].suspend_entry_ )
00745 {
00746 size_t last_valid_slot = this->max_handlep1_ - 1;
00747
00748
00749 if (i < last_valid_slot)
00750
00751 {
00752
00753 this->current_info_[i] =
00754 this->current_info_[last_valid_slot];
00755 this->current_handles_[i] =
00756 this->current_handles_[last_valid_slot];
00757 }
00758
00759 this->current_info_[last_valid_slot].reset ();
00760 this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
00761 --this->max_handlep1_;
00762 }
00763 else
00764 {
00765
00766
00767
00768 ++i;
00769 }
00770
00771
00772
00773 if (event_handler != 0)
00774 {
00775 bool const requires_reference_counting =
00776 event_handler->reference_counting_policy ().value () ==
00777 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00778
00779 event_handler->handle_close (handle, masks);
00780
00781 if (requires_reference_counting)
00782 {
00783 event_handler->remove_reference ();
00784 }
00785 }
00786 }
00787 }
00788
00789 return 0;
00790 }
00791
00792 int
00793 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
00794 {
00795
00796 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
00797 {
00798 size_t i = 0;
00799 while (i < this->suspended_handles_)
00800 {
00801
00802
00803
00804
00805 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00806 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00807 ACE_Event_Handler *event_handler = 0;
00808
00809
00810 if (this->current_suspended_info_[i].delete_entry_)
00811 {
00812
00813
00814
00815
00816
00817
00818
00819 masks = this->current_suspended_info_[i].close_masks_;
00820 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00821 {
00822
00823 if (this->current_suspended_info_[i].io_entry_)
00824 handle = this->current_suspended_info_[i].io_handle_;
00825 else
00826 handle = this->current_suspended_info_[i].event_handle_;
00827
00828
00829 event_handler = this->current_suspended_info_[i].event_handler_;
00830 }
00831
00832
00833 if (this->current_suspended_info_[i].delete_event_)
00834 ACE_OS::event_destroy (&this->current_suspended_info_[i].event_handle_);
00835
00836
00837 --this->handles_to_be_deleted_;
00838 }
00839
00840 else if (this->current_suspended_info_[i].resume_entry_)
00841 {
00842
00843 this->current_handles_[this->max_handlep1_] = this->current_suspended_info_[i].event_handle_;
00844
00845 this->current_info_[this->max_handlep1_].set (this->current_suspended_info_[i]);
00846 ++this->max_handlep1_;
00847
00848
00849 --this->handles_to_be_resumed_;
00850 }
00851
00852
00853
00854
00855 if (this->current_suspended_info_[i].resume_entry_ ||
00856 this->current_suspended_info_[i].delete_entry_)
00857 {
00858 size_t last_valid_slot = this->suspended_handles_ - 1;
00859
00860
00861
00862
00863
00864 if (i < last_valid_slot)
00865
00866 this->current_suspended_info_[i] =
00867 this->current_suspended_info_[last_valid_slot];
00868 this->current_suspended_info_[last_valid_slot].reset ();
00869 --this->suspended_handles_;
00870 }
00871 else
00872 {
00873
00874
00875
00876 ++i;
00877 }
00878
00879
00880
00881 if (event_handler != 0)
00882 {
00883 int requires_reference_counting =
00884 event_handler->reference_counting_policy ().value () ==
00885 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00886
00887 event_handler->handle_close (handle, masks);
00888
00889 if (requires_reference_counting)
00890 {
00891 event_handler->remove_reference ();
00892 }
00893 }
00894 }
00895 }
00896
00897 return 0;
00898 }
00899
00900 int
00901 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void)
00902 {
00903
00904 for (size_t i = 0; i < this->handles_to_be_added_; ++i)
00905 {
00906
00907
00908
00909
00910 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00911 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00912 ACE_Event_Handler *event_handler = 0;
00913
00914
00915 if (this->to_be_added_info_[i].delete_entry_)
00916 {
00917
00918
00919
00920
00921
00922
00923
00924 masks = this->to_be_added_info_[i].close_masks_;
00925 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00926 {
00927
00928 if (this->to_be_added_info_[i].io_entry_)
00929 handle = this->to_be_added_info_[i].io_handle_;
00930 else
00931 handle = this->to_be_added_info_[i].event_handle_;
00932
00933
00934 event_handler = this->to_be_added_info_[i].event_handler_;
00935 }
00936
00937
00938 if (this->to_be_added_info_[i].delete_event_)
00939 ACE_OS::event_destroy (&this->to_be_added_info_[i].event_handle_);
00940
00941
00942 --this->handles_to_be_deleted_;
00943 }
00944
00945
00946 else if (this->to_be_added_info_[i].suspend_entry_)
00947 {
00948 this->current_suspended_info_ [this->suspended_handles_].set (this->to_be_added_info_[i].event_handle_,
00949 this->to_be_added_info_[i]);
00950
00951 ++this->suspended_handles_;
00952
00953
00954 --this->handles_to_be_suspended_;
00955 }
00956
00957
00958 else
00959 {
00960
00961 this->current_handles_[this->max_handlep1_] = this->to_be_added_info_[i].event_handle_;
00962
00963 this->current_info_[this->max_handlep1_].set (this->to_be_added_info_[i]);
00964 ++this->max_handlep1_;
00965 }
00966
00967
00968 this->to_be_added_info_[i].reset ();
00969
00970
00971
00972 if (event_handler != 0)
00973 {
00974 int requires_reference_counting =
00975 event_handler->reference_counting_policy ().value () ==
00976 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00977
00978 event_handler->handle_close (handle, masks);
00979
00980 if (requires_reference_counting)
00981 {
00982 event_handler->remove_reference ();
00983 }
00984 }
00985 }
00986
00987
00988
00989 this->handles_to_be_added_ = 0;
00990
00991 return 0;
00992 }
00993
00994 void
00995 ACE_WFMO_Reactor_Handler_Repository::dump (void) const
00996 {
00997 #if defined (ACE_HAS_DUMP)
00998 size_t i = 0;
00999
01000 ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
01001
01002 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01003
01004 ACE_DEBUG ((LM_DEBUG,
01005 ACE_TEXT ("Max size = %d\n"),
01006 this->max_size_));
01007
01008 ACE_DEBUG ((LM_DEBUG,
01009 ACE_TEXT ("Current info table\n\n")));
01010 ACE_DEBUG ((LM_DEBUG,
01011 ACE_TEXT ("\tSize = %d\n"),
01012 this->max_handlep1_));
01013 ACE_DEBUG ((LM_DEBUG,
01014 ACE_TEXT ("\tHandles to be suspended = %d\n"),
01015 this->handles_to_be_suspended_));
01016
01017 for (i = 0; i < this->max_handlep1_; ++i)
01018 this->current_info_[i].dump (this->current_handles_[i]);
01019
01020 ACE_DEBUG ((LM_DEBUG,
01021 ACE_TEXT ("\n")));
01022
01023 ACE_DEBUG ((LM_DEBUG,
01024 ACE_TEXT ("To-be-added info table\n\n")));
01025 ACE_DEBUG ((LM_DEBUG,
01026 ACE_TEXT ("\tSize = %d\n"),
01027 this->handles_to_be_added_));
01028
01029 for (i = 0; i < this->handles_to_be_added_; ++i)
01030 this->to_be_added_info_[i].dump ();
01031
01032 ACE_DEBUG ((LM_DEBUG,
01033 ACE_TEXT ("\n")));
01034
01035 ACE_DEBUG ((LM_DEBUG,
01036 ACE_TEXT ("Suspended info table\n\n")));
01037 ACE_DEBUG ((LM_DEBUG,
01038 ACE_TEXT ("\tSize = %d\n"),
01039 this->suspended_handles_));
01040 ACE_DEBUG ((LM_DEBUG,
01041 ACE_TEXT ("\tHandles to be resumed = %d\n"),
01042 this->handles_to_be_resumed_));
01043
01044 for (i = 0; i < this->suspended_handles_; ++i)
01045 this->current_suspended_info_[i].dump ();
01046
01047 ACE_DEBUG ((LM_DEBUG,
01048 ACE_TEXT ("\n")));
01049
01050 ACE_DEBUG ((LM_DEBUG,
01051 ACE_TEXT ("Total handles to be deleted = %d\n"),
01052 this->handles_to_be_deleted_));
01053
01054 ACE_DEBUG ((LM_DEBUG,
01055 ACE_END_DUMP));
01056 #endif
01057 }
01058
01059
01060
01061 int
01062 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
01063 {
01064 ACE_NOTSUP_RETURN (-1);
01065 }
01066
01067 #if defined (ACE_WIN32_VC8)
01068 # pragma warning (push)
01069 # pragma warning (disable:4355)
01070 # endif
01071 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
01072 ACE_Timer_Queue *tq,
01073 ACE_Reactor_Notify *notify)
01074 : signal_handler_ (0),
01075 delete_signal_handler_ (false),
01076 timer_queue_ (0),
01077 delete_timer_queue_ (false),
01078 delete_handler_rep_ (false),
01079 notify_handler_ (0),
01080 delete_notify_handler_ (false),
01081 lock_adapter_ (lock_),
01082 handler_rep_ (*this),
01083
01084 ok_to_wait_ (1),
01085
01086 wakeup_all_threads_ (0),
01087
01088 waiting_to_change_state_ (0),
01089 active_threads_ (0),
01090 owner_ (ACE_Thread::self ()),
01091 new_owner_ (0),
01092 change_state_thread_ (0),
01093 open_for_business_ (false),
01094 deactivated_ (0)
01095 {
01096 if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
01097 ACE_ERROR ((LM_ERROR,
01098 ACE_TEXT ("%p\n"),
01099 ACE_TEXT ("WFMO_Reactor")));
01100 }
01101
01102 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
01103 int unused,
01104 ACE_Sig_Handler *sh,
01105 ACE_Timer_Queue *tq,
01106 ACE_Reactor_Notify *notify)
01107 : signal_handler_ (0),
01108 delete_signal_handler_ (false),
01109 timer_queue_ (0),
01110 delete_timer_queue_ (false),
01111 delete_handler_rep_ (false),
01112 notify_handler_ (0),
01113 delete_notify_handler_ (false),
01114 lock_adapter_ (lock_),
01115 handler_rep_ (*this),
01116
01117 ok_to_wait_ (1),
01118
01119 wakeup_all_threads_ (0),
01120
01121 waiting_to_change_state_ (0),
01122 active_threads_ (0),
01123 owner_ (ACE_Thread::self ()),
01124 new_owner_ (0),
01125 change_state_thread_ (0),
01126 open_for_business_ (false),
01127 deactivated_ (0)
01128 {
01129 ACE_UNUSED_ARG (unused);
01130
01131 if (this->open (size, 0, sh, tq, 0, notify) == -1)
01132 ACE_ERROR ((LM_ERROR,
01133 ACE_TEXT ("%p\n"),
01134 ACE_TEXT ("WFMO_Reactor")));
01135 }
01136 #if defined (ACE_WIN32_VC8)
01137 # pragma warning (pop)
01138 #endif
01139
01140 int
01141 ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &)
01142 {
01143 return -1;
01144 }
01145
01146 int
01147 ACE_WFMO_Reactor::open (size_t size,
01148 int unused,
01149 ACE_Sig_Handler *sh,
01150 ACE_Timer_Queue *tq,
01151 int disable_notify_pipe,
01152 ACE_Reactor_Notify *notify)
01153 {
01154 ACE_UNUSED_ARG (unused);
01155 ACE_UNUSED_ARG (disable_notify_pipe);
01156
01157
01158 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01159
01160
01161 if (this->open_for_business_)
01162 return -1;
01163
01164
01165 if (this->delete_timer_queue_)
01166 delete this->timer_queue_;
01167
01168 if (tq == 0)
01169 {
01170 ACE_NEW_RETURN (this->timer_queue_,
01171 ACE_Timer_Heap,
01172 -1);
01173 this->delete_timer_queue_ = true;
01174 }
01175 else
01176 {
01177 this->timer_queue_ = tq;
01178 this->delete_timer_queue_ = false;
01179 }
01180
01181
01182 if (this->delete_signal_handler_)
01183 delete this->signal_handler_;
01184
01185 if (sh == 0)
01186 {
01187 ACE_NEW_RETURN (this->signal_handler_,
01188 ACE_Sig_Handler,
01189 -1);
01190 this->delete_signal_handler_ = true;
01191 }
01192 else
01193 {
01194 this->signal_handler_ = sh;
01195 this->delete_signal_handler_ = false;
01196 }
01197
01198
01199 this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
01200 this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
01201
01202
01203 if (this->delete_handler_rep_)
01204 {
01205 if (this->handler_rep_.changes_required ())
01206 {
01207
01208 this->handler_rep_.make_changes ();
01209
01210
01211 this->wakeup_all_threads_.reset ();
01212 }
01213
01214 this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository ();
01215 }
01216
01217
01218
01219 if (this->handler_rep_.open (size + 2) == -1)
01220 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
01221 ACE_TEXT ("opening handler repository")),
01222 -1);
01223 else
01224 this->delete_handler_rep_ = true;
01225
01226 if (this->notify_handler_ != 0 && this->delete_notify_handler_)
01227 delete this->notify_handler_;
01228
01229 this->notify_handler_ = notify;
01230
01231 if (this->notify_handler_ == 0)
01232 {
01233 ACE_NEW_RETURN (this->notify_handler_,
01234 ACE_WFMO_Reactor_Notify,
01235 -1);
01236
01237 if (this->notify_handler_ == 0)
01238 return -1;
01239 else
01240 this->delete_notify_handler_ = true;
01241 }
01242
01243
01244
01245
01246
01247 if (this->notify_handler_->open (this, this->timer_queue_) == -1)
01248 ACE_ERROR_RETURN ((LM_ERROR,
01249 ACE_TEXT ("%p\n"),
01250 ACE_TEXT ("opening notify handler ")),
01251 -1);
01252
01253
01254 if (this->register_handler (&this->wakeup_all_threads_handler_,
01255 this->wakeup_all_threads_.handle ()) == -1)
01256 ACE_ERROR_RETURN ((LM_ERROR,
01257 ACE_TEXT ("%p\n"),
01258 ACE_TEXT ("registering thread wakeup handler")),
01259 -1);
01260
01261
01262
01263 if (this->handler_rep_.changes_required ())
01264 {
01265
01266 this->handler_rep_.make_changes ();
01267
01268
01269 this->wakeup_all_threads_.reset ();
01270 }
01271
01272
01273 this->open_for_business_ = true;
01274
01275 return 0;
01276 }
01277
01278 int
01279 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
01280 {
01281 if (this->signal_handler_ != 0 && this->delete_signal_handler_)
01282 delete this->signal_handler_;
01283 this->signal_handler_ = signal_handler;
01284 this->delete_signal_handler_ = false;
01285 return 0;
01286 }
01287
01288 ACE_Timer_Queue *
01289 ACE_WFMO_Reactor::timer_queue (void) const
01290 {
01291 return this->timer_queue_;
01292 }
01293
01294 int
01295 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue *tq)
01296 {
01297 if (this->timer_queue_ != 0 && this->delete_timer_queue_)
01298 delete this->timer_queue_;
01299 this->timer_queue_ = tq;
01300 this->delete_timer_queue_ = false;
01301 return 0;
01302 }
01303
01304 int
01305 ACE_WFMO_Reactor::close (void)
01306 {
01307
01308 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01309
01310
01311 if (!this->open_for_business_)
01312 return -1;
01313
01314
01315 this->open_for_business_ = false;
01316
01317 this->handler_rep_.close ();
01318
01319 return 0;
01320 }
01321
01322 ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void)
01323 {
01324
01325
01326
01327
01328 this->close ();
01329
01330
01331
01332 this->handler_rep_.make_changes ();
01333
01334 if (this->delete_timer_queue_)
01335 {
01336 delete this->timer_queue_;
01337 this->timer_queue_ = 0;
01338 this->delete_timer_queue_ = false;
01339 }
01340
01341 if (this->delete_signal_handler_)
01342 {
01343 delete this->signal_handler_;
01344 this->signal_handler_ = 0;
01345 this->delete_signal_handler_ = false;
01346 }
01347
01348 if (this->delete_notify_handler_)
01349 {
01350 delete this->notify_handler_;
01351 this->notify_handler_ = 0;
01352 this->delete_notify_handler_ = false;
01353 }
01354 }
01355
01356 int
01357 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
01358 ACE_HANDLE io_handle,
01359 ACE_Event_Handler *event_handler,
01360 ACE_Reactor_Mask new_masks)
01361 {
01362
01363
01364
01365
01366 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
01367
01368 ACE_UNUSED_ARG (event_handle);
01369 ACE_UNUSED_ARG (io_handle);
01370 ACE_UNUSED_ARG (event_handler);
01371 ACE_UNUSED_ARG (new_masks);
01372 ACE_NOTSUP_RETURN (-1);
01373
01374 #else
01375
01376
01377 if (io_handle == ACE_INVALID_HANDLE)
01378 io_handle = event_handler->get_handle ();
01379
01380 if (this->handler_rep_.invalid_handle (io_handle))
01381 {
01382 errno = ERROR_INVALID_HANDLE;
01383 return -1;
01384 }
01385
01386 long new_network_events = 0;
01387 int delete_event = 0;
01388 auto_ptr <ACE_Auto_Event> event;
01389
01390
01391
01392 ACE_Reactor_Mask old_masks;
01393 int found = this->handler_rep_.modify_network_events_i (io_handle,
01394 new_masks,
01395 old_masks,
01396 new_network_events,
01397 event_handle,
01398 delete_event,
01399 ACE_Reactor::ADD_MASK);
01400
01401
01402
01403 if (event_handle == ACE_INVALID_HANDLE)
01404 {
01405
01406
01407 auto_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
01408 event = tmp;
01409 event_handle = event->handle ();
01410 delete_event = 1;
01411 }
01412
01413 int result = ::WSAEventSelect ((SOCKET) io_handle,
01414 event_handle,
01415 new_network_events);
01416
01417 if (found)
01418 return result;
01419 else if (result != SOCKET_ERROR &&
01420 this->handler_rep_.bind_i (1,
01421 event_handler,
01422 new_network_events,
01423 io_handle,
01424 event_handle,
01425 delete_event) != -1)
01426 {
01427
01428
01429 if (delete_event)
01430 {
01431
01432
01433
01434
01435
01436
01437
01438
01439 ACE_Errno_Guard guard (errno);
01440 event->handle (ACE_INVALID_HANDLE);
01441 event->remove ();
01442 }
01443 return 0;
01444 }
01445 else
01446 return -1;
01447
01448 #endif
01449
01450 }
01451
01452 int
01453 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle,
01454 ACE_Reactor_Mask new_masks,
01455 int operation)
01456 {
01457
01458 if (this->handler_rep_.invalid_handle (io_handle))
01459 return -1;
01460
01461 long new_network_events = 0;
01462 int delete_event = 0;
01463 ACE_HANDLE event_handle = ACE_INVALID_HANDLE;
01464
01465
01466
01467 ACE_Reactor_Mask old_masks;
01468 int found = this->handler_rep_.modify_network_events_i (io_handle,
01469 new_masks,
01470 old_masks,
01471 new_network_events,
01472 event_handle,
01473 delete_event,
01474 operation);
01475 if (found)
01476 {
01477 int result = ::WSAEventSelect ((SOCKET) io_handle,
01478 event_handle,
01479 new_network_events);
01480 if (result == 0)
01481 return old_masks;
01482 else
01483 return result;
01484 }
01485 else
01486 return -1;
01487 }
01488
01489
01490
01491 int
01492 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle,
01493 ACE_Reactor_Mask new_masks,
01494 ACE_Reactor_Mask &old_masks,
01495 long &new_network_events,
01496 ACE_HANDLE &event_handle,
01497 int &delete_event,
01498 int operation)
01499 {
01500 long *modified_network_events = &new_network_events;
01501 int found = 0;
01502 size_t i;
01503
01504
01505
01506
01507
01508 for (i = 0; i < this->max_handlep1_ && !found; ++i)
01509 if (io_handle == this->current_info_[i].io_handle_ &&
01510 !this->current_info_[i].delete_entry_)
01511 {
01512 found = 1;
01513 modified_network_events = &this->current_info_[i].network_events_;
01514 delete_event = this->current_info_[i].delete_event_;
01515 event_handle = this->current_handles_[i];
01516 }
01517
01518
01519
01520
01521
01522 for (i = 0; i < this->suspended_handles_ && !found; ++i)
01523 if (io_handle == this->current_suspended_info_[i].io_handle_ &&
01524 !this->current_suspended_info_[i].delete_entry_)
01525 {
01526 found = 1;
01527 modified_network_events = &this->current_suspended_info_[i].network_events_;
01528 delete_event = this->current_suspended_info_[i].delete_event_;
01529 event_handle = this->current_suspended_info_[i].event_handle_;
01530 }
01531
01532
01533
01534
01535
01536 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01537 if (io_handle == this->to_be_added_info_[i].io_handle_ &&
01538 !this->to_be_added_info_[i].delete_entry_)
01539 {
01540 found = 1;
01541 modified_network_events = &this->to_be_added_info_[i].network_events_;
01542 delete_event = this->to_be_added_info_[i].delete_event_;
01543 event_handle = this->to_be_added_info_[i].event_handle_;
01544 }
01545
01546 old_masks = this->bit_ops (*modified_network_events,
01547 new_masks,
01548 operation);
01549
01550 new_network_events = *modified_network_events;
01551
01552 return found;
01553 }
01554
01555 ACE_Event_Handler *
01556 ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle)
01557 {
01558 long existing_masks_ignored = 0;
01559 return this->handler (handle,
01560 existing_masks_ignored);
01561 }
01562
01563 ACE_Event_Handler *
01564 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01565 long &existing_masks)
01566 {
01567 int found = 0;
01568 size_t i = 0;
01569 ACE_Event_Handler *event_handler = 0;
01570 existing_masks = 0;
01571
01572
01573
01574
01575
01576
01577
01578 for (i = 0; i < this->max_handlep1_ && !found; ++i)
01579 if ((handle == this->current_info_[i].io_handle_ ||
01580 handle == this->current_handles_[i]) &&
01581 !this->current_info_[i].delete_entry_)
01582 {
01583 found = 1;
01584 event_handler = this->current_info_[i].event_handler_;
01585 existing_masks = this->current_info_[i].network_events_;
01586 }
01587
01588
01589
01590
01591
01592 for (i = 0; i < this->suspended_handles_ && !found; ++i)
01593 if ((handle == this->current_suspended_info_[i].io_handle_ ||
01594 handle == this->current_suspended_info_[i].event_handle_) &&
01595 !this->current_suspended_info_[i].delete_entry_)
01596 {
01597 found = 1;
01598 event_handler = this->current_suspended_info_[i].event_handler_;
01599 existing_masks = this->current_suspended_info_[i].network_events_;
01600 }
01601
01602
01603
01604
01605
01606 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01607 if ((handle == this->to_be_added_info_[i].io_handle_ ||
01608 handle == this->to_be_added_info_[i].event_handle_) &&
01609 !this->to_be_added_info_[i].delete_entry_)
01610 {
01611 found = 1;
01612 event_handler = this->to_be_added_info_[i].event_handler_;
01613 existing_masks = this->to_be_added_info_[i].network_events_;
01614 }
01615
01616 if (event_handler)
01617 event_handler->add_reference ();
01618
01619 return event_handler;
01620 }
01621
01622 int
01623 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01624 ACE_Reactor_Mask user_masks,
01625 ACE_Event_Handler **user_event_handler)
01626 {
01627 long existing_masks = 0;
01628 int found = 0;
01629
01630 ACE_Event_Handler_var safe_event_handler =
01631 this->handler (handle,
01632 existing_masks);
01633
01634 if (safe_event_handler.handler ())
01635 found = 1;
01636
01637 if (!found)
01638 return -1;
01639
01640
01641
01642 if (found &&
01643 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
01644 if (!ACE_BIT_ENABLED (existing_masks, FD_READ) &&
01645 !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
01646 found = 0;
01647
01648 if (found &&
01649 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::WRITE_MASK))
01650 if (!ACE_BIT_ENABLED (existing_masks, FD_WRITE))
01651 found = 0;
01652
01653 if (found &&
01654 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::EXCEPT_MASK))
01655 if (!ACE_BIT_ENABLED (existing_masks, FD_OOB))
01656 found = 0;
01657
01658 if (found &&
01659 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::ACCEPT_MASK))
01660 if (!ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
01661 found = 0;
01662
01663 if (found &&
01664 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::CONNECT_MASK))
01665 if (!ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
01666 found = 0;
01667
01668 if (found &&
01669 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::QOS_MASK))
01670 if (!ACE_BIT_ENABLED (existing_masks, FD_QOS))
01671 found = 0;
01672
01673 if (found &&
01674 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::GROUP_QOS_MASK))
01675 if (!ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
01676 found = 0;
01677
01678 if (found &&
01679 user_event_handler)
01680 *user_event_handler = safe_event_handler.release ();
01681
01682 if (found)
01683 return 0;
01684 else
01685 return -1;
01686 }
01687
01688
01689
01690 int
01691 ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
01692 int alertable)
01693 {
01694 ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
01695
01696
01697 if (!this->open_for_business_ || this->deactivated_)
01698 return -1;
01699
01700
01701
01702
01703 ACE_Countdown_Time countdown (max_wait_time);
01704
01705 int result;
01706 do
01707 {
01708
01709
01710
01711 result = this->ok_to_wait (max_wait_time, alertable);
01712 if (result != 1)
01713 return result;
01714
01715
01716 ++this->active_threads_;
01717
01718
01719 this->lock_.release ();
01720
01721
01722
01723 countdown.update ();
01724
01725
01726 int timeout = this->calculate_timeout (max_wait_time);
01727
01728
01729 DWORD wait_status = this->wait_for_multiple_events (timeout,
01730 alertable);
01731
01732
01733 result = this->safe_dispatch (wait_status);
01734 if (0 == result)
01735 {
01736
01737
01738
01739
01740
01741
01742
01743
01744 countdown.update ();
01745 if (0 == max_wait_time || max_wait_time->usec () == 0)
01746 break;
01747 }
01748 }
01749 while (result == 0);
01750
01751 return result;
01752 }
01753
01754 int
01755 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value *max_wait_time,
01756 int alertable)
01757 {
01758
01759
01760
01761
01762
01763
01764
01765
01766
01767
01768
01769
01770 #if defined (ACE_HAS_WINCE)
01771 ACE_Time_Value timeout = ACE_OS::gettimeofday ();
01772 if (max_wait_time != 0)
01773 timeout += *max_wait_time;
01774 while (1)
01775 {
01776 int status;
01777 if (max_wait_time == 0)
01778 status = this->ok_to_wait_.wait ();
01779 else
01780 status = this->ok_to_wait_.wait (&timeout);
01781 if (status == -1)
01782 return -1;
01783
01784
01785 if (max_wait_time == 0)
01786 status = this->lock_.acquire ();
01787 else
01788 status = this->lock_.acquire (timeout);
01789 if (status == -1)
01790 return -1;
01791
01792
01793
01794 ACE_Time_Value poll_it = ACE_OS::gettimeofday ();
01795 if (this->ok_to_wait_.wait (&poll_it) == 0)
01796 break;
01797 this->lock_.release ();
01798 }
01799 return 1;
01800
01801 #else
01802 int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
01803 DWORD result = 0;
01804 while (1)
01805 {
01806 # if defined (ACE_HAS_PHARLAP)
01807
01808
01809 result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01810 this->atomic_wait_array_,
01811 TRUE,
01812 timeout);
01813
01814 if (result != WAIT_IO_COMPLETION)
01815 break;
01816
01817 # else
01818 result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01819 this->atomic_wait_array_,
01820 TRUE,
01821 timeout,
01822 alertable);
01823
01824 if (result != WAIT_IO_COMPLETION)
01825 break;
01826
01827 # endif
01828 }
01829
01830 switch (result)
01831 {
01832 case WAIT_TIMEOUT:
01833 errno = ETIME;
01834 return 0;
01835 case WAIT_FAILED:
01836 case WAIT_ABANDONED_0:
01837 ACE_OS::set_errno_to_last_error ();
01838 return -1;
01839 default:
01840 break;
01841 }
01842
01843
01844 return 1;
01845 #endif
01846 }
01847
01848 DWORD
01849 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
01850 int alertable)
01851 {
01852
01853
01854
01855
01856 #if defined (ACE_HAS_PHARLAP) || defined (ACE_HAS_WINCE)
01857
01858
01859 ACE_UNUSED_ARG (alertable);
01860 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 (),
01861 this->handler_rep_.handles (),
01862 FALSE,
01863 timeout);
01864 #else
01865 return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
01866 this->handler_rep_.handles (),
01867 FALSE,
01868 timeout,
01869 alertable);
01870 #endif
01871 }
01872
01873 DWORD
01874 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot)
01875 {
01876 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - slot,
01877 this->handler_rep_.handles () + slot,
01878 FALSE,
01879 0);
01880 }
01881
01882 int
01883 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value *max_wait_time)
01884 {
01885 ACE_Time_Value *time = 0;
01886 if (this->owner_ == ACE_Thread::self ())
01887 time = this->timer_queue_->calculate_timeout (max_wait_time);
01888 else
01889 time = max_wait_time;
01890
01891 if (time == 0)
01892 return INFINITE;
01893 else
01894 return time->msec ();
01895 }
01896
01897
01898 int
01899 ACE_WFMO_Reactor::expire_timers (void)
01900 {
01901
01902 if (ACE_Thread::self () == this->owner_)
01903
01904 return this->timer_queue_->expire ();
01905
01906 else
01907
01908 return 0;
01909 }
01910
01911 int
01912 ACE_WFMO_Reactor::dispatch (DWORD wait_status)
01913 {
01914 int handlers_dispatched = 0;
01915
01916
01917 handlers_dispatched += this->expire_timers ();
01918
01919 switch (wait_status)
01920 {
01921 case WAIT_FAILED:
01922 ACE_OS::set_errno_to_last_error ();
01923 return -1;
01924
01925 case WAIT_TIMEOUT:
01926 errno = ETIME;
01927 return handlers_dispatched;
01928
01929 #ifndef ACE_HAS_WINCE
01930 case WAIT_IO_COMPLETION:
01931 return handlers_dispatched;
01932 #endif // ACE_HAS_WINCE
01933
01934 default:
01935
01936 handlers_dispatched += this->dispatch_handles (wait_status);
01937 return handlers_dispatched;
01938 }
01939 }
01940
01941
01942
01943
01944 int
01945 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
01946 {
01947
01948
01949 DWORD dispatch_slot = 0;
01950
01951
01952 DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
01953
01954
01955
01956 DWORD nCount = max_handlep1;
01957
01958 for (int number_of_handlers_dispatched = 1;
01959 ;
01960 ++number_of_handlers_dispatched)
01961 {
01962 const bool ok = (
01963 #if ! defined(__BORLANDC__) \
01964 && !defined (ghs) \
01965 && !defined (__MINGW32__) \
01966 && !(defined (_MSC_VER) && _MSC_VER >= 1300)
01967
01968
01969
01970 wait_status >= WAIT_OBJECT_0 &&
01971 #endif
01972 wait_status <= (WAIT_OBJECT_0 + nCount));
01973
01974 if (ok)
01975 dispatch_slot += wait_status - WAIT_OBJECT_0;
01976 else
01977
01978 dispatch_slot += wait_status - WAIT_ABANDONED_0;
01979
01980
01981 if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
01982 return -1;
01983
01984
01985 ++dispatch_slot;
01986
01987
01988 if (dispatch_slot >= max_handlep1)
01989 return number_of_handlers_dispatched;
01990
01991
01992 nCount = max_handlep1 - dispatch_slot;
01993
01994
01995 wait_status = this->poll_remaining_handles (dispatch_slot);
01996 switch (wait_status)
01997 {
01998 case WAIT_FAILED:
01999 ACE_OS::set_errno_to_last_error ();
02000
02001 case WAIT_TIMEOUT:
02002
02003 return number_of_handlers_dispatched;
02004 }
02005 }
02006 }
02007
02008 int
02009 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
02010 DWORD max_handlep1)
02011 {
02012
02013 if (slot == max_handlep1)
02014 return this->dispatch_window_messages ();
02015
02016
02017
02018
02019
02020
02021
02022 else if (!this->handler_rep_.scheduled_for_deletion (slot))
02023 {
02024 ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
02025
02026 if (this->handler_rep_.current_info ()[slot].io_entry_)
02027 return this->complex_dispatch_handler (slot,
02028 event_handle);
02029 else
02030 return this->simple_dispatch_handler (slot,
02031 event_handle);
02032 }
02033 else
02034
02035 return 0;
02036 }
02037
02038 int
02039 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
02040 ACE_HANDLE event_handle)
02041 {
02042
02043
02044
02045
02046 siginfo_t sig (event_handle);
02047
02048 ACE_Event_Handler *event_handler =
02049 this->handler_rep_.current_info ()[slot].event_handler_;
02050
02051 int requires_reference_counting =
02052 event_handler->reference_counting_policy ().value () ==
02053 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02054
02055 if (requires_reference_counting)
02056 {
02057 event_handler->add_reference ();
02058 }
02059
02060
02061 if (event_handler->handle_signal (0, &sig) == -1)
02062 this->handler_rep_.unbind (event_handle,
02063 ACE_Event_Handler::NULL_MASK);
02064
02065
02066 if (requires_reference_counting)
02067 {
02068 event_handler->remove_reference ();
02069 }
02070
02071 return 0;
02072 }
02073
02074 int
02075 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
02076 ACE_HANDLE event_handle)
02077 {
02078
02079
02080 ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
02081 this->handler_rep_.current_info ()[slot];
02082
02083 WSANETWORKEVENTS events;
02084 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02085 if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
02086 event_handle,
02087 &events) == SOCKET_ERROR)
02088 problems = ACE_Event_Handler::ALL_EVENTS_MASK;
02089 else
02090 {
02091
02092
02093
02094
02095
02096
02097
02098
02099
02100
02101
02102 events.lNetworkEvents &= current_info.network_events_;
02103 while (events.lNetworkEvents != 0)
02104 {
02105 ACE_Event_Handler *event_handler =
02106 current_info.event_handler_;
02107
02108 int reference_counting_required =
02109 event_handler->reference_counting_policy ().value () ==
02110 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02111
02112
02113 if (reference_counting_required)
02114 {
02115 event_handler->add_reference ();
02116 }
02117
02118
02119 problems |= this->upcall (current_info.event_handler_,
02120 current_info.io_handle_,
02121 events);
02122
02123
02124 if (reference_counting_required)
02125 {
02126 event_handler->remove_reference ();
02127 }
02128
02129 if (this->handler_rep_.scheduled_for_deletion (slot))
02130 break;
02131 }
02132 }
02133
02134 if (problems != ACE_Event_Handler::NULL_MASK
02135 && !this->handler_rep_.scheduled_for_deletion (slot) )
02136 this->handler_rep_.unbind (event_handle, problems);
02137
02138 return 0;
02139 }
02140
02141 ACE_Reactor_Mask
02142 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
02143 ACE_HANDLE io_handle,
02144 WSANETWORKEVENTS &events)
02145 {
02146
02147
02148 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02149
02150
02151
02152
02153
02154 long actual_events = events.lNetworkEvents;
02155 int action;
02156
02157 if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
02158 {
02159 action = event_handler->handle_output (io_handle);
02160 if (action <= 0)
02161 {
02162 ACE_CLR_BITS (actual_events, FD_WRITE);
02163 if (action == -1)
02164 ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
02165 }
02166 }
02167
02168 if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
02169 {
02170 if (events.iErrorCode[FD_CONNECT_BIT] == 0)
02171 {
02172
02173 action = event_handler->handle_output (io_handle);
02174 if (action <= 0)
02175 {
02176 ACE_CLR_BITS (actual_events, FD_CONNECT);
02177 if (action == -1)
02178 ACE_SET_BITS (problems,
02179 ACE_Event_Handler::CONNECT_MASK);
02180 }
02181 }
02182
02183 else
02184 {
02185 action = event_handler->handle_input (io_handle);
02186 if (action <= 0)
02187 {
02188 ACE_CLR_BITS (actual_events, FD_CONNECT);
02189 if (action == -1)
02190 ACE_SET_BITS (problems,
02191 ACE_Event_Handler::CONNECT_MASK);
02192 }
02193 }
02194 }
02195
02196 if (ACE_BIT_ENABLED (actual_events, FD_OOB))
02197 {
02198 action = event_handler->handle_exception (io_handle);
02199 if (action <= 0)
02200 {
02201 ACE_CLR_BITS (actual_events, FD_OOB);
02202 if (action == -1)
02203 ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
02204 }
02205 }
02206
02207 if (ACE_BIT_ENABLED (actual_events, FD_READ))
02208 {
02209 action = event_handler->handle_input (io_handle);
02210 if (action <= 0)
02211 {
02212 ACE_CLR_BITS (actual_events, FD_READ);
02213 if (action == -1)
02214 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02215 }
02216 }
02217
02218 if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
02219 && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
02220 {
02221 action = event_handler->handle_input (io_handle);
02222 if (action <= 0)
02223 {
02224 ACE_CLR_BITS (actual_events, FD_CLOSE);
02225 if (action == -1)
02226 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02227 }
02228 }
02229
02230 if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
02231 {
02232 action = event_handler->handle_input (io_handle);
02233 if (action <= 0)
02234 {
02235 ACE_CLR_BITS (actual_events, FD_ACCEPT);
02236 if (action == -1)
02237 ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
02238 }
02239 }
02240
02241 if (ACE_BIT_ENABLED (actual_events, FD_QOS))
02242 {
02243 action = event_handler->handle_qos (io_handle);
02244 if (action <= 0)
02245 {
02246 ACE_CLR_BITS (actual_events, FD_QOS);
02247 if (action == -1)
02248 ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
02249 }
02250 }
02251
02252 if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
02253 {
02254 action = event_handler->handle_group_qos (io_handle);
02255 if (action <= 0)
02256 {
02257 ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
02258 if (action == -1)
02259 ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
02260 }
02261 }
02262
02263 events.lNetworkEvents = actual_events;
02264 return problems;
02265 }
02266
02267
02268 int
02269 ACE_WFMO_Reactor::update_state (void)
02270 {
02271
02272 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02273
02274
02275 --this->active_threads_;
02276
02277
02278
02279 if (this->handler_rep_.changes_required () || this->new_owner ())
02280 {
02281 if (this->change_state_thread_ == 0)
02282
02283
02284 {
02285 this->change_state_thread_ = ACE_Thread::self ();
02286
02287 this->ok_to_wait_.reset ();
02288
02289 if (this->active_threads_ > 0)
02290
02291 {
02292
02293 this->wakeup_all_threads_.signal ();
02294
02295 monitor.release ();
02296
02297 this->waiting_to_change_state_.wait ();
02298
02299 monitor.acquire ();
02300 }
02301
02302
02303
02304
02305 while (this->handler_rep_.changes_required ())
02306
02307 this->handler_rep_.make_changes ();
02308 if (this->new_owner ())
02309
02310 this->change_owner ();
02311
02312 this->wakeup_all_threads_.reset ();
02313
02314 this->ok_to_wait_.signal ();
02315
02316 this->change_state_thread_ = 0;
02317 }
02318 else if (this->active_threads_ == 0)
02319
02320
02321
02322 this->waiting_to_change_state_.signal ();
02323 }
02324
02325
02326 else if (this->active_threads_ == 0)
02327
02328 this->wakeup_all_threads_.reset ();
02329
02330 return 0;
02331 }
02332
02333 void
02334 ACE_WFMO_Reactor::dump (void) const
02335 {
02336 #if defined (ACE_HAS_DUMP)
02337 ACE_TRACE ("ACE_WFMO_Reactor::dump");
02338
02339 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02340
02341 ACE_DEBUG ((LM_DEBUG,
02342 ACE_TEXT ("Count of currently active threads = %d\n"),
02343 this->active_threads_));
02344
02345 ACE_DEBUG ((LM_DEBUG,
02346 ACE_TEXT ("ID of owner thread = %d\n"),
02347 this->owner_));
02348
02349 this->handler_rep_.dump ();
02350 this->signal_handler_->dump ();
02351 this->timer_queue_->dump ();
02352
02353 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02354 #endif
02355 }
02356
02357 int
02358 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & ,
02359 ACE_Handle_Set & )
02360 {
02361 return -1;
02362 }
02363
02364 int
02365 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer & )
02366 {
02367 return 0;
02368 }
02369
02370 ACE_HANDLE
02371 ACE_WFMO_Reactor_Notify::notify_handle (void)
02372 {
02373 return ACE_INVALID_HANDLE;
02374 }
02375
02376 int
02377 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE ,
02378 ACE_Notification_Buffer &)
02379 {
02380 return 0;
02381 }
02382
02383 int
02384 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &)
02385 {
02386 return 0;
02387 }
02388
02389 int
02390 ACE_WFMO_Reactor_Notify::close (void)
02391 {
02392 return -1;
02393 }
02394
02395 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
02396 : timer_queue_ (0),
02397 message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
02398 max_notifies * sizeof (ACE_Notification_Buffer)),
02399 max_notify_iterations_ (-1)
02400 {
02401 }
02402
02403 int
02404 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor,
02405 ACE_Timer_Queue *timer_queue,
02406 int ignore_notify)
02407 {
02408 ACE_UNUSED_ARG (ignore_notify);
02409 timer_queue_ = timer_queue;
02410 return wfmo_reactor->register_handler (this);
02411 }
02412
02413 ACE_HANDLE
02414 ACE_WFMO_Reactor_Notify::get_handle (void) const
02415 {
02416 return this->wakeup_one_thread_.handle ();
02417 }
02418
02419
02420
02421 int
02422 ACE_WFMO_Reactor_Notify::handle_signal (int signum,
02423 siginfo_t *siginfo,
02424 ucontext_t *)
02425 {
02426 ACE_UNUSED_ARG (signum);
02427
02428
02429 if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
02430 return -1;
02431
02432
02433
02434
02435
02436
02437 for (int i = 1; ; ++i)
02438 {
02439 ACE_Message_Block *mb = 0;
02440
02441 ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
02442 if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
02443 {
02444 if (errno == EWOULDBLOCK)
02445
02446
02447 return 0;
02448 else
02449 return -1;
02450 }
02451 else
02452 {
02453 ACE_Notification_Buffer *buffer =
02454 reinterpret_cast <ACE_Notification_Buffer *> (mb->base ());
02455
02456
02457
02458
02459
02460 if (buffer->eh_ != 0)
02461 {
02462 ACE_Event_Handler *event_handler =
02463 buffer->eh_;
02464
02465 bool const requires_reference_counting =
02466 event_handler->reference_counting_policy ().value () ==
02467 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02468
02469 int result = 0;
02470
02471 switch (buffer->mask_)
02472 {
02473 case ACE_Event_Handler::READ_MASK:
02474 case ACE_Event_Handler::ACCEPT_MASK:
02475 result = event_handler->handle_input (ACE_INVALID_HANDLE);
02476 break;
02477 case ACE_Event_Handler::WRITE_MASK:
02478 result = event_handler->handle_output (ACE_INVALID_HANDLE);
02479 break;
02480 case ACE_Event_Handler::EXCEPT_MASK:
02481 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
02482 break;
02483 case ACE_Event_Handler::QOS_MASK:
02484 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
02485 break;
02486 case ACE_Event_Handler::GROUP_QOS_MASK:
02487 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
02488 break;
02489 default:
02490 ACE_ERROR ((LM_ERROR,
02491 ACE_TEXT ("invalid mask = %d\n"),
02492 buffer->mask_));
02493 break;
02494 }
02495
02496 if (result == -1)
02497 event_handler->handle_close (ACE_INVALID_HANDLE,
02498 ACE_Event_Handler::EXCEPT_MASK);
02499
02500 if (requires_reference_counting)
02501 {
02502 event_handler->remove_reference ();
02503 }
02504 }
02505
02506
02507
02508 mb->release ();
02509
02510
02511
02512
02513 if (i == this->max_notify_iterations_)
02514 {
02515
02516
02517 if (!this->message_queue_.is_empty ())
02518 this->wakeup_one_thread_.signal ();
02519
02520
02521 return 0;
02522 }
02523 }
02524 }
02525 }
02526
02527
02528
02529
02530
02531 int
02532 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
02533 ACE_Reactor_Mask mask,
02534 ACE_Time_Value *timeout)
02535 {
02536 if (event_handler != 0)
02537 {
02538 ACE_Message_Block *mb = 0;
02539 ACE_NEW_RETURN (mb,
02540 ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
02541 -1);
02542
02543 ACE_Notification_Buffer *buffer =
02544 (ACE_Notification_Buffer *) mb->base ();
02545 buffer->eh_ = event_handler;
02546 buffer->mask_ = mask;
02547
02548
02549
02550
02551 if (timeout != 0)
02552 *timeout += timer_queue_->gettimeofday ();
02553
02554 if (this->message_queue_.enqueue_tail
02555 (mb, timeout) == -1)
02556 {
02557 mb->release ();
02558 return -1;
02559 }
02560
02561 event_handler->add_reference ();
02562 }
02563
02564 return this->wakeup_one_thread_.signal ();
02565 }
02566
02567 void
02568 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations)
02569 {
02570 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02571
02572 if (iterations == 0)
02573 iterations = 1;
02574
02575 this->max_notify_iterations_ = iterations;
02576 }
02577
02578 int
02579 ACE_WFMO_Reactor_Notify::max_notify_iterations (void)
02580 {
02581 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02582 return this->max_notify_iterations_;
02583 }
02584
02585 int
02586 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
02587 ACE_Reactor_Mask mask)
02588 {
02589 ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
02590
02591
02592
02593
02594
02595
02596 if (this->message_queue_.is_empty ())
02597 return 0;
02598
02599
02600
02601
02602
02603
02604
02605 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
02606
02607
02608
02609 ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
02610
02611 size_t queue_size = this->message_queue_.message_count ();
02612 int number_purged = 0;
02613
02614 size_t index;
02615
02616 for (index = 0; index < queue_size; ++index)
02617 {
02618 ACE_Message_Block *mb = 0;
02619 if (-1 == this->message_queue_.dequeue_head (mb))
02620 return -1;
02621
02622 ACE_Notification_Buffer *buffer =
02623 reinterpret_cast<ACE_Notification_Buffer *> (mb->base ());
02624
02625
02626
02627
02628
02629 if ((0 != buffer->eh_) &&
02630 (0 == eh || eh == buffer->eh_) &&
02631 ACE_BIT_DISABLED (buffer->mask_, ~mask))
02632
02633
02634 {
02635 ACE_Event_Handler *event_handler = buffer->eh_;
02636
02637 event_handler->remove_reference ();
02638
02639 mb->release ();
02640 ++number_purged;
02641 }
02642 else
02643 {
02644
02645
02646
02647
02648 if ((0 != buffer->eh_) &&
02649 (0 == eh || eh == buffer->eh_))
02650 ACE_CLR_BITS(buffer->mask_, mask);
02651 if (-1 == local_queue.enqueue_head (mb))
02652 return -1;
02653 }
02654 }
02655
02656 if (this->message_queue_.message_count ())
02657 {
02658 ACE_ASSERT (0);
02659 return -1;
02660 }
02661
02662
02663
02664 queue_size = local_queue.message_count ();
02665 for (index = 0; index < queue_size; ++index)
02666 {
02667 ACE_Message_Block *mb = 0;
02668 if (-1 == local_queue.dequeue_head (mb))
02669 {
02670 ACE_ASSERT (0);
02671 return -1;
02672 }
02673
02674 if (-1 == this->message_queue_.enqueue_head (mb))
02675 {
02676 ACE_ASSERT (0);
02677 return -1;
02678 }
02679 }
02680
02681 return number_purged;
02682 }
02683
02684 void
02685 ACE_WFMO_Reactor_Notify::dump (void) const
02686 {
02687 #if defined (ACE_HAS_DUMP)
02688 ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
02689 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02690 this->timer_queue_->dump ();
02691 ACE_DEBUG ((LM_DEBUG,
02692 ACE_TEXT ("Max. iteration: %d\n"),
02693 this->max_notify_iterations_));
02694 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02695 #endif
02696 }
02697
02698 void
02699 ACE_WFMO_Reactor::max_notify_iterations (int iterations)
02700 {
02701 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02702 ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
02703
02704
02705 this->notify_handler_->max_notify_iterations (iterations);
02706 }
02707
02708 int
02709 ACE_WFMO_Reactor::max_notify_iterations (void)
02710 {
02711 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02712 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02713
02714 return this->notify_handler_->max_notify_iterations ();
02715 }
02716
02717 int
02718 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh,
02719 ACE_Reactor_Mask mask)
02720 {
02721 ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
02722 if (this->notify_handler_ == 0)
02723 return 0;
02724 else
02725 return this->notify_handler_->purge_pending_notifications (eh, mask);
02726 }
02727
02728 int
02729 ACE_WFMO_Reactor::resumable_handler (void)
02730 {
02731 ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
02732 return 0;
02733 }
02734
02735
02736
02737 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
02738 int
02739 WSAEventSelect (SOCKET ,
02740 WSAEVENT ,
02741 long )
02742 {
02743 return -1;
02744 }
02745
02746 int
02747 WSAEnumNetworkEvents (SOCKET ,
02748 WSAEVENT ,
02749 LPWSANETWORKEVENTS )
02750 {
02751 return -1;
02752 }
02753 #endif
02754
02755 ACE_END_VERSIONED_NAMESPACE_DECL
02756
02757 #endif