00001
00002
00003 #include "orbsvcs/Event/EC_MT_Dispatching.h"
00004
00005 ACE_RCSID(Event, EC_MT_Dispatching, "EC_MT_Dispatching.cpp,v 1.13 2006/03/14 06:14:25 jtc Exp")
00006
00007
00008 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00009
00010 TAO_EC_MT_Dispatching::TAO_EC_MT_Dispatching (int nthreads,
00011 int thread_creation_flags,
00012 int thread_priority,
00013 int force_activate,
00014 TAO_EC_Queue_Full_Service_Object* service_object)
00015 : nthreads_ (nthreads),
00016 thread_creation_flags_ (thread_creation_flags),
00017 thread_priority_ (thread_priority),
00018 force_activate_ (force_activate),
00019 active_ (0),
00020 queue_full_service_object_ (service_object)
00021 {
00022 this->task_.open (&this->thread_manager_);
00023 }
00024
00025 void
00026 TAO_EC_MT_Dispatching::activate (void)
00027 {
00028 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00029
00030 if (this->active_ != 0)
00031 return;
00032
00033 this->active_ = 1;
00034
00035 if (this->task_.activate (this->thread_creation_flags_,
00036 this->nthreads_,
00037 1,
00038 this->thread_priority_) == -1)
00039 {
00040 if (this->force_activate_ != 0)
00041 {
00042 ACE_DEBUG ((LM_DEBUG,
00043 "EC (%P|%t) activating dispatching queue at"
00044 " default priority\n"));
00045 if (this->task_.activate (THR_BOUND, this->nthreads_) == -1)
00046 ACE_ERROR ((LM_ERROR,
00047 "EC (%P|%t) cannot activate dispatching queue.\n"));
00048 }
00049 }
00050 }
00051
00052 void
00053 TAO_EC_MT_Dispatching::shutdown (void)
00054 {
00055 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00056
00057 if (this->active_ == 0)
00058 return;
00059
00060 for (int i = 0; i < this->nthreads_; ++i)
00061 {
00062 this->task_.putq (new TAO_EC_Shutdown_Task_Command);
00063 }
00064 this->thread_manager_.wait ();
00065 }
00066
00067 void
00068 TAO_EC_MT_Dispatching::push (TAO_EC_ProxyPushSupplier* proxy,
00069 RtecEventComm::PushConsumer_ptr consumer,
00070 const RtecEventComm::EventSet& event,
00071 TAO_EC_QOS_Info& qos_info
00072 ACE_ENV_ARG_DECL)
00073 {
00074 RtecEventComm::EventSet event_copy = event;
00075 this->push_nocopy (proxy, consumer, event_copy, qos_info ACE_ENV_ARG_PARAMETER);
00076 }
00077
00078 void
00079 TAO_EC_MT_Dispatching::push_nocopy (TAO_EC_ProxyPushSupplier* proxy,
00080 RtecEventComm::PushConsumer_ptr consumer,
00081 RtecEventComm::EventSet& event,
00082 TAO_EC_QOS_Info&
00083 ACE_ENV_ARG_DECL)
00084 {
00085
00086 if (this->active_ == 0)
00087 this->activate ();
00088
00089 this->task_.push (proxy, consumer, event ACE_ENV_ARG_PARAMETER);
00090 }
00091
00092 TAO_END_VERSIONED_NAMESPACE_DECL