00001
00002
00003 #include "orbsvcs/Event/EC_TPC_Dispatching.h"
00004 #include "orbsvcs/Event/EC_Defaults.h"
00005
00006 #include <ace/Dynamic_Service.h>
00007
00008 ACE_RCSID(Event, EC_TPC_Dispatching, "$Id: EC_TPC_Dispatching.cpp 80166 2007-12-03 13:53:49Z sowayaa $")
00009
00010 #if !defined(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE)
00011 #define TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE 32
00012 #endif
00013
00014 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00015
00016 extern unsigned long TAO_EC_TPC_debug_level;
00017
00018 TAO_EC_TPC_Dispatching::TAO_EC_TPC_Dispatching (int nthreads,
00019 int thread_creation_flags,
00020 int thread_priority,
00021 int force_activate,
00022 TAO_EC_Queue_Full_Service_Object* so)
00023 : nthreads_(nthreads)
00024 , thread_creation_flags_ (thread_creation_flags)
00025 , thread_priority_ (thread_priority)
00026 , force_activate_ (force_activate)
00027 , consumer_task_map_(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE)
00028 , queue_full_service_object_(so)
00029 {
00030 ACE_ASSERT (this->queue_full_service_object_ != 0);
00031 }
00032
00033 TAO_EC_TPC_Dispatching::~TAO_EC_TPC_Dispatching ()
00034 {
00035
00036
00037
00038
00039 }
00040
00041 int
00042 TAO_EC_TPC_Dispatching::add_consumer (RtecEventComm::PushConsumer_ptr consumer)
00043 {
00044 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00045
00046
00047 RtecEventComm::PushConsumer_var pc =
00048 RtecEventComm::PushConsumer::_duplicate(consumer);
00049
00050 if (TAO_EC_TPC_debug_level > 0)
00051 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@)\n", pc.in()));
00052
00053 TAO_EC_Dispatching_Task* dtask =
00054 new TAO_EC_TPC_Dispatching_Task (&this->thread_manager_,
00055 this->queue_full_service_object_);
00056
00057 if (TAO_EC_TPC_debug_level > 0)
00058 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@): new task %@\n", pc.in(), dtask));
00059
00060 if ((dtask->activate (this->thread_creation_flags_,
00061 1,
00062 1,
00063 this->thread_priority_)) == -1)
00064 {
00065 ACE_DEBUG ((LM_WARNING,
00066 "EC (%P|%t): TPC_Dispatching::add_consumer unable to activate"
00067 " dispatching task for consumer (%@)\n",
00068 consumer));
00069 delete dtask;
00070 return -1;
00071 }
00072
00073 int bindresult =
00074 this->consumer_task_map_.bind (RtecEventComm::PushConsumer::_duplicate(pc.in()),
00075 dtask);
00076 const char* explanation = 0;
00077 if (bindresult == -1)
00078 explanation = "general failure";
00079 else if (bindresult == 1)
00080 explanation = "entry already exists";
00081
00082 if (explanation != 0)
00083 {
00084 ACE_DEBUG ((LM_WARNING,
00085 "EC (%P|%t): TPC_Dispatching::add_consumer failed to bind consumer (%@)"
00086 " and dispatch task in map (%s): %p\n",
00087 consumer, explanation));
00088 dtask->putq (new TAO_EC_Shutdown_Task_Command);
00089 dtask->wait ();
00090 delete dtask;
00091 return -1;
00092 }
00093
00094 return 0;
00095 }
00096
00097 int
00098 TAO_EC_TPC_Dispatching::remove_consumer (RtecEventComm::PushConsumer_ptr consumer)
00099 {
00100 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00101
00102 TAO_EC_Dispatching_Task* dtask = 0;
00103
00104 if (this->consumer_task_map_.find (consumer, dtask) == -1)
00105 {
00106 ACE_DEBUG ((LM_WARNING,
00107 "EC (%P|%t): TPC_Dispatching::remove_consumer failed to"
00108 " find consumer (%@) in map\n", consumer));
00109 return -1;
00110 }
00111
00112
00113 if (this->consumer_task_map_.unbind (consumer) == -1)
00114 {
00115 ACE_DEBUG ((LM_WARNING,
00116 "EC (%P|%t): TPC_Dispatching::remove_consumer failed to"
00117 " unbind consumer (%@) and task in map\n", consumer));
00118 return -1;
00119 }
00120
00121 dtask->putq (new TAO_EC_Shutdown_Task_Command);
00122 CORBA::release (consumer);
00123 return 0;
00124 }
00125
00126 void
00127 TAO_EC_TPC_Dispatching::activate (void)
00128 {
00129 }
00130
00131 void
00132 TAO_EC_TPC_Dispatching::shutdown (void)
00133 {
00134 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00135
00136
00137
00138 MAPTYPE::ITERATOR iter = this->consumer_task_map_.begin ();
00139 while (! iter.done())
00140 {
00141 MAPTYPE::ENTRY* entry = 0;
00142 if (! iter.next(entry))
00143 continue;
00144
00145 entry->int_id_->putq (new TAO_EC_Shutdown_Task_Command);
00146 iter.advance ();
00147 }
00148
00149 this->thread_manager_.wait ();
00150
00151
00152
00153 iter = this->consumer_task_map_.begin ();
00154 while (! iter.done())
00155 {
00156 MAPTYPE::ENTRY* entry = 0;
00157 if (! iter.next(entry))
00158 continue;
00159
00160 CORBA::release (entry->ext_id_);
00161 iter.advance ();
00162 }
00163
00164 this->consumer_task_map_.unbind_all ();
00165 }
00166
00167 void
00168 TAO_EC_TPC_Dispatching::push (TAO_EC_ProxyPushSupplier* proxy,
00169 RtecEventComm::PushConsumer_ptr consumer,
00170 const RtecEventComm::EventSet& event,
00171 TAO_EC_QOS_Info& qos_info)
00172 {
00173 RtecEventComm::EventSet event_copy = event;
00174 this->push_nocopy (proxy, consumer, event_copy, qos_info);
00175 }
00176
00177 void
00178 TAO_EC_TPC_Dispatching::push_nocopy (TAO_EC_ProxyPushSupplier* proxy,
00179 RtecEventComm::PushConsumer_ptr consumer,
00180 RtecEventComm::EventSet& event,
00181 TAO_EC_QOS_Info&)
00182 {
00183 if (TAO_EC_TPC_debug_level > 0)
00184 ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::push_nocopy(supplier=%@,consumer=%@)\n", proxy, consumer));
00185
00186 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00187 TAO_EC_Dispatching_Task* dtask;
00188
00189 if (this->consumer_task_map_.find (consumer, dtask) == -1)
00190 {
00191 ACE_DEBUG ((LM_WARNING,
00192 "EC (%P|%t): TPC_Dispatching::push_nocopy failed to"
00193 " find consumer (%@) in map\n", consumer));
00194 }
00195 else
00196 {
00197 dtask->push (proxy, consumer, event);
00198 }
00199 }
00200
00201 TAO_END_VERSIONED_NAMESPACE_DECL