Priority_Reactor.cpp

Go to the documentation of this file.
00001 // $Id: Priority_Reactor.cpp 80826 2008-03-04 14:51:23Z wotte $
00002 
00003 #include "ace/Priority_Reactor.h"
00004 #include "ace/Malloc_T.h"
00005 
00006 ACE_RCSID(ace, Priority_Reactor, "$Id: Priority_Reactor.cpp 80826 2008-03-04 14:51:23Z wotte $")
00007 
00008 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00009 
00010 typedef ACE_Unbounded_Queue_Iterator<ACE_Event_Tuple> QUEUE_ITERATOR;
00011 // Its iterator.
00012 
00013 typedef ACE_Cached_Allocator<ACE_Node<ACE_Event_Tuple>, ACE_SYNCH_NULL_MUTEX> TUPLE_ALLOCATOR;
00014 // Defines the memory allocator used, no need for locking because it
00015 // is only used in one thread of control.
00016 
00017 ACE_ALLOC_HOOK_DEFINE(ACE_Priority_Reactor)
00018 
00019 // Initialize ACE_Select_Reactor.
00020 
00021 #define npriorities \
00022         ACE_Event_Handler::HI_PRIORITY-ACE_Event_Handler::LO_PRIORITY+1
00023 
00024 void
00025 ACE_Priority_Reactor::init_bucket (void)
00026 {
00027   // Allocate enough space for all the handles.
00028   // TODO: This can be wrong, maybe we should use other kind of
00029   // allocator here?
00030   ACE_NEW (this->tuple_allocator_,
00031            TUPLE_ALLOCATOR (ACE_Select_Reactor::DEFAULT_SIZE));
00032 
00033   // The event handlers are assigned to a new As the Event
00034   ACE_NEW (this->bucket_,
00035            QUEUE *[npriorities]);
00036 
00037   // This loops "ensures" exception safety.
00038   for (int i = 0; i < npriorities; ++i)
00039     ACE_NEW (this->bucket_[i],
00040              QUEUE (this->tuple_allocator_));
00041 }
00042 
00043 ACE_Priority_Reactor::ACE_Priority_Reactor (ACE_Sig_Handler *sh,
00044                                             ACE_Timer_Queue *tq)
00045   : ACE_Select_Reactor(sh, tq),
00046     bucket_ (0),
00047     tuple_allocator_ (0)
00048 {
00049   ACE_TRACE ("ACE_Priority_Reactor::ACE_Priority_Reactor");
00050   this->init_bucket ();
00051 }
00052 
00053 ACE_Priority_Reactor::ACE_Priority_Reactor (size_t size,
00054                                             int rs,
00055                                             ACE_Sig_Handler *sh,
00056                                             ACE_Timer_Queue *tq)
00057   : ACE_Select_Reactor (size, rs, sh, tq),
00058     bucket_ (0),
00059     tuple_allocator_ (0)
00060 {
00061   ACE_TRACE ("ACE_Priority_Reactor::ACE_Priority_Reactor");
00062   this->init_bucket ();
00063 }
00064 
00065 ACE_Priority_Reactor::~ACE_Priority_Reactor (void)
00066 {
00067   ACE_TRACE ("ACE_Priority_Reactor::~ACE_Priority_Reactor");
00068 
00069   for (int i = 0; i < npriorities; ++i)
00070     delete this->bucket_[i];
00071 
00072   delete[] this->bucket_;
00073   delete tuple_allocator_;
00074 }
00075 
00076 int
00077 ACE_Priority_Reactor::build_bucket (ACE_Handle_Set &dispatch_mask,
00078                                     int &min_priority,
00079                                     int &max_priority)
00080 {
00081   ACE_Handle_Set_Iterator handle_iter (dispatch_mask);
00082 
00083   for (ACE_HANDLE handle;
00084        (handle = handle_iter ()) != ACE_INVALID_HANDLE;
00085        )
00086     {
00087       ACE_Event_Handler *event_handler = 
00088         this->handler_rep_.find (handle);
00089       if (event_handler == 0)
00090         return -1;
00091 
00092       ACE_Event_Tuple et (event_handler,
00093                           handle);
00094       int prio = et.event_handler_->priority ();
00095 
00096       // If the priority is out of range assign the minimum priority.
00097       if (prio < ACE_Event_Handler::LO_PRIORITY
00098           || prio > ACE_Event_Handler::HI_PRIORITY)
00099         prio = ACE_Event_Handler::LO_PRIORITY;
00100 
00101       if (bucket_[prio]->enqueue_tail (et) == -1)
00102         return -1;
00103 
00104       // Update the priority ranges....
00105       if (min_priority > prio)
00106         min_priority = prio;
00107       if (max_priority < prio)
00108         max_priority = prio;
00109     }
00110 
00111   return 0;
00112 }
00113 
00114 int
00115 ACE_Priority_Reactor::dispatch_io_set (int number_of_active_handles,
00116                                        int& number_dispatched,
00117                                        int mask,
00118                                        ACE_Handle_Set& dispatch_mask,
00119                                        ACE_Handle_Set& ready_mask,
00120                                        ACE_EH_PTMF callback)
00121 {
00122   ACE_TRACE ("ACE_Priority_Reactor::dispatch_io_set");
00123 
00124   if (number_of_active_handles == 0)
00125     return 0;
00126 
00127   // The range for which there exists any Event_Tuple is computed on
00128   // the ordering loop, minimizing iterations on the dispatching loop.
00129   int min_priority =
00130     ACE_Event_Handler::HI_PRIORITY;
00131   int max_priority =
00132     ACE_Event_Handler::LO_PRIORITY;
00133 
00134   if (this->build_bucket (dispatch_mask,
00135                           min_priority,
00136                           max_priority) == -1)
00137     return -1;
00138 
00139   for (int i = max_priority; i >= min_priority; --i)
00140     {
00141       while (!bucket_[i]->is_empty ()
00142              && number_dispatched < number_of_active_handles)
00143         {
00144 
00145           ACE_Event_Tuple et;
00146 
00147           bucket_[i]->dequeue_head (et);
00148 
00149           this->notify_handle (et.handle_,
00150                                mask,
00151                                ready_mask,
00152                                et.event_handler_,
00153                                callback);
00154           number_dispatched++;
00155 
00156           // clear the bit from that dispatch mask,
00157           // so when we need to restart the iteration (rebuilding the iterator...)
00158           // we will not dispatch the already dipatched handlers
00159           this->clear_dispatch_mask (et.handle_,
00160                                      mask);
00161 
00162           if (this->state_changed_)
00163             this->state_changed_ = false; // so it will not rebuild it ...
00164         }
00165 
00166       // Even if we are aborting the loop due to this->state_changed
00167       // or another error we still want to cleanup the buckets.
00168       bucket_[i]->reset ();
00169     }
00170 
00171   return 0;
00172 }
00173 
00174 void
00175 ACE_Priority_Reactor::dump (void) const
00176 {
00177 #if defined (ACE_HAS_DUMP)
00178   ACE_TRACE ("ACE_Priority_Reactor::dump");
00179 
00180   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00181 
00182   ACE_Select_Reactor::dump ();
00183 
00184   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00185 #endif /* ACE_HAS_DUMP */
00186 }
00187 
00188 ACE_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:18:41 2010 for ACE by  doxygen 1.4.7