#include <WFMO_Reactor.h>
Inheritance diagram for ACE_WFMO_Reactor_Notify:
Public Member Functions | |
ACE_WFMO_Reactor_Notify (size_t max_notifies=1024) | |
Constructor. | |
virtual int | open (ACE_Reactor_Impl *wfmo_reactor, ACE_Timer_Queue *timer_queue, int disable_notify=0) |
Initialization. is stored to call . | |
virtual int | close (void) |
No-op. | |
virtual int | notify (ACE_Event_Handler *event_handler=0, ACE_Reactor_Mask mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0) |
virtual int | dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask) |
No-op. | |
virtual ACE_HANDLE | get_handle (void) const |
Returns a handle to the . | |
virtual ACE_HANDLE | notify_handle (void) |
virtual int | dispatch_notify (ACE_Notification_Buffer &buffer) |
virtual int | is_dispatchable (ACE_Notification_Buffer &buffer) |
Verify whether the buffer has dispatchable info or not. | |
virtual int | read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer) |
void | max_notify_iterations (int) |
int | max_notify_iterations (void) |
virtual int | purge_pending_notifications (ACE_Event_Handler *, ACE_Reactor_Mask=ACE_Event_Handler::ALL_EVENTS_MASK) |
virtual void | dump (void) const |
Dump the state of an object. | |
Private Member Functions | |
virtual int | handle_signal (int signum, siginfo_t *=0, ucontext_t *=0) |
Private Attributes | |
ACE_Timer_Queue * | timer_queue_ |
Pointer to the wfmo_reactor's timer queue. | |
ACE_Auto_Event | wakeup_one_thread_ |
ACE_Message_Queue< ACE_MT_SYNCH > | message_queue_ |
int | max_notify_iterations_ |
This implementation is necessary for cases where the is run in a multi-threaded program. In this case, we need to be able to unblock when updates occur other than in the main thread. To do this, we signal an auto-reset event the is listening on. If an ACE_Event_Handler and is passed to , the appropriate <handle_*> method is dispatched.
Definition at line 495 of file WFMO_Reactor.h.
|
Constructor.
Definition at line 2341 of file WFMO_Reactor.cpp.
02342 : timer_queue_ (0), 02343 message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer), 02344 max_notifies * sizeof (ACE_Notification_Buffer)), 02345 max_notify_iterations_ (-1) 02346 { 02347 } |
|
No-op.
Implements ACE_Reactor_Notify. Definition at line 2336 of file WFMO_Reactor.cpp.
02337 {
02338 return -1;
02339 }
|
|
No-op.
Implements ACE_Reactor_Notify. Definition at line 2304 of file WFMO_Reactor.cpp.
02306 {
02307 return -1;
02308 }
|
|
Handle one of the notify call on the . This could be because of a thread trying to unblock the Implements ACE_Reactor_Notify. Definition at line 2330 of file WFMO_Reactor.cpp.
02331 {
02332 return 0;
02333 }
|
|
Dump the state of an object.
Implements ACE_Reactor_Notify. Definition at line 2631 of file WFMO_Reactor.cpp. References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::dump(), and LM_DEBUG.
02632 { 02633 #if defined (ACE_HAS_DUMP) 02634 ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump"); 02635 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); 02636 this->timer_queue_->dump (); 02637 ACE_DEBUG ((LM_DEBUG, 02638 ACE_LIB_TEXT ("Max. iteration: %d\n"), 02639 this->max_notify_iterations_)); 02640 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); 02641 #endif /* ACE_HAS_DUMP */ 02642 } |
|
Returns a handle to the .
Reimplemented from ACE_Event_Handler. Definition at line 2360 of file WFMO_Reactor.cpp. References ACE_Event::handle(), and wakeup_one_thread_.
02361 { 02362 return this->wakeup_one_thread_.handle (); 02363 } |
|
Called when the notification event waited on by is signaled. This dequeues all pending and dispatches them. Reimplemented from ACE_Event_Handler. Definition at line 2368 of file WFMO_Reactor.cpp. References ACE_ERROR, ACE_LIB_TEXT, ACE_Message_Block::base(), ACE_Message_Queue< ACE_MT_SYNCH >::dequeue_head(), ACE_Notification_Buffer::eh_, EWOULDBLOCK, ACE_Event::handle(), ACE_Event_Handler::handle_close(), ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_group_qos(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), ACE_Event_Handler::handle_qos(), ACE_Message_Queue< ACE_MT_SYNCH >::is_empty(), LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::reference_counting_policy(), ACE_Message_Block::release(), ACE_Event_Handler::remove_reference(), siginfo_t::si_handle_, ACE_Event::signal(), ucontext_t, and wakeup_one_thread_.
02371 { 02372 ACE_UNUSED_ARG (signum); 02373 02374 // Just check for sanity... 02375 if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ()) 02376 return -1; 02377 02378 // This will get called when <WFMO_Reactor->wakeup_one_thread_> event 02379 // is signaled. 02380 // ACE_DEBUG ((LM_DEBUG, 02381 // ACE_LIB_TEXT ("(%t) waking up to handle internal notifications\n"))); 02382 02383 for (int i = 1; ; ++i) 02384 { 02385 ACE_Message_Block *mb = 0; 02386 // Copy ACE_Time_Value::zero since dequeue_head will modify it. 02387 ACE_Time_Value zero_timeout (ACE_Time_Value::zero); 02388 if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1) 02389 { 02390 if (errno == EWOULDBLOCK) 02391 // We've reached the end of the processing, return 02392 // normally. 02393 return 0; 02394 else 02395 return -1; // Something weird happened... 02396 } 02397 else 02398 { 02399 ACE_Notification_Buffer *buffer = 02400 reinterpret_cast <ACE_Notification_Buffer *> (mb->base ()); 02401 02402 // If eh == 0 then we've got major problems! Otherwise, we 02403 // need to dispatch the appropriate handle_* method on the 02404 // ACE_Event_Handler pointer we've been passed. 02405 02406 if (buffer->eh_ != 0) 02407 { 02408 ACE_Event_Handler *event_handler = 02409 buffer->eh_; 02410 02411 int requires_reference_counting = 02412 event_handler->reference_counting_policy ().value () == 02413 ACE_Event_Handler::Reference_Counting_Policy::ENABLED; 02414 02415 int result = 0; 02416 02417 switch (buffer->mask_) 02418 { 02419 case ACE_Event_Handler::READ_MASK: 02420 case ACE_Event_Handler::ACCEPT_MASK: 02421 result = event_handler->handle_input (ACE_INVALID_HANDLE); 02422 break; 02423 case ACE_Event_Handler::WRITE_MASK: 02424 result = event_handler->handle_output (ACE_INVALID_HANDLE); 02425 break; 02426 case ACE_Event_Handler::EXCEPT_MASK: 02427 result = event_handler->handle_exception (ACE_INVALID_HANDLE); 02428 break; 02429 case ACE_Event_Handler::QOS_MASK: 02430 result = event_handler->handle_qos (ACE_INVALID_HANDLE); 02431 break; 02432 case ACE_Event_Handler::GROUP_QOS_MASK: 02433 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE); 02434 break; 02435 default: 02436 ACE_ERROR ((LM_ERROR, 02437 ACE_LIB_TEXT ("invalid mask = %d\n"), 02438 buffer->mask_)); 02439 break; 02440 } 02441 02442 if (result == -1) 02443 event_handler->handle_close (ACE_INVALID_HANDLE, 02444 ACE_Event_Handler::EXCEPT_MASK); 02445 02446 if (requires_reference_counting) 02447 { 02448 event_handler->remove_reference (); 02449 } 02450 } 02451 02452 // Make sure to delete the memory regardless of success or 02453 // failure! 02454 mb->release (); 02455 02456 // Bail out if we've reached the <max_notify_iterations_>. 02457 // Note that by default <max_notify_iterations_> is -1, so 02458 // we'll loop until we're done. 02459 if (i == this->max_notify_iterations_) 02460 { 02461 // If there are still notification in the queue, we need 02462 // to wake up again 02463 if (!this->message_queue_.is_empty ()) 02464 this->wakeup_one_thread_.signal (); 02465 02466 // Break the loop as we have reached max_notify_iterations_ 02467 return 0; 02468 } 02469 } 02470 } 02471 } |
|
Verify whether the buffer has dispatchable info or not.
Implements ACE_Reactor_Notify. Definition at line 2311 of file WFMO_Reactor.cpp.
02312 {
02313 return 0;
02314 }
|
|
Get the maximum number of times that the <ACE_WFMO_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify queue before breaking out of its <ACE_Message_Queue::dequeue> loop. Implements ACE_Reactor_Notify. Definition at line 2525 of file WFMO_Reactor.cpp. References ACE_TRACE.
02526 { 02527 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations"); 02528 return this->max_notify_iterations_; 02529 } |
|
Set the maximum number of times that the <ACE_WFMO_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify queue before breaking out of its <ACE_Message_Queue::dequeue> loop. By default, this is set to -1, which means "iterate until the queue is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead. Implements ACE_Reactor_Notify. Definition at line 2514 of file WFMO_Reactor.cpp. References ACE_TRACE.
02515 { 02516 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations"); 02517 // Must always be > 0 or < 0 to optimize the loop exit condition. 02518 if (iterations == 0) 02519 iterations = 1; 02520 02521 this->max_notify_iterations_ = iterations; 02522 } |
|
Special trick to unblock when updates occur. All we do is enqueue and onto the ACE_Message_Queue and wakeup the by signaling its handle. The indicates how long to blocking trying to notify the . If == 0, the caller will block until action is possible, else will wait until the relative time specified in elapses). Implements ACE_Reactor_Notify. Definition at line 2478 of file WFMO_Reactor.cpp. References ACE_NEW_RETURN, ACE_Reactor_Mask, ACE_Event_Handler::add_reference(), ACE_Message_Block::base(), ACE_Notification_Buffer::eh_, ACE_Message_Queue< ACE_MT_SYNCH >::enqueue_tail(), ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::gettimeofday(), ACE_Notification_Buffer::mask_, ACE_Message_Block::release(), ACE_Event::signal(), and wakeup_one_thread_.
02481 { 02482 if (event_handler != 0) 02483 { 02484 ACE_Message_Block *mb = 0; 02485 ACE_NEW_RETURN (mb, 02486 ACE_Message_Block (sizeof (ACE_Notification_Buffer)), 02487 -1); 02488 02489 ACE_Notification_Buffer *buffer = 02490 (ACE_Notification_Buffer *) mb->base (); 02491 buffer->eh_ = event_handler; 02492 buffer->mask_ = mask; 02493 02494 // Convert from relative time to absolute time by adding the 02495 // current time of day. This is what <ACE_Message_Queue> 02496 // expects. 02497 if (timeout != 0) 02498 *timeout += timer_queue_->gettimeofday (); 02499 02500 if (this->message_queue_.enqueue_tail 02501 (mb, timeout) == -1) 02502 { 02503 mb->release (); 02504 return -1; 02505 } 02506 02507 event_handler->add_reference (); 02508 } 02509 02510 return this->wakeup_one_thread_.signal (); 02511 } |
|
Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Implements ACE_Reactor_Notify. Definition at line 2317 of file WFMO_Reactor.cpp.
02318 {
02319 return ACE_INVALID_HANDLE;
02320 }
|
|
Initialization. is stored to call .
Implements ACE_Reactor_Notify. Definition at line 2350 of file WFMO_Reactor.cpp. References ACE_Timer_Queue, and ACE_Reactor_Impl::register_handler().
02353 { 02354 ACE_UNUSED_ARG (ignore_notify); 02355 timer_queue_ = timer_queue; 02356 return wfmo_reactor->register_handler (this); 02357 } |
|
Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. If == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error. Implements ACE_Reactor_Notify. Definition at line 2532 of file WFMO_Reactor.cpp. References ACE_ASSERT, ACE_BIT_DISABLED, ACE_CLR_BITS, ACE_GUARD_RETURN, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Message_Block::base(), ACE_Message_Queue<>::dequeue_head(), ACE_Message_Queue< ACE_MT_SYNCH >::dequeue_head(), ACE_Notification_Buffer::eh_, ACE_Message_Queue< ACE_MT_SYNCH >::enqueue_head(), ACE_Message_Queue<>::enqueue_head(), ACE_Message_Queue< ACE_MT_SYNCH >::is_empty(), ACE_Notification_Buffer::mask_, ACE_Message_Queue<>::message_count(), ACE_Message_Queue< ACE_MT_SYNCH >::message_count(), ACE_Message_Block::release(), and ACE_Event_Handler::remove_reference().
02534 { 02535 ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications"); 02536 02537 // Go over message queue and take out all the matching event 02538 // handlers. If eh == 0, purge all. Note that reactor notifies (no 02539 // handler specified) are never purged, as this may lose a needed 02540 // notify the reactor queued for itself. 02541 02542 if (this->message_queue_.is_empty ()) 02543 return 0; 02544 02545 // Guard against new and/or delivered notifications while purging. 02546 // WARNING!!! The use of the notification queue's lock object for 02547 // this guard makes use of the knowledge that on Win32, the mutex 02548 // protecting the queue is really a CriticalSection, which is 02549 // recursive. This is how we can get away with locking it down here 02550 // and still calling member functions on the queue object. 02551 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1); 02552 02553 // first, copy all to our own local queue. Since we've locked everyone out 02554 // of here, there's no need to use any synchronization on this queue. 02555 ACE_Message_Queue<ACE_NULL_SYNCH> local_queue; 02556 02557 size_t queue_size = this->message_queue_.message_count (); 02558 int number_purged = 0; 02559 02560 size_t index; 02561 02562 for (index = 0; index < queue_size; ++index) 02563 { 02564 ACE_Message_Block *mb = 0; 02565 if (-1 == this->message_queue_.dequeue_head (mb)) 02566 return -1; // This shouldn't happen... 02567 02568 ACE_Notification_Buffer *buffer = 02569 reinterpret_cast<ACE_Notification_Buffer *> (mb->base ()); 02570 02571 // If this is not a Reactor notify (it is for a particular handler), 02572 // and it matches the specified handler (or purging all), 02573 // and applying the mask would totally eliminate the notification, then 02574 // release it and count the number purged. 02575 if ((0 != buffer->eh_) && 02576 (0 == eh || eh == buffer->eh_) && 02577 ACE_BIT_DISABLED (buffer->mask_, ~mask)) // the existing notification mask 02578 // is left with nothing when 02579 // applying the mask 02580 { 02581 ACE_Event_Handler *event_handler = buffer->eh_; 02582 02583 event_handler->remove_reference (); 02584 02585 mb->release (); 02586 ++number_purged; 02587 } 02588 else 02589 { 02590 // To preserve it, move it to the local_queue. But first, if 02591 // this is not a Reactor notify (it is for a 02592 // particularhandler), and it matches the specified handler 02593 // (or purging all), then apply the mask 02594 if ((0 != buffer->eh_) && 02595 (0 == eh || eh == buffer->eh_)) 02596 ACE_CLR_BITS(buffer->mask_, mask); 02597 if (-1 == local_queue.enqueue_head (mb)) 02598 return -1; 02599 } 02600 } 02601 02602 if (this->message_queue_.message_count ()) 02603 { // Should be empty! 02604 ACE_ASSERT (0); 02605 return -1; 02606 } 02607 02608 // Now copy back from the local queue to the class queue, taking 02609 // care to preserve the original order... 02610 queue_size = local_queue.message_count (); 02611 for (index = 0; index < queue_size; ++index) 02612 { 02613 ACE_Message_Block *mb = 0; 02614 if (-1 == local_queue.dequeue_head (mb)) 02615 { 02616 ACE_ASSERT (0); 02617 return -1; 02618 } 02619 02620 if (-1 == this->message_queue_.enqueue_head (mb)) 02621 { 02622 ACE_ASSERT (0); 02623 return -1; 02624 } 02625 } 02626 02627 return number_purged; 02628 } |
|
Read one of the notify call on the into the . This could be because of a thread trying to unblock the Implements ACE_Reactor_Notify. Definition at line 2323 of file WFMO_Reactor.cpp.
02325 {
02326 return 0;
02327 }
|
|
Keeps track of the maximum number of times that the <ACE_WFMO_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify queue before breaking out of its <ACE_Message_Queue::dequeue> loop. By default, this is set to -1, which means "iterate until the queue is empty." Definition at line 610 of file WFMO_Reactor.h. |
|
Message queue that keeps track of pending . This queue must be thread-safe because it can be called by multiple threads of control. Definition at line 600 of file WFMO_Reactor.h. |
|
Pointer to the wfmo_reactor's timer queue.
Definition at line 584 of file WFMO_Reactor.h. |
|
An auto event is used so that we can it to wakeup one thread up (e.g., when the method is called). Definition at line 595 of file WFMO_Reactor.h. Referenced by get_handle(), handle_signal(), and notify(). |