#include <Dev_Poll_Reactor.h>
Inheritance diagram for ACE_Dev_Poll_Reactor_Notify:
Initialization and Termination Methods | |
Methods called when initializing and terminating this event handler. | |
virtual int | open (ACE_Reactor_Impl *, ACE_Timer_Queue *timer_queue=0, int disable_notify=0) |
virtual int | close (void) |
virtual int | notify (ACE_Event_Handler *eh=0, ACE_Reactor_Mask mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0) |
virtual int | dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask) |
virtual ACE_HANDLE | notify_handle (void) |
virtual int | is_dispatchable (ACE_Notification_Buffer &buffer) |
Verify whether the buffer has dispatchable info or not. | |
virtual int | dispatch_notify (ACE_Notification_Buffer &buffer) |
virtual int | read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer) |
virtual int | handle_input (ACE_HANDLE handle) |
virtual void | max_notify_iterations (int) |
virtual int | max_notify_iterations (void) |
virtual int | purge_pending_notifications (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::ALL_EVENTS_MASK) |
virtual void | dump (void) const |
Dump the state of an object. | |
ACE_Dev_Poll_Reactor * | dp_reactor_ |
ACE_Pipe | notification_pipe_ |
int | max_notify_iterations_ |
Public Member Functions | |
ACE_Dev_Poll_Reactor_Notify (void) | |
Constructor. |
This event handler is used internally by the ACE_Dev_Poll_Reactor as a means to allow a thread other then the one running the event loop to unblock the event loop.
Definition at line 166 of file Dev_Poll_Reactor.h.
|
Constructor.
Definition at line 45 of file Dev_Poll_Reactor.cpp.
00046 : dp_reactor_ (0) 00047 , notification_pipe_ () 00048 , max_notify_iterations_ (-1) 00049 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00050 , alloc_queue_ () 00051 , notify_queue_ () 00052 , free_queue_ () 00053 , notify_queue_lock_ () 00054 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00055 { 00056 } |
|
Implements ACE_Reactor_Notify. Definition at line 115 of file Dev_Poll_Reactor.cpp. References ACE_TRACE, ACE_Unbounded_Queue_Iterator< T >::advance(), ACE_Pipe::close(), ACE_Unbounded_Queue_Iterator< T >::next(), and notification_pipe_.
00116 { 00117 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close"); 00118 00119 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00120 // Free up the dynamically allocated resources. 00121 ACE_Notification_Buffer **b; 00122 00123 for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_); 00124 alloc_iter.next (b) != 0; 00125 alloc_iter.advance ()) 00126 { 00127 delete [] *b; 00128 *b = 0; 00129 } 00130 00131 this->alloc_queue_.reset (); 00132 this->notify_queue_.reset (); 00133 this->free_queue_.reset (); 00134 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00135 00136 return this->notification_pipe_.close (); 00137 } |
|
This method's interface is not very compatibile with this Reactor's design. It's not clear why this method is pure virtual either. Implements ACE_Reactor_Notify. Definition at line 225 of file Dev_Poll_Reactor.cpp. References ACE_NOTSUP_RETURN, ACE_TRACE, handle_input(), notification_pipe_, and ACE_Pipe::read_handle().
00228 { 00229 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notifications"); 00230 00231 // This method is unimplemented in the ACE_Dev_Poll_Reactor. 00232 // Instead, the notification handler is invoked as part of the IO 00233 // event set. Doing so alters the some documented semantics that 00234 // state that the notifications are handled before IO events. 00235 // Enforcing such semantics does not appear to be beneficial, and 00236 // also serves to slow down event dispatching particularly with this 00237 // ACE_Dev_Poll_Reactor. 00238 00239 #if 0 00240 ACE_HANDLE read_handle = 00241 this->notification_pipe_.read_handle (); 00242 00243 // Note that we do not check if the handle has received any events. 00244 // Instead a non-blocking "speculative" read is performed. If the 00245 // read returns with errno == EWOULDBLOCK then no notifications are 00246 // dispatched. See ACE_Dev_Poll_Reactor_Notify::read_notify_pipe() 00247 // for details. 00248 if (read_handle != ACE_INVALID_HANDLE) 00249 { 00250 --number_of_active_handles; 00251 00252 return this->handle_input (read_handle); 00253 } 00254 else 00255 return 0; 00256 #else 00257 ACE_NOTSUP_RETURN (-1); 00258 #endif /* 0 */ 00259 } |
|
Handle one notify call represented in buffer. This could be because of a thread trying to unblock the Reactor_Impl. Implements ACE_Reactor_Notify. Definition at line 403 of file Dev_Poll_Reactor.cpp. References ACE_ERROR, ACE_LIB_TEXT, ACE_TRACE, ACE_Notification_Buffer::eh_, ACE_Event_Handler::handle_close(), ACE_Event_Handler::handle_exception(), ACE_Event_Handler::handle_input(), ACE_Event_Handler::handle_output(), LM_ERROR, and ACE_Notification_Buffer::mask_. Referenced by handle_input().
00404 { 00405 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notify"); 00406 00407 // If eh == 0 then another thread is unblocking the 00408 // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's 00409 // internal structures. Otherwise, we need to dispatch the 00410 // appropriate handle_* method on the ACE_Event_Handler 00411 // pointer we've been passed. 00412 if (buffer.eh_ != 0) 00413 { 00414 int result = 0; 00415 00416 // Guard the handler's refcount. Recall that when the notify 00417 // was queued, the refcount was incremented, so it need not be 00418 // now. The guard insures that it is decremented properly. 00419 ACE_Dev_Poll_Handler_Guard eh_guard (buffer.eh_, false); 00420 00421 switch (buffer.mask_) 00422 { 00423 case ACE_Event_Handler::READ_MASK: 00424 case ACE_Event_Handler::ACCEPT_MASK: 00425 result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); 00426 break; 00427 case ACE_Event_Handler::WRITE_MASK: 00428 result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); 00429 break; 00430 case ACE_Event_Handler::EXCEPT_MASK: 00431 result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); 00432 break; 00433 default: 00434 // Should we bail out if we get an invalid mask? 00435 ACE_ERROR ((LM_ERROR, 00436 ACE_LIB_TEXT ("dispatch_notify invalid mask = %d\n"), 00437 buffer.mask_)); 00438 } 00439 if (result == -1) 00440 buffer.eh_->handle_close (ACE_INVALID_HANDLE, buffer.mask_); 00441 } 00442 00443 return 1; 00444 } |
|
Dump the state of an object.
Implements ACE_Reactor_Notify. Definition at line 561 of file Dev_Poll_Reactor.cpp. References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Pipe::dump(), LM_DEBUG, and notification_pipe_.
00562 { 00563 #if defined (ACE_HAS_DUMP) 00564 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dump"); 00565 00566 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); 00567 ACE_DEBUG ((LM_DEBUG, 00568 ACE_LIB_TEXT ("dp_reactor_ = %@"), 00569 this->dp_reactor_)); 00570 this->notification_pipe_.dump (); 00571 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); 00572 #endif /* ACE_HAS_DUMP */ 00573 } |
|
Called back by the ACE_Dev_Poll_Reactor when a thread wants to unblock us. Reimplemented from ACE_Event_Handler. Definition at line 341 of file Dev_Poll_Reactor.cpp. References ACE_TRACE, dispatch_notify(), max_notify_iterations_, and read_notify_pipe(). Referenced by dispatch_notifications().
00342 { 00343 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::handle_input"); 00344 00345 // @@ We may end up dispatching this event handler twice: once when 00346 // performing the speculative read on the notification pipe 00347 // handle, and once more when dispatching the IO events. 00348 00349 // Precondition: this->select_reactor_.token_.current_owner () == 00350 // ACE_Thread::self (); 00351 00352 int number_dispatched = 0; 00353 int result = 0; 00354 ACE_Notification_Buffer buffer; 00355 00356 while ((result = this->read_notify_pipe (handle, buffer)) > 0) 00357 { 00358 // Dispatch the buffer 00359 // NOTE: We count only if we made any dispatches ie. upcalls. 00360 if (this->dispatch_notify (buffer) > 0) 00361 ++number_dispatched; 00362 00363 // Bail out if we've reached the <notify_threshold_>. Note that 00364 // by default <notify_threshold_> is -1, so we'll loop until all 00365 // the available notifications have been dispatched. 00366 if (number_dispatched == this->max_notify_iterations_) 00367 break; 00368 } 00369 00370 if (result == -1) 00371 { 00372 // Reassign number_dispatched to -1 if things have gone 00373 // seriously wrong. 00374 number_dispatched = -1; 00375 } 00376 00377 // Enqueue ourselves into the list of waiting threads. When we 00378 // reacquire the token we'll be off and running again with ownership 00379 // of the token. The postcondition of this call is that 00380 // <select_reactor_.token_.current_owner> == <ACE_Thread::self>. 00381 //this->select_reactor_->renew (); 00382 00383 return number_dispatched; 00384 } |
|
Verify whether the buffer has dispatchable info or not.
Implements ACE_Reactor_Notify. Definition at line 395 of file Dev_Poll_Reactor.cpp. References ACE_NOTSUP_RETURN, and ACE_TRACE.
00396 { 00397 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::is_dispatchable"); 00398 00399 ACE_NOTSUP_RETURN (-1); 00400 } |
|
Get the maximum number of times that the handle_input method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify queue before breaking out of its event loop. Implements ACE_Reactor_Notify. Definition at line 459 of file Dev_Poll_Reactor.cpp. References ACE_TRACE, and max_notify_iterations_.
00460 { 00461 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations"); 00462 00463 return this->max_notify_iterations_; 00464 } |
|
Set the maximum number of times that the handle_input method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify queue before breaking out of the event loop. By default, this is set to -1, which means "iterate until the queue is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead. Implements ACE_Reactor_Notify. Definition at line 447 of file Dev_Poll_Reactor.cpp. References ACE_TRACE, and max_notify_iterations_.
00448 { 00449 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations"); 00450 00451 // Must always be > 0 or < 0 to optimize the loop exit condition. 00452 if (iterations == 0) 00453 iterations = 1; 00454 00455 this->max_notify_iterations_ = iterations; 00456 } |
|
Called by a thread when it wants to unblock the Reactor_Impl. This wakes up the Reactor_Impl if currently blocked. Pass over both the Event_Handler and the mask to allow the caller to dictate which Event_Handler method the Reactor_Impl will invoke. The ACE_Time_Value indicates how long to block trying to notify the Reactor_Impl. If timeout == 0, the caller will block until action is possible, else will wait until the relative time specified in *timeout elapses). Implements ACE_Reactor_Notify. Definition at line 140 of file Dev_Poll_Reactor.cpp. References ACE_ASSERT, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_Reactor_Mask, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_SYNCH_MUTEX, ACE_TRACE, dp_reactor_, ETIME, ACE_Dev_Poll_Handler_Guard::release(), ACE::send(), and ssize_t.
00143 { 00144 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify"); 00145 00146 // Just consider this method a "no-op" if there's no 00147 // ACE_Dev_Poll_Reactor configured. 00148 if (this->dp_reactor_ == 0) 00149 return 0; 00150 00151 ACE_Notification_Buffer buffer (eh, mask); 00152 00153 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00154 00155 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00156 00157 // Locate a free buffer in the queue. Enlarge the queue if needed. 00158 ACE_Notification_Buffer *temp = 0; 00159 00160 if (free_queue_.dequeue_head (temp) == -1) 00161 { 00162 // Grow the queue of available buffers. 00163 ACE_Notification_Buffer *temp1; 00164 00165 ACE_NEW_RETURN (temp1, 00166 ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], 00167 -1); 00168 00169 if (this->alloc_queue_.enqueue_head (temp1) == -1) 00170 return -1; 00171 00172 // Start at 1 and enqueue only 00173 // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since 00174 // the first one will be used right now. 00175 for (size_t i = 1; 00176 i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; 00177 ++i) 00178 this->free_queue_.enqueue_head (temp1 + i); 00179 00180 temp = temp1; 00181 } 00182 00183 ACE_ASSERT (temp != 0); 00184 *temp = buffer; 00185 00186 ACE_Dev_Poll_Handler_Guard eh_guard (eh); 00187 00188 if (notify_queue_.enqueue_tail (temp) == -1) 00189 return -1; 00190 00191 // Now pop the pipe to force the callback for dispatching when ready. If 00192 // the send fails due to a full pipe, don't fail - assume the already-sent 00193 // pipe bytes will cause the entire notification queue to be processed. 00194 ssize_t n = ACE::send (this->notification_pipe_.write_handle (), 00195 (char *) &buffer, 00196 1, // Only need one byte to pop the pipe 00197 timeout); 00198 if (n == -1 && (errno != ETIME && errno != EAGAIN)) 00199 return -1; 00200 00201 // Since the notify is queued (and maybe already delivered by now) 00202 // we can simply release the guard. The dispatch of this notification 00203 // will decrement the reference count. 00204 eh_guard.release (); 00205 00206 return 0; 00207 #else 00208 00209 ACE_Dev_Poll_Handler_Guard eh_guard (eh); 00210 00211 ssize_t n = ACE::send (this->notification_pipe_.write_handle (), 00212 (char *) &buffer, 00213 sizeof buffer, 00214 timeout); 00215 if (n == -1) 00216 return -1; 00217 00218 eh_guard.release (); 00219 00220 return 0; 00221 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00222 } |
|
Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Reactor_Impl. Implements ACE_Reactor_Notify. Definition at line 387 of file Dev_Poll_Reactor.cpp. References ACE_TRACE, notification_pipe_, and ACE_Pipe::read_handle().
00388 { 00389 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify_handle"); 00390 00391 return this->notification_pipe_.read_handle (); 00392 } |
|
Implements ACE_Reactor_Notify. Definition at line 59 of file Dev_Poll_Reactor.cpp. References ACE_NEW_RETURN, ACE_NONBLOCK, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_Timer_Queue, ACE_TRACE, dp_reactor_, ACE_OS::fcntl(), notification_pipe_, ACE_Pipe::open(), and ACE::set_flags().
00062 { 00063 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::open"); 00064 00065 if (disable_notify_pipe == 0) 00066 { 00067 this->dp_reactor_ = dynamic_cast<ACE_Dev_Poll_Reactor *> (r); 00068 00069 if (this->dp_reactor_ == 0) 00070 { 00071 errno = EINVAL; 00072 return -1; 00073 } 00074 00075 if (this->notification_pipe_.open () == -1) 00076 return -1; 00077 00078 #if defined (F_SETFD) 00079 // close-on-exec 00080 ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1); 00081 ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1); 00082 #endif /* F_SETFD */ 00083 00084 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00085 ACE_Notification_Buffer *temp; 00086 00087 ACE_NEW_RETURN (temp, 00088 ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], 00089 -1); 00090 00091 if (this->alloc_queue_.enqueue_head (temp) == -1) 00092 return -1; 00093 00094 for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i) 00095 if (free_queue_.enqueue_head (temp + i) == -1) 00096 return -1; 00097 00098 if (ACE::set_flags (this->notification_pipe_.write_handle (), 00099 ACE_NONBLOCK) == -1) 00100 return -1; 00101 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00102 00103 // Set the read handle into non-blocking mode since we need to 00104 // perform a "speculative" read when determining if their are 00105 // notifications to dispatch. 00106 if (ACE::set_flags (this->notification_pipe_.read_handle (), 00107 ACE_NONBLOCK) == -1) 00108 return -1; 00109 } 00110 00111 return 0; 00112 } |
|
Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. Returns the number of notifications purged. Returns -1 on error. Implements ACE_Reactor_Notify. Definition at line 467 of file Dev_Poll_Reactor.cpp. References ACE_ASSERT, ACE_BIT_DISABLED, ACE_CLR_BITS, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_NOTSUP_RETURN, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Unbounded_Queue< T >::dequeue_head(), ACE_Notification_Buffer::eh_, ACE_Unbounded_Queue< T >::enqueue_head(), LM_ERROR, ACE_Notification_Buffer::mask_, and ACE_Unbounded_Queue< T >::size().
00470 { 00471 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications"); 00472 00473 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00474 00475 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00476 00477 if (this->notify_queue_.is_empty ()) 00478 return 0; 00479 00480 ACE_Notification_Buffer *temp; 00481 ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue; 00482 00483 size_t queue_size = this->notify_queue_.size (); 00484 int number_purged = 0; 00485 size_t i; 00486 for (i = 0; i < queue_size; ++i) 00487 { 00488 if (-1 == this->notify_queue_.dequeue_head (temp)) 00489 ACE_ERROR_RETURN ((LM_ERROR, 00490 ACE_LIB_TEXT ("%p\n"), 00491 ACE_LIB_TEXT ("dequeue_head")), 00492 -1); 00493 00494 // If this is not a Reactor notify (it is for a particular 00495 // handler), and it matches the specified handler (or purging 00496 // all), and applying the mask would totally eliminate the 00497 // notification, then release it and count the number purged. 00498 if ((0 != temp->eh_) && 00499 (0 == eh || eh == temp->eh_) && 00500 ACE_BIT_DISABLED (temp->mask_, ~mask)) // The existing 00501 // notification mask 00502 // is left with 00503 // nothing when 00504 // applying the mask. 00505 { 00506 if (this->free_queue_.enqueue_head (temp) == -1) 00507 ACE_ERROR_RETURN ((LM_ERROR, 00508 ACE_LIB_TEXT ("%p\n"), 00509 ACE_LIB_TEXT ("enqueue_head")), 00510 -1); 00511 ++number_purged; 00512 } 00513 else 00514 { 00515 // To preserve it, move it to the local_queue. 00516 // But first, if this is not a Reactor notify (it is for a 00517 // particular handler), and it matches the specified handler 00518 // (or purging all), then apply the mask. 00519 if ((0 != temp->eh_) && 00520 (0 == eh || eh == temp->eh_)) 00521 ACE_CLR_BITS(temp->mask_, mask); 00522 if (-1 == local_queue.enqueue_head (temp)) 00523 return -1; 00524 } 00525 } 00526 00527 if (this->notify_queue_.size ()) 00528 { 00529 // Should be empty! 00530 ACE_ASSERT (0); 00531 return -1; 00532 } 00533 00534 // Now put it back in the notify queue. 00535 queue_size = local_queue.size (); 00536 for (i = 0; i < queue_size; ++i) 00537 { 00538 if (-1 == local_queue.dequeue_head (temp)) 00539 ACE_ERROR_RETURN ((LM_ERROR, 00540 ACE_LIB_TEXT ("%p\n"), 00541 ACE_LIB_TEXT ("dequeue_head")), 00542 -1); 00543 00544 if (-1 == this->notify_queue_.enqueue_head (temp)) 00545 ACE_ERROR_RETURN ((LM_ERROR, 00546 ACE_LIB_TEXT ("%p\n"), 00547 ACE_LIB_TEXT ("enqueue_head")), 00548 -1); 00549 } 00550 00551 return number_purged; 00552 00553 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ 00554 ACE_UNUSED_ARG (eh); 00555 ACE_UNUSED_ARG (mask); 00556 ACE_NOTSUP_RETURN (-1); 00557 #endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ 00558 } |
|
Read one notify call on the handle into buffer. This could be because of a thread trying to unblock the Reactor_Impl. Implements ACE_Reactor_Notify. Definition at line 262 of file Dev_Poll_Reactor.cpp. References ACE_ERROR, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_SYNCH_MUTEX, ACE_TRACE, EWOULDBLOCK, LM_ERROR, ACE::recv(), and ssize_t. Referenced by handle_input().
00264 { 00265 ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::read_notify_pipe"); 00266 00267 // This is a (non-blocking) "speculative" read, i.e., we attempt to 00268 // read even if no event was polled on the read handle. A 00269 // speculative read is necessary since notifications must be 00270 // dispatched before IO events. We can avoid the speculative read 00271 // by "walking" the array of pollfd structures returned from 00272 // `/dev/poll' or `/dev/epoll' but that is potentially much more 00273 // expensive than simply checking for an EWOULDBLOCK. 00274 size_t to_read; 00275 char *read_p; 00276 bool have_one = false; 00277 00278 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) 00279 // For the queued case, we'll try to read one byte (since that's what 00280 // the notify() tried to put in) but we don't need it - notifications can 00281 // be queued even if the pipe fills, so there may be more notifications 00282 // queued than there are bytes in the pipe. 00283 char b; 00284 read_p = &b; 00285 to_read = 1; 00286 ACE_Notification_Buffer *temp; 00287 00288 // New scope to release the guard before trying the recv(). 00289 { 00290 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); 00291 00292 if (notify_queue_.is_empty ()) 00293 return 0; 00294 else if (notify_queue_.dequeue_head (temp) == -1) 00295 ACE_ERROR_RETURN ((LM_ERROR, 00296 ACE_LIB_TEXT ("%p\n"), 00297 ACE_LIB_TEXT ("read_notify_pipe: dequeue_head")), 00298 -1); 00299 buffer = *temp; 00300 have_one = true; 00301 if (free_queue_.enqueue_head (temp) == -1) 00302 ACE_ERROR ((LM_ERROR, 00303 ACE_LIB_TEXT ("%p\n"), 00304 ACE_LIB_TEXT ("read_notify_pipe: enqueue_head"))); 00305 } 00306 00307 #else 00308 to_read = sizeof buffer; 00309 read_p = (char *)&buffer; 00310 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ 00311 00312 ssize_t n = ACE::recv (handle, read_p, to_read); 00313 00314 if (n > 0) 00315 { 00316 // Check to see if we've got a short read. 00317 if (static_cast<size_t>(n) != to_read) 00318 { 00319 size_t remainder = to_read - n; 00320 00321 // If so, try to recover by reading the remainder. If this 00322 // doesn't work we're in big trouble since the input stream 00323 // won't be aligned correctly. I'm not sure quite what to 00324 // do at this point. It's probably best just to return -1. 00325 if (ACE::recv (handle, &read_p[n], remainder) <= 0) 00326 return -1; 00327 } 00328 00329 return 1; 00330 } 00331 00332 // Return -1 if things have gone seriously wrong. 00333 if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN)) 00334 return -1; 00335 00336 return have_one ? 1 : 0; 00337 } |
|
Keep a back pointer to the ACE_Dev_Poll_Reactor. If this value if NULL then the ACE_Dev_Poll_Reactor has been initialized with disable_notify_pipe. Definition at line 267 of file Dev_Poll_Reactor.h. |
|
Keeps track of the maximum number of times that the ACE_Dev_Poll_Reactor_Notify::handle_input method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty." Definition at line 283 of file Dev_Poll_Reactor.h. Referenced by handle_input(), and max_notify_iterations(). |
|
Contains the ACE_HANDLE the ACE_Dev_Poll_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Dev_Poll_Reactor will write to. Definition at line 274 of file Dev_Poll_Reactor.h. Referenced by close(), dispatch_notifications(), dump(), notify_handle(), and open(). |