#include <EC_Kokyu_Filter_Builder.h>
Inheritance diagram for TAO_EC_Kokyu_Filter_Builder:
Public Member Functions | |
TAO_EC_Kokyu_Filter_Builder (TAO_EC_Event_Channel_Base *ec) | |
constructor. | |
virtual | ~TAO_EC_Kokyu_Filter_Builder (void) |
destructor... | |
TAO_EC_Filter * | build (TAO_EC_ProxyPushSupplier *supplier, RtecEventChannelAdmin::ConsumerQOS &qos) const |
Private Member Functions | |
TAO_EC_Filter * | recursive_build (TAO_EC_ProxyPushSupplier *supplier, RtecEventChannelAdmin::ConsumerQOS &qos, CORBA::ULong &pos, RtecScheduler::Scheduler_ptr scheduler, RtecScheduler::handle_t parent_info) const |
Recursively build the filter tree. | |
void | recursive_name (RtecEventChannelAdmin::ConsumerQOS &qos, CORBA::ULong &pos, RtecScheduler::Scheduler_ptr scheduler, ACE_CString &name) const |
Build the name recursively... | |
CORBA::ULong | count_children (RtecEventChannelAdmin::ConsumerQOS &qos, CORBA::ULong pos) const |
Private Attributes | |
TAO_EC_Event_Channel_Base * | event_channel_ |
The event channel. |
The sched filtering mechanisms in the Event channel (source/type based filtering + disjunctions and conjunctions) are constructed using this class.
Definition at line 42 of file EC_Kokyu_Filter_Builder.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE TAO_EC_Kokyu_Filter_Builder::TAO_EC_Kokyu_Filter_Builder | ( | TAO_EC_Event_Channel_Base * | ec | ) |
constructor.
Definition at line 10 of file EC_Kokyu_Filter_Builder.inl.
00011 : event_channel_ (ec) 00012 { 00013 }
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_EC_Kokyu_Filter_Builder::~TAO_EC_Kokyu_Filter_Builder | ( | void | ) | [virtual] |
TAO_EC_Filter * TAO_EC_Kokyu_Filter_Builder::build | ( | TAO_EC_ProxyPushSupplier * | supplier, | |
RtecEventChannelAdmin::ConsumerQOS & | qos | |||
) | const [virtual] |
Create the filter, the caller must assume ownership of the filter returned.
Implements TAO_EC_Filter_Builder.
Definition at line 45 of file EC_Kokyu_Filter_Builder.cpp.
References ACE_DEBUG, ACE_ES_BITMASK_DESIGNATOR, ACE_ES_CONJUNCTION_DESIGNATOR, ACE_ES_DISJUNCTION_DESIGNATOR, ACE_ES_EVENT_DEADLINE_TIMEOUT, ACE_ES_EVENT_INTERVAL_TIMEOUT, ACE_ES_EVENT_TIMEOUT, ACE_ES_GLOBAL_DESIGNATOR, ACE_ES_LOGICAL_AND_DESIGNATOR, ACE_ES_MASKED_TYPE_DESIGNATOR, ACE_ES_NEGATION_DESIGNATOR, ACE_ES_NULL_DESIGNATOR, ACE_String_Base< CHAR >::c_str(), RtecEventChannelAdmin::ConsumerQOS::dependencies, designator(), event_channel_, TAO_EC_Kokyu_Filter::get_qos_info(), TAO_Pseudo_Var_T< T >::in(), LM_DEBUG, RtecBase::ONE_WAY_CALL, recursive_build(), and TAO_EC_Event_Channel_Base::scheduler().
00048 { 00049 CORBA::ULong i=0,found=0; 00050 CORBA::ULong pos = 0; 00051 CORBA::Long npos = -1; 00052 int establish_final_consumer_dependency=0; 00053 00054 CORBA::Object_var tmp = 00055 this->event_channel_->scheduler (); 00056 00057 RtecScheduler::Scheduler_var scheduler = 00058 RtecScheduler::Scheduler::_narrow (tmp.in ()); 00059 00060 #ifdef EC_KOKYU_LOGGING 00061 for (i=0; i<qos.dependencies.length (); ++i) 00062 { 00063 ACE_DEBUG ((LM_DEBUG, 00064 "consumerqos[%d] event.header.type = %s," 00065 "rt_info = %d\n", 00066 i, 00067 designator (qos.dependencies[i].event.header.type), 00068 qos.dependencies[i].rt_info)); 00069 } 00070 #endif 00071 00072 //find the first entry which is not a designator. We are going to 00073 //assume that this entry will have the rt_info of the connecting 00074 //consumer (ProxyPushSupplier), which is passed into this function. 00075 for (i=0; !found && i<qos.dependencies.length (); ++i) 00076 { 00077 switch (qos.dependencies[i].event.header.type) 00078 { 00079 case ACE_ES_CONJUNCTION_DESIGNATOR: 00080 case ACE_ES_DISJUNCTION_DESIGNATOR: 00081 case ACE_ES_NEGATION_DESIGNATOR: 00082 case ACE_ES_LOGICAL_AND_DESIGNATOR: 00083 case ACE_ES_BITMASK_DESIGNATOR: 00084 case ACE_ES_MASKED_TYPE_DESIGNATOR: 00085 case ACE_ES_NULL_DESIGNATOR: 00086 establish_final_consumer_dependency = 1; 00087 continue; 00088 00089 case ACE_ES_GLOBAL_DESIGNATOR: 00090 case ACE_ES_EVENT_TIMEOUT: 00091 case ACE_ES_EVENT_INTERVAL_TIMEOUT: 00092 case ACE_ES_EVENT_DEADLINE_TIMEOUT: 00093 continue; 00094 00095 default: 00096 npos = i; 00097 found = 1; 00098 break; 00099 } 00100 } 00101 00102 ACE_CString final_consumer_rep_name; 00103 RtecScheduler::handle_t h_final_consumer_rt_info = 0; 00104 RtecScheduler::handle_t h_final_consumer_rep_rt_info = 0; 00105 00106 #ifdef EC_KOKYU_LOGGING 00107 ACE_DEBUG ((LM_DEBUG, "consumer rt_info found in consumerqos[%d] \n", npos)); 00108 #endif 00109 00110 if (npos >= 0 && establish_final_consumer_dependency == 1) 00111 { 00112 //Hopefully this will have the final consumer's rt_info 00113 h_final_consumer_rt_info = qos.dependencies[npos].rt_info; 00114 00115 #ifdef EC_KOKYU_LOGGING 00116 ACE_DEBUG ((LM_DEBUG, "about to get rt_info = %d\n", 00117 h_final_consumer_rep_rt_info)); 00118 #endif 00119 00120 RtecScheduler::RT_Info_var final_consumer_rt_info = 00121 scheduler->get ( h_final_consumer_rt_info); 00122 00123 final_consumer_rep_name = final_consumer_rt_info->entry_point.in (); 00124 final_consumer_rep_name += "#rep"; 00125 00126 #ifdef EC_KOKYU_LOGGING 00127 ACE_DEBUG ((LM_DEBUG, "about to create consumer rep %s\n", 00128 final_consumer_rep_name.c_str ())); 00129 #endif 00130 00131 //create an rt_info corresponding to this rep. 00132 h_final_consumer_rep_rt_info = 00133 scheduler->create (final_consumer_rep_name.c_str ()); 00134 #ifdef EC_KOKYU_LOGGING 00135 ACE_DEBUG ((LM_DEBUG, "consumer rep created\n")); 00136 #endif 00137 00138 } 00139 00140 //We are passing the final consumer as the parent. The final 00141 //consumer is the one which is connecting to the ProxyPushSupplier 00142 //passed in to this function. 00143 00144 TAO_EC_Filter* filter = 00145 this->recursive_build (supplier, qos, pos, 00146 scheduler.in (), 00147 h_final_consumer_rep_rt_info //parent_info 00148 ); 00149 00150 #ifdef EC_KOKYU_LOGGING 00151 ACE_DEBUG ((LM_DEBUG, 00152 "Filter_Builder::Verifying whether root filter" 00153 " dependency can be established\n")); 00154 #endif 00155 00156 if (npos >= 0 && establish_final_consumer_dependency == 1) 00157 { 00158 #ifdef EC_KOKYU_LOGGING 00159 ACE_DEBUG ((LM_DEBUG, 00160 "Filter_Builder::root filter dependency " 00161 "can be established\n")); 00162 #endif 00163 TAO_EC_Kokyu_Filter* kokyu_filter = 00164 dynamic_cast<TAO_EC_Kokyu_Filter*> (filter); 00165 00166 //add the dependency between the root in the filter hierarchy and 00167 //the final consumer 00168 TAO_EC_QOS_Info qos_info; 00169 kokyu_filter->get_qos_info (qos_info); 00170 00171 scheduler->add_dependency (h_final_consumer_rt_info, 00172 qos_info.rt_info, 00173 1, 00174 RtecBase::ONE_WAY_CALL); 00175 } 00176 return filter; 00177 }
CORBA::ULong TAO_EC_Kokyu_Filter_Builder::count_children | ( | RtecEventChannelAdmin::ConsumerQOS & | qos, | |
CORBA::ULong | pos | |||
) | const [private] |
Count the number of children of the current node, i.e. until a conjunction or disjunction starts.
Definition at line 463 of file EC_Kokyu_Filter_Builder.cpp.
References ACE_ES_CONJUNCTION_DESIGNATOR, ACE_ES_DISJUNCTION_DESIGNATOR, and RtecEventChannelAdmin::ConsumerQOS::dependencies.
Referenced by recursive_build(), and recursive_name().
00465 { 00466 CORBA::ULong l = qos.dependencies.length (); 00467 CORBA::ULong i; 00468 for (i = pos; i != l; ++i) 00469 { 00470 const RtecEventComm::Event& e = qos.dependencies[i].event; 00471 if (e.header.type == ACE_ES_CONJUNCTION_DESIGNATOR 00472 || e.header.type == ACE_ES_DISJUNCTION_DESIGNATOR) 00473 break; 00474 } 00475 return i - 1; 00476 }
TAO_EC_Filter * TAO_EC_Kokyu_Filter_Builder::recursive_build | ( | TAO_EC_ProxyPushSupplier * | supplier, | |
RtecEventChannelAdmin::ConsumerQOS & | qos, | |||
CORBA::ULong & | pos, | |||
RtecScheduler::Scheduler_ptr | scheduler, | |||
RtecScheduler::handle_t | parent_info | |||
) | const [private] |
Recursively build the filter tree.
Definition at line 180 of file EC_Kokyu_Filter_Builder.cpp.
References ACE_DEBUG, ACE_ES_CONJUNCTION_DESIGNATOR, ACE_ES_DISJUNCTION_DESIGNATOR, ACE_ES_EVENT_DEADLINE_TIMEOUT, ACE_ES_EVENT_INTERVAL_TIMEOUT, ACE_ES_EVENT_TIMEOUT, ACE_ES_GLOBAL_DESIGNATOR, ACE_NEW_RETURN, ACE_String_Base< CHAR >::c_str(), count_children(), RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_EC_Kokyu_Filter::get_qos_info(), RtecEventComm::Event::header, LM_DEBUG, recursive_name(), TAO_EC_QOS_Info::rt_info, ACE_OS::sprintf(), and RtecBase::TWO_WAY_CALL.
Referenced by build().
00186 { 00187 const RtecEventComm::Event& e = qos.dependencies[pos].event; 00188 00189 #ifdef EC_KOKYU_LOGGING 00190 ACE_DEBUG ((LM_DEBUG, "Filter_Builder::In recursive build\n")); 00191 #endif 00192 00193 if (e.header.type == ACE_ES_CONJUNCTION_DESIGNATOR) 00194 { 00195 #ifdef EC_KOKYU_LOGGING 00196 ACE_DEBUG ((LM_DEBUG, "Filter_Builder::Conjuction designator\n")); 00197 #endif 00198 CORBA::ULong npos = pos; 00199 ACE_CString name; 00200 this->recursive_name (qos, npos, 00201 scheduler, name); 00202 00203 pos++; // Consume the designator 00204 00205 CORBA::ULong n = this->count_children (qos, pos); 00206 00207 RtecBase::handle_t conj_rt_info = parent_info; 00208 00209 TAO_EC_Filter** children; 00210 ACE_NEW_RETURN (children, TAO_EC_Filter*[n], 0); 00211 for (CORBA::ULong i = 0; i != n; ++i) 00212 { 00213 children[i] = this->recursive_build (supplier, qos, pos, 00214 scheduler, 00215 conj_rt_info); 00216 } 00217 00218 TAO_EC_Kokyu_Filter *filter; 00219 ACE_NEW_RETURN (filter, 00220 TAO_EC_Kokyu_Filter (name.c_str (), 00221 conj_rt_info, 00222 scheduler, 00223 new TAO_EC_Conjunction_Filter(children, 00224 n), 00225 conj_rt_info, 00226 conj_rt_info, 00227 RtecScheduler::CONJUNCTION), 00228 0); 00229 TAO_EC_QOS_Info qos_info; 00230 filter->get_qos_info (qos_info); 00231 // @@ 00232 return filter; 00233 } 00234 00235 else if (e.header.type == ACE_ES_DISJUNCTION_DESIGNATOR) 00236 { 00237 #ifdef EC_KOKYU_LOGGING 00238 ACE_DEBUG ((LM_DEBUG, "Filter_Builder::Disjunction designator\n")); 00239 #endif 00240 CORBA::ULong npos = pos; 00241 ACE_CString name; 00242 this->recursive_name (qos, npos, 00243 scheduler, name); 00244 00245 pos++; // Consume the designator 00246 00247 RtecBase::handle_t disj_rt_info = parent_info; 00248 00249 CORBA::ULong n = this->count_children (qos, pos); 00250 00251 TAO_EC_Filter** children; 00252 ACE_NEW_RETURN (children, TAO_EC_Filter*[n], 0); 00253 for (CORBA::ULong i = 0; i != n; ++i) 00254 { 00255 children[i] = this->recursive_build (supplier, qos, pos, 00256 scheduler, 00257 disj_rt_info); 00258 } 00259 TAO_EC_Kokyu_Filter *filter; 00260 ACE_NEW_RETURN (filter, 00261 TAO_EC_Kokyu_Filter (name.c_str (), 00262 disj_rt_info, 00263 scheduler, 00264 new TAO_EC_Disjunction_Filter (children, 00265 n), 00266 disj_rt_info, 00267 disj_rt_info, 00268 RtecScheduler::DISJUNCTION), 00269 0); 00270 00271 TAO_EC_QOS_Info qos_info; 00272 filter->get_qos_info (qos_info); 00273 // @@ 00274 return filter; 00275 } 00276 else if (e.header.type == ACE_ES_EVENT_TIMEOUT 00277 || e.header.type == ACE_ES_EVENT_INTERVAL_TIMEOUT 00278 || e.header.type == ACE_ES_EVENT_DEADLINE_TIMEOUT) 00279 { 00280 #ifdef EC_KOKYU_LOGGING 00281 ACE_DEBUG ((LM_DEBUG, "Filter_Builder::Timeout designator\n")); 00282 #endif 00283 // @@ We need a unique name for each timeout, assigned by the 00284 // application? 00285 char buf[64]; 00286 00287 //get the rt_info for the timer consumer 00288 RtecBase::handle_t h_consumer_rt_info = qos.dependencies[pos].rt_info; 00289 00290 //build a unique name using the cosumer_rt_info 00291 ACE_OS::sprintf (buf, "TIMEOUT:%umsec:%d", 00292 static_cast<u_int> ((e.header.creation_time / 10000)), 00293 h_consumer_rt_info); 00294 ACE_CString name = buf; 00295 00296 TAO_EC_QOS_Info qos_info; 00297 qos_info.rt_info = 00298 scheduler->create (name.c_str ()); 00299 00300 // Convert the time to the proper units.... 00301 RtecScheduler::Period_t period = 00302 static_cast<RtecScheduler::Period_t> (e.header.creation_time); 00303 00304 #if 1 //by VS original code replaced with this 00305 RtecScheduler::RT_Info* consumer_rt_info_ptr; 00306 00307 consumer_rt_info_ptr = scheduler->get (h_consumer_rt_info); 00308 scheduler->set (qos_info.rt_info, 00309 consumer_rt_info_ptr->criticality, 00310 0, // worst_cast_execution_time 00311 0, // typical_cast_execution_time 00312 0, // cached_cast_execution_time 00313 period, 00314 consumer_rt_info_ptr->importance, 00315 0, // quantum 00316 1, // threads 00317 RtecScheduler::OPERATION); 00318 00319 scheduler->add_dependency (qos_info.rt_info, 00320 h_consumer_rt_info, 00321 1, 00322 RtecBase::TWO_WAY_CALL); 00323 #endif //by VS 00324 00325 pos++; 00326 return new TAO_EC_Timeout_Filter (this->event_channel_, 00327 supplier, 00328 qos_info, 00329 e.header.type, 00330 e.header.creation_time); 00331 } 00332 00333 #if 1 //added by VS 00334 else if (e.header.type == ACE_ES_GLOBAL_DESIGNATOR) 00335 { 00336 pos++; 00337 return this->recursive_build (supplier, qos, pos, 00338 scheduler, 00339 parent_info); 00340 } 00341 else 00342 { 00343 #ifdef EC_KOKYU_LOGGING 00344 ACE_DEBUG ((LM_DEBUG, 00345 "Kokyu_Filter_Builder::No designator for this entry. " 00346 "Must be a body\n")); 00347 #endif 00348 } 00349 #endif 00350 00351 //probably because of a global designator, the parent_info could be 0. 00352 if (parent_info == 0) 00353 { 00354 //In this case, the parent_info is the same as the one supplied 00355 //in the consumer qos. 00356 parent_info = qos.dependencies[pos].rt_info; 00357 } 00358 00359 RtecScheduler::RT_Info_var info = 00360 scheduler->get (parent_info); 00361 00362 ACE_CString name = info->entry_point.in (); 00363 00364 pos++; 00365 TAO_EC_Kokyu_Filter *filter; 00366 ACE_NEW_RETURN (filter, 00367 TAO_EC_Kokyu_Filter (name.c_str (), 00368 parent_info, 00369 scheduler, 00370 new TAO_EC_Type_Filter (e.header), 00371 parent_info, 00372 parent_info, 00373 RtecScheduler::OPERATION), 00374 0); 00375 00376 TAO_EC_QOS_Info qos_info; 00377 filter->get_qos_info (qos_info); 00378 // @@ 00379 return filter; 00380 }
void TAO_EC_Kokyu_Filter_Builder::recursive_name | ( | RtecEventChannelAdmin::ConsumerQOS & | qos, | |
CORBA::ULong & | pos, | |||
RtecScheduler::Scheduler_ptr | scheduler, | |||
ACE_CString & | name | |||
) | const [private] |
Build the name recursively...
Definition at line 383 of file EC_Kokyu_Filter_Builder.cpp.
References ACE_ES_CONJUNCTION_DESIGNATOR, ACE_ES_DISJUNCTION_DESIGNATOR, ACE_ES_EVENT_DEADLINE_TIMEOUT, ACE_ES_EVENT_INTERVAL_TIMEOUT, ACE_ES_EVENT_TIMEOUT, count_children(), RtecEventChannelAdmin::ConsumerQOS::dependencies, RtecEventComm::Event::header, and ACE_OS::sprintf().
Referenced by recursive_build().
00388 { 00389 const RtecEventComm::Event& e = qos.dependencies[pos].event; 00390 00391 if (e.header.type == ACE_ES_CONJUNCTION_DESIGNATOR) 00392 { 00393 pos++; // Consume the designator 00394 CORBA::ULong n = this->count_children (qos, pos); 00395 00396 for (CORBA::ULong i = 0; i != n; ++i) 00397 { 00398 ACE_CString child_name; 00399 this->recursive_name (qos, pos, 00400 scheduler, 00401 child_name); 00402 00403 if (i == 0) 00404 name += "("; 00405 else 00406 name += "&&"; 00407 name += child_name; 00408 } 00409 name += ")"; 00410 return; 00411 } 00412 00413 else if (e.header.type == ACE_ES_DISJUNCTION_DESIGNATOR) 00414 { 00415 pos++; // Consume the designator 00416 CORBA::ULong n = this->count_children (qos, pos); 00417 00418 for (CORBA::ULong i = 0; i != n; ++i) 00419 { 00420 ACE_CString child_name; 00421 00422 this->recursive_name (qos, pos, 00423 scheduler, 00424 child_name); 00425 00426 if (i == 0) 00427 name += "("; 00428 else 00429 name += "||"; 00430 name += child_name; 00431 } 00432 name += ")"; 00433 return; 00434 } 00435 00436 else if (e.header.type == ACE_ES_EVENT_TIMEOUT 00437 || e.header.type == ACE_ES_EVENT_INTERVAL_TIMEOUT 00438 || e.header.type == ACE_ES_EVENT_DEADLINE_TIMEOUT) 00439 { 00440 pos++; 00441 00442 char buf[64]; 00443 ACE_OS::sprintf (buf, "TIMEOUT:%umsec", 00444 static_cast<u_int> ((e.header.creation_time / 10000))); 00445 name = buf; 00446 00447 return; 00448 } 00449 00450 RtecScheduler::handle_t body_info = qos.dependencies[pos].rt_info; 00451 00452 RtecScheduler::RT_Info_var info = 00453 scheduler->get (body_info); 00454 00455 name = info->entry_point.in (); 00456 name += "#rep"; 00457 00458 pos++; 00459 }