00001
00002
00003 #include "orbsvcs/CosEvent/CEC_MT_Dispatching.h"
00004
00005 ACE_RCSID(CosEvent, CEC_MT_Dispatching, "$Id: CEC_MT_Dispatching.cpp 76589 2007-01-25 18:04:11Z elliott_c $")
00006
00007 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00008
00009
00010
00011
00012 TAO_CEC_MT_Dispatching::TAO_CEC_MT_Dispatching (int nthreads,
00013 int thread_creation_flags,
00014 int thread_priority,
00015 int force_activate)
00016 : nthreads_ (nthreads),
00017 thread_creation_flags_ (thread_creation_flags),
00018 thread_priority_ (thread_priority),
00019 force_activate_ (force_activate),
00020 task_ (&this->thread_manager_),
00021 active_ (0)
00022 {
00023 }
00024
00025 void
00026 TAO_CEC_MT_Dispatching::activate (void)
00027 {
00028 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00029
00030 if (this->active_ != 0)
00031 return;
00032
00033 this->active_ = 1;
00034
00035 if (this->task_.activate (this->thread_creation_flags_,
00036 this->nthreads_,
00037 1,
00038 this->thread_priority_) == -1)
00039 {
00040 if (this->force_activate_ != 0)
00041 {
00042 if (this->task_.activate (THR_BOUND, this->nthreads_) == -1)
00043 ACE_ERROR ((LM_ERROR,
00044 "EC (%P|%t) cannot activate dispatching queue"));
00045 }
00046 }
00047 }
00048
00049 void
00050 TAO_CEC_MT_Dispatching::shutdown (void)
00051 {
00052 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00053
00054 if (this->active_ == 0)
00055 return;
00056
00057 for (int i = 0; i < this->nthreads_; ++i)
00058 {
00059 this->task_.putq (new TAO_CEC_Shutdown_Task_Command);
00060 }
00061 this->thread_manager_.wait ();
00062 }
00063
00064 void
00065 TAO_CEC_MT_Dispatching::push (TAO_CEC_ProxyPushSupplier* proxy,
00066 const CORBA::Any& event)
00067 {
00068 CORBA::Any event_copy = event;
00069 this->push_nocopy (proxy, event_copy);
00070 }
00071
00072 void
00073 TAO_CEC_MT_Dispatching::push_nocopy (TAO_CEC_ProxyPushSupplier* proxy,
00074 CORBA::Any& event)
00075 {
00076
00077 if (this->active_ == 0)
00078 this->activate ();
00079
00080 this->task_.push (proxy, event);
00081 }
00082
00083 #if defined (TAO_HAS_TYPED_EVENT_CHANNEL)
00084 void
00085 TAO_CEC_MT_Dispatching::invoke (TAO_CEC_ProxyPushSupplier* proxy,
00086 const TAO_CEC_TypedEvent& typed_event)
00087 {
00088 TAO_CEC_TypedEvent typed_event_copy = typed_event;
00089 this->invoke_nocopy (proxy, typed_event_copy);
00090 }
00091
00092 void
00093 TAO_CEC_MT_Dispatching::invoke_nocopy (TAO_CEC_ProxyPushSupplier* proxy,
00094 TAO_CEC_TypedEvent& typed_event)
00095 {
00096
00097 if (this->active_ == 0)
00098 this->activate ();
00099
00100 this->task_.invoke (proxy, typed_event);
00101 }
00102 #endif
00103
00104 TAO_END_VERSIONED_NAMESPACE_DECL