00001
00002
00003 #include "orbsvcs/Event/EC_TPC_Dispatching.h"
00004 #include "orbsvcs/Event/EC_Defaults.h"
00005
00006 #include <ace/Dynamic_Service.h>
00007
00008 ACE_RCSID(Event, EC_TPC_Dispatching, "EC_TPC_Dispatching.cpp,v 1.9 2006/04/04 07:00:59 jwillemsen Exp")
00009
00010 #if !defined(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE)
00011 #define TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE 32
00012 #endif
00013
00014 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00015
00016 extern unsigned long EC_TPC_debug_level;
00017
00018 TAO_EC_TPC_Dispatching::TAO_EC_TPC_Dispatching (TAO_EC_Queue_Full_Service_Object* so)
00019 : consumer_task_map_(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE)
00020 , queue_full_service_object_(so)
00021 {
00022 ACE_ASSERT (this->queue_full_service_object_ != 0);
00023 }
00024
00025 TAO_EC_TPC_Dispatching::~TAO_EC_TPC_Dispatching ()
00026 {
00027
00028
00029
00030
00031 }
00032
00033 int
00034 TAO_EC_TPC_Dispatching::add_consumer (RtecEventComm::PushConsumer_ptr consumer
00035 ACE_ENV_ARG_DECL_NOT_USED)
00036 {
00037 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00038
00039
00040 RtecEventComm::PushConsumer_var pc =
00041 RtecEventComm::PushConsumer::_duplicate(consumer);
00042
00043 if (EC_TPC_debug_level > 0)
00044 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@)\n", pc.in()));
00045
00046 TAO_EC_Dispatching_Task* dtask =
00047 new TAO_EC_TPC_Dispatching_Task (&this->thread_manager_,
00048 this->queue_full_service_object_);
00049
00050 if (EC_TPC_debug_level > 0)
00051 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@): new task %@\n", pc.in(), dtask));
00052
00053 if ((dtask->activate (TAO_EC_DEFAULT_DISPATCHING_THREADS_FLAGS,
00054 1,
00055 1,
00056 TAO_EC_DEFAULT_DISPATCHING_THREADS_PRIORITY)) == -1)
00057 {
00058 ACE_DEBUG ((LM_WARNING,
00059 "EC (%P|%t): TPC_Dispatching::add_consumer unable to activate"
00060 " dispatching task for consumer (%@)\n",
00061 consumer));
00062 delete dtask;
00063 return -1;
00064 }
00065
00066 int bindresult =
00067 this->consumer_task_map_.bind (RtecEventComm::PushConsumer::_duplicate(pc.in()),
00068 dtask);
00069 const char* explanation = 0;
00070 if (bindresult == -1)
00071 explanation = "general failure";
00072 else if (bindresult == 1)
00073 explanation = "entry already exists";
00074
00075 if (explanation != 0)
00076 {
00077 ACE_DEBUG ((LM_WARNING,
00078 "EC (%P|%t): TPC_Dispatching::add_consumer failed to bind consumer (%@)"
00079 " and dispatch task in map (%s): %p\n",
00080 consumer, explanation));
00081 dtask->putq (new TAO_EC_Shutdown_Task_Command);
00082 dtask->wait ();
00083 delete dtask;
00084 return -1;
00085 }
00086
00087 return 0;
00088 }
00089
00090 int
00091 TAO_EC_TPC_Dispatching::remove_consumer (RtecEventComm::PushConsumer_ptr consumer
00092 ACE_ENV_ARG_DECL_NOT_USED)
00093 {
00094 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00095
00096 TAO_EC_Dispatching_Task* dtask = 0;
00097
00098 if (this->consumer_task_map_.find (consumer, dtask) == -1)
00099 {
00100 ACE_DEBUG ((LM_WARNING,
00101 "EC (%P|%t): TPC_Dispatching::remove_consumer failed to"
00102 " find consumer (%@) in map\n", consumer));
00103 return -1;
00104 }
00105
00106
00107 if (this->consumer_task_map_.unbind (consumer) == -1)
00108 {
00109 ACE_DEBUG ((LM_WARNING,
00110 "EC (%P|%t): TPC_Dispatching::remove_consumer failed to"
00111 " unbind consumer (%@) and task in map\n", consumer));
00112 return -1;
00113 }
00114
00115 dtask->putq (new TAO_EC_Shutdown_Task_Command);
00116 CORBA::release (consumer);
00117 return 0;
00118 }
00119
00120 void
00121 TAO_EC_TPC_Dispatching::activate (void)
00122 {
00123 }
00124
00125 void
00126 TAO_EC_TPC_Dispatching::shutdown (void)
00127 {
00128 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
00129
00130
00131
00132 MAPTYPE::ITERATOR iter = this->consumer_task_map_.begin ();
00133 while (! iter.done())
00134 {
00135 MAPTYPE::ENTRY* entry = 0;
00136 if (! iter.next(entry))
00137 continue;
00138
00139 entry->int_id_->putq (new TAO_EC_Shutdown_Task_Command);
00140 iter.advance ();
00141 }
00142
00143 this->thread_manager_.wait ();
00144
00145
00146
00147 iter = this->consumer_task_map_.begin ();
00148 while (! iter.done())
00149 {
00150 MAPTYPE::ENTRY* entry = 0;
00151 if (! iter.next(entry))
00152 continue;
00153
00154 CORBA::release (entry->ext_id_);
00155 iter.advance ();
00156 }
00157
00158 this->consumer_task_map_.unbind_all ();
00159 }
00160
00161 void
00162 TAO_EC_TPC_Dispatching::push (TAO_EC_ProxyPushSupplier* proxy,
00163 RtecEventComm::PushConsumer_ptr consumer,
00164 const RtecEventComm::EventSet& event,
00165 TAO_EC_QOS_Info& qos_info
00166 ACE_ENV_ARG_DECL)
00167 {
00168 RtecEventComm::EventSet event_copy = event;
00169 this->push_nocopy (proxy, consumer, event_copy, qos_info ACE_ENV_ARG_PARAMETER);
00170 }
00171
00172 void
00173 TAO_EC_TPC_Dispatching::push_nocopy (TAO_EC_ProxyPushSupplier* proxy,
00174 RtecEventComm::PushConsumer_ptr consumer,
00175 RtecEventComm::EventSet& event,
00176 TAO_EC_QOS_Info&
00177 ACE_ENV_ARG_DECL)
00178 {
00179 if (EC_TPC_debug_level > 0)
00180 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::push_nocopy(supplier=%@,consumer=%@)\n", proxy, consumer));
00181
00182 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
00183 TAO_EC_Dispatching_Task* dtask;
00184
00185 if (this->consumer_task_map_.find (consumer, dtask) == -1)
00186 {
00187 ACE_DEBUG ((LM_WARNING,
00188 "EC (%P|%t): TPC_Dispatching::push_nocopy failed to"
00189 " find consumer (%@) in map\n", consumer));
00190 }
00191 else
00192 {
00193 dtask->push (proxy, consumer, event ACE_ENV_ARG_PARAMETER);
00194 ACE_CHECK;
00195 }
00196 }
00197
00198 TAO_END_VERSIONED_NAMESPACE_DECL