TAO_EC_TPC_Dispatching Class Reference

Dispatching strategy that isolates deliveries to a consumer from any other. More...

#include <EC_TPC_Dispatching.h>

Inheritance diagram for TAO_EC_TPC_Dispatching:

Inheritance graph
[legend]
Collaboration diagram for TAO_EC_TPC_Dispatching:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_EC_TPC_Dispatching (int nthreads, int thread_creation_flags, int thread_priority, int force_activate, TAO_EC_Queue_Full_Service_Object *so)
 ~TAO_EC_TPC_Dispatching ()
virtual void activate (void)
virtual void shutdown (void)
virtual void push (TAO_EC_ProxyPushSupplier *proxy, RtecEventComm::PushConsumer_ptr consumer, const RtecEventComm::EventSet &event, TAO_EC_QOS_Info &qos_info)
virtual void push_nocopy (TAO_EC_ProxyPushSupplier *proxy, RtecEventComm::PushConsumer_ptr consumer, RtecEventComm::EventSet &event, TAO_EC_QOS_Info &qos_info)
int add_consumer (RtecEventComm::PushConsumer_ptr consumer)
int remove_consumer (RtecEventComm::PushConsumer_ptr consumer)

Private Types

typedef ACE_Hash_Map_Manager_Ex<
RtecEventComm::PushConsumer_ptr,
TAO_EC_Dispatching_Task *,
ACE_Pointer_Hash< RtecEventComm::PushConsumer_ptr >,
ACE_Equal_To< RtecEventComm::PushConsumer_ptr >,
ACE_Null_Mutex
MAPTYPE

Private Attributes

ACE_Thread_Manager thread_manager_
int nthreads_
 The number of active tasks.

int thread_creation_flags_
int thread_priority_
 The priority of the dispatching threads.

int force_activate_
MAPTYPE consumer_task_map_
ACE_SYNCH_MUTEX lock_
TAO_EC_Queue_Full_Service_Objectqueue_full_service_object_

Detailed Description

Dispatching strategy that isolates deliveries to a consumer from any other.

This strategy uses a thread per consumer, and was specifically designed to isolate the effects of an ill-behaved consumer from affecting other consumers.

Definition at line 39 of file EC_TPC_Dispatching.h.


Member Typedef Documentation

typedef ACE_Hash_Map_Manager_Ex<RtecEventComm::PushConsumer_ptr,TAO_EC_Dispatching_Task*,ACE_Pointer_Hash<RtecEventComm::PushConsumer_ptr>,ACE_Equal_To<RtecEventComm::PushConsumer_ptr>,ACE_Null_Mutex> TAO_EC_TPC_Dispatching::MAPTYPE [private]
 

Definition at line 82 of file EC_TPC_Dispatching.h.


Constructor & Destructor Documentation

TAO_EC_TPC_Dispatching::TAO_EC_TPC_Dispatching int  nthreads,
int  thread_creation_flags,
int  thread_priority,
int  force_activate,
TAO_EC_Queue_Full_Service_Object so
 

Definition at line 18 of file EC_TPC_Dispatching.cpp.

References ACE_ASSERT, and TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE.

00023   : nthreads_(nthreads)
00024   , thread_creation_flags_ (thread_creation_flags)
00025   , thread_priority_ (thread_priority)
00026   , force_activate_ (force_activate)
00027   , consumer_task_map_(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE)
00028   , queue_full_service_object_(so)
00029 {
00030   ACE_ASSERT (this->queue_full_service_object_ != 0);
00031 }

TAO_EC_TPC_Dispatching::~TAO_EC_TPC_Dispatching  ) 
 

Definition at line 33 of file EC_TPC_Dispatching.cpp.

00034 {
00035   // No other dispatching strategy has a DTOR body.  I can only
00036   // assume that it's guaranteed that shutdown() is called before
00037   // the DTOR, so the tear-down logic needs to go in the shutdown,
00038   // and the DTOR need not call shutdown.
00039 }


Member Function Documentation

void TAO_EC_TPC_Dispatching::activate void   )  [virtual]
 

Initialize all the data structures, activate any internal threads, etc.

Implements TAO_EC_Dispatching.

Definition at line 127 of file EC_TPC_Dispatching.cpp.

00128 {
00129 }

int TAO_EC_TPC_Dispatching::add_consumer RtecEventComm::PushConsumer_ptr  consumer  ) 
 

Definition at line 42 of file EC_TPC_Dispatching.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_Task_Base::activate(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), consumer_task_map_, LM_DEBUG, LM_WARNING, ACE_Task< ACE_SYNCH >::putq(), TAO_EC_TPC_debug_level, and ACE_Task_Base::wait().

