ACE_WFMO_Reactor_Notify Class Reference

Unblock the from its event loop, passing it an optional ACE_Event_Handler to dispatch. More...

#include <WFMO_Reactor.h>

Inheritance diagram for ACE_WFMO_Reactor_Notify:

Inheritance graph
[legend]
Collaboration diagram for ACE_WFMO_Reactor_Notify:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ACE_WFMO_Reactor_Notify (size_t max_notifies=1024)
 Constructor.

virtual int open (ACE_Reactor_Impl *wfmo_reactor, ACE_Timer_Queue *timer_queue, int disable_notify=0)
 Initialization. is stored to call .

virtual int close (void)
 No-op.

virtual int notify (ACE_Event_Handler *event_handler=0, ACE_Reactor_Mask mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask)
 No-op.

virtual ACE_HANDLE get_handle (void) const
 Returns a handle to the .

virtual ACE_HANDLE notify_handle (void)
virtual int dispatch_notify (ACE_Notification_Buffer &buffer)
virtual int is_dispatchable (ACE_Notification_Buffer &buffer)
 Verify whether the buffer has dispatchable info or not.

virtual int read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer)
void max_notify_iterations (int)
int max_notify_iterations (void)
virtual int purge_pending_notifications (ACE_Event_Handler *, ACE_Reactor_Mask=ACE_Event_Handler::ALL_EVENTS_MASK)
virtual void dump (void) const
 Dump the state of an object.


Private Member Functions

virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)

Private Attributes

ACE_Timer_Queuetimer_queue_
 Pointer to the wfmo_reactor's timer queue.

ACE_Auto_Event wakeup_one_thread_
ACE_Message_Queue< ACE_MT_SYNCHmessage_queue_
int max_notify_iterations_

Detailed Description

Unblock the from its event loop, passing it an optional ACE_Event_Handler to dispatch.

This implementation is necessary for cases where the is run in a multi-threaded program. In this case, we need to be able to unblock when updates occur other than in the main thread. To do this, we signal an auto-reset event the is listening on. If an ACE_Event_Handler and is passed to , the appropriate <handle_*> method is dispatched.

Definition at line 495 of file WFMO_Reactor.h.


Constructor & Destructor Documentation

ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify size_t  max_notifies = 1024  ) 
 

Constructor.

Definition at line 2341 of file WFMO_Reactor.cpp.

02342   : timer_queue_ (0),
02343     message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
02344                     max_notifies * sizeof (ACE_Notification_Buffer)),
02345     max_notify_iterations_ (-1)
02346 {
02347 }


Member Function Documentation

int ACE_WFMO_Reactor_Notify::close void   )  [virtual]
 

No-op.

Implements ACE_Reactor_Notify.

Definition at line 2336 of file WFMO_Reactor.cpp.

02337 {
02338   return -1;
02339 }

int ACE_WFMO_Reactor_Notify::dispatch_notifications int &  number_of_active_handles,
ACE_Handle_Set rd_mask
[virtual]
 

No-op.

Implements ACE_Reactor_Notify.

Definition at line 2304 of file WFMO_Reactor.cpp.

02306 {
02307   return -1;
02308 }

int ACE_WFMO_Reactor_Notify::dispatch_notify ACE_Notification_Buffer buffer  )  [virtual]
 

Handle one of the notify call on the . This could be because of a thread trying to unblock the

Implements ACE_Reactor_Notify.

Definition at line 2330 of file WFMO_Reactor.cpp.

02331 {
02332   return 0;
02333 }

void ACE_WFMO_Reactor_Notify::dump void   )  const [virtual]
 

Dump the state of an object.

Implements ACE_Reactor_Notify.

Definition at line 2631 of file WFMO_Reactor.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::dump(), and LM_DEBUG.

02632 {
02633 #if defined (ACE_HAS_DUMP)
02634   ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
02635   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02636   this->timer_queue_->dump ();
02637   ACE_DEBUG ((LM_DEBUG,
02638               ACE_LIB_TEXT ("Max. iteration: %d\n"),
02639               this->max_notify_iterations_));
02640   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02641 #endif /* ACE_HAS_DUMP */
02642 }

