#include <EC_TPC_Dispatching.h>
Inheritance diagram for TAO_EC_TPC_Dispatching:
This strategy uses a thread per consumer, and was specifically designed to isolate the effects of an ill-behaved consumer from affecting other consumers.
Definition at line 39 of file EC_TPC_Dispatching.h.
|
Definition at line 66 of file EC_TPC_Dispatching.h. |
|
Definition at line 18 of file EC_TPC_Dispatching.cpp. References ACE_ASSERT, and TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE.
00019 : consumer_task_map_(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE) 00020 , queue_full_service_object_(so) 00021 { 00022 ACE_ASSERT (this->queue_full_service_object_ != 0); 00023 } |
|
Definition at line 25 of file EC_TPC_Dispatching.cpp.
00026 { 00027 // No other dispatching strategy has a DTOR body. I can only 00028 // assume that it's guaranteed that shutdown() is called before 00029 // the DTOR, so the tear-down logic needs to go in the shutdown, 00030 // and the DTOR need not call shutdown. 00031 } |
|
Initialize all the data structures, activate any internal threads, etc. Implements TAO_EC_Dispatching. Definition at line 121 of file EC_TPC_Dispatching.cpp.
00122 { 00123 } |
|
Definition at line 34 of file EC_TPC_Dispatching.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_Task_Base::activate(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), consumer_task_map_, EC_TPC_debug_level, LM_DEBUG, LM_WARNING, ACE_Task< ACE_SYNCH >::putq(), TAO_EC_DEFAULT_DISPATCHING_THREADS_FLAGS, TAO_EC_DEFAULT_DISPATCHING_THREADS_PRIORITY, and ACE_Task_Base::wait().
00036 { 00037 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00038 00039 // Duplicate the pointer and hold it safely 00040 RtecEventComm::PushConsumer_var pc = 00041 RtecEventComm::PushConsumer::_duplicate(consumer); 00042 00043 if (EC_TPC_debug_level > 0) 00044 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@)\n", pc.in())); 00045 00046 TAO_EC_Dispatching_Task* dtask = 00047 new TAO_EC_TPC_Dispatching_Task (&this->thread_manager_, 00048 this->queue_full_service_object_); 00049 00050 if (EC_TPC_debug_level > 0) 00051 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@): new task %@\n", pc.in(), dtask)); 00052 00053 if ((dtask->activate (TAO_EC_DEFAULT_DISPATCHING_THREADS_FLAGS, 00054 1, // we only want one thread to dispatch to a consumer 00055 1, // magic number?? 00056 TAO_EC_DEFAULT_DISPATCHING_THREADS_PRIORITY)) == -1) 00057 { 00058 ACE_DEBUG ((LM_WARNING, 00059 "EC (%P|%t): TPC_Dispatching::add_consumer unable to activate" 00060 " dispatching task for consumer (%@)\n", 00061 consumer)); 00062 delete dtask; 00063 return -1; 00064 } 00065 00066 int bindresult = 00067 this->consumer_task_map_.bind (RtecEventComm::PushConsumer::_duplicate(pc.in()), 00068 dtask); 00069 const char* explanation = 0; 00070 if (bindresult == -1) 00071 explanation = "general failure"; 00072 else if (bindresult == 1) 00073 explanation = "entry already exists"; 00074 00075 if (explanation != 0) 00076 { 00077 ACE_DEBUG ((LM_WARNING, 00078 "EC (%P|%t): TPC_Dispatching::add_consumer failed to bind consumer (%@)" 00079 " and dispatch task in map (%s): %p\n", 00080 consumer, explanation)); 00081 dtask->putq (new TAO_EC_Shutdown_Task_Command); 00082 dtask->wait (); 00083 delete dtask; 00084 return -1; 00085 } 00086 00087 return 0; 00088 } |
|
The consumer represented by proxy should receive event. It can use the information in qos_info to determine the event priority (among other things). Implements TAO_EC_Dispatching. Definition at line 162 of file EC_TPC_Dispatching.cpp. References ACE_ENV_ARG_PARAMETER, RtecEventComm::EventSet, and push_nocopy().
00167 { 00168 RtecEventComm::EventSet event_copy = event; 00169 this->push_nocopy (proxy, consumer, event_copy, qos_info ACE_ENV_ARG_PARAMETER); 00170 } |
|
Implements TAO_EC_Dispatching. Referenced by push(). |
|
Definition at line 91 of file EC_TPC_Dispatching.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, consumer_task_map_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), LM_WARNING, ACE_Task< ACE_SYNCH >::putq(), CORBA::release(), and ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind().
00093 { 00094 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00095 00096 TAO_EC_Dispatching_Task* dtask = 0; 00097 00098 if (this->consumer_task_map_.find (consumer, dtask) == -1) 00099 { 00100 ACE_DEBUG ((LM_WARNING, 00101 "EC (%P|%t): TPC_Dispatching::remove_consumer failed to" 00102 " find consumer (%@) in map\n", consumer)); 00103 return -1; 00104 } 00105 00106 // Must have found it...first try to unbind 00107 if (this->consumer_task_map_.unbind (consumer) == -1) 00108 { 00109 ACE_DEBUG ((LM_WARNING, 00110 "EC (%P|%t): TPC_Dispatching::remove_consumer failed to" 00111 " unbind consumer (%@) and task in map\n", consumer)); 00112 return -1; 00113 } 00114 00115 dtask->putq (new TAO_EC_Shutdown_Task_Command); 00116 CORBA::release (consumer); // This matches the _duplicate in add_consumer 00117 return 0; 00118 } |
|
Deactivate any internal threads and cleanup internal data structures, it should only return once the threads have finished their jobs. Implements TAO_EC_Dispatching. Definition at line 126 of file EC_TPC_Dispatching.cpp. References ACE_GUARD, ACE_SYNCH_MUTEX, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), consumer_task_map_, CORBA::release(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind_all(), and ACE_Thread_Manager::wait().
00127 { 00128 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); 00129 00130 // The MT_Dispatching strategy sends a TAO_EC_Shutdown_Task_Command 00131 // to the dispatching task. Is that what we should do here? 00132 MAPTYPE::ITERATOR iter = this->consumer_task_map_.begin (); 00133 while (! iter.done()) 00134 { 00135 MAPTYPE::ENTRY* entry = 0; 00136 if (! iter.next(entry)) 00137 continue; 00138 00139 entry->int_id_->putq (new TAO_EC_Shutdown_Task_Command); 00140 iter.advance (); 00141 } 00142 00143 this->thread_manager_.wait (); // Wait for the threads to terminate 00144 00145 // Now iterate again and call CORBA::release on the ext_id; 00146 // we don't have to delete int_id_ b/c that happens in its close() method. 00147 iter = this->consumer_task_map_.begin (); 00148 while (! iter.done()) 00149 { 00150 MAPTYPE::ENTRY* entry = 0; 00151 if (! iter.next(entry)) 00152 continue; 00153 00154 CORBA::release (entry->ext_id_); 00155 iter.advance (); 00156 } 00157 00158 this->consumer_task_map_.unbind_all (); 00159 } |
|
Definition at line 70 of file EC_TPC_Dispatching.h. Referenced by add_consumer(), remove_consumer(), and shutdown(). |
|
Definition at line 75 of file EC_TPC_Dispatching.h. |
|
Definition at line 78 of file EC_TPC_Dispatching.h. |
|
Definition at line 64 of file EC_TPC_Dispatching.h. |