00001 // $Id: Routing_Slip_Queue.cpp 71526 2006-03-14 06:14:35Z jtc $ 00002 00003 #include "orbsvcs/Notify/Routing_Slip_Queue.h" 00004 00005 #include "tao/debug.h" 00006 #include "ace/Dynamic_Service.h" 00007 00008 //#define DEBUG_LEVEL 9 00009 #ifndef DEBUG_LEVEL 00010 # define DEBUG_LEVEL TAO_debug_level 00011 #endif //DEBUG_LEVEL 00012 00013 00014 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00015 00016 namespace TAO_Notify 00017 { 00018 Routing_Slip_Queue::Routing_Slip_Queue (size_t allowed) 00019 : allowed_ (allowed) 00020 , active_ (0) 00021 { 00022 } 00023 00024 Routing_Slip_Queue::~Routing_Slip_Queue () 00025 { 00026 } 00027 00028 void 00029 Routing_Slip_Queue::add (const Routing_Slip_Ptr & routing_slip) 00030 { 00031 Guard guard (internals_); 00032 ACE_ASSERT (guard.locked()); // check recursion 00033 if (this->allowed_ == 0) 00034 { 00035 ++this->active_; 00036 guard.release (); 00037 routing_slip->at_front_of_persist_queue (); 00038 // guard.acquire (); 00039 } 00040 else 00041 { 00042 this->queue_.enqueue_tail (routing_slip); 00043 dispatch (guard); 00044 } 00045 } 00046 00047 void Routing_Slip_Queue::complete () 00048 { 00049 Guard guard (internals_); 00050 ACE_ASSERT (guard.locked()); // check recursion 00051 ACE_ASSERT (this->active_ > 0); 00052 --this->active_; 00053 dispatch (guard); 00054 } 00055 00056 void 00057 Routing_Slip_Queue::dispatch (Guard & guard) 00058 { 00059 // we start out pretty nice, 00060 // but the more work we do for other people 00061 // the less nice we get. 00062 size_t nice = this->allowed_ + 1; 00063 while (nice > 0 && (this->active_ < this->allowed_)) 00064 { 00065 if (dispatch_one (guard)) 00066 { 00067 --nice; 00068 } 00069 else 00070 { 00071 // that's about as nice as I get. 00072 nice = 0; 00073 } 00074 } 00075 } 00076 00077 bool 00078 Routing_Slip_Queue::dispatch_one (Guard & guard) 00079 { 00080 bool ok = false; 00081 Routing_Slip_Ptr routing_slip; 00082 if (this->queue_.dequeue_head (routing_slip) == 0) 00083 { 00084 ++this->active_; 00085 guard.release (); 00086 routing_slip->at_front_of_persist_queue (); 00087 guard.acquire (); 00088 } 00089 return ok; 00090 } 00091 00092 void 00093 Routing_Slip_Queue::set_allowed (size_t allowed) 00094 { 00095 Guard guard (internals_); 00096 size_t allowed_was = this->allowed_; 00097 this->allowed_ = allowed; 00098 if (allowed == 0 && allowed_was != 0) 00099 { 00100 while (dispatch_one (guard)) 00101 { 00102 ; // work happens in dispatc_one 00103 } 00104 } 00105 else 00106 { 00107 dispatch (guard); 00108 } 00109 } 00110 } // namespace 00111 00112 TAO_END_VERSIONED_NAMESPACE_DECL