#include <EC_TPC_Dispatching.h>
Inheritance diagram for TAO_EC_TPC_Dispatching:
Public Member Functions | |
TAO_EC_TPC_Dispatching (int nthreads, int thread_creation_flags, int thread_priority, int force_activate, TAO_EC_Queue_Full_Service_Object *so) | |
~TAO_EC_TPC_Dispatching () | |
virtual void | activate (void) |
virtual void | shutdown (void) |
virtual void | push (TAO_EC_ProxyPushSupplier *proxy, RtecEventComm::PushConsumer_ptr consumer, const RtecEventComm::EventSet &event, TAO_EC_QOS_Info &qos_info) |
virtual void | push_nocopy (TAO_EC_ProxyPushSupplier *proxy, RtecEventComm::PushConsumer_ptr consumer, RtecEventComm::EventSet &event, TAO_EC_QOS_Info &qos_info) |
int | add_consumer (RtecEventComm::PushConsumer_ptr consumer) |
int | remove_consumer (RtecEventComm::PushConsumer_ptr consumer) |
Private Types | |
typedef ACE_Hash_Map_Manager_Ex< RtecEventComm::PushConsumer_ptr, TAO_EC_Dispatching_Task *, ACE_Pointer_Hash< RtecEventComm::PushConsumer_ptr >, ACE_Equal_To< RtecEventComm::PushConsumer_ptr >, ACE_Null_Mutex > | MAPTYPE |
Private Attributes | |
ACE_Thread_Manager | thread_manager_ |
int | nthreads_ |
The number of active tasks. | |
int | thread_creation_flags_ |
int | thread_priority_ |
The priority of the dispatching threads. | |
int | force_activate_ |
MAPTYPE | consumer_task_map_ |
ACE_SYNCH_MUTEX | lock_ |
TAO_EC_Queue_Full_Service_Object * | queue_full_service_object_ |
This strategy uses a thread per consumer, and was specifically designed to isolate the effects of an ill-behaved consumer from affecting other consumers.
Definition at line 39 of file EC_TPC_Dispatching.h.
|
Definition at line 82 of file EC_TPC_Dispatching.h. |
|
Definition at line 18 of file EC_TPC_Dispatching.cpp. References ACE_ASSERT, and TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE.
00023 : nthreads_(nthreads) 00024 , thread_creation_flags_ (thread_creation_flags) 00025 , thread_priority_ (thread_priority) 00026 , force_activate_ (force_activate) 00027 , consumer_task_map_(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE) 00028 , queue_full_service_object_(so) 00029 { 00030 ACE_ASSERT (this->queue_full_service_object_ != 0); 00031 } |
|
Definition at line 33 of file EC_TPC_Dispatching.cpp.
00034 { 00035 // No other dispatching strategy has a DTOR body. I can only 00036 // assume that it's guaranteed that shutdown() is called before 00037 // the DTOR, so the tear-down logic needs to go in the shutdown, 00038 // and the DTOR need not call shutdown. 00039 } |
|
Initialize all the data structures, activate any internal threads, etc. Implements TAO_EC_Dispatching. Definition at line 127 of file EC_TPC_Dispatching.cpp.
00128 { 00129 } |
|
Definition at line 42 of file EC_TPC_Dispatching.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_Task_Base::activate(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), consumer_task_map_, LM_DEBUG, LM_WARNING, ACE_Task< ACE_SYNCH >::putq(), TAO_EC_TPC_debug_level, and ACE_Task_Base::wait(). Referenced by TAO_EC_TPC_ProxyPushSupplier::connect_push_consumer().
00043 { 00044 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00045 00046 // Duplicate the pointer and hold it safely 00047 RtecEventComm::PushConsumer_var pc = 00048 RtecEventComm::PushConsumer::_duplicate(consumer); 00049 00050 if (TAO_EC_TPC_debug_level > 0) 00051 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@)\n", pc.in())); 00052 00053 TAO_EC_Dispatching_Task* dtask = 00054 new TAO_EC_TPC_Dispatching_Task (&this->thread_manager_, 00055 this->queue_full_service_object_); 00056 00057 if (TAO_EC_TPC_debug_level > 0) 00058 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@): new task %@\n", pc.in(), dtask)); 00059 00060 if ((dtask->activate (this->thread_creation_flags_, 00061 1, // we only want one thread to dispatch to a consumer 00062 1, // magic number?? 00063 this->thread_priority_)) == -1) 00064 { 00065 ACE_DEBUG ((LM_WARNING, 00066 "EC (%P|%t): TPC_Dispatching::add_consumer unable to activate" 00067 " dispatching task for consumer (%@)\n", 00068 consumer)); 00069 delete dtask; 00070 return -1; 00071 } 00072 00073 int bindresult = 00074 this->consumer_task_map_.bind (RtecEventComm::PushConsumer::_duplicate(pc.in()), 00075 dtask); 00076 const char* explanation = 0; 00077 if (bindresult == -1) 00078 explanation = "general failure"; 00079 else if (bindresult == 1) 00080 explanation = "entry already exists"; 00081 00082 if (explanation != 0) 00083 { 00084 ACE_DEBUG ((LM_WARNING, 00085 "EC (%P|%t): TPC_Dispatching::add_consumer failed to bind consumer (%@)" 00086 " and dispatch task in map (%s): %p\n", 00087 consumer, explanation)); 00088 dtask->putq (new TAO_EC_Shutdown_Task_Command); 00089 dtask->wait (); 00090 delete dtask; 00091 return -1; 00092 } 00093 00094 return 0; 00095 } |
|
The consumer represented by proxy should receive event. It can use the information in qos_info to determine the event priority (among other things). Implements TAO_EC_Dispatching. Definition at line 168 of file EC_TPC_Dispatching.cpp. References RtecEventComm::EventSet, and push_nocopy().
00172 { 00173 RtecEventComm::EventSet event_copy = event; 00174 this->push_nocopy (proxy, consumer, event_copy, qos_info); 00175 } |
|
Implements TAO_EC_Dispatching. Definition at line 178 of file EC_TPC_Dispatching.cpp. References ACE_DEBUG, ACE_GUARD, ACE_SYNCH_MUTEX, consumer_task_map_, RtecEventComm::EventSet, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), LM_DEBUG, LM_WARNING, TAO_EC_Dispatching_Task::push(), and TAO_EC_TPC_debug_level. Referenced by push().
00182 { 00183 if (TAO_EC_TPC_debug_level > 0) 00184 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::push_nocopy(supplier=%@,consumer=%@)\n", proxy, consumer)); 00185 00186 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); 00187 TAO_EC_Dispatching_Task* dtask; 00188 00189 if (this->consumer_task_map_.find (consumer, dtask) == -1) 00190 { 00191 ACE_DEBUG ((LM_WARNING, 00192 "EC (%P|%t): TPC_Dispatching::push_nocopy failed to" 00193 " find consumer (%@) in map\n", consumer)); 00194 } 00195 else 00196 { 00197 dtask->push (proxy, consumer, event); 00198 } 00199 } |
|
Definition at line 98 of file EC_TPC_Dispatching.cpp. References ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, consumer_task_map_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), LM_WARNING, ACE_Task< ACE_SYNCH >::putq(), CORBA::release(), and ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(). Referenced by TAO_EC_TPC_ProxyPushConsumer::disconnect_push_consumer(), and TAO_EC_TPC_ProxyPushSupplier::disconnect_push_supplier().
00099 { 00100 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); 00101 00102 TAO_EC_Dispatching_Task* dtask = 0; 00103 00104 if (this->consumer_task_map_.find (consumer, dtask) == -1) 00105 { 00106 ACE_DEBUG ((LM_WARNING, 00107 "EC (%P|%t): TPC_Dispatching::remove_consumer failed to" 00108 " find consumer (%@) in map\n", consumer)); 00109 return -1; 00110 } 00111 00112 // Must have found it...first try to unbind 00113 if (this->consumer_task_map_.unbind (consumer) == -1) 00114 { 00115 ACE_DEBUG ((LM_WARNING, 00116 "EC (%P|%t): TPC_Dispatching::remove_consumer failed to" 00117 " unbind consumer (%@) and task in map\n", consumer)); 00118 return -1; 00119 } 00120 00121 dtask->putq (new TAO_EC_Shutdown_Task_Command); 00122 CORBA::release (consumer); // This matches the _duplicate in add_consumer 00123 return 0; 00124 } |
|
Deactivate any internal threads and cleanup internal data structures, it should only return once the threads have finished their jobs. Implements TAO_EC_Dispatching. Definition at line 132 of file EC_TPC_Dispatching.cpp. References ACE_GUARD, ACE_SYNCH_MUTEX, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), consumer_task_map_, CORBA::release(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind_all(), and ACE_Thread_Manager::wait().
00133 { 00134 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); 00135 00136 // The MT_Dispatching strategy sends a TAO_EC_Shutdown_Task_Command 00137 // to the dispatching task. Is that what we should do here? 00138 MAPTYPE::ITERATOR iter = this->consumer_task_map_.begin (); 00139 while (! iter.done()) 00140 { 00141 MAPTYPE::ENTRY* entry = 0; 00142 if (! iter.next(entry)) 00143 continue; 00144 00145 entry->int_id_->putq (new TAO_EC_Shutdown_Task_Command); 00146 iter.advance (); 00147 } 00148 00149 this->thread_manager_.wait (); // Wait for the threads to terminate 00150 00151 // Now iterate again and call CORBA::release on the ext_id; 00152 // we don't have to delete int_id_ b/c that happens in its close() method. 00153 iter = this->consumer_task_map_.begin (); 00154 while (! iter.done()) 00155 { 00156 MAPTYPE::ENTRY* entry = 0; 00157 if (! iter.next(entry)) 00158 continue; 00159 00160 CORBA::release (entry->ext_id_); 00161 iter.advance (); 00162 } 00163 00164 this->consumer_task_map_.unbind_all (); 00165 } |
|
Definition at line 86 of file EC_TPC_Dispatching.h. Referenced by add_consumer(), push_nocopy(), remove_consumer(), and shutdown(). |
|
If activation at the requested priority fails then we fallback on the defaults for thread activation. Definition at line 80 of file EC_TPC_Dispatching.h. |
|
Definition at line 91 of file EC_TPC_Dispatching.h. |
|
The number of active tasks.
Definition at line 69 of file EC_TPC_Dispatching.h. |
|
Definition at line 94 of file EC_TPC_Dispatching.h. |
|
The flags (THR_BOUND, THR_NEW_LWP, etc.) used to create the dispatching threads. Definition at line 73 of file EC_TPC_Dispatching.h. |
|
Definition at line 66 of file EC_TPC_Dispatching.h. |
|
The priority of the dispatching threads.
Definition at line 76 of file EC_TPC_Dispatching.h. |