00001
00002
00003 #include "ace/Priority_Reactor.h"
00004 #include "ace/Malloc_T.h"
00005
00006 ACE_RCSID(ace, Priority_Reactor, "$Id: Priority_Reactor.cpp 80826 2008-03-04 14:51:23Z wotte $")
00007
00008 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00009
00010 typedef ACE_Unbounded_Queue_Iterator<ACE_Event_Tuple> QUEUE_ITERATOR;
00011
00012
00013 typedef ACE_Cached_Allocator<ACE_Node<ACE_Event_Tuple>, ACE_SYNCH_NULL_MUTEX> TUPLE_ALLOCATOR;
00014
00015
00016
00017 ACE_ALLOC_HOOK_DEFINE(ACE_Priority_Reactor)
00018
00019
00020
00021 #define npriorities \
00022 ACE_Event_Handler::HI_PRIORITY-ACE_Event_Handler::LO_PRIORITY+1
00023
00024 void
00025 ACE_Priority_Reactor::init_bucket (void)
00026 {
00027
00028
00029
00030 ACE_NEW (this->tuple_allocator_,
00031 TUPLE_ALLOCATOR (ACE_Select_Reactor::DEFAULT_SIZE));
00032
00033
00034 ACE_NEW (this->bucket_,
00035 QUEUE *[npriorities]);
00036
00037
00038 for (int i = 0; i < npriorities; ++i)
00039 ACE_NEW (this->bucket_[i],
00040 QUEUE (this->tuple_allocator_));
00041 }
00042
00043 ACE_Priority_Reactor::ACE_Priority_Reactor (ACE_Sig_Handler *sh,
00044 ACE_Timer_Queue *tq)
00045 : ACE_Select_Reactor(sh, tq),
00046 bucket_ (0),
00047 tuple_allocator_ (0)
00048 {
00049 ACE_TRACE ("ACE_Priority_Reactor::ACE_Priority_Reactor");
00050 this->init_bucket ();
00051 }
00052
00053 ACE_Priority_Reactor::ACE_Priority_Reactor (size_t size,
00054 int rs,
00055 ACE_Sig_Handler *sh,
00056 ACE_Timer_Queue *tq)
00057 : ACE_Select_Reactor (size, rs, sh, tq),
00058 bucket_ (0),
00059 tuple_allocator_ (0)
00060 {
00061 ACE_TRACE ("ACE_Priority_Reactor::ACE_Priority_Reactor");
00062 this->init_bucket ();
00063 }
00064
00065 ACE_Priority_Reactor::~ACE_Priority_Reactor (void)
00066 {
00067 ACE_TRACE ("ACE_Priority_Reactor::~ACE_Priority_Reactor");
00068
00069 for (int i = 0; i < npriorities; ++i)
00070 delete this->bucket_[i];
00071
00072 delete[] this->bucket_;
00073 delete tuple_allocator_;
00074 }
00075
00076 int
00077 ACE_Priority_Reactor::build_bucket (ACE_Handle_Set &dispatch_mask,
00078 int &min_priority,
00079 int &max_priority)
00080 {
00081 ACE_Handle_Set_Iterator handle_iter (dispatch_mask);
00082
00083 for (ACE_HANDLE handle;
00084 (handle = handle_iter ()) != ACE_INVALID_HANDLE;
00085 )
00086 {
00087 ACE_Event_Handler *event_handler =
00088 this->handler_rep_.find (handle);
00089 if (event_handler == 0)
00090 return -1;
00091
00092 ACE_Event_Tuple et (event_handler,
00093 handle);
00094 int prio = et.event_handler_->priority ();
00095
00096
00097 if (prio < ACE_Event_Handler::LO_PRIORITY
00098 || prio > ACE_Event_Handler::HI_PRIORITY)
00099 prio = ACE_Event_Handler::LO_PRIORITY;
00100
00101 if (bucket_[prio]->enqueue_tail (et) == -1)
00102 return -1;
00103
00104
00105 if (min_priority > prio)
00106 min_priority = prio;
00107 if (max_priority < prio)
00108 max_priority = prio;
00109 }
00110
00111 return 0;
00112 }
00113
00114 int
00115 ACE_Priority_Reactor::dispatch_io_set (int number_of_active_handles,
00116 int& number_dispatched,
00117 int mask,
00118 ACE_Handle_Set& dispatch_mask,
00119 ACE_Handle_Set& ready_mask,
00120 ACE_EH_PTMF callback)
00121 {
00122 ACE_TRACE ("ACE_Priority_Reactor::dispatch_io_set");
00123
00124 if (number_of_active_handles == 0)
00125 return 0;
00126
00127
00128
00129 int min_priority =
00130 ACE_Event_Handler::HI_PRIORITY;
00131 int max_priority =
00132 ACE_Event_Handler::LO_PRIORITY;
00133
00134 if (this->build_bucket (dispatch_mask,
00135 min_priority,
00136 max_priority) == -1)
00137 return -1;
00138
00139 for (int i = max_priority; i >= min_priority; --i)
00140 {
00141 while (!bucket_[i]->is_empty ()
00142 && number_dispatched < number_of_active_handles)
00143 {
00144
00145 ACE_Event_Tuple et;
00146
00147 bucket_[i]->dequeue_head (et);
00148
00149 this->notify_handle (et.handle_,
00150 mask,
00151 ready_mask,
00152 et.event_handler_,
00153 callback);
00154 number_dispatched++;
00155
00156
00157
00158
00159 this->clear_dispatch_mask (et.handle_,
00160 mask);
00161
00162 if (this->state_changed_)
00163 this->state_changed_ = false;
00164 }
00165
00166
00167
00168 bucket_[i]->reset ();
00169 }
00170
00171 return 0;
00172 }
00173
00174 void
00175 ACE_Priority_Reactor::dump (void) const
00176 {
00177 #if defined (ACE_HAS_DUMP)
00178 ACE_TRACE ("ACE_Priority_Reactor::dump");
00179
00180 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00181
00182 ACE_Select_Reactor::dump ();
00183
00184 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00185 #endif
00186 }
00187
00188 ACE_END_VERSIONED_NAMESPACE_DECL