EC_TPC_Dispatching.cpp

Go to the documentation of this file.
00001 // EC_TPC_Dispatching.cpp,v 1.9 2006/04/04 07:00:59 jwillemsen Exp
00002 
00003 #include "orbsvcs/Event/EC_TPC_Dispatching.h"
00004 #include "orbsvcs/Event/EC_Defaults.h"
00005 
00006 #include <ace/Dynamic_Service.h>
00007 
00008 ACE_RCSID(Event, EC_TPC_Dispatching, "EC_TPC_Dispatching.cpp,v 1.9 2006/04/04 07:00:59 jwillemsen Exp")
00009 
00010 #if !defined(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE)
00011 #define TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE 32
00012 #endif
00013 
00014 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00015 
00016 extern unsigned long EC_TPC_debug_level;
00017 
00018 TAO_EC_TPC_Dispatching::TAO_EC_TPC_Dispatching (TAO_EC_Queue_Full_Service_Object* so)
00019   : consumer_task_map_(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE)
00020   , queue_full_service_object_(so)
00021 {
00022   ACE_ASSERT (this->queue_full_service_object_ != 0);
00023 }
00024 
00025 TAO_EC_TPC_Dispatching::~TAO_EC_TPC_Dispatching ()
00026 {
00027   // No other dispatching strategy has a DTOR body.  I can only
00028   // assume that it's guaranteed that shutdown() is called before
00029   // the DTOR, so the tear-down logic needs to go in the shutdown,
00030   // and the DTOR need not call shutdown.
00031 }
00032 
00033 int
00034 TAO_EC_TPC_Dispatching::add_consumer (RtecEventComm::PushConsumer_ptr consumer
00035                                       ACE_ENV_ARG_DECL_NOT_USED)
00036 {
00037   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00038 
00039   // Duplicate the pointer and hold it safely
00040   RtecEventComm::PushConsumer_var pc =
00041     RtecEventComm::PushConsumer::_duplicate(consumer);
00042 
00043   if (EC_TPC_debug_level > 0)
00044     ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@)\n", pc.in()));
00045 
00046   TAO_EC_Dispatching_Task* dtask =
00047     new TAO_EC_TPC_Dispatching_Task (&this->thread_manager_,
00048                                      this->queue_full_service_object_);
00049 
00050   if (EC_TPC_debug_level > 0)
00051     ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@): new task %@\n", pc.in(), dtask));
00052 
00053   if ((dtask->activate (TAO_EC_DEFAULT_DISPATCHING_THREADS_FLAGS,
00054                        1, // we only want one thread to dispatch to a consumer
00055                        1, // magic number??
00056                         TAO_EC_DEFAULT_DISPATCHING_THREADS_PRIORITY)) == -1)
00057     {
00058       ACE_DEBUG ((LM_WARNING,
00059                   "EC (%P|%t): TPC_Dispatching::add_consumer unable to activate"
00060                   " dispatching task for consumer (%@)\n",
00061                   consumer));
00062       delete dtask;
00063       return -1;
00064     }
00065 
00066   int bindresult =
00067     this->consumer_task_map_.bind (RtecEventComm::PushConsumer::_duplicate(pc.in()),
00068                                    dtask);
00069   const char* explanation = 0;
00070   if (bindresult == -1)
00071     explanation = "general failure";
00072   else if (bindresult == 1)
00073     explanation = "entry already exists";
00074 
00075   if (explanation != 0)
00076     {
00077       ACE_DEBUG ((LM_WARNING,
00078                   "EC (%P|%t): TPC_Dispatching::add_consumer failed to bind consumer (%@)"
00079                   " and dispatch task in map (%s): %p\n",
00080                   consumer, explanation));
00081       dtask->putq (new TAO_EC_Shutdown_Task_Command);
00082       dtask->wait ();
00083       delete dtask;
00084       return -1;
00085     }
00086 
00087   return 0;
00088 }
00089 
00090 int
00091 TAO_EC_TPC_Dispatching::remove_consumer (RtecEventComm::PushConsumer_ptr consumer
00092                                          ACE_ENV_ARG_DECL_NOT_USED)
00093 {
00094   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00095 
00096   TAO_EC_Dispatching_Task* dtask = 0;
00097 
00098   if (this->consumer_task_map_.find (consumer, dtask) == -1)
00099     {
00100       ACE_DEBUG ((LM_WARNING,
00101                   "EC (%P|%t): TPC_Dispatching::remove_consumer failed to"
00102                   " find consumer (%@) in map\n", consumer));
00103       return -1;
00104     }
00105 
00106   // Must have found it...first try to unbind
00107   if (this->consumer_task_map_.unbind (consumer) == -1)
00108     {
00109       ACE_DEBUG ((LM_WARNING,
00110                   "EC (%P|%t): TPC_Dispatching::remove_consumer failed to"
00111                   " unbind consumer (%@) and task in map\n",  consumer));
00112       return -1;
00113     }
00114 
00115   dtask->putq (new TAO_EC_Shutdown_Task_Command);
00116   CORBA::release (consumer);  // This matches the _duplicate in add_consumer
00117   return 0;
00118 }
00119 
00120 void
00121 TAO_EC_TPC_Dispatching::activate (void)
00122 {
00123 }
00124 
00125 void
00126 TAO_EC_TPC_Dispatching::shutdown (void)
00127 {
00128   ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
00129 
00130   // The MT_Dispatching strategy sends a TAO_EC_Shutdown_Task_Command
00131   // to the dispatching task.  Is that what we should do here?
00132   MAPTYPE::ITERATOR iter = this->consumer_task_map_.begin ();
00133   while (! iter.done())
00134     {
00135       MAPTYPE::ENTRY* entry = 0;
00136       if (! iter.next(entry))
00137         continue;
00138 
00139       entry->int_id_->putq (new TAO_EC_Shutdown_Task_Command);
00140       iter.advance ();
00141     }
00142 
00143   this->thread_manager_.wait (); // Wait for the threads to terminate
00144 
00145   // Now iterate again and call CORBA::release on the ext_id;
00146   // we don't have to delete int_id_ b/c that happens in its close() method.
00147   iter = this->consumer_task_map_.begin ();
00148   while (! iter.done())
00149     {
00150       MAPTYPE::ENTRY* entry = 0;
00151       if (! iter.next(entry))
00152         continue;
00153 
00154       CORBA::release (entry->ext_id_);
00155       iter.advance ();
00156     }
00157 
00158   this->consumer_task_map_.unbind_all ();
00159 }
00160 
00161 void
00162 TAO_EC_TPC_Dispatching::push (TAO_EC_ProxyPushSupplier* proxy,
00163                              RtecEventComm::PushConsumer_ptr consumer,
00164                              const RtecEventComm::EventSet& event,
00165                              TAO_EC_QOS_Info& qos_info
00166                              ACE_ENV_ARG_DECL)
00167 {
00168   RtecEventComm::EventSet event_copy = event;
00169   this->push_nocopy (proxy, consumer, event_copy, qos_info ACE_ENV_ARG_PARAMETER);
00170 }
00171 
00172 void
00173 TAO_EC_TPC_Dispatching::push_nocopy (TAO_EC_ProxyPushSupplier* proxy,
00174                                     RtecEventComm::PushConsumer_ptr consumer,
00175                                     RtecEventComm::EventSet& event,
00176                                     TAO_EC_QOS_Info&
00177                                     ACE_ENV_ARG_DECL)
00178 {
00179   if (EC_TPC_debug_level > 0)
00180     ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::push_nocopy(supplier=%@,consumer=%@)\n", proxy, consumer));
00181 
00182   ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
00183   TAO_EC_Dispatching_Task* dtask;
00184 
00185   if (this->consumer_task_map_.find (consumer, dtask) == -1)
00186     {
00187       ACE_DEBUG ((LM_WARNING,
00188                   "EC (%P|%t): TPC_Dispatching::push_nocopy failed to"
00189                   " find consumer (%@) in map\n", consumer));
00190     }
00191   else
00192     {
00193       dtask->push (proxy, consumer, event ACE_ENV_ARG_PARAMETER);
00194       ACE_CHECK;
00195     }
00196 }
00197 
00198 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:09 2006 for TAO_RTEvent by doxygen 1.3.6