00001
00002
00003 #include "orbsvcs/Event/EC_MT_Dispatching.h"
00004
00005 ACE_RCSID(Event, EC_MT_Dispatching, "$Id: EC_MT_Dispatching.cpp 77613 2007-03-08 17:59:53Z fields_t $")
00006
00007
00008 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00009
00010 TAO_EC_MT_Dispatching::TAO_EC_MT_Dispatching (int nthreads,
00011 int thread_creation_flags,
00012 int thread_priority,
00013 int force_activate,
00014 TAO_EC_Queue_Full_Service_Object* service_object)
00015 : nthreads_ (nthreads),
00016 thread_creation_flags_ (thread_creation_flags),
00017 thread_priority_ (thread_priority),
00018 force_activate_ (force_activate),
00019 task_(0, service_object),
00020 active_ (0),
00021 queue_full_service_object_ (service_object)
00022 {
00023 this->task_.open (&this->thread_manager_);
00024 }
00025
00026 void
00027 TAO_EC_MT_Dispatching::activate (void)
00028 {
00029 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00030
00031 if (this->active_ != 0)
00032 return;
00033
00034 this->active_ = 1;
00035
00036 if (this->task_.activate (this->thread_creation_flags_,
00037 this->nthreads_,
00038 1,
00039 this->thread_priority_) == -1)
00040 {
00041 if (this->force_activate_ != 0)
00042 {
00043 ACE_DEBUG ((LM_DEBUG,
00044 "EC (%P|%t) activating dispatching queue at"
00045 " default priority\n"));
00046 if (this->task_.activate (THR_BOUND, this->nthreads_) == -1)
00047 ACE_ERROR ((LM_ERROR,
00048 "EC (%P|%t) cannot activate dispatching queue.\n"));
00049 }
00050 }
00051 }
00052
00053 void
00054 TAO_EC_MT_Dispatching::shutdown (void)
00055 {
00056 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00057
00058 if (this->active_ == 0)
00059 return;
00060
00061 for (int i = 0; i < this->nthreads_; ++i)
00062 {
00063 this->task_.putq (new TAO_EC_Shutdown_Task_Command);
00064 }
00065 this->thread_manager_.wait ();
00066 }
00067
00068 void
00069 TAO_EC_MT_Dispatching::push (TAO_EC_ProxyPushSupplier* proxy,
00070 RtecEventComm::PushConsumer_ptr consumer,
00071 const RtecEventComm::EventSet& event,
00072 TAO_EC_QOS_Info& qos_info)
00073 {
00074 RtecEventComm::EventSet event_copy = event;
00075 this->push_nocopy (proxy, consumer, event_copy, qos_info);
00076 }
00077
00078 void
00079 TAO_EC_MT_Dispatching::push_nocopy (TAO_EC_ProxyPushSupplier* proxy,
00080 RtecEventComm::PushConsumer_ptr consumer,
00081 RtecEventComm::EventSet& event,
00082 TAO_EC_QOS_Info&)
00083 {
00084
00085 if (this->active_ == 0)
00086 this->activate ();
00087
00088 this->task_.push (proxy, consumer, event);
00089 }
00090
00091 TAO_END_VERSIONED_NAMESPACE_DECL