ACE_HANDLE ACE_WFMO_Reactor_Notify::get_handle void   )  const [virtual]
 

Returns a handle to the .

Reimplemented from ACE_Event_Handler.

Definition at line 2360 of file WFMO_Reactor.cpp.

References ACE_Event::handle(), and wakeup_one_thread_.

02361 {
02362   return this->wakeup_one_thread_.handle ();
02363 }

int ACE_WFMO_Reactor_Notify::handle_signal int  signum,
siginfo_t = 0,
ucontext_t = 0
[private, virtual]
 

Called when the notification event waited on by is signaled. This dequeues all pending and dispatches them.

Reimplemented from ACE_Event_Handler.

Definition at line 2368 of file WFMO_Reactor.cpp.

References ACE_ERROR, ACE_LIB_TEXT, ACE_Message_Block::base(), ACE_Message_Queue< ACE_MT_SYNCH >::dequeue_head(), ACE_Notification_Buffer::eh_, EWOULDBLOCK, ACE_Event::handle(), ACE_Event_Handler::handle_close(), ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_group_qos(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), ACE_Event_Handler::handle_qos(), ACE_Message_Queue< ACE_MT_SYNCH >::is_empty(), LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::reference_counting_policy(), ACE_Message_Block::release(), ACE_Event_Handler::remove_reference(), siginfo_t::si_handle_, ACE_Event::signal(), ucontext_t, and wakeup_one_thread_.

02371 {
02372   ACE_UNUSED_ARG (signum);
02373 
02374   // Just check for sanity...
02375   if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
02376     return -1;
02377 
02378   // This will get called when <WFMO_Reactor->wakeup_one_thread_> event
02379   // is signaled.
02380   //  ACE_DEBUG ((LM_DEBUG,
02381   //             ACE_LIB_TEXT ("(%t) waking up to handle internal notifications\n")));
02382 
02383   for (int i = 1; ; ++i)
02384     {
02385       ACE_Message_Block *mb = 0;
02386       // Copy ACE_Time_Value::zero since dequeue_head will modify it.
02387       ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
02388       if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
02389         {
02390           if (errno == EWOULDBLOCK)
02391             // We've reached the end of the processing, return
02392             // normally.
02393             return 0;
02394           else
02395             return -1; // Something weird happened...
02396         }
02397       else
02398         {
02399           ACE_Notification_Buffer *buffer =
02400             reinterpret_cast <ACE_Notification_Buffer *> (mb->base ());
02401 
02402           // If eh == 0 then we've got major problems!  Otherwise, we
02403           // need to dispatch the appropriate handle_* method on the
02404           // ACE_Event_Handler pointer we've been passed.
02405 
02406           if (buffer->eh_ != 0)
02407             {
02408               ACE_Event_Handler *event_handler =
02409                 buffer->eh_;
02410 
02411               int requires_reference_counting =
02412                 event_handler->reference_counting_policy ().value () ==
02413                 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02414 
02415               int result = 0;
02416 
02417               switch (buffer->mask_)
02418                 {
02419                 case ACE_Event_Handler::READ_MASK:
02420                 case ACE_Event_Handler::ACCEPT_MASK:
02421                   result = event_handler->handle_input (ACE_INVALID_HANDLE);
02422                   break;
02423                 case ACE_Event_Handler::WRITE_MASK:
02424                   result = event_handler->handle_output (ACE_INVALID_HANDLE);
02425                   break;
02426                 case ACE_Event_Handler::EXCEPT_MASK:
02427                   result = event_handler->handle_exception (ACE_INVALID_HANDLE);
02428                   break;
02429                 case ACE_Event_Handler::QOS_MASK:
02430                   result = event_handler->handle_qos (ACE_INVALID_HANDLE);
02431                   break;
02432                 case ACE_Event_Handler::GROUP_QOS_MASK:
02433                   result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
02434                   break;
02435                 default:
02436                   ACE_ERROR ((LM_ERROR,
02437                               ACE_LIB_TEXT ("invalid mask = %d\n"),
02438                               buffer->mask_));
02439                   break;
02440                 }
02441 
02442               if (result == -1)
02443                 event_handler->handle_close (ACE_INVALID_HANDLE,
02444                                              ACE_Event_Handler::EXCEPT_MASK);
02445 
02446               if (requires_reference_counting)
02447                 {
02448                   event_handler->remove_reference ();
02449                 }
02450             }
02451 
02452           // Make sure to delete the memory regardless of success or
02453           // failure!
02454           mb->release ();
02455 
02456           // Bail out if we've reached the <max_notify_iterations_>.
02457           // Note that by default <max_notify_iterations_> is -1, so
02458           // we'll loop until we're done.
02459           if (i == this->max_notify_iterations_)
02460             {
02461               // If there are still notification in the queue, we need
02462               // to wake up again
02463               if (!this->message_queue_.is_empty ())
02464                 this->wakeup_one_thread_.signal ();
02465 
02466               // Break the loop as we have reached max_notify_iterations_
02467               return 0;
02468             }
02469         }
02470     }
02471 }

