00001
00002
00003 #include "orbsvcs/CosEvent/CEC_MT_Dispatching.h"
00004
00005 ACE_RCSID(CosEvent, CEC_MT_Dispatching, "CEC_MT_Dispatching.cpp,v 1.9 2006/03/14 06:14:24 jtc Exp")
00006
00007 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00008
00009
00010
00011
00012 TAO_CEC_MT_Dispatching::TAO_CEC_MT_Dispatching (int nthreads,
00013 int thread_creation_flags,
00014 int thread_priority,
00015 int force_activate)
00016 : nthreads_ (nthreads),
00017 thread_creation_flags_ (thread_creation_flags),
00018 thread_priority_ (thread_priority),
00019 force_activate_ (force_activate),
00020 task_ (&this->thread_manager_),
00021 active_ (0)
00022 {
00023 }
00024
00025 void
00026 TAO_CEC_MT_Dispatching::activate (void)
00027 {
00028 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00029
00030 if (this->active_ != 0)
00031 return;
00032
00033 this->active_ = 1;
00034
00035 if (this->task_.activate (this->thread_creation_flags_,
00036 this->nthreads_,
00037 1,
00038 this->thread_priority_) == -1)
00039 {
00040 if (this->force_activate_ != 0)
00041 {
00042 if (this->task_.activate (THR_BOUND, this->nthreads_) == -1)
00043 ACE_ERROR ((LM_ERROR,
00044 "EC (%P|%t) cannot activate dispatching queue"));
00045 }
00046 }
00047 }
00048
00049 void
00050 TAO_CEC_MT_Dispatching::shutdown (void)
00051 {
00052 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00053
00054 if (this->active_ == 0)
00055 return;
00056
00057 for (int i = 0; i < this->nthreads_; ++i)
00058 {
00059 this->task_.putq (new TAO_CEC_Shutdown_Task_Command);
00060 }
00061 this->thread_manager_.wait ();
00062 }
00063
00064 void
00065 TAO_CEC_MT_Dispatching::push (TAO_CEC_ProxyPushSupplier* proxy,
00066 const CORBA::Any& event
00067 ACE_ENV_ARG_DECL)
00068 {
00069 CORBA::Any event_copy = event;
00070 this->push_nocopy (proxy, event_copy ACE_ENV_ARG_PARAMETER);
00071 }
00072
00073 void
00074 TAO_CEC_MT_Dispatching::push_nocopy (TAO_CEC_ProxyPushSupplier* proxy,
00075 CORBA::Any& event
00076 ACE_ENV_ARG_DECL)
00077 {
00078
00079 if (this->active_ == 0)
00080 this->activate ();
00081
00082 this->task_.push (proxy, event ACE_ENV_ARG_PARAMETER);
00083 }
00084
00085 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00086 void
00087 TAO_CEC_MT_Dispatching::invoke (TAO_CEC_ProxyPushSupplier* proxy,
00088 const TAO_CEC_TypedEvent& typed_event
00089 ACE_ENV_ARG_DECL)
00090 {
00091 TAO_CEC_TypedEvent typed_event_copy = typed_event;
00092 this->invoke_nocopy (proxy, typed_event_copy ACE_ENV_ARG_PARAMETER);
00093 }
00094
00095 void
00096 TAO_CEC_MT_Dispatching::invoke_nocopy (TAO_CEC_ProxyPushSupplier* proxy,
00097 TAO_CEC_TypedEvent& typed_event
00098 ACE_ENV_ARG_DECL)
00099 {
00100
00101 if (this->active_ == 0)
00102 this->activate ();
00103
00104 this->task_.invoke (proxy, typed_event ACE_ENV_ARG_PARAMETER);
00105 }
00106 #endif
00107
00108 TAO_END_VERSIONED_NAMESPACE_DECL