#include <SequencePushConsumer.h>
Inheritance diagram for TAO_Notify_SequencePushConsumer:
Public Member Functions | |
TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier *proxy) | |
virtual | ~TAO_Notify_SequencePushConsumer () |
void | init (CosNotifyComm::SequencePushConsumer_ptr push_consumer) |
Init the Consumer. | |
virtual bool | enqueue_if_necessary (TAO_Notify_Method_Request_Event *request) |
virtual bool | dispatch_from_queue (Request_Queue &requests, ACE_Guard< TAO_SYNCH_MUTEX > &ace_mon) |
virtual void | push (const CORBA::Any &event) |
Push <event> to this consumer. | |
virtual void | push (const CosNotification::StructuredEvent &event) |
Push event to this consumer. | |
virtual void | push (const CosNotification::EventBatch &event) |
Push a batch of events to this consumer. | |
virtual ACE_CString | get_ior (void) const |
Retrieve the ior of this peer. | |
virtual void | reconnect_from_consumer (TAO_Notify_Consumer *old_consumer) |
Protected Attributes | |
CosNotifyComm::SequencePushConsumer_var | push_consumer_ |
The Consumer. | |
Private Member Functions | |
virtual void | release (void) |
TAO_Notify_Destroy_Callback methods. |
Definition at line 42 of file SequencePushConsumer.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer | ( | TAO_Notify_ProxySupplier * | proxy | ) |
Definition at line 28 of file SequencePushConsumer.cpp.
00029 : TAO_Notify_Consumer (proxy) 00030 { 00031 }
TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer | ( | ) | [virtual] |
bool TAO_Notify_SequencePushConsumer::dispatch_from_queue | ( | Request_Queue & | requests, | |
ACE_Guard< TAO_SYNCH_MUTEX > & | ace_mon | |||
) | [virtual] |
Definition at line 100 of file SequencePushConsumer.cpp.
References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify_Method_Request_Event::complete(), TAO_Notify_Event::convert(), DEBUG_LEVEL, TAO_Notify_ProxySupplier::destroy(), TAO_Notify_Consumer::dispatch_batch(), TAO_Notify_Consumer::DISPATCH_DISCARD, TAO_Notify_Consumer::DISPATCH_FAIL, TAO_Notify_Consumer::DISPATCH_RETRY, TAO_Notify_Consumer::DISPATCH_SUCCESS, TAO_Notify_Method_Request_Event::event(), LM_DEBUG, TAO_Notify_Consumer::proxy_supplier(), ACE_Message_Block::release(), ACE_Guard< ACE_LOCK >::release(), TAO_Notify_Method_Request_Event::sequence(), and TAO_Notify_Method_Request_Event::should_retry().
00101 { 00102 bool result = true; 00103 if (DEBUG_LEVEL > 0) 00104 { 00105 ACE_DEBUG ( (LM_DEBUG, 00106 ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"), 00107 requests.size ())); 00108 } 00109 00110 CORBA::ULong queue_size = ACE_Utils::truncate_cast<CORBA::ULong> (requests.size ()); 00111 CORBA::Long max_batch_size = queue_size; 00112 if (this->max_batch_size_.is_valid () ) 00113 { 00114 max_batch_size = this->max_batch_size_.value (); 00115 } 00116 CORBA::Long batch_size = queue_size; 00117 if (batch_size > max_batch_size) 00118 { 00119 batch_size = max_batch_size; 00120 } 00121 if (batch_size > 0) 00122 { 00123 CosNotification::EventBatch batch (batch_size); 00124 batch.length (batch_size); 00125 00126 Request_Queue completed; 00127 00128 CORBA::Long pos = 0; 00129 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00130 while (pos < batch_size && requests.dequeue_head (request) == 0) 00131 { 00132 if (DEBUG_LEVEL > 0) 00133 { 00134 ACE_DEBUG ( (LM_DEBUG, 00135 ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"), 00136 request)); 00137 } 00138 00139 const TAO_Notify_Event * ev = request->event (); 00140 ev->convert (batch [pos]); 00141 ++pos; 00142 00143 // note enqueue at head, use queue as stack. 00144 completed.enqueue_head (request); 00145 } 00146 batch.length (pos); 00147 ACE_ASSERT (pos > 0); 00148 00149 ace_mon.release (); 00150 TAO_Notify_Consumer::DispatchStatus status = 00151 this->dispatch_batch (batch); 00152 ace_mon.acquire (); 00153 switch (status) 00154 { 00155 case DISPATCH_SUCCESS: 00156 { 00157 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00158 while (completed.dequeue_head (request) == 0) 00159 { 00160 request->complete (); 00161 request->release (); 00162 } 00163 result = true; 00164 break; 00165 } 00166 case DISPATCH_FAIL: 00167 { 00168 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00169 while (completed.dequeue_head (request) == 0) 00170 { 00171 if (request->should_retry ()) 00172 { 00173 if (DEBUG_LEVEL > 0) 00174 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00175 static_cast <int> (this->proxy ()->id ()), 00176 request->sequence ())); 00177 requests.enqueue_head (request); 00178 result = false; 00179 } 00180 else 00181 { 00182 if (DEBUG_LEVEL > 0) 00183 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), 00184 static_cast<int> (this->proxy ()->id ()), 00185 request->sequence ())); 00186 request->complete (); 00187 request->release (); 00188 } 00189 } 00190 while (requests.dequeue_head (request) == 0) 00191 { 00192 if (request->should_retry ()) 00193 { 00194 if (DEBUG_LEVEL > 0) 00195 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00196 static_cast<int> (this->proxy ()->id ()), 00197 request->sequence ())); 00198 requests.enqueue_head (request); 00199 result = false; 00200 } 00201 else 00202 { 00203 if (DEBUG_LEVEL > 0) 00204 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), 00205 static_cast<int> (this->proxy ()->id ()), 00206 request->sequence ())); 00207 request->complete (); 00208 request->release (); 00209 } 00210 } 00211 ace_mon.release(); 00212 try 00213 { 00214 this->proxy_supplier ()->destroy (); 00215 } 00216 catch (const CORBA::Exception&) 00217 { 00218 // todo is there something meaningful we can do here? 00219 ; 00220 } 00221 ace_mon.acquire(); 00222 break; 00223 } 00224 case DISPATCH_RETRY: 00225 case DISPATCH_DISCARD: 00226 { 00227 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00228 while (completed.dequeue_head (request) == 0) 00229 { 00230 if (request->should_retry ()) 00231 { 00232 if (DEBUG_LEVEL > 0) 00233 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00234 static_cast<int> (this->proxy ()->id ()), 00235 request->sequence ())); 00236 requests.enqueue_head (request); 00237 result = false; 00238 } 00239 else 00240 { 00241 if (DEBUG_LEVEL > 0) 00242 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), 00243 static_cast<int> (this->proxy ()->id ()), 00244 request->sequence ())); 00245 request->complete (); 00246 request->release (); 00247 } 00248 } 00249 break; 00250 } 00251 default: 00252 { 00253 result = false; 00254 break; 00255 } 00256 } 00257 } 00258 return result; 00259 }
bool TAO_Notify_SequencePushConsumer::enqueue_if_necessary | ( | TAO_Notify_Method_Request_Event * | request | ) | [virtual] |
Add request to a queue if necessary. for Sequence it's always necessary.
Reimplemented from TAO_Notify_Consumer.
Definition at line 262 of file SequencePushConsumer.cpp.
References ACE_DEBUG, DEBUG_LEVEL, TAO_Notify_Consumer::enqueue_request(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, TAO_Notify_Consumer::max_batch_size_, TAO_Notify_Consumer::pacing_, TAO_Notify_Consumer::schedule_timer(), and TAO_Notify_PropertyBase_T< TYPE >::value().
00264 { 00265 if (DEBUG_LEVEL > 0) 00266 ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n")); 00267 this->enqueue_request (request); 00268 00269 size_t mbs = static_cast<size_t>(this->max_batch_size_.value()); 00270 00271 if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0) 00272 { 00273 this->dispatch_pending (); 00274 } 00275 else 00276 { 00277 schedule_timer (false); 00278 } 00279 return true; 00280 }
ACE_CString TAO_Notify_SequencePushConsumer::get_ior | ( | void | ) | const [virtual] |
Retrieve the ior of this peer.
Implements TAO_Notify_Peer.
Definition at line 310 of file SequencePushConsumer.cpp.
References ACE_String_Base< CHAR >::fast_clear(), TAO_Notify_Properties::instance(), and TAO_Notify_Properties::orb().
00311 { 00312 ACE_CString result; 00313 CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb (); 00314 try 00315 { 00316 CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in ()); 00317 result = static_cast<const char*> (ior.in ()); 00318 } 00319 catch (const CORBA::Exception&) 00320 { 00321 result.fast_clear(); 00322 } 00323 return result; 00324 }
void TAO_Notify_SequencePushConsumer::init | ( | CosNotifyComm::SequencePushConsumer_ptr | push_consumer | ) |
Init the Consumer.
Definition at line 38 of file SequencePushConsumer.cpp.
References ACE_ASSERT, ACE_DEBUG, TAO_Notify_Properties::dispatching_orb(), TAO_Pseudo_Var_T< T >::in(), TAO_Notify_Properties::instance(), CORBA::is_nil(), LM_DEBUG, CORBA::ORB::object_to_string(), TAO_Notify_Properties::orb(), TAO_Notify_Consumer::publish_, push_consumer_, CORBA::ORB::string_to_object(), and TAO_debug_level.
Referenced by reconnect_from_consumer().
00039 { 00040 // Initialize only once 00041 ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) ); 00042 00043 if (CORBA::is_nil (push_consumer)) 00044 { 00045 throw CORBA::BAD_PARAM(); 00046 } 00047 00048 if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ()) 00049 { 00050 this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer); 00051 this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); 00052 } 00053 else 00054 { 00055 // "Port" consumer's object reference from receiving ORB to dispatching ORB. 00056 CORBA::String_var temp = 00057 TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer); 00058 00059 CORBA::Object_var obj = 00060 TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in()); 00061 00062 try 00063 { 00064 CosNotifyComm::SequencePushConsumer_var new_push_consumer = 00065 CosNotifyComm::SequencePushConsumer::_unchecked_narrow(obj.in()); 00066 00067 this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (new_push_consumer.in()); 00068 this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer.in()); 00069 00070 //--cj verify dispatching ORB 00071 if (TAO_debug_level >= 10) 00072 { 00073 ACE_DEBUG ((LM_DEBUG, 00074 "(%P|%t) Sequence push init dispatching ORB id is %s.\n", 00075 obj->_stubobj()->orb_core()->orbid())); 00076 } 00077 //--cj end 00078 } 00079 catch (const CORBA::TRANSIENT& ex) 00080 { 00081 ex._tao_print_exception ( 00082 "Got a TRANSIENT in NS_SequencePushConsumer::init"); 00083 ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_SequencePushConsumer %@\n", this)); 00084 } 00085 catch (const CORBA::Exception&) 00086 { 00087 // _narrow failed 00088 } 00089 } 00090 }
void TAO_Notify_SequencePushConsumer::push | ( | const CosNotification::EventBatch & | event | ) | [virtual] |
Push a batch of events to this consumer.
Implements TAO_Notify_Consumer.
Definition at line 297 of file SequencePushConsumer.cpp.
References ACE_DEBUG, LM_DEBUG, push_consumer_, and TAO_debug_level.
00298 { 00299 //--cj verify dispatching ORB 00300 if (TAO_debug_level >= 10) { 00301 ACE_DEBUG ((LM_DEBUG, "(%P|%t) Sequence push dispatching ORB id is %s.\n", 00302 this->push_consumer_->_stubobj()->orb_core()->orbid())); 00303 } 00304 //--cj end 00305 00306 this->push_consumer_->push_structured_events (event_batch); 00307 }
void TAO_Notify_SequencePushConsumer::push | ( | const CosNotification::StructuredEvent & | event | ) | [virtual] |
Push event to this consumer.
Implements TAO_Notify_Consumer.
Definition at line 290 of file SequencePushConsumer.cpp.
void TAO_Notify_SequencePushConsumer::push | ( | const CORBA::Any & | event | ) | [virtual] |
Push <event> to this consumer.
Implements TAO_Notify_Consumer.
Definition at line 284 of file SequencePushConsumer.cpp.
void TAO_Notify_SequencePushConsumer::reconnect_from_consumer | ( | TAO_Notify_Consumer * | old_consumer | ) | [virtual] |
on reconnect we need to move events from the old consumer to the new one
Implements TAO_Notify_Consumer.
Definition at line 327 of file SequencePushConsumer.cpp.
References ACE_ASSERT, init(), push_consumer_, and TAO_Notify_Consumer::schedule_timer().
00328 { 00329 TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer); 00330 ACE_ASSERT(tmp != 0); 00331 this->init(tmp->push_consumer_.in()); 00332 this->schedule_timer(false); 00333 }
void TAO_Notify_SequencePushConsumer::release | ( | void | ) | [private, virtual] |
TAO_Notify_Destroy_Callback methods.
Implements TAO_Notify_Peer.
Definition at line 93 of file SequencePushConsumer.cpp.
CosNotifyComm::SequencePushConsumer_var TAO_Notify_SequencePushConsumer::push_consumer_ [protected] |
The Consumer.
Definition at line 82 of file SequencePushConsumer.h.
Referenced by init(), push(), and reconnect_from_consumer().