int ACE_WFMO_Reactor_Notify::is_dispatchable ACE_Notification_Buffer buffer  )  [virtual]
 

Verify whether the buffer has dispatchable info or not.

Implements ACE_Reactor_Notify.

Definition at line 2311 of file WFMO_Reactor.cpp.

02312 {
02313   return 0;
02314 }

int ACE_WFMO_Reactor_Notify::max_notify_iterations void   )  [virtual]
 

Get the maximum number of times that the <ACE_WFMO_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify queue before breaking out of its <ACE_Message_Queue::dequeue> loop.

Implements ACE_Reactor_Notify.

Definition at line 2525 of file WFMO_Reactor.cpp.

References ACE_TRACE.

02526 {
02527   ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02528   return this->max_notify_iterations_;
02529 }

void ACE_WFMO_Reactor_Notify::max_notify_iterations int   )  [virtual]
 

Set the maximum number of times that the <ACE_WFMO_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify queue before breaking out of its <ACE_Message_Queue::dequeue> loop. By default, this is set to -1, which means "iterate until the queue is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.

Implements ACE_Reactor_Notify.

Definition at line 2514 of file WFMO_Reactor.cpp.

References ACE_TRACE.

02515 {
02516   ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02517   // Must always be > 0 or < 0 to optimize the loop exit condition.
02518   if (iterations == 0)
02519     iterations = 1;
02520 
02521   this->max_notify_iterations_ = iterations;
02522 }

int ACE_WFMO_Reactor_Notify::notify ACE_Event_Handler event_handler = 0,
ACE_Reactor_Mask  mask = ACE_Event_Handler::EXCEPT_MASK,
ACE_Time_Value timeout = 0
[virtual]
 

Special trick to unblock when updates occur. All we do is enqueue and onto the ACE_Message_Queue and wakeup the by signaling its handle. The indicates how long to blocking trying to notify the . If == 0, the caller will block until action is possible, else will wait until the relative time specified in elapses).

Implements ACE_Reactor_Notify.

Definition at line 2478 of file WFMO_Reactor.cpp.

References ACE_NEW_RETURN, ACE_Reactor_Mask, ACE_Event_Handler::add_reference(), ACE_Message_Block::base(), ACE_Notification_Buffer::eh_, ACE_Message_Queue< ACE_MT_SYNCH >::enqueue_tail(), ACE_Timer_Queue_T< TYPE, FUNCTOR, ACE_LOCK >::gettimeofday(), ACE_Notification_Buffer::mask_, ACE_Message_Block::release(), ACE_Event::signal(), and wakeup_one_thread_.

