#include <EC_Priority_Dispatching.h>
Inheritance diagram for TAO_EC_Priority_Dispatching:
Public Member Functions | |
TAO_EC_Priority_Dispatching (TAO_EC_Event_Channel_Base *ec) | |
virtual void | activate (void) |
virtual void | shutdown (void) |
virtual void | push (TAO_EC_ProxyPushSupplier *proxy, RtecEventComm::PushConsumer_ptr consumer, const RtecEventComm::EventSet &event, TAO_EC_QOS_Info &qos_info ACE_ENV_ARG_DECL) |
virtual void | push_nocopy (TAO_EC_ProxyPushSupplier *proxy, RtecEventComm::PushConsumer_ptr consumer, RtecEventComm::EventSet &event, TAO_EC_QOS_Info &qos_info ACE_ENV_ARG_DECL) |
Private Attributes | |
ACE_Thread_Manager | thread_manager_ |
Use our own thread manager. | |
int | ntasks_ |
The number of active tasks. | |
TAO_EC_Dispatching_Task ** | tasks_ |
The tasks.. | |
RtecScheduler::Scheduler_var | scheduler_ |
The scheduler. |
This strategy uses multiple queues, each serviced by a thread at different priority. This minimizes priority inversion because the consumers at higher priority are serviced before consumers at lower priority. It is more flexible than using the supplier thread to dispatch because it allows high-priority suppliers to push events to low-priority consumers (and vice-versa). It also isolates the supplier threads from the time spent on upcalls to the consumer objects, making the system easier to analyze and schedule.
Definition at line 52 of file EC_Priority_Dispatching.h.
|
The scheduler is used to find the range of priorities and similar info. Definition at line 17 of file EC_Priority_Dispatching.cpp. References TAO_EC_Event_Channel_Base::scheduler().
00018 : ntasks_ (0), 00019 tasks_ (0) 00020 { 00021 CORBA::Object_var tmp = ec->scheduler (); 00022 this->scheduler_ = RtecScheduler::Scheduler::_narrow (tmp.in ()); 00023 } |
|
Initialize all the data structures, activate any internal threads, etc. Implements TAO_EC_Dispatching. Definition at line 26 of file EC_Priority_Dispatching.cpp. References ACE_CATCHANY, ACE_CU64_TO_CU32, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_NEW, ACE_SCHED_FIFO, ACE_SCHED_OTHER, ACE_Scheduler_MAX_PRIORITIES, ACE_Scheduler_Rates, ACE_SCOPE_THREAD, ACE_TRY, ACE_TRY_CHECK, ACE_Task_Base::activate(), LM_ERROR, ACE_Sched_Params::next_priority(), ntasks_, ACE_Sched_Params::priority_max(), ACE_Sched_Params::priority_min(), ACE_OS::sprintf(), and tasks_. Referenced by push_nocopy().
00027 { 00028 if (this->tasks_ != 0) 00029 return; 00030 00031 // @@ Query the scheduler to obtain this.... 00032 this->ntasks_ = ACE_Scheduler_MAX_PRIORITIES; 00033 ACE_NEW (this->tasks_, TAO_EC_Dispatching_Task*[this->ntasks_]); 00034 00035 // @@ Query the scheduler to obtain the priorities! 00036 int priority = 00037 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) + 00038 ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2; 00039 priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO, priority); 00040 00041 ACE_DECLARE_NEW_CORBA_ENV; 00042 for (int i = 0; i < this->ntasks_; ++i) 00043 { 00044 ACE_TRY 00045 { 00046 RtecScheduler::Period_t period = 00047 ACE_CU64_TO_CU32 (ACE_Scheduler_Rates[i]); 00048 char buf[128]; 00049 ACE_OS::sprintf (buf, "Dispatching_Task-%d.us", period); 00050 00051 RtecScheduler::handle_t rt_info = 00052 this->scheduler_->create (buf ACE_ENV_ARG_PARAMETER); 00053 ACE_TRY_CHECK; 00054 00055 this->scheduler_->set (rt_info, 00056 RtecScheduler::VERY_LOW_CRITICALITY, 00057 0, // worst_cast_execution_time 00058 0, // typical_cast_execution_time 00059 0, // cached_cast_execution_time 00060 period, 00061 RtecScheduler::VERY_LOW_IMPORTANCE, 00062 0, // quantum 00063 1, // threads 00064 RtecScheduler::OPERATION 00065 ACE_ENV_ARG_PARAMETER); 00066 ACE_TRY_CHECK; 00067 } 00068 ACE_CATCHANY 00069 { 00070 // Ignore exceptions.. 00071 } 00072 ACE_ENDTRY; 00073 00074 ACE_NEW (this->tasks_[i], 00075 TAO_EC_Dispatching_Task (&this->thread_manager_)); 00076 00077 // @@ Query the scheduler to obtain the priority... 00078 long flags = THR_BOUND | THR_SCHED_FIFO; 00079 if (this->tasks_[i]->activate (flags, 1, 1, priority) == -1) 00080 { 00081 flags = THR_BOUND; 00082 priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER, 00083 ACE_SCOPE_THREAD); 00084 if (this->tasks_[i]->activate (flags, 1, 1, priority) == -1) 00085 ACE_ERROR ((LM_ERROR, 00086 "EC (%P|%t) cannot activate queue %d", i)); 00087 } 00088 } 00089 } |
|
The consumer represented by proxy should receive event. It can use the information in qos_info to determine the event priority (among other things). Implements TAO_EC_Dispatching. Definition at line 110 of file EC_Priority_Dispatching.cpp. References ACE_ENV_ARG_PARAMETER, RtecEventComm::EventSet, and push_nocopy().
00115 { 00116 RtecEventComm::EventSet event_copy = event; 00117 this->push_nocopy (proxy, consumer, event_copy, qos_info ACE_ENV_ARG_PARAMETER); 00118 } |
|
Implements TAO_EC_Dispatching. Definition at line 121 of file EC_Priority_Dispatching.cpp. References ACE_ENV_ARG_PARAMETER, activate(), RtecEventComm::EventSet, ntasks_, TAO_EC_Dispatching_Task::push(), and tasks_. Referenced by push().
00126 { 00127 if (this->tasks_ == 0) 00128 this->activate (); 00129 00130 int i = qos_info.preemption_priority; 00131 if (i < 0 || i >= this->ntasks_) 00132 { 00133 // @@ Throw something? 00134 i = 0; 00135 } 00136 00137 00138 this->tasks_[i]->push (proxy, consumer, event ACE_ENV_ARG_PARAMETER); 00139 } |
|
Deactivate any internal threads and cleanup internal data structures, it should only return once the threads have finished their jobs. Implements TAO_EC_Dispatching. Definition at line 92 of file EC_Priority_Dispatching.cpp. References ntasks_, ACE_Task< ACE_SYNCH >::putq(), tasks_, and ACE_Thread_Manager::wait().
00093 { 00094 if (this->tasks_ == 0) 00095 return; 00096 00097 for (int i = 0; i < this->ntasks_; ++i) 00098 this->tasks_[i]->putq (new TAO_EC_Shutdown_Task_Command); 00099 00100 this->thread_manager_.wait (); 00101 00102 for (int j = 0; j < this->ntasks_; ++j) 00103 delete this->tasks_[j]; 00104 00105 delete[] this->tasks_; 00106 this->tasks_ = 0; 00107 } |
|
The number of active tasks.
Definition at line 78 of file EC_Priority_Dispatching.h. Referenced by activate(), push_nocopy(), and shutdown(). |
|
The scheduler.
Definition at line 84 of file EC_Priority_Dispatching.h. |
|
The tasks..
Definition at line 81 of file EC_Priority_Dispatching.h. Referenced by activate(), push_nocopy(), and shutdown(). |
|
Use our own thread manager.
Definition at line 75 of file EC_Priority_Dispatching.h. |