Referenced by TAO_EC_TPC_ProxyPushSupplier::connect_push_consumer().

00043 {
00044   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00045 
00046   // Duplicate the pointer and hold it safely
00047   RtecEventComm::PushConsumer_var pc =
00048     RtecEventComm::PushConsumer::_duplicate(consumer);
00049 
00050   if (TAO_EC_TPC_debug_level > 0)
00051     ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@)\n", pc.in()));
00052 
00053   TAO_EC_Dispatching_Task* dtask =
00054     new TAO_EC_TPC_Dispatching_Task (&this->thread_manager_,
00055                                      this->queue_full_service_object_);
00056 
00057   if (TAO_EC_TPC_debug_level > 0)
00058     ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@): new task %@\n", pc.in(), dtask));
00059 
00060   if ((dtask->activate (this->thread_creation_flags_,
00061                        1, // we only want one thread to dispatch to a consumer
00062                        1, // magic number??
00063                         this->thread_priority_)) == -1)
00064     {
00065       ACE_DEBUG ((LM_WARNING,
00066                   "EC (%P|%t): TPC_Dispatching::add_consumer unable to activate"
00067                   " dispatching task for consumer (%@)\n",
00068                   consumer));
00069       delete dtask;
00070       return -1;
00071     }
00072 
00073   int bindresult =
00074     this->consumer_task_map_.bind (RtecEventComm::PushConsumer::_duplicate(pc.in()),
00075                                    dtask);
00076   const char* explanation = 0;
00077   if (bindresult == -1)
00078     explanation = "general failure";
00079   else if (bindresult == 1)
00080     explanation = "entry already exists";
00081 
00082   if (explanation != 0)
00083     {
00084       ACE_DEBUG ((LM_WARNING,
00085                   "EC (%P|%t): TPC_Dispatching::add_consumer failed to bind consumer (%@)"
00086                   " and dispatch task in map (%s): %p\n",
00087                   consumer, explanation));
00088       dtask->putq (new TAO_EC_Shutdown_Task_Command);
00089       dtask->wait ();
00090       delete dtask;
00091       return -1;
00092     }
00093 
00094   return 0;
00095 }

void TAO_EC_TPC_Dispatching::push TAO_EC_ProxyPushSupplier proxy,
RtecEventComm::PushConsumer_ptr  consumer,
const RtecEventComm::EventSet event,
TAO_EC_QOS_Info qos_info
[virtual]
 

The consumer represented by proxy should receive event. It can use the information in qos_info to determine the event priority (among other things).

Implements TAO_EC_Dispatching.

Definition at line 168 of file EC_TPC_Dispatching.cpp.

References RtecEventComm::EventSet, and push_nocopy().

00172 {
00173   RtecEventComm::EventSet event_copy = event;
00174   this->push_nocopy (proxy, consumer, event_copy, qos_info);
00175 }

void TAO_EC_TPC_Dispatching::push_nocopy TAO_EC_ProxyPushSupplier proxy,
RtecEventComm::PushConsumer_ptr  consumer,
RtecEventComm::EventSet event,
TAO_EC_QOS_Info qos_info
[virtual]
 

Implements TAO_EC_Dispatching.

Definition at line 178 of file EC_TPC_Dispatching.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_SYNCH_MUTEX, consumer_task_map_, RtecEventComm::EventSet, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), LM_DEBUG, LM_WARNING, TAO_EC_Dispatching_Task::push(), and TAO_EC_TPC_debug_level.

Referenced by push().

00182 {
00183   if (TAO_EC_TPC_debug_level > 0)
00184     ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::push_nocopy(supplier=%@,consumer=%@)\n", proxy, consumer));
00185 
00186   ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
00187   TAO_EC_Dispatching_Task* dtask;
00188 
00189   if (this->consumer_task_map_.find (consumer, dtask) == -1)
00190     {
00191       ACE_DEBUG ((LM_WARNING,
00192                   "EC (%P|%t): TPC_Dispatching::push_nocopy failed to"
00193                   " find consumer (%@) in map\n", consumer));
00194     }
00195   else
00196     {
00197       dtask->push (proxy, consumer, event);
00198     }
00199 }

int TAO_EC_TPC_Dispatching::remove_consumer RtecEventComm::PushConsumer_ptr  consumer  ) 
 

Definition at line 98 of file EC_TPC_Dispatching.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, consumer_task_map_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), LM_WARNING, ACE_Task< ACE_SYNCH >::putq(), CORBA::release(), and ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind().

