00001
00002
00003 #include "ace/WFMO_Reactor.h"
00004
00005 #if defined (ACE_WIN32)
00006
00007 #include "ace/Handle_Set.h"
00008 #include "ace/Timer_Heap.h"
00009 #include "ace/Thread.h"
00010 #include "ace/OS_NS_errno.h"
00011 #include "ace/Null_Condition.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/WFMO_Reactor.inl"
00015 #endif
00016
00017 ACE_RCSID(ace, WFMO_Reactor, "WFMO_Reactor.cpp,v 4.110 2006/06/07 03:03:54 schmidt Exp")
00018
00019 #include "ace/Auto_Ptr.h"
00020
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor &wfmo_reactor)
00024 : wfmo_reactor_ (wfmo_reactor)
00025 {
00026 }
00027
00028 int
00029 ACE_WFMO_Reactor_Handler_Repository::open (size_t size)
00030 {
00031 if (size > MAXIMUM_WAIT_OBJECTS)
00032 ACE_ERROR_RETURN ((LM_ERROR,
00033 ACE_LIB_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
00034 size,
00035 MAXIMUM_WAIT_OBJECTS),
00036 -1);
00037
00038
00039 ACE_NEW_RETURN (this->current_handles_,
00040 ACE_HANDLE[size],
00041 -1);
00042 ACE_NEW_RETURN (this->current_info_,
00043 Current_Info[size],
00044 -1);
00045 ACE_NEW_RETURN (this->current_suspended_info_,
00046 Suspended_Info[size],
00047 -1);
00048 ACE_NEW_RETURN (this->to_be_added_info_,
00049 To_Be_Added_Info[size],
00050 -1);
00051
00052
00053 this->max_size_ = size;
00054 this->max_handlep1_ = 0;
00055 this->suspended_handles_ = 0;
00056 this->handles_to_be_added_ = 0;
00057 this->handles_to_be_deleted_ = 0;
00058 this->handles_to_be_suspended_ = 0;
00059 this->handles_to_be_resumed_ = 0;
00060
00061 for (size_t i = 0; i < size; ++i)
00062 this->current_handles_[i] = ACE_INVALID_HANDLE;
00063
00064 return 0;
00065 }
00066
00067 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void)
00068 {
00069
00070 delete [] this->current_handles_;
00071 delete [] this->current_info_;
00072 delete [] this->current_suspended_info_;
00073 delete [] this->to_be_added_info_;
00074 }
00075
00076 ACE_Reactor_Mask
00077 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks,
00078 ACE_Reactor_Mask change_masks,
00079 int operation)
00080 {
00081
00082
00083
00084 ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK;
00085
00086 if (ACE_BIT_ENABLED (existing_masks, FD_READ)
00087 || ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
00088 ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK);
00089
00090 if (ACE_BIT_ENABLED (existing_masks, FD_WRITE))
00091 ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK);
00092
00093 if (ACE_BIT_ENABLED (existing_masks, FD_OOB))
00094 ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK);
00095
00096 if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
00097 ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK);
00098
00099 if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
00100 ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK);
00101
00102 if (ACE_BIT_ENABLED (existing_masks, FD_QOS))
00103 ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK);
00104
00105 if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
00106 ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK);
00107
00108 switch (operation)
00109 {
00110 case ACE_Reactor::CLR_MASK:
00111
00112
00113 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00114 {
00115 ACE_CLR_BITS (existing_masks, FD_READ);
00116 ACE_CLR_BITS (existing_masks, FD_CLOSE);
00117 }
00118
00119 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00120 ACE_CLR_BITS (existing_masks, FD_WRITE);
00121
00122 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00123 ACE_CLR_BITS (existing_masks, FD_OOB);
00124
00125 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00126 ACE_CLR_BITS (existing_masks, FD_ACCEPT);
00127
00128 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00129 ACE_CLR_BITS (existing_masks, FD_CONNECT);
00130
00131 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00132 ACE_CLR_BITS (existing_masks, FD_QOS);
00133
00134 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00135 ACE_CLR_BITS (existing_masks, FD_GROUP_QOS);
00136
00137 break;
00138
00139 case ACE_Reactor::SET_MASK:
00140
00141
00142 existing_masks = 0;
00143
00144
00145 case ACE_Reactor::ADD_MASK:
00146
00147
00148
00149 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00150 {
00151 ACE_SET_BITS (existing_masks, FD_READ);
00152 ACE_SET_BITS (existing_masks, FD_CLOSE);
00153 }
00154
00155 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00156 ACE_SET_BITS (existing_masks, FD_WRITE);
00157
00158 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00159 ACE_SET_BITS (existing_masks, FD_OOB);
00160
00161 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00162 ACE_SET_BITS (existing_masks, FD_ACCEPT);
00163
00164 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00165 ACE_SET_BITS (existing_masks, FD_CONNECT);
00166
00167 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00168 ACE_SET_BITS (existing_masks, FD_QOS);
00169
00170 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00171 ACE_SET_BITS (existing_masks, FD_GROUP_QOS);
00172
00173 break;
00174
00175 case ACE_Reactor::GET_MASK:
00176
00177
00178
00179
00180 ACE_UNUSED_ARG (change_masks);
00181
00182 break;
00183 }
00184
00185 return old_masks;
00186 }
00187
00188 int
00189 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle,
00190 ACE_Reactor_Mask mask,
00191 int &changes_required)
00192 {
00193 int error = 0;
00194
00195
00196
00197 size_t const original_handle_count = this->handles_to_be_deleted_;
00198 int result = 0;
00199 size_t i;
00200
00201
00202
00203
00204
00205
00206 for (i = 0; i < this->max_handlep1_ && error == 0; ++i)
00207
00208
00209 if ((this->current_handles_[i] == handle
00210 || this->current_info_[i].io_handle_ == handle)
00211 &&
00212 !this->current_info_[i].delete_entry_)
00213 {
00214 result = this->remove_handler_i (i,
00215 mask);
00216 if (result == -1)
00217 error = 1;
00218 }
00219
00220
00221 for (i = 0; i < this->suspended_handles_ && error == 0; ++i)
00222
00223
00224 if ((this->current_suspended_info_[i].io_handle_ == handle
00225 || this->current_suspended_info_[i].event_handle_ == handle)
00226 &&
00227
00228 !this->current_suspended_info_[i].delete_entry_)
00229 {
00230 result = this->remove_suspended_handler_i (i,
00231 mask);
00232 if (result == -1)
00233 error = 1;
00234 }
00235
00236
00237 for (i = 0; i < this->handles_to_be_added_ && error == 0; ++i)
00238
00239
00240 if ((this->to_be_added_info_[i].io_handle_ == handle
00241 || this->to_be_added_info_[i].event_handle_ == handle)
00242 &&
00243
00244 !this->to_be_added_info_[i].delete_entry_)
00245 {
00246 result = this->remove_to_be_added_handler_i (i,
00247 mask);
00248 if (result == -1)
00249 error = 1;
00250 }
00251
00252
00253
00254 if (original_handle_count < this->handles_to_be_deleted_)
00255 changes_required = 1;
00256
00257 return error ? -1 : 0;
00258 }
00259
00260 int
00261 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
00262 ACE_Reactor_Mask to_be_removed_masks)
00263 {
00264
00265 if (this->current_info_[slot].io_entry_)
00266 {
00267
00268
00269 this->bit_ops (this->current_info_[slot].network_events_,
00270 to_be_removed_masks,
00271 ACE_Reactor::CLR_MASK);
00272
00273
00274
00275
00276
00277
00278 ::WSAEventSelect ((SOCKET) this->current_info_[slot].io_handle_,
00279 this->current_handles_[slot],
00280 this->current_info_[slot].network_events_);
00281 }
00282
00283 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00284
00285 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00286 else
00287
00288 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00289
00290
00291
00292 if (this->current_info_[slot].suspend_entry_)
00293 {
00294
00295 this->current_info_[slot].suspend_entry_ = 0;
00296
00297 --this->handles_to_be_suspended_;
00298 }
00299
00300
00301
00302
00303 if (this->current_info_[slot].network_events_ == 0)
00304 {
00305
00306 this->current_info_[slot].delete_entry_ = 1;
00307
00308 this->current_info_[slot].close_masks_ = to_be_removed_masks;
00309
00310 ++this->handles_to_be_deleted_;
00311 }
00312
00313
00314
00315
00316
00317
00318 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00319 {
00320 ACE_HANDLE handle = this->current_info_[slot].io_handle_;
00321 this->current_info_[slot].event_handler_->handle_close (handle,
00322 to_be_removed_masks);
00323 }
00324
00325 return 0;
00326 }
00327
00328 int
00329 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot,
00330 ACE_Reactor_Mask to_be_removed_masks)
00331 {
00332
00333 if (this->current_suspended_info_[slot].io_entry_)
00334 {
00335
00336
00337 this->bit_ops (this->current_suspended_info_[slot].network_events_,
00338 to_be_removed_masks,
00339 ACE_Reactor::CLR_MASK);
00340
00341
00342
00343
00344
00345
00346 ::WSAEventSelect ((SOCKET) this->current_suspended_info_[slot].io_handle_,
00347 this->current_suspended_info_[slot].event_handle_,
00348 this->current_suspended_info_[slot].network_events_);
00349 }
00350
00351 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00352
00353 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00354 else
00355
00356 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00357
00358
00359
00360 if (this->current_suspended_info_[slot].resume_entry_)
00361 {
00362
00363 this->current_suspended_info_[slot].resume_entry_ = 0;
00364
00365 --this->handles_to_be_resumed_;
00366 }
00367
00368
00369
00370
00371 if (this->current_suspended_info_[slot].network_events_ == 0)
00372 {
00373
00374 this->current_suspended_info_[slot].delete_entry_ = 1;
00375
00376 this->current_suspended_info_[slot].close_masks_ = to_be_removed_masks;
00377
00378 ++this->handles_to_be_deleted_;
00379 }
00380
00381
00382
00383
00384
00385 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00386 {
00387 ACE_HANDLE handle = this->current_suspended_info_[slot].io_handle_;
00388 this->current_suspended_info_[slot].event_handler_->handle_close (handle,
00389 to_be_removed_masks);
00390 }
00391
00392 return 0;
00393 }
00394
00395 int
00396 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot,
00397 ACE_Reactor_Mask to_be_removed_masks)
00398 {
00399
00400 if (this->to_be_added_info_[slot].io_entry_)
00401 {
00402
00403
00404 this->bit_ops (this->to_be_added_info_[slot].network_events_,
00405 to_be_removed_masks,
00406 ACE_Reactor::CLR_MASK);
00407
00408
00409
00410
00411
00412
00413 ::WSAEventSelect ((SOCKET) this->to_be_added_info_[slot].io_handle_,
00414 this->to_be_added_info_[slot].event_handle_,
00415 this->to_be_added_info_[slot].network_events_);
00416 }
00417
00418 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00419
00420 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00421 else
00422
00423 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00424
00425
00426
00427 if (this->to_be_added_info_[slot].suspend_entry_)
00428 {
00429
00430 this->to_be_added_info_[slot].suspend_entry_ = 0;
00431
00432 --this->handles_to_be_suspended_;
00433 }
00434
00435
00436
00437
00438 if (this->to_be_added_info_[slot].network_events_ == 0)
00439 {
00440
00441 this->to_be_added_info_[slot].delete_entry_ = 1;
00442
00443 this->to_be_added_info_[slot].close_masks_ = to_be_removed_masks;
00444
00445 ++this->handles_to_be_deleted_;
00446 }
00447
00448
00449
00450
00451
00452 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00453 {
00454 ACE_HANDLE handle = this->to_be_added_info_[slot].io_handle_;
00455 this->to_be_added_info_[slot].event_handler_->handle_close (handle,
00456 to_be_removed_masks);
00457 }
00458
00459 return 0;
00460 }
00461
00462 int
00463 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle,
00464 int &changes_required)
00465 {
00466 size_t i = 0;
00467
00468
00469
00470
00471
00472
00473 for (i = 0; i < this->max_handlep1_; ++i)
00474
00475
00476 if ((this->current_handles_[i] == handle ||
00477 this->current_info_[i].io_handle_ == handle) &&
00478
00479 !this->current_info_[i].suspend_entry_)
00480 {
00481
00482 this->current_info_[i].suspend_entry_ = 1;
00483
00484 ++this->handles_to_be_suspended_;
00485
00486 changes_required = 1;
00487 }
00488
00489
00490 for (i = 0; i < this->suspended_handles_; ++i)
00491
00492
00493 if ((this->current_suspended_info_[i].event_handle_ == handle ||
00494 this->current_suspended_info_[i].io_handle_ == handle) &&
00495
00496 this->current_suspended_info_[i].resume_entry_)
00497 {
00498
00499 this->current_suspended_info_[i].resume_entry_ = 0;
00500
00501 --this->handles_to_be_resumed_;
00502
00503 changes_required = 1;
00504 }
00505
00506
00507 for (i = 0; i < this->handles_to_be_added_; ++i)
00508
00509
00510 if ((this->to_be_added_info_[i].io_handle_ == handle ||
00511 this->to_be_added_info_[i].event_handle_ == handle) &&
00512
00513 !this->to_be_added_info_[i].suspend_entry_)
00514 {
00515
00516 this->to_be_added_info_[i].suspend_entry_ = 1;
00517
00518 ++this->handles_to_be_suspended_;
00519
00520 changes_required = 1;
00521 }
00522
00523 return 0;
00524 }
00525
00526 int
00527 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle,
00528 int &changes_required)
00529 {
00530 size_t i = 0;
00531
00532
00533
00534
00535
00536
00537 for (i = 0; i < this->max_handlep1_; ++i)
00538
00539
00540 if ((this->current_handles_[i] == handle ||
00541 this->current_info_[i].io_handle_ == handle) &&
00542
00543 this->current_info_[i].suspend_entry_)
00544 {
00545
00546 this->current_info_[i].suspend_entry_ = 0;
00547
00548 --this->handles_to_be_suspended_;
00549
00550 changes_required = 1;
00551 }
00552
00553
00554 for (i = 0; i < this->suspended_handles_; ++i)
00555
00556
00557 if ((this->current_suspended_info_[i].event_handle_ == handle ||
00558 this->current_suspended_info_[i].io_handle_ == handle) &&
00559
00560 !this->current_suspended_info_[i].resume_entry_)
00561 {
00562
00563 this->current_suspended_info_[i].resume_entry_ = 1;
00564
00565 ++this->handles_to_be_resumed_;
00566
00567 changes_required = 1;
00568 }
00569
00570
00571 for (i = 0; i < this->handles_to_be_added_; ++i)
00572
00573
00574 if ((this->to_be_added_info_[i].io_handle_ == handle ||
00575 this->to_be_added_info_[i].event_handle_ == handle) &&
00576
00577 this->to_be_added_info_[i].suspend_entry_)
00578 {
00579
00580 this->to_be_added_info_[i].suspend_entry_ = 0;
00581
00582 --this->handles_to_be_suspended_;
00583
00584 changes_required = 1;
00585 }
00586
00587 return 0;
00588 }
00589
00590 void
00591 ACE_WFMO_Reactor_Handler_Repository::unbind_all (void)
00592 {
00593 {
00594 ACE_GUARD (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_);
00595
00596 int dummy;
00597 size_t i;
00598
00599
00600 for (i = 0; i < this->max_handlep1_; ++i)
00601 this->unbind_i (this->current_handles_[i],
00602 ACE_Event_Handler::ALL_EVENTS_MASK,
00603 dummy);
00604
00605
00606 for (i = 0; i < this->suspended_handles_; ++i)
00607 this->unbind_i (this->current_suspended_info_[i].event_handle_,
00608 ACE_Event_Handler::ALL_EVENTS_MASK,
00609 dummy);
00610
00611
00612 for (i = 0; i < this->handles_to_be_added_; ++i)
00613 this->unbind_i (this->to_be_added_info_[i].event_handle_,
00614 ACE_Event_Handler::ALL_EVENTS_MASK,
00615 dummy);
00616
00617 }
00618
00619
00620
00621
00622
00623 this->wfmo_reactor_.wakeup_all_threads ();
00624 }
00625
00626 int
00627 ACE_WFMO_Reactor_Handler_Repository::bind_i (int io_entry,
00628 ACE_Event_Handler *event_handler,
00629 long network_events,
00630 ACE_HANDLE io_handle,
00631 ACE_HANDLE event_handle,
00632 int delete_event)
00633 {
00634
00635 if (event_handle == ACE_INVALID_HANDLE)
00636 event_handle = event_handler->get_handle ();
00637 if (this->invalid_handle (event_handle))
00638 return -1;
00639
00640 size_t current_size = this->max_handlep1_ +
00641 this->handles_to_be_added_ -
00642 this->handles_to_be_deleted_ +
00643 this->suspended_handles_;
00644
00645
00646
00647
00648 if (current_size < this->max_size_ &&
00649 this->handles_to_be_added_ < this->max_size_)
00650 {
00651
00652
00653 this->to_be_added_info_[this->handles_to_be_added_].set (event_handle,
00654 io_entry,
00655 event_handler,
00656 io_handle,
00657 network_events,
00658 delete_event);
00659
00660 ++this->handles_to_be_added_;
00661
00662 event_handler->add_reference ();
00663
00664
00665
00666 this->wfmo_reactor_.wakeup_all_threads ();
00667 }
00668 else
00669 {
00670 errno = EMFILE;
00671 return -1;
00672 }
00673
00674 return 0;
00675 }
00676
00677 int
00678 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
00679 {
00680
00681
00682 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
00683 {
00684 size_t i = 0;
00685 while (i < this->max_handlep1_)
00686 {
00687
00688
00689
00690
00691 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00692 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00693 ACE_Event_Handler *event_handler = 0;
00694
00695
00696 if (this->current_info_[i].delete_entry_)
00697 {
00698
00699
00700
00701
00702
00703
00704
00705 masks = this->current_info_[i].close_masks_;
00706 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00707 {
00708
00709 if (this->current_info_[i].io_entry_)
00710 handle = this->current_info_[i].io_handle_;
00711 else
00712 handle = this->current_handles_[i];
00713
00714
00715 event_handler = this->current_info_[i].event_handler_;
00716 }
00717
00718
00719 if (this->current_info_[i].delete_event_)
00720 ACE_OS::event_destroy (&this->current_handles_[i]);
00721
00722
00723 --this->handles_to_be_deleted_;
00724 }
00725
00726
00727 else if (this->current_info_[i].suspend_entry_)
00728 {
00729 this->current_suspended_info_ [this->suspended_handles_].set (this->current_handles_[i],
00730 this->current_info_[i]);
00731
00732 ++this->suspended_handles_;
00733
00734
00735 --this->handles_to_be_suspended_;
00736 }
00737
00738
00739
00740 if (this->current_info_[i].delete_entry_ ||
00741 this->current_info_[i].suspend_entry_ )
00742 {
00743 size_t last_valid_slot = this->max_handlep1_ - 1;
00744
00745
00746 if (i < last_valid_slot)
00747
00748 {
00749
00750 this->current_info_[i] =
00751 this->current_info_[last_valid_slot];
00752 this->current_handles_[i] =
00753 this->current_handles_[last_valid_slot];
00754 }
00755
00756 this->current_info_[last_valid_slot].reset ();
00757 this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
00758 --this->max_handlep1_;
00759 }
00760 else
00761 {
00762
00763
00764
00765 ++i;
00766 }
00767
00768
00769
00770 if (event_handler != 0)
00771 {
00772 int requires_reference_counting =
00773 event_handler->reference_counting_policy ().value () ==
00774 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00775
00776 event_handler->handle_close (handle, masks);
00777
00778 if (requires_reference_counting)
00779 {
00780 event_handler->remove_reference ();
00781 }
00782 }
00783 }
00784 }
00785
00786 return 0;
00787 }
00788
00789 int
00790 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
00791 {
00792
00793 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
00794 {
00795 size_t i = 0;
00796 while (i < this->suspended_handles_)
00797 {
00798
00799
00800
00801
00802 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00803 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00804 ACE_Event_Handler *event_handler = 0;
00805
00806
00807 if (this->current_suspended_info_[i].delete_entry_)
00808 {
00809
00810
00811
00812
00813
00814
00815
00816 masks = this->current_suspended_info_[i].close_masks_;
00817 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00818 {
00819
00820 if (this->current_suspended_info_[i].io_entry_)
00821 handle = this->current_suspended_info_[i].io_handle_;
00822 else
00823 handle = this->current_suspended_info_[i].event_handle_;
00824
00825
00826 event_handler = this->current_suspended_info_[i].event_handler_;
00827 }
00828
00829
00830 if (this->current_suspended_info_[i].delete_event_)
00831 ACE_OS::event_destroy (&this->current_suspended_info_[i].event_handle_);
00832
00833
00834 --this->handles_to_be_deleted_;
00835 }
00836
00837 else if (this->current_suspended_info_[i].resume_entry_)
00838 {
00839
00840 this->current_handles_[this->max_handlep1_] = this->current_suspended_info_[i].event_handle_;
00841
00842 this->current_info_[this->max_handlep1_].set (this->current_suspended_info_[i]);
00843 ++this->max_handlep1_;
00844
00845
00846 --this->handles_to_be_resumed_;
00847 }
00848
00849
00850
00851
00852 if (this->current_suspended_info_[i].resume_entry_ ||
00853 this->current_suspended_info_[i].delete_entry_)
00854 {
00855 size_t last_valid_slot = this->suspended_handles_ - 1;
00856
00857
00858
00859
00860
00861 if (i < last_valid_slot)
00862
00863 this->current_suspended_info_[i] =
00864 this->current_suspended_info_[last_valid_slot];
00865 this->current_suspended_info_[last_valid_slot].reset ();
00866 --this->suspended_handles_;
00867 }
00868 else
00869 {
00870
00871
00872
00873 ++i;
00874 }
00875
00876
00877
00878 if (event_handler != 0)
00879 {
00880 int requires_reference_counting =
00881 event_handler->reference_counting_policy ().value () ==
00882 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00883
00884 event_handler->handle_close (handle, masks);
00885
00886 if (requires_reference_counting)
00887 {
00888 event_handler->remove_reference ();
00889 }
00890 }
00891 }
00892 }
00893
00894 return 0;
00895 }
00896
00897 int
00898 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void)
00899 {
00900
00901 for (size_t i = 0; i < this->handles_to_be_added_; ++i)
00902 {
00903
00904
00905
00906
00907 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00908 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00909 ACE_Event_Handler *event_handler = 0;
00910
00911
00912 if (this->to_be_added_info_[i].delete_entry_)
00913 {
00914
00915
00916
00917
00918
00919
00920
00921 masks = this->to_be_added_info_[i].close_masks_;
00922 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00923 {
00924
00925 if (this->to_be_added_info_[i].io_entry_)
00926 handle = this->to_be_added_info_[i].io_handle_;
00927 else
00928 handle = this->to_be_added_info_[i].event_handle_;
00929
00930
00931 event_handler = this->to_be_added_info_[i].event_handler_;
00932 }
00933
00934
00935 if (this->to_be_added_info_[i].delete_event_)
00936 ACE_OS::event_destroy (&this->to_be_added_info_[i].event_handle_);
00937
00938
00939 --this->handles_to_be_deleted_;
00940 }
00941
00942
00943 else if (this->to_be_added_info_[i].suspend_entry_)
00944 {
00945 this->current_suspended_info_ [this->suspended_handles_].set (this->to_be_added_info_[i].event_handle_,
00946 this->to_be_added_info_[i]);
00947
00948 ++this->suspended_handles_;
00949
00950
00951 --this->handles_to_be_suspended_;
00952 }
00953
00954
00955 else
00956 {
00957
00958 this->current_handles_[this->max_handlep1_] = this->to_be_added_info_[i].event_handle_;
00959
00960 this->current_info_[this->max_handlep1_].set (this->to_be_added_info_[i]);
00961 ++this->max_handlep1_;
00962 }
00963
00964
00965 this->to_be_added_info_[i].reset ();
00966
00967
00968
00969 if (event_handler != 0)
00970 {
00971 int requires_reference_counting =
00972 event_handler->reference_counting_policy ().value () ==
00973 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00974
00975 event_handler->handle_close (handle, masks);
00976
00977 if (requires_reference_counting)
00978 {
00979 event_handler->remove_reference ();
00980 }
00981 }
00982 }
00983
00984
00985
00986 this->handles_to_be_added_ = 0;
00987
00988 return 0;
00989 }
00990
00991 void
00992 ACE_WFMO_Reactor_Handler_Repository::dump (void) const
00993 {
00994 #if defined (ACE_HAS_DUMP)
00995 size_t i = 0;
00996
00997 ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
00998
00999 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01000
01001 ACE_DEBUG ((LM_DEBUG,
01002 ACE_LIB_TEXT ("Max size = %d\n"),
01003 this->max_size_));
01004
01005 ACE_DEBUG ((LM_DEBUG,
01006 ACE_LIB_TEXT ("Current info table\n\n")));
01007 ACE_DEBUG ((LM_DEBUG,
01008 ACE_LIB_TEXT ("\tSize = %d\n"),
01009 this->max_handlep1_));
01010 ACE_DEBUG ((LM_DEBUG,
01011 ACE_LIB_TEXT ("\tHandles to be suspended = %d\n"),
01012 this->handles_to_be_suspended_));
01013
01014 for (i = 0; i < this->max_handlep1_; ++i)
01015 this->current_info_[i].dump (this->current_handles_[i]);
01016
01017 ACE_DEBUG ((LM_DEBUG,
01018 ACE_LIB_TEXT ("\n")));
01019
01020 ACE_DEBUG ((LM_DEBUG,
01021 ACE_LIB_TEXT ("To-be-added info table\n\n")));
01022 ACE_DEBUG ((LM_DEBUG,
01023 ACE_LIB_TEXT ("\tSize = %d\n"),
01024 this->handles_to_be_added_));
01025
01026 for (i = 0; i < this->handles_to_be_added_; ++i)
01027 this->to_be_added_info_[i].dump ();
01028
01029 ACE_DEBUG ((LM_DEBUG,
01030 ACE_LIB_TEXT ("\n")));
01031
01032 ACE_DEBUG ((LM_DEBUG,
01033 ACE_LIB_TEXT ("Suspended info table\n\n")));
01034 ACE_DEBUG ((LM_DEBUG,
01035 ACE_LIB_TEXT ("\tSize = %d\n"),
01036 this->suspended_handles_));
01037 ACE_DEBUG ((LM_DEBUG,
01038 ACE_LIB_TEXT ("\tHandles to be resumed = %d\n"),
01039 this->handles_to_be_resumed_));
01040
01041 for (i = 0; i < this->suspended_handles_; ++i)
01042 this->current_suspended_info_[i].dump ();
01043
01044 ACE_DEBUG ((LM_DEBUG,
01045 ACE_LIB_TEXT ("\n")));
01046
01047 ACE_DEBUG ((LM_DEBUG,
01048 ACE_LIB_TEXT ("Total handles to be deleted = %d\n"),
01049 this->handles_to_be_deleted_));
01050
01051 ACE_DEBUG ((LM_DEBUG,
01052 ACE_END_DUMP));
01053 #endif
01054 }
01055
01056
01057
01058 int
01059 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
01060 {
01061 ACE_NOTSUP_RETURN (-1);
01062 }
01063
01064 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
01065 ACE_Timer_Queue *tq,
01066 ACE_Reactor_Notify *notify)
01067 : signal_handler_ (0),
01068 delete_signal_handler_ (0),
01069 timer_queue_ (0),
01070 delete_timer_queue_ (0),
01071 delete_handler_rep_ (0),
01072 delete_notify_handler_ (0),
01073 lock_adapter_ (lock_),
01074 handler_rep_ (*this),
01075
01076 ok_to_wait_ (1),
01077
01078 wakeup_all_threads_ (0),
01079
01080 waiting_to_change_state_ (0),
01081 active_threads_ (0),
01082 owner_ (ACE_Thread::self ()),
01083 new_owner_ (0),
01084 change_state_thread_ (0),
01085 open_for_business_ (0),
01086 deactivated_ (0)
01087 {
01088 if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
01089 ACE_ERROR ((LM_ERROR,
01090 ACE_LIB_TEXT ("%p\n"),
01091 ACE_LIB_TEXT ("WFMO_Reactor")));
01092 }
01093
01094 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
01095 int unused,
01096 ACE_Sig_Handler *sh,
01097 ACE_Timer_Queue *tq,
01098 ACE_Reactor_Notify *notify)
01099 : signal_handler_ (0),
01100 delete_signal_handler_ (0),
01101 timer_queue_ (0),
01102 delete_timer_queue_ (0),
01103 delete_handler_rep_ (0),
01104 delete_notify_handler_ (0),
01105 lock_adapter_ (lock_),
01106 handler_rep_ (*this),
01107
01108 ok_to_wait_ (1),
01109
01110 wakeup_all_threads_ (0),
01111
01112 waiting_to_change_state_ (0),
01113 active_threads_ (0),
01114 owner_ (ACE_Thread::self ()),
01115 new_owner_ (0),
01116 change_state_thread_ (0),
01117 open_for_business_ (0),
01118 deactivated_ (0)
01119 {
01120 ACE_UNUSED_ARG (unused);
01121
01122 if (this->open (size, 0, sh, tq, 0, notify) == -1)
01123 ACE_ERROR ((LM_ERROR,
01124 ACE_LIB_TEXT ("%p\n"),
01125 ACE_LIB_TEXT ("WFMO_Reactor")));
01126 }
01127
01128 int
01129 ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &)
01130 {
01131 return -1;
01132 }
01133
01134 int
01135 ACE_WFMO_Reactor::open (size_t size,
01136 int unused,
01137 ACE_Sig_Handler *sh,
01138 ACE_Timer_Queue *tq,
01139 int disable_notify_pipe,
01140 ACE_Reactor_Notify *notify)
01141 {
01142 ACE_UNUSED_ARG (unused);
01143 ACE_UNUSED_ARG (disable_notify_pipe);
01144
01145
01146 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01147
01148
01149 if (this->open_for_business_)
01150 return -1;
01151
01152
01153 if (this->delete_timer_queue_)
01154 delete this->timer_queue_;
01155
01156 if (tq == 0)
01157 {
01158 ACE_NEW_RETURN (this->timer_queue_,
01159 ACE_Timer_Heap,
01160 -1);
01161 this->delete_timer_queue_ = 1;
01162 }
01163 else
01164 {
01165 this->timer_queue_ = tq;
01166 this->delete_timer_queue_ = 0;
01167 }
01168
01169
01170 if (this->delete_signal_handler_)
01171 delete this->signal_handler_;
01172
01173 if (sh == 0)
01174 {
01175 ACE_NEW_RETURN (this->signal_handler_,
01176 ACE_Sig_Handler,
01177 -1);
01178 this->delete_signal_handler_ = 1;
01179 }
01180 else
01181 {
01182 this->signal_handler_ = sh;
01183 this->delete_signal_handler_ = 0;
01184 }
01185
01186
01187 this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
01188 this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
01189
01190
01191 if (this->delete_handler_rep_)
01192 this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository ();
01193
01194
01195
01196 if (this->handler_rep_.open (size + 2) == -1)
01197 ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT ("%p\n"),
01198 ACE_LIB_TEXT ("opening handler repository")),
01199 -1);
01200 else
01201 this->delete_handler_rep_ = 1;
01202
01203 if (this->notify_handler_ != 0
01204 && this->delete_notify_handler_ != 0)
01205 delete this->notify_handler_;
01206
01207 this->notify_handler_ = notify;
01208
01209 if (this->notify_handler_ == 0)
01210 {
01211 ACE_NEW_RETURN (this->notify_handler_,
01212 ACE_WFMO_Reactor_Notify,
01213 -1);
01214
01215 if (this->notify_handler_ == 0)
01216 return -1;
01217 else
01218 this->delete_notify_handler_ = 1;
01219 }
01220
01221
01222
01223
01224
01225 if (this->notify_handler_->open (this, this->timer_queue_) == -1)
01226 ACE_ERROR_RETURN ((LM_ERROR,
01227 ACE_LIB_TEXT ("%p\n"),
01228 ACE_LIB_TEXT ("opening notify handler ")),
01229 -1);
01230
01231
01232 if (this->register_handler (&this->wakeup_all_threads_handler_,
01233 this->wakeup_all_threads_.handle ()) == -1)
01234 ACE_ERROR_RETURN ((LM_ERROR,
01235 ACE_LIB_TEXT ("%p\n"),
01236 ACE_LIB_TEXT ("registering thread wakeup handler")),
01237 -1);
01238
01239
01240
01241 if (this->handler_rep_.changes_required ())
01242 {
01243
01244 this->handler_rep_.make_changes ();
01245
01246
01247 this->wakeup_all_threads_.reset ();
01248 }
01249
01250
01251 this->open_for_business_ = 1;
01252
01253 return 0;
01254 }
01255
01256 int
01257 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
01258 {
01259 if (this->signal_handler_ != 0 && this->delete_signal_handler_ != 0)
01260 delete this->signal_handler_;
01261 this->signal_handler_ = signal_handler;
01262 this->delete_signal_handler_ = 0;
01263 return 0;
01264 }
01265
01266 ACE_Timer_Queue *
01267 ACE_WFMO_Reactor::timer_queue (void) const
01268 {
01269 return this->timer_queue_;
01270 }
01271
01272 int
01273 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue *tq)
01274 {
01275 if (this->timer_queue_ != 0 && this->delete_timer_queue_ != 0)
01276 delete this->timer_queue_;
01277 this->timer_queue_ = tq;
01278 this->delete_timer_queue_ = 0;
01279 return 0;
01280 }
01281
01282 int
01283 ACE_WFMO_Reactor::close (void)
01284 {
01285
01286 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01287
01288
01289 if (!this->open_for_business_)
01290 return -1;
01291
01292
01293 this->open_for_business_ = 0;
01294
01295 this->handler_rep_.close ();
01296
01297 return 0;
01298 }
01299
01300 ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void)
01301 {
01302
01303
01304
01305
01306 this->close ();
01307
01308
01309
01310 this->handler_rep_.make_changes ();
01311
01312 if (this->delete_timer_queue_)
01313 {
01314 delete this->timer_queue_;
01315 this->timer_queue_ = 0;
01316 this->delete_timer_queue_ = 0;
01317 }
01318
01319 if (this->delete_signal_handler_)
01320 {
01321 delete this->signal_handler_;
01322 this->signal_handler_ = 0;
01323 this->delete_signal_handler_ = 0;
01324 }
01325
01326 if (this->delete_notify_handler_)
01327 {
01328 delete this->notify_handler_;
01329 this->notify_handler_ = 0;
01330 this->delete_notify_handler_ = 0;
01331 }
01332 }
01333
01334 int
01335 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
01336 ACE_HANDLE io_handle,
01337 ACE_Event_Handler *event_handler,
01338 ACE_Reactor_Mask new_masks)
01339 {
01340
01341
01342
01343
01344 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
01345
01346 ACE_UNUSED_ARG (event_handle);
01347 ACE_UNUSED_ARG (io_handle);
01348 ACE_UNUSED_ARG (event_handler);
01349 ACE_UNUSED_ARG (new_masks);
01350 ACE_NOTSUP_RETURN (-1);
01351
01352 #else
01353
01354
01355 if (io_handle == ACE_INVALID_HANDLE)
01356 io_handle = event_handler->get_handle ();
01357
01358 if (this->handler_rep_.invalid_handle (io_handle))
01359 {
01360 errno = ERROR_INVALID_HANDLE;
01361 return -1;
01362 }
01363
01364 long new_network_events = 0;
01365 int delete_event = 0;
01366 auto_ptr <ACE_Auto_Event> event;
01367
01368
01369
01370 ACE_Reactor_Mask old_masks;
01371 int found = this->handler_rep_.modify_network_events_i (io_handle,
01372 new_masks,
01373 old_masks,
01374 new_network_events,
01375 event_handle,
01376 delete_event,
01377 ACE_Reactor::ADD_MASK);
01378
01379
01380
01381 if (event_handle == ACE_INVALID_HANDLE)
01382 {
01383
01384
01385 auto_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
01386 event = tmp;
01387 event_handle = event->handle ();
01388 delete_event = 1;
01389 }
01390
01391 int result = ::WSAEventSelect ((SOCKET) io_handle,
01392 event_handle,
01393 new_network_events);
01394
01395 if (found)
01396 return result;
01397 else if (result != SOCKET_ERROR &&
01398 this->handler_rep_.bind_i (1,
01399 event_handler,
01400 new_network_events,
01401 io_handle,
01402 event_handle,
01403 delete_event) != -1)
01404 {
01405
01406
01407 if (delete_event)
01408 {
01409
01410
01411
01412
01413
01414
01415
01416
01417 ACE_Errno_Guard guard (errno);
01418 event->handle (ACE_INVALID_HANDLE);
01419 event->remove ();
01420 }
01421 return 0;
01422 }
01423 else
01424 return -1;
01425
01426 #endif
01427
01428 }
01429
01430 int
01431 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle,
01432 ACE_Reactor_Mask new_masks,
01433 int operation)
01434 {
01435
01436 if (this->handler_rep_.invalid_handle (io_handle))
01437 return -1;
01438
01439 long new_network_events = 0;
01440 int delete_event = 0;
01441 ACE_HANDLE event_handle = ACE_INVALID_HANDLE;
01442
01443
01444
01445 ACE_Reactor_Mask old_masks;
01446 int found = this->handler_rep_.modify_network_events_i (io_handle,
01447 new_masks,
01448 old_masks,
01449 new_network_events,
01450 event_handle,
01451 delete_event,
01452 operation);
01453 if (found)
01454 {
01455 int result = ::WSAEventSelect ((SOCKET) io_handle,
01456 event_handle,
01457 new_network_events);
01458 if (result == 0)
01459 return old_masks;
01460 else
01461 return result;
01462 }
01463 else
01464 return -1;
01465 }
01466
01467
01468
01469 int
01470 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle,
01471 ACE_Reactor_Mask new_masks,
01472 ACE_Reactor_Mask &old_masks,
01473 long &new_network_events,
01474 ACE_HANDLE &event_handle,
01475 int &delete_event,
01476 int operation)
01477 {
01478 long *modified_network_events = &new_network_events;
01479 int found = 0;
01480 size_t i;
01481
01482
01483
01484
01485
01486 for (i = 0; i < this->max_handlep1_ && !found; ++i)
01487 if (io_handle == this->current_info_[i].io_handle_ &&
01488 !this->current_info_[i].delete_entry_)
01489 {
01490 found = 1;
01491 modified_network_events = &this->current_info_[i].network_events_;
01492 delete_event = this->current_info_[i].delete_event_;
01493 event_handle = this->current_handles_[i];
01494 }
01495
01496
01497
01498
01499
01500 for (i = 0; i < this->suspended_handles_ && !found; ++i)
01501 if (io_handle == this->current_suspended_info_[i].io_handle_ &&
01502 !this->current_suspended_info_[i].delete_entry_)
01503 {
01504 found = 1;
01505 modified_network_events = &this->current_suspended_info_[i].network_events_;
01506 delete_event = this->current_suspended_info_[i].delete_event_;
01507 event_handle = this->current_suspended_info_[i].event_handle_;
01508 }
01509
01510
01511
01512
01513
01514 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01515 if (io_handle == this->to_be_added_info_[i].io_handle_ &&
01516 !this->to_be_added_info_[i].delete_entry_)
01517 {
01518 found = 1;
01519 modified_network_events = &this->to_be_added_info_[i].network_events_;
01520 delete_event = this->to_be_added_info_[i].delete_event_;
01521 event_handle = this->to_be_added_info_[i].event_handle_;
01522 }
01523
01524 old_masks = this->bit_ops (*modified_network_events,
01525 new_masks,
01526 operation);
01527
01528 new_network_events = *modified_network_events;
01529
01530 return found;
01531 }
01532
01533 ACE_Event_Handler *
01534 ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle)
01535 {
01536 long existing_masks_ignored = 0;
01537 return this->handler (handle,
01538 existing_masks_ignored);
01539 }
01540
01541 ACE_Event_Handler *
01542 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01543 long &existing_masks)
01544 {
01545 int found = 0;
01546 size_t i = 0;
01547 ACE_Event_Handler *event_handler = 0;
01548 existing_masks = 0;
01549
01550
01551
01552
01553
01554
01555
01556 for (i = 0; i < this->max_handlep1_ && !found; ++i)
01557 if ((handle == this->current_info_[i].io_handle_ ||
01558 handle == this->current_handles_[i]) &&
01559 !this->current_info_[i].delete_entry_)
01560 {
01561 found = 1;
01562 event_handler = this->current_info_[i].event_handler_;
01563 existing_masks = this->current_info_[i].network_events_;
01564 }
01565
01566
01567
01568
01569
01570 for (i = 0; i < this->suspended_handles_ && !found; ++i)
01571 if ((handle == this->current_suspended_info_[i].io_handle_ ||
01572 handle == this->current_suspended_info_[i].event_handle_) &&
01573 !this->current_suspended_info_[i].delete_entry_)
01574 {
01575 found = 1;
01576 event_handler = this->current_suspended_info_[i].event_handler_;
01577 existing_masks = this->current_suspended_info_[i].network_events_;
01578 }
01579
01580
01581
01582
01583
01584 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01585 if ((handle == this->to_be_added_info_[i].io_handle_ ||
01586 handle == this->to_be_added_info_[i].event_handle_) &&
01587 !this->to_be_added_info_[i].delete_entry_)
01588 {
01589 found = 1;
01590 event_handler = this->to_be_added_info_[i].event_handler_;
01591 existing_masks = this->to_be_added_info_[i].network_events_;
01592 }
01593
01594 if (event_handler)
01595 event_handler->add_reference ();
01596
01597 return event_handler;
01598 }
01599
01600 int
01601 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01602 ACE_Reactor_Mask user_masks,
01603 ACE_Event_Handler **user_event_handler)
01604 {
01605 long existing_masks = 0;
01606 int found = 0;
01607
01608 ACE_Event_Handler_var safe_event_handler =
01609 this->handler (handle,
01610 existing_masks);
01611
01612 if (safe_event_handler.handler ())
01613 found = 1;
01614
01615 if (!found)
01616 return -1;
01617
01618
01619
01620 if (found &&
01621 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
01622 if (!ACE_BIT_ENABLED (existing_masks, FD_READ) &&
01623 !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
01624 found = 0;
01625
01626 if (found &&
01627 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::WRITE_MASK))
01628 if (!ACE_BIT_ENABLED (existing_masks, FD_WRITE))
01629 found = 0;
01630
01631 if (found &&
01632 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::EXCEPT_MASK))
01633 if (!ACE_BIT_ENABLED (existing_masks, FD_OOB))
01634 found = 0;
01635
01636 if (found &&
01637 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::ACCEPT_MASK))
01638 if (!ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
01639 found = 0;
01640
01641 if (found &&
01642 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::CONNECT_MASK))
01643 if (!ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
01644 found = 0;
01645
01646 if (found &&
01647 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::QOS_MASK))
01648 if (!ACE_BIT_ENABLED (existing_masks, FD_QOS))
01649 found = 0;
01650
01651 if (found &&
01652 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::GROUP_QOS_MASK))
01653 if (!ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
01654 found = 0;
01655
01656 if (found &&
01657 user_event_handler)
01658 *user_event_handler = safe_event_handler.release ();
01659
01660 if (found)
01661 return 0;
01662 else
01663 return -1;
01664 }
01665
01666
01667
01668 int
01669 ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
01670 int alertable)
01671 {
01672 ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
01673
01674
01675 if (!this->open_for_business_ || this->deactivated_)
01676 return -1;
01677
01678
01679
01680
01681 ACE_Countdown_Time countdown (max_wait_time);
01682
01683 int result;
01684 do
01685 {
01686
01687
01688
01689 result = this->ok_to_wait (max_wait_time, alertable);
01690 if (result != 1)
01691 return result;
01692
01693
01694 ++this->active_threads_;
01695
01696
01697 this->lock_.release ();
01698
01699
01700
01701 countdown.update ();
01702
01703
01704 int timeout = this->calculate_timeout (max_wait_time);
01705
01706
01707 DWORD wait_status = this->wait_for_multiple_events (timeout,
01708 alertable);
01709
01710
01711 result = this->safe_dispatch (wait_status);
01712 if (0 == result)
01713 {
01714
01715
01716
01717
01718
01719
01720
01721
01722 countdown.update ();
01723 if (0 == max_wait_time || max_wait_time->usec () == 0)
01724 break;
01725 }
01726 }
01727 while (result == 0);
01728
01729 return result;
01730 }
01731
01732 int
01733 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value *max_wait_time,
01734 int alertable)
01735 {
01736
01737
01738
01739
01740
01741 int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
01742
01743
01744 DWORD result = 0;
01745 while (1)
01746 {
01747 #if defined (ACE_HAS_PHARLAP)
01748
01749
01750 result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01751 this->atomic_wait_array_,
01752 TRUE,
01753 timeout);
01754
01755 if (result != WAIT_IO_COMPLETION)
01756 break;
01757
01758 #elif defined (ACE_HAS_WINCE)
01759 result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01760 this->atomic_wait_array_,
01761 TRUE,
01762 timeout);
01763 break;
01764 #else
01765 result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01766 this->atomic_wait_array_,
01767 TRUE,
01768 timeout,
01769 alertable);
01770
01771 if (result != WAIT_IO_COMPLETION)
01772 break;
01773
01774 #endif
01775 }
01776
01777 switch (result)
01778 {
01779 case WAIT_TIMEOUT:
01780 errno = ETIME;
01781 return 0;
01782 case WAIT_FAILED:
01783 case WAIT_ABANDONED_0:
01784 ACE_OS::set_errno_to_last_error ();
01785 return -1;
01786 default:
01787 break;
01788 }
01789
01790
01791 return 1;
01792 }
01793
01794 DWORD
01795 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
01796 int alertable)
01797 {
01798
01799
01800
01801
01802 #if defined (ACE_HAS_PHARLAP) || defined (ACE_HAS_WINCE)
01803
01804
01805 ACE_UNUSED_ARG (alertable);
01806 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 (),
01807 this->handler_rep_.handles (),
01808 FALSE,
01809 timeout);
01810 #else
01811 return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
01812 this->handler_rep_.handles (),
01813 FALSE,
01814 timeout,
01815 alertable);
01816 #endif
01817 }
01818
01819 DWORD
01820 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot)
01821 {
01822 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - slot,
01823 this->handler_rep_.handles () + slot,
01824 FALSE,
01825 0);
01826 }
01827
01828 int
01829 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value *max_wait_time)
01830 {
01831 ACE_Time_Value *time = 0;
01832 if (this->owner_ == ACE_Thread::self ())
01833 time = this->timer_queue_->calculate_timeout (max_wait_time);
01834 else
01835 time = max_wait_time;
01836
01837 if (time == 0)
01838 return INFINITE;
01839 else
01840 return time->msec ();
01841 }
01842
01843
01844 int
01845 ACE_WFMO_Reactor::expire_timers (void)
01846 {
01847
01848 if (ACE_Thread::self () == this->owner_)
01849
01850 return this->timer_queue_->expire ();
01851
01852 else
01853
01854 return 0;
01855 }
01856
01857 int
01858 ACE_WFMO_Reactor::dispatch (DWORD wait_status)
01859 {
01860 int handlers_dispatched = 0;
01861
01862
01863 handlers_dispatched += this->expire_timers ();
01864
01865 switch (wait_status)
01866 {
01867 case WAIT_FAILED:
01868 ACE_OS::set_errno_to_last_error ();
01869 return -1;
01870
01871 case WAIT_TIMEOUT:
01872 errno = ETIME;
01873 return handlers_dispatched;
01874
01875 #ifndef ACE_HAS_WINCE
01876 case WAIT_IO_COMPLETION:
01877 return handlers_dispatched;
01878 #endif // ACE_HAS_WINCE
01879
01880 default:
01881
01882 handlers_dispatched += this->dispatch_handles (wait_status);
01883 return handlers_dispatched;
01884 }
01885 }
01886
01887
01888
01889
01890 int
01891 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
01892 {
01893
01894
01895 DWORD dispatch_slot = 0;
01896
01897
01898 DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
01899
01900
01901
01902 DWORD nCount = max_handlep1;
01903
01904 for (int number_of_handlers_dispatched = 1;
01905 ;
01906 ++number_of_handlers_dispatched)
01907 {
01908 const bool ok = (
01909 #if ! (defined(__BORLANDC__) && (__BORLANDC__ >= 0x0530)) \
01910 && !defined (ghs) \
01911 && !defined (__MINGW32__) \
01912 && !(defined (_MSC_VER) && _MSC_VER >= 1300)
01913
01914
01915
01916 wait_status >= WAIT_OBJECT_0 &&
01917 #endif
01918 wait_status <= (WAIT_OBJECT_0 + nCount));
01919
01920 if (ok)
01921 dispatch_slot += wait_status - WAIT_OBJECT_0;
01922 else
01923
01924 dispatch_slot += wait_status - WAIT_ABANDONED_0;
01925
01926
01927 if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
01928 return -1;
01929
01930
01931 ++dispatch_slot;
01932
01933
01934 if (dispatch_slot >= max_handlep1)
01935 return number_of_handlers_dispatched;
01936
01937
01938 nCount = max_handlep1 - dispatch_slot;
01939
01940
01941 wait_status = this->poll_remaining_handles (dispatch_slot);
01942 switch (wait_status)
01943 {
01944 case WAIT_FAILED:
01945 ACE_OS::set_errno_to_last_error ();
01946
01947 case WAIT_TIMEOUT:
01948
01949 return number_of_handlers_dispatched;
01950 }
01951 }
01952 }
01953
01954 int
01955 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
01956 DWORD max_handlep1)
01957 {
01958
01959 if (slot == max_handlep1)
01960 return this->dispatch_window_messages ();
01961
01962
01963
01964
01965
01966
01967
01968 else if (!this->handler_rep_.scheduled_for_deletion (slot))
01969 {
01970 ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
01971
01972 if (this->handler_rep_.current_info ()[slot].io_entry_)
01973 return this->complex_dispatch_handler (slot,
01974 event_handle);
01975 else
01976 return this->simple_dispatch_handler (slot,
01977 event_handle);
01978 }
01979 else
01980
01981 return 0;
01982 }
01983
01984 int
01985 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
01986 ACE_HANDLE event_handle)
01987 {
01988
01989
01990
01991
01992 siginfo_t sig (event_handle);
01993
01994 ACE_Event_Handler *event_handler =
01995 this->handler_rep_.current_info ()[slot].event_handler_;
01996
01997 int requires_reference_counting =
01998 event_handler->reference_counting_policy ().value () ==
01999 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02000
02001 if (requires_reference_counting)
02002 {
02003 event_handler->add_reference ();
02004 }
02005
02006
02007 if (event_handler->handle_signal (0, &sig) == -1)
02008 this->handler_rep_.unbind (event_handle,
02009 ACE_Event_Handler::NULL_MASK);
02010
02011
02012 if (requires_reference_counting)
02013 {
02014 event_handler->remove_reference ();
02015 }
02016
02017 return 0;
02018 }
02019
02020 int
02021 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
02022 ACE_HANDLE event_handle)
02023 {
02024
02025
02026 ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
02027 this->handler_rep_.current_info ()[slot];
02028
02029 WSANETWORKEVENTS events;
02030 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02031 if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
02032 event_handle,
02033 &events) == SOCKET_ERROR)
02034 problems = ACE_Event_Handler::ALL_EVENTS_MASK;
02035 else
02036 {
02037
02038
02039
02040
02041
02042
02043
02044
02045
02046
02047
02048 events.lNetworkEvents &= current_info.network_events_;
02049 while (events.lNetworkEvents != 0)
02050 {
02051 ACE_Event_Handler *event_handler =
02052 current_info.event_handler_;
02053
02054 int reference_counting_required =
02055 event_handler->reference_counting_policy ().value () ==
02056 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02057
02058
02059 if (reference_counting_required)
02060 {
02061 event_handler->add_reference ();
02062 }
02063
02064
02065 problems |= this->upcall (current_info.event_handler_,
02066 current_info.io_handle_,
02067 events);
02068
02069
02070 if (reference_counting_required)
02071 {
02072 event_handler->remove_reference ();
02073 }
02074
02075 if (this->handler_rep_.scheduled_for_deletion (slot))
02076 break;
02077 }
02078 }
02079
02080 if (problems != ACE_Event_Handler::NULL_MASK
02081 && !this->handler_rep_.scheduled_for_deletion (slot) )
02082 this->handler_rep_.unbind (event_handle, problems);
02083
02084 return 0;
02085 }
02086
02087 ACE_Reactor_Mask
02088 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
02089 ACE_HANDLE io_handle,
02090 WSANETWORKEVENTS &events)
02091 {
02092
02093
02094 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02095
02096
02097
02098
02099
02100 long actual_events = events.lNetworkEvents;
02101 int action;
02102
02103 if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
02104 {
02105 action = event_handler->handle_output (io_handle);
02106 if (action <= 0)
02107 {
02108 ACE_CLR_BITS (actual_events, FD_WRITE);
02109 if (action == -1)
02110 ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
02111 }
02112 }
02113
02114 if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
02115 {
02116 if (events.iErrorCode[FD_CONNECT_BIT] == 0)
02117 {
02118
02119 action = event_handler->handle_output (io_handle);
02120 if (action <= 0)
02121 {
02122 ACE_CLR_BITS (actual_events, FD_CONNECT);
02123 if (action == -1)
02124 ACE_SET_BITS (problems,
02125 ACE_Event_Handler::CONNECT_MASK);
02126 }
02127 }
02128
02129 else
02130 {
02131 action = event_handler->handle_input (io_handle);
02132 if (action <= 0)
02133 {
02134 ACE_CLR_BITS (actual_events, FD_CONNECT);
02135 if (action == -1)
02136 ACE_SET_BITS (problems,
02137 ACE_Event_Handler::CONNECT_MASK);
02138 }
02139 }
02140 }
02141
02142 if (ACE_BIT_ENABLED (actual_events, FD_OOB))
02143 {
02144 action = event_handler->handle_exception (io_handle);
02145 if (action <= 0)
02146 {
02147 ACE_CLR_BITS (actual_events, FD_OOB);
02148 if (action == -1)
02149 ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
02150 }
02151 }
02152
02153 if (ACE_BIT_ENABLED (actual_events, FD_READ))
02154 {
02155 action = event_handler->handle_input (io_handle);
02156 if (action <= 0)
02157 {
02158 ACE_CLR_BITS (actual_events, FD_READ);
02159 if (action == -1)
02160 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02161 }
02162 }
02163
02164 if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
02165 && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
02166 {
02167 action = event_handler->handle_input (io_handle);
02168 if (action <= 0)
02169 {
02170 ACE_CLR_BITS (actual_events, FD_CLOSE);
02171 if (action == -1)
02172 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02173 }
02174 }
02175
02176 if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
02177 {
02178 action = event_handler->handle_input (io_handle);
02179 if (action <= 0)
02180 {
02181 ACE_CLR_BITS (actual_events, FD_ACCEPT);
02182 if (action == -1)
02183 ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
02184 }
02185 }
02186
02187 if (ACE_BIT_ENABLED (actual_events, FD_QOS))
02188 {
02189 action = event_handler->handle_qos (io_handle);
02190 if (action <= 0)
02191 {
02192 ACE_CLR_BITS (actual_events, FD_QOS);
02193 if (action == -1)
02194 ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
02195 }
02196 }
02197
02198 if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
02199 {
02200 action = event_handler->handle_group_qos (io_handle);
02201 if (action <= 0)
02202 {
02203 ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
02204 if (action == -1)
02205 ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
02206 }
02207 }
02208
02209 events.lNetworkEvents = actual_events;
02210 return problems;
02211 }
02212
02213
02214 int
02215 ACE_WFMO_Reactor::update_state (void)
02216 {
02217
02218 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02219
02220
02221 --this->active_threads_;
02222
02223
02224
02225 if (this->handler_rep_.changes_required () || this->new_owner ())
02226 {
02227 if (this->change_state_thread_ == 0)
02228
02229
02230 {
02231 this->change_state_thread_ = ACE_Thread::self ();
02232
02233 this->ok_to_wait_.reset ();
02234
02235 if (this->active_threads_ > 0)
02236
02237 {
02238
02239 this->wakeup_all_threads_.signal ();
02240
02241 monitor.release ();
02242
02243 this->waiting_to_change_state_.wait ();
02244
02245 monitor.acquire ();
02246 }
02247
02248
02249
02250
02251 while (this->handler_rep_.changes_required ())
02252
02253 this->handler_rep_.make_changes ();
02254 if (this->new_owner ())
02255
02256 this->change_owner ();
02257
02258 this->wakeup_all_threads_.reset ();
02259
02260 this->ok_to_wait_.signal ();
02261
02262 this->change_state_thread_ = 0;
02263 }
02264 else if (this->active_threads_ == 0)
02265
02266
02267
02268 this->waiting_to_change_state_.signal ();
02269 }
02270
02271
02272 else if (this->active_threads_ == 0)
02273
02274 this->wakeup_all_threads_.reset ();
02275
02276 return 0;
02277 }
02278
02279 void
02280 ACE_WFMO_Reactor::dump (void) const
02281 {
02282 #if defined (ACE_HAS_DUMP)
02283 ACE_TRACE ("ACE_WFMO_Reactor::dump");
02284
02285 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02286
02287 ACE_DEBUG ((LM_DEBUG,
02288 ACE_LIB_TEXT ("Count of currently active threads = %d\n"),
02289 this->active_threads_));
02290
02291 ACE_DEBUG ((LM_DEBUG,
02292 ACE_LIB_TEXT ("ID of owner thread = %d\n"),
02293 this->owner_));
02294
02295 this->handler_rep_.dump ();
02296 this->signal_handler_->dump ();
02297 this->timer_queue_->dump ();
02298
02299 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02300 #endif
02301 }
02302
02303 int
02304 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & ,
02305 ACE_Handle_Set & )
02306 {
02307 return -1;
02308 }
02309
02310 int
02311 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer & )
02312 {
02313 return 0;
02314 }
02315
02316 ACE_HANDLE
02317 ACE_WFMO_Reactor_Notify::notify_handle (void)
02318 {
02319 return ACE_INVALID_HANDLE;
02320 }
02321
02322 int
02323 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE ,
02324 ACE_Notification_Buffer &)
02325 {
02326 return 0;
02327 }
02328
02329 int
02330 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &)
02331 {
02332 return 0;
02333 }
02334
02335 int
02336 ACE_WFMO_Reactor_Notify::close (void)
02337 {
02338 return -1;
02339 }
02340
02341 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
02342 : timer_queue_ (0),
02343 message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
02344 max_notifies * sizeof (ACE_Notification_Buffer)),
02345 max_notify_iterations_ (-1)
02346 {
02347 }
02348
02349 int
02350 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor,
02351 ACE_Timer_Queue *timer_queue,
02352 int ignore_notify)
02353 {
02354 ACE_UNUSED_ARG (ignore_notify);
02355 timer_queue_ = timer_queue;
02356 return wfmo_reactor->register_handler (this);
02357 }
02358
02359 ACE_HANDLE
02360 ACE_WFMO_Reactor_Notify::get_handle (void) const
02361 {
02362 return this->wakeup_one_thread_.handle ();
02363 }
02364
02365
02366
02367 int
02368 ACE_WFMO_Reactor_Notify::handle_signal (int signum,
02369 siginfo_t *siginfo,
02370 ucontext_t *)
02371 {
02372 ACE_UNUSED_ARG (signum);
02373
02374
02375 if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
02376 return -1;
02377
02378
02379
02380
02381
02382
02383 for (int i = 1; ; ++i)
02384 {
02385 ACE_Message_Block *mb = 0;
02386
02387 ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
02388 if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
02389 {
02390 if (errno == EWOULDBLOCK)
02391
02392
02393 return 0;
02394 else
02395 return -1;
02396 }
02397 else
02398 {
02399 ACE_Notification_Buffer *buffer =
02400 reinterpret_cast <ACE_Notification_Buffer *> (mb->base ());
02401
02402
02403
02404
02405
02406 if (buffer->eh_ != 0)
02407 {
02408 ACE_Event_Handler *event_handler =
02409 buffer->eh_;
02410
02411 int requires_reference_counting =
02412 event_handler->reference_counting_policy ().value () ==
02413 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02414
02415 int result = 0;
02416
02417 switch (buffer->mask_)
02418 {
02419 case ACE_Event_Handler::READ_MASK:
02420 case ACE_Event_Handler::ACCEPT_MASK:
02421 result = event_handler->handle_input (ACE_INVALID_HANDLE);
02422 break;
02423 case ACE_Event_Handler::WRITE_MASK:
02424 result = event_handler->handle_output (ACE_INVALID_HANDLE);
02425 break;
02426 case ACE_Event_Handler::EXCEPT_MASK:
02427 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
02428 break;
02429 case ACE_Event_Handler::QOS_MASK:
02430 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
02431 break;
02432 case ACE_Event_Handler::GROUP_QOS_MASK:
02433 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
02434 break;
02435 default:
02436 ACE_ERROR ((LM_ERROR,
02437 ACE_LIB_TEXT ("invalid mask = %d\n"),
02438 buffer->mask_));
02439 break;
02440 }
02441
02442 if (result == -1)
02443 event_handler->handle_close (ACE_INVALID_HANDLE,
02444 ACE_Event_Handler::EXCEPT_MASK);
02445
02446 if (requires_reference_counting)
02447 {
02448 event_handler->remove_reference ();
02449 }
02450 }
02451
02452
02453
02454 mb->release ();
02455
02456
02457
02458
02459 if (i == this->max_notify_iterations_)
02460 {
02461
02462
02463 if (!this->message_queue_.is_empty ())
02464 this->wakeup_one_thread_.signal ();
02465
02466
02467 return 0;
02468 }
02469 }
02470 }
02471 }
02472
02473
02474
02475
02476
02477 int
02478 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
02479 ACE_Reactor_Mask mask,
02480 ACE_Time_Value *timeout)
02481 {
02482 if (event_handler != 0)
02483 {
02484 ACE_Message_Block *mb = 0;
02485 ACE_NEW_RETURN (mb,
02486 ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
02487 -1);
02488
02489 ACE_Notification_Buffer *buffer =
02490 (ACE_Notification_Buffer *) mb->base ();
02491 buffer->eh_ = event_handler;
02492 buffer->mask_ = mask;
02493
02494
02495
02496
02497 if (timeout != 0)
02498 *timeout += timer_queue_->gettimeofday ();
02499
02500 if (this->message_queue_.enqueue_tail
02501 (mb, timeout) == -1)
02502 {
02503 mb->release ();
02504 return -1;
02505 }
02506
02507 event_handler->add_reference ();
02508 }
02509
02510 return this->wakeup_one_thread_.signal ();
02511 }
02512
02513 void
02514 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations)
02515 {
02516 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02517
02518 if (iterations == 0)
02519 iterations = 1;
02520
02521 this->max_notify_iterations_ = iterations;
02522 }
02523
02524 int
02525 ACE_WFMO_Reactor_Notify::max_notify_iterations (void)
02526 {
02527 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02528 return this->max_notify_iterations_;
02529 }
02530
02531 int
02532 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
02533 ACE_Reactor_Mask mask)
02534 {
02535 ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
02536
02537
02538
02539
02540
02541
02542 if (this->message_queue_.is_empty ())
02543 return 0;
02544
02545
02546
02547
02548
02549
02550
02551 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
02552
02553
02554
02555 ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
02556
02557 size_t queue_size = this->message_queue_.message_count ();
02558 int number_purged = 0;
02559
02560 size_t index;
02561
02562 for (index = 0; index < queue_size; ++index)
02563 {
02564 ACE_Message_Block *mb = 0;
02565 if (-1 == this->message_queue_.dequeue_head (mb))
02566 return -1;
02567
02568 ACE_Notification_Buffer *buffer =
02569 reinterpret_cast<ACE_Notification_Buffer *> (mb->base ());
02570
02571
02572
02573
02574
02575 if ((0 != buffer->eh_) &&
02576 (0 == eh || eh == buffer->eh_) &&
02577 ACE_BIT_DISABLED (buffer->mask_, ~mask))
02578
02579
02580 {
02581 ACE_Event_Handler *event_handler = buffer->eh_;
02582
02583 event_handler->remove_reference ();
02584
02585 mb->release ();
02586 ++number_purged;
02587 }
02588 else
02589 {
02590
02591
02592
02593
02594 if ((0 != buffer->eh_) &&
02595 (0 == eh || eh == buffer->eh_))
02596 ACE_CLR_BITS(buffer->mask_, mask);
02597 if (-1 == local_queue.enqueue_head (mb))
02598 return -1;
02599 }
02600 }
02601
02602 if (this->message_queue_.message_count ())
02603 {
02604 ACE_ASSERT (0);
02605 return -1;
02606 }
02607
02608
02609
02610 queue_size = local_queue.message_count ();
02611 for (index = 0; index < queue_size; ++index)
02612 {
02613 ACE_Message_Block *mb = 0;
02614 if (-1 == local_queue.dequeue_head (mb))
02615 {
02616 ACE_ASSERT (0);
02617 return -1;
02618 }
02619
02620 if (-1 == this->message_queue_.enqueue_head (mb))
02621 {
02622 ACE_ASSERT (0);
02623 return -1;
02624 }
02625 }
02626
02627 return number_purged;
02628 }
02629
02630 void
02631 ACE_WFMO_Reactor_Notify::dump (void) const
02632 {
02633 #if defined (ACE_HAS_DUMP)
02634 ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
02635 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02636 this->timer_queue_->dump ();
02637 ACE_DEBUG ((LM_DEBUG,
02638 ACE_LIB_TEXT ("Max. iteration: %d\n"),
02639 this->max_notify_iterations_));
02640 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02641 #endif
02642 }
02643
02644 void
02645 ACE_WFMO_Reactor::max_notify_iterations (int iterations)
02646 {
02647 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02648 ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
02649
02650
02651 this->notify_handler_->max_notify_iterations (iterations);
02652 }
02653
02654 int
02655 ACE_WFMO_Reactor::max_notify_iterations (void)
02656 {
02657 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02658 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02659
02660 return this->notify_handler_->max_notify_iterations ();
02661 }
02662
02663 int
02664 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh,
02665 ACE_Reactor_Mask mask)
02666 {
02667 ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
02668 if (this->notify_handler_ == 0)
02669 return 0;
02670 else
02671 return this->notify_handler_->purge_pending_notifications (eh, mask);
02672 }
02673
02674 int
02675 ACE_WFMO_Reactor::resumable_handler (void)
02676 {
02677 ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
02678 return 0;
02679 }
02680
02681
02682
02683 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
02684 int
02685 WSAEventSelect (SOCKET ,
02686 WSAEVENT ,
02687 long )
02688 {
02689 return -1;
02690 }
02691
02692 int
02693 WSAEnumNetworkEvents (SOCKET ,
02694 WSAEVENT ,
02695 LPWSANETWORKEVENTS )
02696 {
02697 return -1;
02698 }
02699 #endif
02700
02701 ACE_END_VERSIONED_NAMESPACE_DECL
02702
02703 #endif