00001
00002
00003 #include "ace/WFMO_Reactor.h"
00004
00005 #if defined (ACE_WIN32)
00006
00007 #include "ace/Handle_Set.h"
00008 #include "ace/Timer_Heap.h"
00009 #include "ace/Thread.h"
00010 #include "ace/OS_NS_errno.h"
00011 #include "ace/Null_Condition.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/WFMO_Reactor.inl"
00015 #endif
00016
00017 ACE_RCSID(ace, WFMO_Reactor, "$Id: WFMO_Reactor.cpp 85125 2009-04-20 16:47:38Z johnnyw $")
00018
00019 #include "ace/Auto_Ptr.h"
00020
00021 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor &wfmo_reactor)
00024 : wfmo_reactor_ (wfmo_reactor)
00025 {
00026 }
00027
00028 int
00029 ACE_WFMO_Reactor_Handler_Repository::open (size_t size)
00030 {
00031 if (size > MAXIMUM_WAIT_OBJECTS)
00032 ACE_ERROR_RETURN ((LM_ERROR,
00033 ACE_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
00034 size,
00035 MAXIMUM_WAIT_OBJECTS),
00036 -1);
00037
00038
00039 ACE_NEW_RETURN (this->current_handles_,
00040 ACE_HANDLE[size],
00041 -1);
00042 ACE_NEW_RETURN (this->current_info_,
00043 Current_Info[size],
00044 -1);
00045 ACE_NEW_RETURN (this->current_suspended_info_,
00046 Suspended_Info[size],
00047 -1);
00048 ACE_NEW_RETURN (this->to_be_added_info_,
00049 To_Be_Added_Info[size],
00050 -1);
00051
00052
00053 this->max_size_ = size;
00054 this->max_handlep1_ = 0;
00055 this->suspended_handles_ = 0;
00056 this->handles_to_be_added_ = 0;
00057 this->handles_to_be_deleted_ = 0;
00058 this->handles_to_be_suspended_ = 0;
00059 this->handles_to_be_resumed_ = 0;
00060
00061 for (size_t i = 0; i < size; ++i)
00062 this->current_handles_[i] = ACE_INVALID_HANDLE;
00063
00064 return 0;
00065 }
00066
00067 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void)
00068 {
00069
00070 delete [] this->current_handles_;
00071 delete [] this->current_info_;
00072 delete [] this->current_suspended_info_;
00073 delete [] this->to_be_added_info_;
00074 }
00075
00076 ACE_Reactor_Mask
00077 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks,
00078 ACE_Reactor_Mask change_masks,
00079 int operation)
00080 {
00081
00082
00083
00084 ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK;
00085
00086 if (ACE_BIT_ENABLED (existing_masks, FD_READ)
00087 || ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
00088 ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK);
00089
00090 if (ACE_BIT_ENABLED (existing_masks, FD_WRITE))
00091 ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK);
00092
00093 if (ACE_BIT_ENABLED (existing_masks, FD_OOB))
00094 ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK);
00095
00096 if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
00097 ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK);
00098
00099 if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
00100 ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK);
00101
00102 if (ACE_BIT_ENABLED (existing_masks, FD_QOS))
00103 ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK);
00104
00105 if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
00106 ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK);
00107
00108 switch (operation)
00109 {
00110 case ACE_Reactor::CLR_MASK:
00111
00112
00113 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00114 {
00115 ACE_CLR_BITS (existing_masks, FD_READ);
00116 ACE_CLR_BITS (existing_masks, FD_CLOSE);
00117 }
00118
00119 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00120 ACE_CLR_BITS (existing_masks, FD_WRITE);
00121
00122 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00123 ACE_CLR_BITS (existing_masks, FD_OOB);
00124
00125 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00126 ACE_CLR_BITS (existing_masks, FD_ACCEPT);
00127
00128 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00129 ACE_CLR_BITS (existing_masks, FD_CONNECT);
00130
00131 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00132 ACE_CLR_BITS (existing_masks, FD_QOS);
00133
00134 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00135 ACE_CLR_BITS (existing_masks, FD_GROUP_QOS);
00136
00137 break;
00138
00139 case ACE_Reactor::SET_MASK:
00140
00141
00142 existing_masks = 0;
00143
00144
00145 case ACE_Reactor::ADD_MASK:
00146
00147
00148
00149 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00150 {
00151 ACE_SET_BITS (existing_masks, FD_READ);
00152 ACE_SET_BITS (existing_masks, FD_CLOSE);
00153 }
00154
00155 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00156 ACE_SET_BITS (existing_masks, FD_WRITE);
00157
00158 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00159 ACE_SET_BITS (existing_masks, FD_OOB);
00160
00161 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00162 ACE_SET_BITS (existing_masks, FD_ACCEPT);
00163
00164 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00165 ACE_SET_BITS (existing_masks, FD_CONNECT);
00166
00167 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00168 ACE_SET_BITS (existing_masks, FD_QOS);
00169
00170 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00171 ACE_SET_BITS (existing_masks, FD_GROUP_QOS);
00172
00173 break;
00174
00175 case ACE_Reactor::GET_MASK:
00176
00177
00178
00179
00180 ACE_UNUSED_ARG (change_masks);
00181
00182 break;
00183 }
00184
00185 return old_masks;
00186 }
00187
00188 int
00189 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle,
00190 ACE_Reactor_Mask mask,
00191 bool &changes_required)
00192 {
00193 int error = 0;
00194
00195
00196
00197 size_t const original_handle_count = this->handles_to_be_deleted_;
00198 size_t i;
00199
00200
00201
00202
00203
00204
00205 for (i = 0; i < this->max_handlep1_ && error == 0; ++i)
00206
00207
00208 if ((this->current_handles_[i] == handle
00209 || this->current_info_[i].io_handle_ == handle)
00210 &&
00211 !this->current_info_[i].delete_entry_)
00212 {
00213 if (this->remove_handler_i (i, mask) == -1)
00214 error = 1;
00215 }
00216
00217
00218 for (i = 0; i < this->suspended_handles_ && error == 0; ++i)
00219
00220
00221 if ((this->current_suspended_info_[i].io_handle_ == handle
00222 || this->current_suspended_info_[i].event_handle_ == handle)
00223 &&
00224
00225 !this->current_suspended_info_[i].delete_entry_)
00226 {
00227 if (this->remove_suspended_handler_i (i, mask) == -1)
00228 error = 1;
00229 }
00230
00231
00232 for (i = 0; i < this->handles_to_be_added_ && error == 0; ++i)
00233
00234
00235 if ((this->to_be_added_info_[i].io_handle_ == handle
00236 || this->to_be_added_info_[i].event_handle_ == handle)
00237 &&
00238
00239 !this->to_be_added_info_[i].delete_entry_)
00240 {
00241 if (this->remove_to_be_added_handler_i (i, mask) == -1)
00242 error = 1;
00243 }
00244
00245
00246
00247 if (original_handle_count < this->handles_to_be_deleted_)
00248 changes_required = true;
00249
00250 return error ? -1 : 0;
00251 }
00252
00253 int
00254 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
00255 ACE_Reactor_Mask to_be_removed_masks)
00256 {
00257
00258 if (this->current_info_[slot].io_entry_)
00259 {
00260
00261
00262 this->bit_ops (this->current_info_[slot].network_events_,
00263 to_be_removed_masks,
00264 ACE_Reactor::CLR_MASK);
00265
00266
00267
00268
00269
00270
00271 ::WSAEventSelect ((SOCKET) this->current_info_[slot].io_handle_,
00272 this->current_handles_[slot],
00273 this->current_info_[slot].network_events_);
00274 }
00275
00276 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00277
00278 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00279 else
00280
00281 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00282
00283
00284
00285 if (this->current_info_[slot].suspend_entry_)
00286 {
00287
00288 this->current_info_[slot].suspend_entry_ = false;
00289
00290 --this->handles_to_be_suspended_;
00291 }
00292
00293
00294
00295
00296 if (this->current_info_[slot].network_events_ == 0)
00297 {
00298
00299 this->current_info_[slot].delete_entry_ = true;
00300
00301 this->current_info_[slot].close_masks_ = to_be_removed_masks;
00302
00303 ++this->handles_to_be_deleted_;
00304 }
00305
00306
00307
00308
00309
00310
00311 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00312 {
00313 ACE_HANDLE handle = this->current_info_[slot].io_handle_;
00314 this->current_info_[slot].event_handler_->handle_close (handle,
00315 to_be_removed_masks);
00316 }
00317
00318 return 0;
00319 }
00320
00321 int
00322 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot,
00323 ACE_Reactor_Mask to_be_removed_masks)
00324 {
00325
00326 if (this->current_suspended_info_[slot].io_entry_)
00327 {
00328
00329
00330 this->bit_ops (this->current_suspended_info_[slot].network_events_,
00331 to_be_removed_masks,
00332 ACE_Reactor::CLR_MASK);
00333
00334
00335
00336
00337
00338
00339 ::WSAEventSelect ((SOCKET) this->current_suspended_info_[slot].io_handle_,
00340 this->current_suspended_info_[slot].event_handle_,
00341 this->current_suspended_info_[slot].network_events_);
00342 }
00343
00344 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00345
00346 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00347 else
00348
00349 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00350
00351
00352
00353 if (this->current_suspended_info_[slot].resume_entry_)
00354 {
00355
00356 this->current_suspended_info_[slot].resume_entry_ = false;
00357
00358 --this->handles_to_be_resumed_;
00359 }
00360
00361
00362
00363
00364 if (this->current_suspended_info_[slot].network_events_ == 0)
00365 {
00366
00367 this->current_suspended_info_[slot].delete_entry_ = true;
00368
00369 this->current_suspended_info_[slot].close_masks_ = to_be_removed_masks;
00370
00371 ++this->handles_to_be_deleted_;
00372 }
00373
00374
00375
00376
00377
00378 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00379 {
00380 ACE_HANDLE handle = this->current_suspended_info_[slot].io_handle_;
00381 this->current_suspended_info_[slot].event_handler_->handle_close (handle,
00382 to_be_removed_masks);
00383 }
00384
00385 return 0;
00386 }
00387
00388 int
00389 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot,
00390 ACE_Reactor_Mask to_be_removed_masks)
00391 {
00392
00393 if (this->to_be_added_info_[slot].io_entry_)
00394 {
00395
00396
00397 this->bit_ops (this->to_be_added_info_[slot].network_events_,
00398 to_be_removed_masks,
00399 ACE_Reactor::CLR_MASK);
00400
00401
00402
00403
00404
00405
00406 ::WSAEventSelect ((SOCKET) this->to_be_added_info_[slot].io_handle_,
00407 this->to_be_added_info_[slot].event_handle_,
00408 this->to_be_added_info_[slot].network_events_);
00409 }
00410
00411 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00412
00413 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00414 else
00415
00416 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00417
00418
00419
00420 if (this->to_be_added_info_[slot].suspend_entry_)
00421 {
00422
00423 this->to_be_added_info_[slot].suspend_entry_ = false;
00424
00425 --this->handles_to_be_suspended_;
00426 }
00427
00428
00429
00430
00431 if (this->to_be_added_info_[slot].network_events_ == 0)
00432 {
00433
00434 this->to_be_added_info_[slot].delete_entry_ = true;
00435
00436 this->to_be_added_info_[slot].close_masks_ = to_be_removed_masks;
00437
00438 ++this->handles_to_be_deleted_;
00439 }
00440
00441
00442
00443
00444
00445 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00446 {
00447 ACE_HANDLE handle = this->to_be_added_info_[slot].io_handle_;
00448 this->to_be_added_info_[slot].event_handler_->handle_close (handle,
00449 to_be_removed_masks);
00450 }
00451
00452 return 0;
00453 }
00454
00455 int
00456 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle,
00457 bool &changes_required)
00458 {
00459 size_t i = 0;
00460
00461
00462
00463
00464
00465
00466 for (i = 0; i < this->max_handlep1_; ++i)
00467
00468
00469 if ((this->current_handles_[i] == handle ||
00470 this->current_info_[i].io_handle_ == handle) &&
00471
00472 !this->current_info_[i].suspend_entry_)
00473 {
00474
00475 this->current_info_[i].suspend_entry_ = true;
00476
00477 ++this->handles_to_be_suspended_;
00478
00479 changes_required = true;
00480 }
00481
00482
00483 for (i = 0; i < this->suspended_handles_; ++i)
00484
00485
00486 if ((this->current_suspended_info_[i].event_handle_ == handle ||
00487 this->current_suspended_info_[i].io_handle_ == handle) &&
00488
00489 this->current_suspended_info_[i].resume_entry_)
00490 {
00491
00492 this->current_suspended_info_[i].resume_entry_ = false;
00493
00494 --this->handles_to_be_resumed_;
00495
00496 changes_required = true;
00497 }
00498
00499
00500 for (i = 0; i < this->handles_to_be_added_; ++i)
00501
00502
00503 if ((this->to_be_added_info_[i].io_handle_ == handle ||
00504 this->to_be_added_info_[i].event_handle_ == handle) &&
00505
00506 !this->to_be_added_info_[i].suspend_entry_)
00507 {
00508
00509 this->to_be_added_info_[i].suspend_entry_ = true;
00510
00511 ++this->handles_to_be_suspended_;
00512
00513 changes_required = true;
00514 }
00515
00516 return 0;
00517 }
00518
00519 int
00520 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle,
00521 bool &changes_required)
00522 {
00523 size_t i = 0;
00524
00525
00526
00527
00528
00529
00530 for (i = 0; i < this->max_handlep1_; ++i)
00531
00532
00533 if ((this->current_handles_[i] == handle ||
00534 this->current_info_[i].io_handle_ == handle) &&
00535
00536 this->current_info_[i].suspend_entry_)
00537 {
00538
00539 this->current_info_[i].suspend_entry_ = false;
00540
00541 --this->handles_to_be_suspended_;
00542
00543 changes_required = true;
00544 }
00545
00546
00547 for (i = 0; i < this->suspended_handles_; ++i)
00548
00549
00550 if ((this->current_suspended_info_[i].event_handle_ == handle ||
00551 this->current_suspended_info_[i].io_handle_ == handle) &&
00552
00553 !this->current_suspended_info_[i].resume_entry_)
00554 {
00555
00556 this->current_suspended_info_[i].resume_entry_ = true;
00557
00558 ++this->handles_to_be_resumed_;
00559
00560 changes_required = true;
00561 }
00562
00563
00564 for (i = 0; i < this->handles_to_be_added_; ++i)
00565
00566
00567 if ((this->to_be_added_info_[i].io_handle_ == handle ||
00568 this->to_be_added_info_[i].event_handle_ == handle) &&
00569
00570 this->to_be_added_info_[i].suspend_entry_)
00571 {
00572
00573 this->to_be_added_info_[i].suspend_entry_ = false;
00574
00575 --this->handles_to_be_suspended_;
00576
00577 changes_required = true;
00578 }
00579
00580 return 0;
00581 }
00582
00583 void
00584 ACE_WFMO_Reactor_Handler_Repository::unbind_all (void)
00585 {
00586 {
00587 ACE_GUARD (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_);
00588
00589 bool dummy;
00590 size_t i;
00591
00592
00593 for (i = 0; i < this->max_handlep1_; ++i)
00594 this->unbind_i (this->current_handles_[i],
00595 ACE_Event_Handler::ALL_EVENTS_MASK,
00596 dummy);
00597
00598
00599 for (i = 0; i < this->suspended_handles_; ++i)
00600 this->unbind_i (this->current_suspended_info_[i].event_handle_,
00601 ACE_Event_Handler::ALL_EVENTS_MASK,
00602 dummy);
00603
00604
00605 for (i = 0; i < this->handles_to_be_added_; ++i)
00606 this->unbind_i (this->to_be_added_info_[i].event_handle_,
00607 ACE_Event_Handler::ALL_EVENTS_MASK,
00608 dummy);
00609 }
00610
00611
00612
00613
00614
00615 this->wfmo_reactor_.wakeup_all_threads ();
00616 }
00617
00618 int
00619 ACE_WFMO_Reactor_Handler_Repository::bind_i (bool io_entry,
00620 ACE_Event_Handler *event_handler,
00621 long network_events,
00622 ACE_HANDLE io_handle,
00623 ACE_HANDLE event_handle,
00624 bool delete_event)
00625 {
00626 if (event_handler == 0)
00627 return -1;
00628
00629
00630 if (event_handle == ACE_INVALID_HANDLE)
00631 event_handle = event_handler->get_handle ();
00632 if (this->invalid_handle (event_handle))
00633 return -1;
00634
00635 size_t current_size = this->max_handlep1_ +
00636 this->handles_to_be_added_ -
00637 this->handles_to_be_deleted_ +
00638 this->suspended_handles_;
00639
00640
00641
00642
00643 if (current_size < this->max_size_ &&
00644 this->handles_to_be_added_ < this->max_size_)
00645 {
00646
00647
00648 this->to_be_added_info_[this->handles_to_be_added_].set (event_handle,
00649 io_entry,
00650 event_handler,
00651 io_handle,
00652 network_events,
00653 delete_event);
00654
00655 ++this->handles_to_be_added_;
00656
00657 event_handler->add_reference ();
00658
00659
00660
00661 this->wfmo_reactor_.wakeup_all_threads ();
00662 }
00663 else
00664 {
00665 errno = EMFILE;
00666 return -1;
00667 }
00668
00669 return 0;
00670 }
00671
00672 int
00673 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
00674 {
00675
00676
00677 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
00678 {
00679 size_t i = 0;
00680 while (i < this->max_handlep1_)
00681 {
00682
00683
00684
00685
00686 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00687 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00688 ACE_Event_Handler *event_handler = 0;
00689
00690
00691 if (this->current_info_[i].delete_entry_)
00692 {
00693
00694
00695
00696
00697
00698
00699
00700 masks = this->current_info_[i].close_masks_;
00701 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00702 {
00703
00704 if (this->current_info_[i].io_entry_)
00705 handle = this->current_info_[i].io_handle_;
00706 else
00707 handle = this->current_handles_[i];
00708
00709
00710 event_handler = this->current_info_[i].event_handler_;
00711 }
00712
00713
00714 if (this->current_info_[i].delete_event_)
00715 ACE_OS::event_destroy (&this->current_handles_[i]);
00716
00717
00718 --this->handles_to_be_deleted_;
00719 }
00720
00721
00722 else if (this->current_info_[i].suspend_entry_)
00723 {
00724 this->current_suspended_info_ [this->suspended_handles_].set (this->current_handles_[i],
00725 this->current_info_[i]);
00726
00727 ++this->suspended_handles_;
00728
00729
00730 --this->handles_to_be_suspended_;
00731 }
00732
00733
00734
00735 if (this->current_info_[i].delete_entry_ ||
00736 this->current_info_[i].suspend_entry_ )
00737 {
00738 size_t last_valid_slot = this->max_handlep1_ - 1;
00739
00740
00741 if (i < last_valid_slot)
00742
00743 {
00744
00745 this->current_info_[i] =
00746 this->current_info_[last_valid_slot];
00747 this->current_handles_[i] =
00748 this->current_handles_[last_valid_slot];
00749 }
00750
00751 this->current_info_[last_valid_slot].reset ();
00752 this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
00753 --this->max_handlep1_;
00754 }
00755 else
00756 {
00757
00758
00759
00760 ++i;
00761 }
00762
00763
00764
00765 if (event_handler != 0)
00766 {
00767 bool const requires_reference_counting =
00768 event_handler->reference_counting_policy ().value () ==
00769 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00770
00771 event_handler->handle_close (handle, masks);
00772
00773 if (requires_reference_counting)
00774 {
00775 event_handler->remove_reference ();
00776 }
00777 }
00778 }
00779 }
00780
00781 return 0;
00782 }
00783
00784 int
00785 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
00786 {
00787
00788 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
00789 {
00790 size_t i = 0;
00791 while (i < this->suspended_handles_)
00792 {
00793
00794
00795
00796
00797 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00798 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00799 ACE_Event_Handler *event_handler = 0;
00800
00801
00802 if (this->current_suspended_info_[i].delete_entry_)
00803 {
00804
00805
00806
00807
00808
00809
00810
00811 masks = this->current_suspended_info_[i].close_masks_;
00812 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00813 {
00814
00815 if (this->current_suspended_info_[i].io_entry_)
00816 handle = this->current_suspended_info_[i].io_handle_;
00817 else
00818 handle = this->current_suspended_info_[i].event_handle_;
00819
00820
00821 event_handler = this->current_suspended_info_[i].event_handler_;
00822 }
00823
00824
00825 if (this->current_suspended_info_[i].delete_event_)
00826 ACE_OS::event_destroy (&this->current_suspended_info_[i].event_handle_);
00827
00828
00829 --this->handles_to_be_deleted_;
00830 }
00831
00832 else if (this->current_suspended_info_[i].resume_entry_)
00833 {
00834
00835 this->current_handles_[this->max_handlep1_] = this->current_suspended_info_[i].event_handle_;
00836
00837 this->current_info_[this->max_handlep1_].set (this->current_suspended_info_[i]);
00838 ++this->max_handlep1_;
00839
00840
00841 --this->handles_to_be_resumed_;
00842 }
00843
00844
00845
00846
00847 if (this->current_suspended_info_[i].resume_entry_ ||
00848 this->current_suspended_info_[i].delete_entry_)
00849 {
00850 size_t last_valid_slot = this->suspended_handles_ - 1;
00851
00852
00853
00854
00855
00856 if (i < last_valid_slot)
00857
00858 this->current_suspended_info_[i] =
00859 this->current_suspended_info_[last_valid_slot];
00860 this->current_suspended_info_[last_valid_slot].reset ();
00861 --this->suspended_handles_;
00862 }
00863 else
00864 {
00865
00866
00867
00868 ++i;
00869 }
00870
00871
00872
00873 if (event_handler != 0)
00874 {
00875 int requires_reference_counting =
00876 event_handler->reference_counting_policy ().value () ==
00877 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00878
00879 event_handler->handle_close (handle, masks);
00880
00881 if (requires_reference_counting)
00882 {
00883 event_handler->remove_reference ();
00884 }
00885 }
00886 }
00887 }
00888
00889 return 0;
00890 }
00891
00892 int
00893 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void)
00894 {
00895
00896 for (size_t i = 0; i < this->handles_to_be_added_; ++i)
00897 {
00898
00899
00900
00901
00902 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00903 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00904 ACE_Event_Handler *event_handler = 0;
00905
00906
00907 if (this->to_be_added_info_[i].delete_entry_)
00908 {
00909
00910
00911
00912
00913
00914
00915
00916 masks = this->to_be_added_info_[i].close_masks_;
00917 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00918 {
00919
00920 if (this->to_be_added_info_[i].io_entry_)
00921 handle = this->to_be_added_info_[i].io_handle_;
00922 else
00923 handle = this->to_be_added_info_[i].event_handle_;
00924
00925
00926 event_handler = this->to_be_added_info_[i].event_handler_;
00927 }
00928
00929
00930 if (this->to_be_added_info_[i].delete_event_)
00931 ACE_OS::event_destroy (&this->to_be_added_info_[i].event_handle_);
00932
00933
00934 --this->handles_to_be_deleted_;
00935 }
00936
00937
00938 else if (this->to_be_added_info_[i].suspend_entry_)
00939 {
00940 this->current_suspended_info_ [this->suspended_handles_].set (this->to_be_added_info_[i].event_handle_,
00941 this->to_be_added_info_[i]);
00942
00943 ++this->suspended_handles_;
00944
00945
00946 --this->handles_to_be_suspended_;
00947 }
00948
00949
00950 else
00951 {
00952
00953 this->current_handles_[this->max_handlep1_] = this->to_be_added_info_[i].event_handle_;
00954
00955 this->current_info_[this->max_handlep1_].set (this->to_be_added_info_[i]);
00956 ++this->max_handlep1_;
00957 }
00958
00959
00960 this->to_be_added_info_[i].reset ();
00961
00962
00963
00964 if (event_handler != 0)
00965 {
00966 int requires_reference_counting =
00967 event_handler->reference_counting_policy ().value () ==
00968 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
00969
00970 event_handler->handle_close (handle, masks);
00971
00972 if (requires_reference_counting)
00973 {
00974 event_handler->remove_reference ();
00975 }
00976 }
00977 }
00978
00979
00980
00981 this->handles_to_be_added_ = 0;
00982
00983 return 0;
00984 }
00985
00986 void
00987 ACE_WFMO_Reactor_Handler_Repository::dump (void) const
00988 {
00989 #if defined (ACE_HAS_DUMP)
00990 size_t i = 0;
00991
00992 ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
00993
00994 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00995
00996 ACE_DEBUG ((LM_DEBUG,
00997 ACE_TEXT ("Max size = %d\n"),
00998 this->max_size_));
00999
01000 ACE_DEBUG ((LM_DEBUG,
01001 ACE_TEXT ("Current info table\n\n")));
01002 ACE_DEBUG ((LM_DEBUG,
01003 ACE_TEXT ("\tSize = %d\n"),
01004 this->max_handlep1_));
01005 ACE_DEBUG ((LM_DEBUG,
01006 ACE_TEXT ("\tHandles to be suspended = %d\n"),
01007 this->handles_to_be_suspended_));
01008
01009 for (i = 0; i < this->max_handlep1_; ++i)
01010 this->current_info_[i].dump (this->current_handles_[i]);
01011
01012 ACE_DEBUG ((LM_DEBUG,
01013 ACE_TEXT ("\n")));
01014
01015 ACE_DEBUG ((LM_DEBUG,
01016 ACE_TEXT ("To-be-added info table\n\n")));
01017 ACE_DEBUG ((LM_DEBUG,
01018 ACE_TEXT ("\tSize = %d\n"),
01019 this->handles_to_be_added_));
01020
01021 for (i = 0; i < this->handles_to_be_added_; ++i)
01022 this->to_be_added_info_[i].dump ();
01023
01024 ACE_DEBUG ((LM_DEBUG,
01025 ACE_TEXT ("\n")));
01026
01027 ACE_DEBUG ((LM_DEBUG,
01028 ACE_TEXT ("Suspended info table\n\n")));
01029 ACE_DEBUG ((LM_DEBUG,
01030 ACE_TEXT ("\tSize = %d\n"),
01031 this->suspended_handles_));
01032 ACE_DEBUG ((LM_DEBUG,
01033 ACE_TEXT ("\tHandles to be resumed = %d\n"),
01034 this->handles_to_be_resumed_));
01035
01036 for (i = 0; i < this->suspended_handles_; ++i)
01037 this->current_suspended_info_[i].dump ();
01038
01039 ACE_DEBUG ((LM_DEBUG,
01040 ACE_TEXT ("\n")));
01041
01042 ACE_DEBUG ((LM_DEBUG,
01043 ACE_TEXT ("Total handles to be deleted = %d\n"),
01044 this->handles_to_be_deleted_));
01045
01046 ACE_DEBUG ((LM_DEBUG,
01047 ACE_END_DUMP));
01048 #endif
01049 }
01050
01051
01052
01053 int
01054 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
01055 {
01056 ACE_NOTSUP_RETURN (-1);
01057 }
01058
01059 #if defined (ACE_WIN32_VC8)
01060 # pragma warning (push)
01061 # pragma warning (disable:4355)
01062 # endif
01063 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
01064 ACE_Timer_Queue *tq,
01065 ACE_Reactor_Notify *notify)
01066 : signal_handler_ (0),
01067 delete_signal_handler_ (false),
01068 timer_queue_ (0),
01069 delete_timer_queue_ (false),
01070 delete_handler_rep_ (false),
01071 notify_handler_ (0),
01072 delete_notify_handler_ (false),
01073 lock_adapter_ (lock_),
01074 handler_rep_ (*this),
01075
01076 ok_to_wait_ (1),
01077
01078 wakeup_all_threads_ (0),
01079
01080 waiting_to_change_state_ (0),
01081 active_threads_ (0),
01082 owner_ (ACE_Thread::self ()),
01083 new_owner_ (0),
01084 change_state_thread_ (0),
01085 open_for_business_ (false),
01086 deactivated_ (0)
01087 {
01088 if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
01089 ACE_ERROR ((LM_ERROR,
01090 ACE_TEXT ("%p\n"),
01091 ACE_TEXT ("WFMO_Reactor")));
01092 }
01093
01094 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
01095 int unused,
01096 ACE_Sig_Handler *sh,
01097 ACE_Timer_Queue *tq,
01098 ACE_Reactor_Notify *notify)
01099 : signal_handler_ (0),
01100 delete_signal_handler_ (false),
01101 timer_queue_ (0),
01102 delete_timer_queue_ (false),
01103 delete_handler_rep_ (false),
01104 notify_handler_ (0),
01105 delete_notify_handler_ (false),
01106 lock_adapter_ (lock_),
01107 handler_rep_ (*this),
01108
01109 ok_to_wait_ (1),
01110
01111 wakeup_all_threads_ (0),
01112
01113 waiting_to_change_state_ (0),
01114 active_threads_ (0),
01115 owner_ (ACE_Thread::self ()),
01116 new_owner_ (0),
01117 change_state_thread_ (0),
01118 open_for_business_ (false),
01119 deactivated_ (0)
01120 {
01121 ACE_UNUSED_ARG (unused);
01122
01123 if (this->open (size, 0, sh, tq, 0, notify) == -1)
01124 ACE_ERROR ((LM_ERROR,
01125 ACE_TEXT ("%p\n"),
01126 ACE_TEXT ("WFMO_Reactor")));
01127 }
01128 #if defined (ACE_WIN32_VC8)
01129 # pragma warning (pop)
01130 #endif
01131
01132 int
01133 ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &)
01134 {
01135 return -1;
01136 }
01137
01138 int
01139 ACE_WFMO_Reactor::open (size_t size,
01140 bool,
01141 ACE_Sig_Handler *sh,
01142 ACE_Timer_Queue *tq,
01143 int,
01144 ACE_Reactor_Notify *notify)
01145 {
01146
01147 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01148
01149
01150 if (this->open_for_business_)
01151 return -1;
01152
01153
01154 if (this->delete_timer_queue_)
01155 delete this->timer_queue_;
01156
01157 if (tq == 0)
01158 {
01159 ACE_NEW_RETURN (this->timer_queue_,
01160 ACE_Timer_Heap,
01161 -1);
01162 this->delete_timer_queue_ = true;
01163 }
01164 else
01165 {
01166 this->timer_queue_ = tq;
01167 this->delete_timer_queue_ = false;
01168 }
01169
01170
01171 if (this->delete_signal_handler_)
01172 delete this->signal_handler_;
01173
01174 if (sh == 0)
01175 {
01176 ACE_NEW_RETURN (this->signal_handler_,
01177 ACE_Sig_Handler,
01178 -1);
01179 this->delete_signal_handler_ = true;
01180 }
01181 else
01182 {
01183 this->signal_handler_ = sh;
01184 this->delete_signal_handler_ = false;
01185 }
01186
01187
01188 this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
01189 this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
01190
01191
01192 if (this->delete_handler_rep_)
01193 {
01194 if (this->handler_rep_.changes_required ())
01195 {
01196
01197 this->handler_rep_.make_changes ();
01198
01199
01200 this->wakeup_all_threads_.reset ();
01201 }
01202
01203 this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository ();
01204 }
01205
01206
01207
01208 if (this->handler_rep_.open (size + 2) == -1)
01209 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
01210 ACE_TEXT ("opening handler repository")),
01211 -1);
01212 else
01213 this->delete_handler_rep_ = true;
01214
01215 if (this->notify_handler_ != 0 && this->delete_notify_handler_)
01216 delete this->notify_handler_;
01217
01218 this->notify_handler_ = notify;
01219
01220 if (this->notify_handler_ == 0)
01221 {
01222 ACE_NEW_RETURN (this->notify_handler_,
01223 ACE_WFMO_Reactor_Notify,
01224 -1);
01225
01226 if (this->notify_handler_ == 0)
01227 return -1;
01228 else
01229 this->delete_notify_handler_ = true;
01230 }
01231
01232
01233
01234
01235
01236 if (this->notify_handler_->open (this, this->timer_queue_) == -1)
01237 ACE_ERROR_RETURN ((LM_ERROR,
01238 ACE_TEXT ("%p\n"),
01239 ACE_TEXT ("opening notify handler ")),
01240 -1);
01241
01242
01243 if (this->register_handler (&this->wakeup_all_threads_handler_,
01244 this->wakeup_all_threads_.handle ()) == -1)
01245 ACE_ERROR_RETURN ((LM_ERROR,
01246 ACE_TEXT ("%p\n"),
01247 ACE_TEXT ("registering thread wakeup handler")),
01248 -1);
01249
01250
01251
01252 if (this->handler_rep_.changes_required ())
01253 {
01254
01255 this->handler_rep_.make_changes ();
01256
01257
01258 this->wakeup_all_threads_.reset ();
01259 }
01260
01261
01262 this->open_for_business_ = true;
01263
01264 return 0;
01265 }
01266
01267 int
01268 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
01269 {
01270 if (this->signal_handler_ != 0 && this->delete_signal_handler_)
01271 delete this->signal_handler_;
01272 this->signal_handler_ = signal_handler;
01273 this->delete_signal_handler_ = false;
01274 return 0;
01275 }
01276
01277 ACE_Timer_Queue *
01278 ACE_WFMO_Reactor::timer_queue (void) const
01279 {
01280 return this->timer_queue_;
01281 }
01282
01283 int
01284 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue *tq)
01285 {
01286 if (this->timer_queue_ != 0 && this->delete_timer_queue_)
01287 delete this->timer_queue_;
01288 this->timer_queue_ = tq;
01289 this->delete_timer_queue_ = false;
01290 return 0;
01291 }
01292
01293 int
01294 ACE_WFMO_Reactor::close (void)
01295 {
01296
01297 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01298
01299
01300 if (!this->open_for_business_)
01301 return -1;
01302
01303
01304 this->open_for_business_ = false;
01305
01306 this->handler_rep_.close ();
01307
01308 return 0;
01309 }
01310
01311 ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void)
01312 {
01313
01314
01315
01316
01317 this->close ();
01318
01319
01320
01321 this->handler_rep_.make_changes ();
01322
01323 if (this->delete_timer_queue_)
01324 {
01325 delete this->timer_queue_;
01326 this->timer_queue_ = 0;
01327 this->delete_timer_queue_ = false;
01328 }
01329
01330 if (this->delete_signal_handler_)
01331 {
01332 delete this->signal_handler_;
01333 this->signal_handler_ = 0;
01334 this->delete_signal_handler_ = false;
01335 }
01336
01337 if (this->delete_notify_handler_)
01338 {
01339 delete this->notify_handler_;
01340 this->notify_handler_ = 0;
01341 this->delete_notify_handler_ = false;
01342 }
01343 }
01344
01345 int
01346 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
01347 ACE_HANDLE io_handle,
01348 ACE_Event_Handler *event_handler,
01349 ACE_Reactor_Mask new_masks)
01350 {
01351
01352
01353
01354
01355 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
01356
01357 ACE_UNUSED_ARG (event_handle);
01358 ACE_UNUSED_ARG (io_handle);
01359 ACE_UNUSED_ARG (event_handler);
01360 ACE_UNUSED_ARG (new_masks);
01361 ACE_NOTSUP_RETURN (-1);
01362
01363 #else
01364
01365
01366 if (io_handle == ACE_INVALID_HANDLE)
01367 io_handle = event_handler->get_handle ();
01368
01369 if (this->handler_rep_.invalid_handle (io_handle))
01370 {
01371 errno = ERROR_INVALID_HANDLE;
01372 return -1;
01373 }
01374
01375 long new_network_events = 0;
01376 bool delete_event = false;
01377 auto_ptr <ACE_Auto_Event> event;
01378
01379
01380
01381 ACE_Reactor_Mask old_masks;
01382 int found = this->handler_rep_.modify_network_events_i (io_handle,
01383 new_masks,
01384 old_masks,
01385 new_network_events,
01386 event_handle,
01387 delete_event,
01388 ACE_Reactor::ADD_MASK);
01389
01390
01391
01392 if (event_handle == ACE_INVALID_HANDLE)
01393 {
01394
01395
01396 auto_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
01397 event = tmp;
01398 event_handle = event->handle ();
01399 delete_event = true;
01400 }
01401
01402 int result = ::WSAEventSelect ((SOCKET) io_handle,
01403 event_handle,
01404 new_network_events);
01405
01406 if (found)
01407 return result;
01408 else if (result != SOCKET_ERROR &&
01409 this->handler_rep_.bind_i (1,
01410 event_handler,
01411 new_network_events,
01412 io_handle,
01413 event_handle,
01414 delete_event) != -1)
01415 {
01416
01417
01418 if (delete_event)
01419 {
01420
01421
01422
01423
01424
01425
01426
01427
01428 ACE_Errno_Guard guard (errno);
01429 event->handle (ACE_INVALID_HANDLE);
01430 event->remove ();
01431 }
01432 return 0;
01433 }
01434 else
01435 return -1;
01436
01437 #endif
01438
01439 }
01440
01441 int
01442 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle,
01443 ACE_Reactor_Mask new_masks,
01444 int operation)
01445 {
01446
01447 if (this->handler_rep_.invalid_handle (io_handle))
01448 return -1;
01449
01450 long new_network_events = 0;
01451 bool delete_event = false;
01452 ACE_HANDLE event_handle = ACE_INVALID_HANDLE;
01453
01454
01455
01456 ACE_Reactor_Mask old_masks;
01457 int found = this->handler_rep_.modify_network_events_i (io_handle,
01458 new_masks,
01459 old_masks,
01460 new_network_events,
01461 event_handle,
01462 delete_event,
01463 operation);
01464 if (found)
01465 {
01466 int result = ::WSAEventSelect ((SOCKET) io_handle,
01467 event_handle,
01468 new_network_events);
01469 if (result == 0)
01470 return old_masks;
01471 else
01472 return result;
01473 }
01474 else
01475 return -1;
01476 }
01477
01478
01479
01480 int
01481 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle,
01482 ACE_Reactor_Mask new_masks,
01483 ACE_Reactor_Mask &old_masks,
01484 long &new_network_events,
01485 ACE_HANDLE &event_handle,
01486 bool &delete_event,
01487 int operation)
01488 {
01489 long *modified_network_events = &new_network_events;
01490 int found = 0;
01491 size_t i;
01492
01493
01494
01495
01496
01497 for (i = 0; i < this->max_handlep1_ && !found; ++i)
01498 if (io_handle == this->current_info_[i].io_handle_ &&
01499 !this->current_info_[i].delete_entry_)
01500 {
01501 found = 1;
01502 modified_network_events = &this->current_info_[i].network_events_;
01503 delete_event = this->current_info_[i].delete_event_;
01504 event_handle = this->current_handles_[i];
01505 }
01506
01507
01508
01509
01510
01511 for (i = 0; i < this->suspended_handles_ && !found; ++i)
01512 if (io_handle == this->current_suspended_info_[i].io_handle_ &&
01513 !this->current_suspended_info_[i].delete_entry_)
01514 {
01515 found = 1;
01516 modified_network_events = &this->current_suspended_info_[i].network_events_;
01517 delete_event = this->current_suspended_info_[i].delete_event_;
01518 event_handle = this->current_suspended_info_[i].event_handle_;
01519 }
01520
01521
01522
01523
01524
01525 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01526 if (io_handle == this->to_be_added_info_[i].io_handle_ &&
01527 !this->to_be_added_info_[i].delete_entry_)
01528 {
01529 found = 1;
01530 modified_network_events = &this->to_be_added_info_[i].network_events_;
01531 delete_event = this->to_be_added_info_[i].delete_event_;
01532 event_handle = this->to_be_added_info_[i].event_handle_;
01533 }
01534
01535 old_masks = this->bit_ops (*modified_network_events,
01536 new_masks,
01537 operation);
01538
01539 new_network_events = *modified_network_events;
01540
01541 return found;
01542 }
01543
01544 ACE_Event_Handler *
01545 ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle)
01546 {
01547 long existing_masks_ignored = 0;
01548 return this->handler (handle, existing_masks_ignored);
01549 }
01550
01551 ACE_Event_Handler *
01552 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01553 long &existing_masks)
01554 {
01555 int found = 0;
01556 size_t i = 0;
01557 ACE_Event_Handler *event_handler = 0;
01558 existing_masks = 0;
01559
01560
01561
01562
01563
01564
01565
01566 for (i = 0; i < this->max_handlep1_ && !found; ++i)
01567 if ((handle == this->current_info_[i].io_handle_ ||
01568 handle == this->current_handles_[i]) &&
01569 !this->current_info_[i].delete_entry_)
01570 {
01571 found = 1;
01572 event_handler = this->current_info_[i].event_handler_;
01573 existing_masks = this->current_info_[i].network_events_;
01574 }
01575
01576
01577
01578
01579
01580 for (i = 0; i < this->suspended_handles_ && !found; ++i)
01581 if ((handle == this->current_suspended_info_[i].io_handle_ ||
01582 handle == this->current_suspended_info_[i].event_handle_) &&
01583 !this->current_suspended_info_[i].delete_entry_)
01584 {
01585 found = 1;
01586 event_handler = this->current_suspended_info_[i].event_handler_;
01587 existing_masks = this->current_suspended_info_[i].network_events_;
01588 }
01589
01590
01591
01592
01593
01594 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
01595 if ((handle == this->to_be_added_info_[i].io_handle_ ||
01596 handle == this->to_be_added_info_[i].event_handle_) &&
01597 !this->to_be_added_info_[i].delete_entry_)
01598 {
01599 found = 1;
01600 event_handler = this->to_be_added_info_[i].event_handler_;
01601 existing_masks = this->to_be_added_info_[i].network_events_;
01602 }
01603
01604 if (event_handler)
01605 event_handler->add_reference ();
01606
01607 return event_handler;
01608 }
01609
01610 int
01611 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01612 ACE_Reactor_Mask user_masks,
01613 ACE_Event_Handler **user_event_handler)
01614 {
01615 long existing_masks = 0;
01616 int found = 0;
01617
01618 ACE_Event_Handler_var safe_event_handler =
01619 this->handler (handle,
01620 existing_masks);
01621
01622 if (safe_event_handler.handler ())
01623 found = 1;
01624
01625 if (!found)
01626 return -1;
01627
01628
01629
01630 if (found &&
01631 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
01632 if (!ACE_BIT_ENABLED (existing_masks, FD_READ) &&
01633 !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
01634 found = 0;
01635
01636 if (found &&
01637 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::WRITE_MASK))
01638 if (!ACE_BIT_ENABLED (existing_masks, FD_WRITE))
01639 found = 0;
01640
01641 if (found &&
01642 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::EXCEPT_MASK))
01643 if (!ACE_BIT_ENABLED (existing_masks, FD_OOB))
01644 found = 0;
01645
01646 if (found &&
01647 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::ACCEPT_MASK))
01648 if (!ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
01649 found = 0;
01650
01651 if (found &&
01652 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::CONNECT_MASK))
01653 if (!ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
01654 found = 0;
01655
01656 if (found &&
01657 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::QOS_MASK))
01658 if (!ACE_BIT_ENABLED (existing_masks, FD_QOS))
01659 found = 0;
01660
01661 if (found &&
01662 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::GROUP_QOS_MASK))
01663 if (!ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
01664 found = 0;
01665
01666 if (found &&
01667 user_event_handler)
01668 *user_event_handler = safe_event_handler.release ();
01669
01670 if (found)
01671 return 0;
01672 else
01673 return -1;
01674 }
01675
01676
01677
01678 int
01679 ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
01680 int alertable)
01681 {
01682 ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
01683
01684
01685 if (!this->open_for_business_ || this->deactivated_)
01686 return -1;
01687
01688
01689
01690
01691 ACE_Countdown_Time countdown (max_wait_time);
01692
01693 int result;
01694 do
01695 {
01696
01697
01698
01699 result = this->ok_to_wait (max_wait_time, alertable);
01700 if (result != 1)
01701 return result;
01702
01703
01704 ++this->active_threads_;
01705
01706
01707 this->lock_.release ();
01708
01709
01710
01711 countdown.update ();
01712
01713
01714 int timeout = this->calculate_timeout (max_wait_time);
01715
01716
01717 DWORD wait_status = this->wait_for_multiple_events (timeout,
01718 alertable);
01719
01720
01721 result = this->safe_dispatch (wait_status);
01722 if (0 == result)
01723 {
01724
01725
01726
01727
01728
01729
01730
01731
01732 countdown.update ();
01733 if (0 == max_wait_time || max_wait_time->usec () == 0)
01734 break;
01735 }
01736 }
01737 while (result == 0);
01738
01739 return result;
01740 }
01741
01742 int
01743 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value *max_wait_time,
01744 int alertable)
01745 {
01746
01747
01748
01749
01750
01751
01752
01753
01754
01755
01756
01757
01758 #if defined (ACE_HAS_WINCE)
01759 ACE_UNUSED_ARG (alertable);
01760 ACE_Time_Value timeout;
01761 if (max_wait_time != 0)
01762 {
01763 timeout = ACE_OS::gettimeofday ();
01764 timeout += *max_wait_time;
01765 }
01766 while (1)
01767 {
01768 int status;
01769 if (max_wait_time == 0)
01770 status = this->ok_to_wait_.wait ();
01771 else
01772 status = this->ok_to_wait_.wait (&timeout);
01773 if (status == -1)
01774 return -1;
01775
01776
01777 if (max_wait_time == 0)
01778 status = this->lock_.acquire ();
01779 else
01780 {
01781 status = this->lock_.acquire (timeout);
01782 }
01783 if (status == -1)
01784 return -1;
01785
01786
01787
01788 if (this->ok_to_wait_.wait (&ACE_Time_Value::zero, 0) == 0)
01789 break;
01790 this->lock_.release ();
01791 }
01792 return 1;
01793
01794 #else
01795 int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
01796 DWORD result = 0;
01797 while (1)
01798 {
01799 # if defined (ACE_HAS_PHARLAP)
01800
01801
01802 result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01803 this->atomic_wait_array_,
01804 TRUE,
01805 timeout);
01806
01807 if (result != WAIT_IO_COMPLETION)
01808 break;
01809
01810 # else
01811 result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01812 this->atomic_wait_array_,
01813 TRUE,
01814 timeout,
01815 alertable);
01816
01817 if (result != WAIT_IO_COMPLETION)
01818 break;
01819
01820 # endif
01821 }
01822
01823 switch (result)
01824 {
01825 case WAIT_TIMEOUT:
01826 errno = ETIME;
01827 return 0;
01828 case WAIT_FAILED:
01829 case WAIT_ABANDONED_0:
01830 ACE_OS::set_errno_to_last_error ();
01831 return -1;
01832 default:
01833 break;
01834 }
01835
01836
01837 return 1;
01838 #endif
01839 }
01840
01841 DWORD
01842 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
01843 int alertable)
01844 {
01845
01846
01847
01848
01849 #if defined (ACE_HAS_PHARLAP) || defined (ACE_HAS_WINCE)
01850
01851
01852 ACE_UNUSED_ARG (alertable);
01853 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 (),
01854 this->handler_rep_.handles (),
01855 FALSE,
01856 timeout);
01857 #else
01858 return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
01859 this->handler_rep_.handles (),
01860 FALSE,
01861 timeout,
01862 alertable);
01863 #endif
01864 }
01865
01866 DWORD
01867 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot)
01868 {
01869 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - slot,
01870 this->handler_rep_.handles () + slot,
01871 FALSE,
01872 0);
01873 }
01874
01875 int
01876 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value *max_wait_time)
01877 {
01878 ACE_Time_Value *time = 0;
01879 if (this->owner_ == ACE_Thread::self ())
01880 time = this->timer_queue_->calculate_timeout (max_wait_time);
01881 else
01882 time = max_wait_time;
01883
01884 if (time == 0)
01885 return INFINITE;
01886 else
01887 return time->msec ();
01888 }
01889
01890
01891 int
01892 ACE_WFMO_Reactor::expire_timers (void)
01893 {
01894
01895 if (ACE_Thread::self () == this->owner_)
01896
01897 return this->timer_queue_->expire ();
01898
01899 else
01900
01901 return 0;
01902 }
01903
01904 int
01905 ACE_WFMO_Reactor::dispatch (DWORD wait_status)
01906 {
01907
01908 int handlers_dispatched = this->expire_timers ();
01909
01910 switch (wait_status)
01911 {
01912 case WAIT_FAILED:
01913 ACE_OS::set_errno_to_last_error ();
01914 return -1;
01915
01916 case WAIT_TIMEOUT:
01917 errno = ETIME;
01918 return handlers_dispatched;
01919
01920 #ifndef ACE_HAS_WINCE
01921 case WAIT_IO_COMPLETION:
01922 return handlers_dispatched;
01923 #endif // ACE_HAS_WINCE
01924
01925 default:
01926
01927 handlers_dispatched += this->dispatch_handles (wait_status);
01928 return handlers_dispatched;
01929 }
01930 }
01931
01932
01933
01934
01935 int
01936 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
01937 {
01938
01939
01940 DWORD dispatch_slot = 0;
01941
01942
01943 DWORD const max_handlep1 = this->handler_rep_.max_handlep1 ();
01944
01945
01946
01947 DWORD nCount = max_handlep1;
01948
01949 for (int number_of_handlers_dispatched = 1;
01950 ;
01951 ++number_of_handlers_dispatched)
01952 {
01953 const bool ok = (
01954 #if ! defined(__BORLANDC__) \
01955 && !defined (ghs) \
01956 && !defined (__MINGW32__) \
01957 && !defined (_MSC_VER)
01958
01959
01960
01961 wait_status >= WAIT_OBJECT_0 &&
01962 #endif
01963 wait_status <= (WAIT_OBJECT_0 + nCount));
01964
01965 if (ok)
01966 dispatch_slot += wait_status - WAIT_OBJECT_0;
01967 else
01968
01969 dispatch_slot += wait_status - WAIT_ABANDONED_0;
01970
01971
01972 if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
01973 return -1;
01974
01975
01976 ++dispatch_slot;
01977
01978
01979 if (dispatch_slot >= max_handlep1)
01980 return number_of_handlers_dispatched;
01981
01982
01983 nCount = max_handlep1 - dispatch_slot;
01984
01985
01986 wait_status = this->poll_remaining_handles (dispatch_slot);
01987 switch (wait_status)
01988 {
01989 case WAIT_FAILED:
01990 ACE_OS::set_errno_to_last_error ();
01991
01992 case WAIT_TIMEOUT:
01993
01994 return number_of_handlers_dispatched;
01995 }
01996 }
01997 }
01998
01999 int
02000 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
02001 DWORD max_handlep1)
02002 {
02003
02004 if (slot == max_handlep1)
02005 return this->dispatch_window_messages ();
02006
02007
02008
02009
02010
02011
02012
02013 else if (!this->handler_rep_.scheduled_for_deletion (slot))
02014 {
02015 ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
02016
02017 if (this->handler_rep_.current_info ()[slot].io_entry_)
02018 return this->complex_dispatch_handler (slot,
02019 event_handle);
02020 else
02021 return this->simple_dispatch_handler (slot,
02022 event_handle);
02023 }
02024 else
02025
02026 return 0;
02027 }
02028
02029 int
02030 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
02031 ACE_HANDLE event_handle)
02032 {
02033
02034
02035
02036
02037 siginfo_t sig (event_handle);
02038
02039 ACE_Event_Handler *event_handler =
02040 this->handler_rep_.current_info ()[slot].event_handler_;
02041
02042 int requires_reference_counting =
02043 event_handler->reference_counting_policy ().value () ==
02044 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02045
02046 if (requires_reference_counting)
02047 {
02048 event_handler->add_reference ();
02049 }
02050
02051
02052 if (event_handler->handle_signal (0, &sig) == -1)
02053 this->handler_rep_.unbind (event_handle,
02054 ACE_Event_Handler::NULL_MASK);
02055
02056
02057 if (requires_reference_counting)
02058 {
02059 event_handler->remove_reference ();
02060 }
02061
02062 return 0;
02063 }
02064
02065 int
02066 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
02067 ACE_HANDLE event_handle)
02068 {
02069
02070
02071 ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
02072 this->handler_rep_.current_info ()[slot];
02073
02074 WSANETWORKEVENTS events;
02075 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02076 if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
02077 event_handle,
02078 &events) == SOCKET_ERROR)
02079 problems = ACE_Event_Handler::ALL_EVENTS_MASK;
02080 else
02081 {
02082
02083
02084
02085
02086
02087
02088
02089
02090
02091
02092
02093 events.lNetworkEvents &= current_info.network_events_;
02094 while (events.lNetworkEvents != 0)
02095 {
02096 ACE_Event_Handler *event_handler =
02097 current_info.event_handler_;
02098
02099 int reference_counting_required =
02100 event_handler->reference_counting_policy ().value () ==
02101 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02102
02103
02104 if (reference_counting_required)
02105 {
02106 event_handler->add_reference ();
02107 }
02108
02109
02110 problems |= this->upcall (current_info.event_handler_,
02111 current_info.io_handle_,
02112 events);
02113
02114
02115 if (reference_counting_required)
02116 {
02117 event_handler->remove_reference ();
02118 }
02119
02120 if (this->handler_rep_.scheduled_for_deletion (slot))
02121 break;
02122 }
02123 }
02124
02125 if (problems != ACE_Event_Handler::NULL_MASK
02126 && !this->handler_rep_.scheduled_for_deletion (slot) )
02127 this->handler_rep_.unbind (event_handle, problems);
02128
02129 return 0;
02130 }
02131
02132 ACE_Reactor_Mask
02133 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
02134 ACE_HANDLE io_handle,
02135 WSANETWORKEVENTS &events)
02136 {
02137
02138
02139 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02140
02141
02142
02143
02144
02145 long actual_events = events.lNetworkEvents;
02146 int action;
02147
02148 if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
02149 {
02150 action = event_handler->handle_output (io_handle);
02151 if (action <= 0)
02152 {
02153 ACE_CLR_BITS (actual_events, FD_WRITE);
02154 if (action == -1)
02155 ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
02156 }
02157 }
02158
02159 if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
02160 {
02161 if (events.iErrorCode[FD_CONNECT_BIT] == 0)
02162 {
02163
02164 action = event_handler->handle_output (io_handle);
02165 if (action <= 0)
02166 {
02167 ACE_CLR_BITS (actual_events, FD_CONNECT);
02168 if (action == -1)
02169 ACE_SET_BITS (problems,
02170 ACE_Event_Handler::CONNECT_MASK);
02171 }
02172 }
02173
02174 else
02175 {
02176 action = event_handler->handle_input (io_handle);
02177 if (action <= 0)
02178 {
02179 ACE_CLR_BITS (actual_events, FD_CONNECT);
02180 if (action == -1)
02181 ACE_SET_BITS (problems,
02182 ACE_Event_Handler::CONNECT_MASK);
02183 }
02184 }
02185 }
02186
02187 if (ACE_BIT_ENABLED (actual_events, FD_OOB))
02188 {
02189 action = event_handler->handle_exception (io_handle);
02190 if (action <= 0)
02191 {
02192 ACE_CLR_BITS (actual_events, FD_OOB);
02193 if (action == -1)
02194 ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
02195 }
02196 }
02197
02198 if (ACE_BIT_ENABLED (actual_events, FD_READ))
02199 {
02200 action = event_handler->handle_input (io_handle);
02201 if (action <= 0)
02202 {
02203 ACE_CLR_BITS (actual_events, FD_READ);
02204 if (action == -1)
02205 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02206 }
02207 }
02208
02209 if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
02210 && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
02211 {
02212 action = event_handler->handle_input (io_handle);
02213 if (action <= 0)
02214 {
02215 ACE_CLR_BITS (actual_events, FD_CLOSE);
02216 if (action == -1)
02217 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02218 }
02219 }
02220
02221 if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
02222 {
02223 action = event_handler->handle_input (io_handle);
02224 if (action <= 0)
02225 {
02226 ACE_CLR_BITS (actual_events, FD_ACCEPT);
02227 if (action == -1)
02228 ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
02229 }
02230 }
02231
02232 if (ACE_BIT_ENABLED (actual_events, FD_QOS))
02233 {
02234 action = event_handler->handle_qos (io_handle);
02235 if (action <= 0)
02236 {
02237 ACE_CLR_BITS (actual_events, FD_QOS);
02238 if (action == -1)
02239 ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
02240 }
02241 }
02242
02243 if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
02244 {
02245 action = event_handler->handle_group_qos (io_handle);
02246 if (action <= 0)
02247 {
02248 ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
02249 if (action == -1)
02250 ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
02251 }
02252 }
02253
02254 events.lNetworkEvents = actual_events;
02255 return problems;
02256 }
02257
02258
02259 int
02260 ACE_WFMO_Reactor::update_state (void)
02261 {
02262
02263 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02264
02265
02266 --this->active_threads_;
02267
02268
02269
02270 if (this->handler_rep_.changes_required () || this->new_owner ())
02271 {
02272 if (this->change_state_thread_ == 0)
02273
02274
02275 {
02276 this->change_state_thread_ = ACE_Thread::self ();
02277
02278 this->ok_to_wait_.reset ();
02279
02280 if (this->active_threads_ > 0)
02281
02282 {
02283
02284 this->wakeup_all_threads_.signal ();
02285
02286 monitor.release ();
02287
02288 this->waiting_to_change_state_.wait ();
02289
02290 monitor.acquire ();
02291 }
02292
02293
02294
02295
02296 while (this->handler_rep_.changes_required ())
02297
02298 this->handler_rep_.make_changes ();
02299 if (this->new_owner ())
02300
02301 this->change_owner ();
02302
02303 this->wakeup_all_threads_.reset ();
02304
02305 this->ok_to_wait_.signal ();
02306
02307 this->change_state_thread_ = 0;
02308 }
02309 else if (this->active_threads_ == 0)
02310
02311
02312
02313 this->waiting_to_change_state_.signal ();
02314 }
02315
02316
02317 else if (this->active_threads_ == 0)
02318
02319 this->wakeup_all_threads_.reset ();
02320
02321 return 0;
02322 }
02323
02324 void
02325 ACE_WFMO_Reactor::dump (void) const
02326 {
02327 #if defined (ACE_HAS_DUMP)
02328 ACE_TRACE ("ACE_WFMO_Reactor::dump");
02329
02330 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02331
02332 ACE_DEBUG ((LM_DEBUG,
02333 ACE_TEXT ("Count of currently active threads = %d\n"),
02334 this->active_threads_));
02335
02336 ACE_DEBUG ((LM_DEBUG,
02337 ACE_TEXT ("ID of owner thread = %d\n"),
02338 this->owner_));
02339
02340 this->handler_rep_.dump ();
02341 this->signal_handler_->dump ();
02342 this->timer_queue_->dump ();
02343
02344 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02345 #endif
02346 }
02347
02348 int
02349 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & ,
02350 ACE_Handle_Set & )
02351 {
02352 return -1;
02353 }
02354
02355 int
02356 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer & )
02357 {
02358 return 0;
02359 }
02360
02361 ACE_HANDLE
02362 ACE_WFMO_Reactor_Notify::notify_handle (void)
02363 {
02364 return ACE_INVALID_HANDLE;
02365 }
02366
02367 int
02368 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE ,
02369 ACE_Notification_Buffer &)
02370 {
02371 return 0;
02372 }
02373
02374 int
02375 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &)
02376 {
02377 return 0;
02378 }
02379
02380 int
02381 ACE_WFMO_Reactor_Notify::close (void)
02382 {
02383 return -1;
02384 }
02385
02386 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
02387 : timer_queue_ (0),
02388 message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
02389 max_notifies * sizeof (ACE_Notification_Buffer)),
02390 max_notify_iterations_ (-1)
02391 {
02392 }
02393
02394 int
02395 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor,
02396 ACE_Timer_Queue *timer_queue,
02397 int ignore_notify)
02398 {
02399 ACE_UNUSED_ARG (ignore_notify);
02400 timer_queue_ = timer_queue;
02401 return wfmo_reactor->register_handler (this);
02402 }
02403
02404 ACE_HANDLE
02405 ACE_WFMO_Reactor_Notify::get_handle (void) const
02406 {
02407 return this->wakeup_one_thread_.handle ();
02408 }
02409
02410
02411
02412 int
02413 ACE_WFMO_Reactor_Notify::handle_signal (int signum,
02414 siginfo_t *siginfo,
02415 ucontext_t *)
02416 {
02417 ACE_UNUSED_ARG (signum);
02418
02419
02420 if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
02421 return -1;
02422
02423
02424
02425
02426
02427
02428 for (int i = 1; ; ++i)
02429 {
02430 ACE_Message_Block *mb = 0;
02431
02432 ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
02433 if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
02434 {
02435 if (errno == EWOULDBLOCK)
02436
02437
02438 return 0;
02439 else
02440 return -1;
02441 }
02442 else
02443 {
02444 ACE_Notification_Buffer *buffer =
02445 reinterpret_cast <ACE_Notification_Buffer *> (mb->base ());
02446
02447
02448
02449
02450
02451 if (buffer->eh_ != 0)
02452 {
02453 ACE_Event_Handler *event_handler =
02454 buffer->eh_;
02455
02456 bool const requires_reference_counting =
02457 event_handler->reference_counting_policy ().value () ==
02458 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02459
02460 int result = 0;
02461
02462 switch (buffer->mask_)
02463 {
02464 case ACE_Event_Handler::READ_MASK:
02465 case ACE_Event_Handler::ACCEPT_MASK:
02466 result = event_handler->handle_input (ACE_INVALID_HANDLE);
02467 break;
02468 case ACE_Event_Handler::WRITE_MASK:
02469 result = event_handler->handle_output (ACE_INVALID_HANDLE);
02470 break;
02471 case ACE_Event_Handler::EXCEPT_MASK:
02472 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
02473 break;
02474 case ACE_Event_Handler::QOS_MASK:
02475 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
02476 break;
02477 case ACE_Event_Handler::GROUP_QOS_MASK:
02478 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
02479 break;
02480 default:
02481 ACE_ERROR ((LM_ERROR,
02482 ACE_TEXT ("invalid mask = %d\n"),
02483 buffer->mask_));
02484 break;
02485 }
02486
02487 if (result == -1)
02488 event_handler->handle_close (ACE_INVALID_HANDLE,
02489 ACE_Event_Handler::EXCEPT_MASK);
02490
02491 if (requires_reference_counting)
02492 {
02493 event_handler->remove_reference ();
02494 }
02495 }
02496
02497
02498
02499 mb->release ();
02500
02501
02502
02503
02504 if (i == this->max_notify_iterations_)
02505 {
02506
02507
02508 if (!this->message_queue_.is_empty ())
02509 this->wakeup_one_thread_.signal ();
02510
02511
02512 return 0;
02513 }
02514 }
02515 }
02516 }
02517
02518
02519
02520
02521
02522 int
02523 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
02524 ACE_Reactor_Mask mask,
02525 ACE_Time_Value *timeout)
02526 {
02527 if (event_handler != 0)
02528 {
02529 ACE_Message_Block *mb = 0;
02530 ACE_NEW_RETURN (mb,
02531 ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
02532 -1);
02533
02534 ACE_Notification_Buffer *buffer =
02535 (ACE_Notification_Buffer *) mb->base ();
02536 buffer->eh_ = event_handler;
02537 buffer->mask_ = mask;
02538
02539
02540
02541
02542 if (timeout != 0)
02543 *timeout += timer_queue_->gettimeofday ();
02544
02545 if (this->message_queue_.enqueue_tail
02546 (mb, timeout) == -1)
02547 {
02548 mb->release ();
02549 return -1;
02550 }
02551
02552 event_handler->add_reference ();
02553 }
02554
02555 return this->wakeup_one_thread_.signal ();
02556 }
02557
02558 void
02559 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations)
02560 {
02561 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02562
02563 if (iterations == 0)
02564 iterations = 1;
02565
02566 this->max_notify_iterations_ = iterations;
02567 }
02568
02569 int
02570 ACE_WFMO_Reactor_Notify::max_notify_iterations (void)
02571 {
02572 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02573 return this->max_notify_iterations_;
02574 }
02575
02576 int
02577 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
02578 ACE_Reactor_Mask mask)
02579 {
02580 ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
02581
02582
02583
02584
02585
02586
02587 if (this->message_queue_.is_empty ())
02588 return 0;
02589
02590
02591
02592
02593
02594
02595
02596 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
02597
02598
02599
02600 ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
02601
02602 size_t queue_size = this->message_queue_.message_count ();
02603 int number_purged = 0;
02604
02605 size_t index;
02606
02607 for (index = 0; index < queue_size; ++index)
02608 {
02609 ACE_Message_Block *mb = 0;
02610 if (-1 == this->message_queue_.dequeue_head (mb))
02611 return -1;
02612
02613 ACE_Notification_Buffer *buffer =
02614 reinterpret_cast<ACE_Notification_Buffer *> (mb->base ());
02615
02616
02617
02618
02619
02620 if ((0 != buffer->eh_) &&
02621 (0 == eh || eh == buffer->eh_) &&
02622 ACE_BIT_DISABLED (buffer->mask_, ~mask))
02623
02624
02625 {
02626 ACE_Event_Handler *event_handler = buffer->eh_;
02627
02628 event_handler->remove_reference ();
02629
02630 mb->release ();
02631 ++number_purged;
02632 }
02633 else
02634 {
02635
02636
02637
02638
02639 if ((0 != buffer->eh_) &&
02640 (0 == eh || eh == buffer->eh_))
02641 ACE_CLR_BITS(buffer->mask_, mask);
02642 if (-1 == local_queue.enqueue_head (mb))
02643 return -1;
02644 }
02645 }
02646
02647 if (this->message_queue_.message_count ())
02648 {
02649 ACE_ASSERT (0);
02650 return -1;
02651 }
02652
02653
02654
02655 queue_size = local_queue.message_count ();
02656 for (index = 0; index < queue_size; ++index)
02657 {
02658 ACE_Message_Block *mb = 0;
02659 if (-1 == local_queue.dequeue_head (mb))
02660 {
02661 ACE_ASSERT (0);
02662 return -1;
02663 }
02664
02665 if (-1 == this->message_queue_.enqueue_head (mb))
02666 {
02667 ACE_ASSERT (0);
02668 return -1;
02669 }
02670 }
02671
02672 return number_purged;
02673 }
02674
02675 void
02676 ACE_WFMO_Reactor_Notify::dump (void) const
02677 {
02678 #if defined (ACE_HAS_DUMP)
02679 ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
02680 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02681 this->timer_queue_->dump ();
02682 ACE_DEBUG ((LM_DEBUG,
02683 ACE_TEXT ("Max. iteration: %d\n"),
02684 this->max_notify_iterations_));
02685 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02686 #endif
02687 }
02688
02689 void
02690 ACE_WFMO_Reactor::max_notify_iterations (int iterations)
02691 {
02692 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02693 ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
02694
02695
02696 this->notify_handler_->max_notify_iterations (iterations);
02697 }
02698
02699 int
02700 ACE_WFMO_Reactor::max_notify_iterations (void)
02701 {
02702 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02703 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02704
02705 return this->notify_handler_->max_notify_iterations ();
02706 }
02707
02708 int
02709 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh,
02710 ACE_Reactor_Mask mask)
02711 {
02712 ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
02713 if (this->notify_handler_ == 0)
02714 return 0;
02715 else
02716 return this->notify_handler_->purge_pending_notifications (eh, mask);
02717 }
02718
02719 int
02720 ACE_WFMO_Reactor::resumable_handler (void)
02721 {
02722 ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
02723 return 0;
02724 }
02725
02726
02727
02728 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
02729 int
02730 WSAEventSelect (SOCKET ,
02731 WSAEVENT ,
02732 long )
02733 {
02734 return -1;
02735 }
02736
02737 int
02738 WSAEnumNetworkEvents (SOCKET ,
02739 WSAEVENT ,
02740 LPWSANETWORKEVENTS )
02741 {
02742 return -1;
02743 }
02744 #endif
02745
02746 ACE_END_VERSIONED_NAMESPACE_DECL
02747
02748 #endif