02481 {
02482   if (event_handler != 0)
02483     {
02484       ACE_Message_Block *mb = 0;
02485       ACE_NEW_RETURN (mb,
02486                       ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
02487                       -1);
02488 
02489       ACE_Notification_Buffer *buffer =
02490         (ACE_Notification_Buffer *) mb->base ();
02491       buffer->eh_ = event_handler;
02492       buffer->mask_ = mask;
02493 
02494       // Convert from relative time to absolute time by adding the
02495       // current time of day.  This is what <ACE_Message_Queue>
02496       // expects.
02497       if (timeout != 0)
02498         *timeout += timer_queue_->gettimeofday ();
02499 
02500       if (this->message_queue_.enqueue_tail
02501           (mb, timeout) == -1)
02502         {
02503           mb->release ();
02504           return -1;
02505         }
02506 
02507       event_handler->add_reference ();
02508     }
02509 
02510   return this->wakeup_one_thread_.signal ();
02511 }

ACE_HANDLE ACE_WFMO_Reactor_Notify::notify_handle void   )  [virtual]
 

Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the

Implements ACE_Reactor_Notify.

Definition at line 2317 of file WFMO_Reactor.cpp.

02318 {
02319   return ACE_INVALID_HANDLE;
02320 }

int ACE_WFMO_Reactor_Notify::open ACE_Reactor_Impl wfmo_reactor,
ACE_Timer_Queue timer_queue,
int  disable_notify = 0
[virtual]
 

Initialization. is stored to call .

Implements ACE_Reactor_Notify.

Definition at line 2350 of file WFMO_Reactor.cpp.

References ACE_Timer_Queue, and ACE_Reactor_Impl::register_handler().

02353 {
02354   ACE_UNUSED_ARG (ignore_notify);
02355   timer_queue_ = timer_queue;
02356   return wfmo_reactor->register_handler (this);
02357 }

int ACE_WFMO_Reactor_Notify::purge_pending_notifications ACE_Event_Handler ,
ACE_Reactor_Mask  = ACE_Event_Handler::ALL_EVENTS_MASK
[virtual]
 

Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. If == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error.

Implements ACE_Reactor_Notify.

Definition at line 2532 of file WFMO_Reactor.cpp.

References ACE_ASSERT, ACE_BIT_DISABLED, ACE_CLR_BITS, ACE_GUARD_RETURN, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Message_Block::base(), ACE_Message_Queue<>::dequeue_head(), ACE_Message_Queue< ACE_MT_SYNCH >::dequeue_head(), ACE_Notification_Buffer::eh_, ACE_Message_Queue< ACE_MT_SYNCH >::enqueue_head(), ACE_Message_Queue<>::enqueue_head(), ACE_Message_Queue< ACE_MT_SYNCH >::is_empty(), ACE_Notification_Buffer::mask_, ACE_Message_Queue<>::message_count(), ACE_Message_Queue< ACE_MT_SYNCH >::message_count(), ACE_Message_Block::release(), and ACE_Event_Handler::remove_reference().