Referenced by TAO_EC_TPC_ProxyPushConsumer::disconnect_push_consumer(), and TAO_EC_TPC_ProxyPushSupplier::disconnect_push_supplier().

00099 {
00100   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00101 
00102   TAO_EC_Dispatching_Task* dtask = 0;
00103 
00104   if (this->consumer_task_map_.find (consumer, dtask) == -1)
00105     {
00106       ACE_DEBUG ((LM_WARNING,
00107                   "EC (%P|%t): TPC_Dispatching::remove_consumer failed to"
00108                   " find consumer (%@) in map\n", consumer));
00109       return -1;
00110     }
00111 
00112   // Must have found it...first try to unbind
00113   if (this->consumer_task_map_.unbind (consumer) == -1)
00114     {
00115       ACE_DEBUG ((LM_WARNING,
00116                   "EC (%P|%t): TPC_Dispatching::remove_consumer failed to"
00117                   " unbind consumer (%@) and task in map\n",  consumer));
00118       return -1;
00119     }
00120 
00121   dtask->putq (new TAO_EC_Shutdown_Task_Command);
00122   CORBA::release (consumer);  // This matches the _duplicate in add_consumer
00123   return 0;
00124 }

void TAO_EC_TPC_Dispatching::shutdown void   )  [virtual]
 

Deactivate any internal threads and cleanup internal data structures, it should only return once the threads have finished their jobs.

Implements TAO_EC_Dispatching.

Definition at line 132 of file EC_TPC_Dispatching.cpp.

References ACE_GUARD, ACE_SYNCH_MUTEX, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), consumer_task_map_, CORBA::release(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind_all(), and ACE_Thread_Manager::wait().

00133 {
00134   ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
00135 
00136   // The MT_Dispatching strategy sends a TAO_EC_Shutdown_Task_Command
00137   // to the dispatching task.  Is that what we should do here?
00138   MAPTYPE::ITERATOR iter = this->consumer_task_map_.begin ();
00139   while (! iter.done())
00140     {
00141       MAPTYPE::ENTRY* entry = 0;
00142       if (! iter.next(entry))
00143         continue;
00144 
00145       entry->int_id_->putq (new TAO_EC_Shutdown_Task_Command);
00146       iter.advance ();
00147     }
00148 
00149   this->thread_manager_.wait (); // Wait for the threads to terminate
00150 
00151   // Now iterate again and call CORBA::release on the ext_id;
00152   // we don't have to delete int_id_ b/c that happens in its close() method.
00153   iter = this->consumer_task_map_.begin ();
00154   while (! iter.done())
00155     {
00156       MAPTYPE::ENTRY* entry = 0;
00157       if (! iter.next(entry))
00158         continue;
00159 
00160       CORBA::release (entry->ext_id_);
00161       iter.advance ();
00162     }
00163 
00164   this->consumer_task_map_.unbind_all ();
00165 }


Member Data Documentation

MAPTYPE TAO_EC_TPC_Dispatching::consumer_task_map_ [private]
 

Definition at line 86 of file EC_TPC_Dispatching.h.

Referenced by add_consumer(), push_nocopy(), remove_consumer(), and shutdown().

int TAO_EC_TPC_Dispatching::force_activate_ [private]
 

If activation at the requested priority fails then we fallback on the defaults for thread activation.

Definition at line 80 of file EC_TPC_Dispatching.h.

ACE_SYNCH_MUTEX TAO_EC_TPC_Dispatching::lock_ [private]
 

Definition at line 91 of file EC_TPC_Dispatching.h.

int TAO_EC_TPC_Dispatching::nthreads_ [private]
 

The number of active tasks.

Definition at line 69 of file EC_TPC_Dispatching.h.

TAO_EC_Queue_Full_Service_Object* TAO_EC_TPC_Dispatching::queue_full_service_object_ [private]
 

Definition at line 94 of file EC_TPC_Dispatching.h.

int TAO_EC_TPC_Dispatching::thread_creation_flags_ [private]
 

The flags (THR_BOUND, THR_NEW_LWP, etc.) used to create the dispatching threads.

Definition at line 73 of file EC_TPC_Dispatching.h.

ACE_Thread_Manager TAO_EC_TPC_Dispatching::thread_manager_ [private]
 

Definition at line 66 of file EC_TPC_Dispatching.h.

int TAO_EC_TPC_Dispatching::thread_priority_ [private]
 

The priority of the dispatching threads.

Definition at line 76 of file EC_TPC_Dispatching.h.


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 13:43:17 2008 for TAO_RTEvent by doxygen 1.3.6