#include <EC_Priority_Dispatching.h>
Inheritance diagram for TAO_EC_Priority_Dispatching:
Public Member Functions | |
TAO_EC_Priority_Dispatching (TAO_EC_Event_Channel_Base *ec) | |
virtual void | activate (void) |
virtual void | shutdown (void) |
virtual void | push (TAO_EC_ProxyPushSupplier *proxy, RtecEventComm::PushConsumer_ptr consumer, const RtecEventComm::EventSet &event, TAO_EC_QOS_Info &qos_info) |
virtual void | push_nocopy (TAO_EC_ProxyPushSupplier *proxy, RtecEventComm::PushConsumer_ptr consumer, RtecEventComm::EventSet &event, TAO_EC_QOS_Info &qos_info) |
Private Attributes | |
ACE_Thread_Manager | thread_manager_ |
Use our own thread manager. | |
int | ntasks_ |
The number of active tasks. | |
TAO_EC_Dispatching_Task ** | tasks_ |
The tasks.. | |
RtecScheduler::Scheduler_var | scheduler_ |
The scheduler. |
This strategy uses multiple queues, each serviced by a thread at different priority. This minimizes priority inversion because the consumers at higher priority are serviced before consumers at lower priority. It is more flexible than using the supplier thread to dispatch because it allows high-priority suppliers to push events to low-priority consumers (and vice-versa). It also isolates the supplier threads from the time spent on upcalls to the consumer objects, making the system easier to analyze and schedule.
Definition at line 52 of file EC_Priority_Dispatching.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_EC_Priority_Dispatching::TAO_EC_Priority_Dispatching | ( | TAO_EC_Event_Channel_Base * | ec | ) |
The scheduler is used to find the range of priorities and similar info.
Definition at line 17 of file EC_Priority_Dispatching.cpp.
References TAO_Pseudo_Var_T< T >::in().
00018 : ntasks_ (0), 00019 tasks_ (0) 00020 { 00021 CORBA::Object_var tmp = ec->scheduler (); 00022 this->scheduler_ = RtecScheduler::Scheduler::_narrow (tmp.in ()); 00023 }
void TAO_EC_Priority_Dispatching::activate | ( | void | ) | [virtual] |
Initialize all the data structures, activate any internal threads, etc.
Implements TAO_EC_Dispatching.
Definition at line 26 of file EC_Priority_Dispatching.cpp.
References ACE_ERROR, ACE_NEW, ACE_SCHED_FIFO, ACE_SCHED_OTHER, ACE_Scheduler_MAX_PRIORITIES, ACE_Scheduler_Rates, ACE_SCOPE_THREAD, LM_ERROR, ACE_Sched_Params::next_priority(), ntasks_, ACE_Sched_Params::priority_max(), ACE_Sched_Params::priority_min(), scheduler_, ACE_OS::sprintf(), THR_BOUND, and THR_SCHED_FIFO.
Referenced by push_nocopy().
00027 { 00028 if (this->tasks_ != 0) 00029 return; 00030 00031 // @@ Query the scheduler to obtain this.... 00032 this->ntasks_ = ACE_Scheduler_MAX_PRIORITIES; 00033 ACE_NEW (this->tasks_, TAO_EC_Dispatching_Task*[this->ntasks_]); 00034 00035 // @@ Query the scheduler to obtain the priorities! 00036 int priority = 00037 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) + 00038 ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2; 00039 priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO, priority); 00040 00041 for (int i = 0; i < this->ntasks_; ++i) 00042 { 00043 try 00044 { 00045 RtecScheduler::Period_t period = 00046 ACE_CU64_TO_CU32 (ACE_Scheduler_Rates[i]); 00047 char buf[128]; 00048 ACE_OS::sprintf (buf, "Dispatching_Task-%d.us", period); 00049 00050 RtecScheduler::handle_t rt_info = 00051 this->scheduler_->create (buf); 00052 00053 this->scheduler_->set (rt_info, 00054 RtecScheduler::VERY_LOW_CRITICALITY, 00055 0, // worst_cast_execution_time 00056 0, // typical_cast_execution_time 00057 0, // cached_cast_execution_time 00058 period, 00059 RtecScheduler::VERY_LOW_IMPORTANCE, 00060 0, // quantum 00061 1, // threads 00062 RtecScheduler::OPERATION); 00063 } 00064 catch (const CORBA::Exception&) 00065 { 00066 // Ignore exceptions.. 00067 } 00068 00069 ACE_NEW (this->tasks_[i], 00070 TAO_EC_Dispatching_Task (&this->thread_manager_)); 00071 00072 // @@ Query the scheduler to obtain the priority... 00073 long flags = THR_BOUND | THR_SCHED_FIFO; 00074 if (this->tasks_[i]->activate (flags, 1, 1, priority) == -1) 00075 { 00076 flags = THR_BOUND; 00077 priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER, 00078 ACE_SCOPE_THREAD); 00079 if (this->tasks_[i]->activate (flags, 1, 1, priority) == -1) 00080 ACE_ERROR ((LM_ERROR, 00081 "EC (%P|%t) cannot activate queue %d", i)); 00082 } 00083 } 00084 }
void TAO_EC_Priority_Dispatching::push | ( | TAO_EC_ProxyPushSupplier * | proxy, | |
RtecEventComm::PushConsumer_ptr | consumer, | |||
const RtecEventComm::EventSet & | event, | |||
TAO_EC_QOS_Info & | qos_info | |||
) | [virtual] |
The consumer represented by proxy should receive event. It can use the information in qos_info to determine the event priority (among other things).
Implements TAO_EC_Dispatching.
Definition at line 105 of file EC_Priority_Dispatching.cpp.
References push_nocopy().
00109 { 00110 RtecEventComm::EventSet event_copy = event; 00111 this->push_nocopy (proxy, consumer, event_copy, qos_info); 00112 }
void TAO_EC_Priority_Dispatching::push_nocopy | ( | TAO_EC_ProxyPushSupplier * | proxy, | |
RtecEventComm::PushConsumer_ptr | consumer, | |||
RtecEventComm::EventSet & | event, | |||
TAO_EC_QOS_Info & | qos_info | |||
) | [virtual] |
Implements TAO_EC_Dispatching.
Definition at line 115 of file EC_Priority_Dispatching.cpp.
References activate(), TAO_EC_QOS_Info::preemption_priority, TAO_EC_Dispatching_Task::push(), and tasks_.
Referenced by push().
00119 { 00120 if (this->tasks_ == 0) 00121 this->activate (); 00122 00123 int i = qos_info.preemption_priority; 00124 if (i < 0 || i >= this->ntasks_) 00125 { 00126 // @@ Throw something? 00127 i = 0; 00128 } 00129 00130 00131 this->tasks_[i]->push (proxy, consumer, event); 00132 }
void TAO_EC_Priority_Dispatching::shutdown | ( | void | ) | [virtual] |
Deactivate any internal threads and cleanup internal data structures, it should only return once the threads have finished their jobs.
Implements TAO_EC_Dispatching.
Definition at line 87 of file EC_Priority_Dispatching.cpp.
References ntasks_, thread_manager_, and ACE_Thread_Manager::wait().
00088 { 00089 if (this->tasks_ == 0) 00090 return; 00091 00092 for (int i = 0; i < this->ntasks_; ++i) 00093 this->tasks_[i]->putq (new TAO_EC_Shutdown_Task_Command); 00094 00095 this->thread_manager_.wait (); 00096 00097 for (int j = 0; j < this->ntasks_; ++j) 00098 delete this->tasks_[j]; 00099 00100 delete[] this->tasks_; 00101 this->tasks_ = 0; 00102 }
int TAO_EC_Priority_Dispatching::ntasks_ [private] |
The number of active tasks.
Definition at line 76 of file EC_Priority_Dispatching.h.
Referenced by activate(), and shutdown().
RtecScheduler::Scheduler_var TAO_EC_Priority_Dispatching::scheduler_ [private] |
Use our own thread manager.
Definition at line 73 of file EC_Priority_Dispatching.h.
Referenced by shutdown().