02534 {
02535   ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
02536 
02537   // Go over message queue and take out all the matching event
02538   // handlers.  If eh == 0, purge all. Note that reactor notifies (no
02539   // handler specified) are never purged, as this may lose a needed
02540   // notify the reactor queued for itself.
02541 
02542   if (this->message_queue_.is_empty ())
02543     return 0;
02544 
02545   // Guard against new and/or delivered notifications while purging.
02546   // WARNING!!! The use of the notification queue's lock object for
02547   // this guard makes use of the knowledge that on Win32, the mutex
02548   // protecting the queue is really a CriticalSection, which is
02549   // recursive. This is how we can get away with locking it down here
02550   // and still calling member functions on the queue object.
02551   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
02552 
02553   // first, copy all to our own local queue. Since we've locked everyone out
02554   // of here, there's no need to use any synchronization on this queue.
02555   ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
02556 
02557   size_t queue_size  = this->message_queue_.message_count ();
02558   int number_purged = 0;
02559 
02560   size_t index;
02561 
02562   for (index = 0; index < queue_size; ++index)
02563     {
02564       ACE_Message_Block *mb = 0;
02565       if (-1 == this->message_queue_.dequeue_head (mb))
02566         return -1;        // This shouldn't happen...
02567 
02568       ACE_Notification_Buffer *buffer =
02569         reinterpret_cast<ACE_Notification_Buffer *> (mb->base ());
02570 
02571       // If this is not a Reactor notify (it is for a particular handler),
02572       // and it matches the specified handler (or purging all),
02573       // and applying the mask would totally eliminate the notification, then
02574       // release it and count the number purged.
02575       if ((0 != buffer->eh_) &&
02576           (0 == eh || eh == buffer->eh_) &&
02577           ACE_BIT_DISABLED (buffer->mask_, ~mask)) // the existing notification mask
02578                                                    // is left with nothing when
02579                                                    // applying the mask
02580         {
02581           ACE_Event_Handler *event_handler = buffer->eh_;
02582 
02583           event_handler->remove_reference ();
02584 
02585           mb->release ();
02586           ++number_purged;
02587         }
02588       else
02589         {
02590           // To preserve it, move it to the local_queue.  But first, if
02591           // this is not a Reactor notify (it is for a
02592           // particularhandler), and it matches the specified handler
02593           // (or purging all), then apply the mask
02594           if ((0 != buffer->eh_) &&
02595               (0 == eh || eh == buffer->eh_))
02596             ACE_CLR_BITS(buffer->mask_, mask);
02597           if (-1 == local_queue.enqueue_head (mb))
02598             return -1;
02599         }
02600     }
02601 
02602   if (this->message_queue_.message_count ())
02603     { // Should be empty!
02604       ACE_ASSERT (0);
02605       return -1;
02606     }
02607 
02608   // Now copy back from the local queue to the class queue, taking
02609   // care to preserve the original order...
02610   queue_size  = local_queue.message_count ();
02611   for (index = 0; index < queue_size; ++index)
02612     {
02613       ACE_Message_Block  *mb = 0;
02614       if (-1 == local_queue.dequeue_head (mb))
02615         {
02616           ACE_ASSERT (0);
02617           return -1;
02618         }
02619 
02620       if (-1 == this->message_queue_.enqueue_head (mb))
02621         {
02622           ACE_ASSERT (0);
02623           return -1;
02624         }
02625     }
02626 
02627   return number_purged;
02628 }

int ACE_WFMO_Reactor_Notify::read_notify_pipe ACE_HANDLE  handle,
ACE_Notification_Buffer buffer
[virtual]
 

Read one of the notify call on the into the . This could be because of a thread trying to unblock the

Implements ACE_Reactor_Notify.

Definition at line 2323 of file WFMO_Reactor.cpp.

02325 {
02326   return 0;
02327 }


Member Data Documentation

int ACE_WFMO_Reactor_Notify::max_notify_iterations_ [private]
 

Keeps track of the maximum number of times that the <ACE_WFMO_Reactor_Notify::handle_input> method will iterate and dispatch the that are passed in via the notify queue before breaking out of its <ACE_Message_Queue::dequeue> loop. By default, this is set to -1, which means "iterate until the queue is empty."

Definition at line 610 of file WFMO_Reactor.h.

ACE_Message_Queue<ACE_MT_SYNCH> ACE_WFMO_Reactor_Notify::message_queue_ [private]
 

Message queue that keeps track of pending . This queue must be thread-safe because it can be called by multiple threads of control.

Definition at line 600 of file WFMO_Reactor.h.

ACE_Timer_Queue* ACE_WFMO_Reactor_Notify::timer_queue_ [private]
 

Pointer to the wfmo_reactor's timer queue.

Definition at line 584 of file WFMO_Reactor.h.

ACE_Auto_Event ACE_WFMO_Reactor_Notify::wakeup_one_thread_ [private]
 

An auto event is used so that we can it to wakeup one thread up (e.g., when the method is called).

Definition at line 595 of file WFMO_Reactor.h.

Referenced by get_handle(), handle_signal(), and notify().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 11:32:38 2006 for ACE by doxygen 1.3.6