#include <SequencePushConsumer.h>
Inheritance diagram for TAO_Notify_SequencePushConsumer:
Public Member Functions | |
TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier *proxy) | |
virtual | ~TAO_Notify_SequencePushConsumer () |
void | init (CosNotifyComm::SequencePushConsumer_ptr push_consumer) |
Init the Consumer. | |
virtual bool | enqueue_if_necessary (TAO_Notify_Method_Request_Event *request) |
virtual bool | dispatch_from_queue (Request_Queue &requests, ACE_Guard< TAO_SYNCH_MUTEX > &ace_mon) |
Attempt to dispatch event from a queue. | |
virtual void | push (const CORBA::Any &event) |
Push to this consumer. | |
virtual void | push (const CosNotification::StructuredEvent &event) |
Push to this consumer. | |
virtual void | push (const CosNotification::EventBatch &event) |
Push a batch of events to this consumer. | |
virtual ACE_CString | get_ior (void) const |
Retrieve the ior of this peer. | |
virtual void | reconnect_from_consumer (TAO_Notify_Consumer *old_consumer) |
Protected Attributes | |
CosNotifyComm::SequencePushConsumer_var | push_consumer_ |
The Consumer. | |
Private Member Functions | |
virtual void | release (void) |
TAO_Notify_Destroy_Callback methods. |
|
Definition at line 28 of file SequencePushConsumer.cpp.
00029 : TAO_Notify_Consumer (proxy) 00030 { 00031 } |
|
Definition at line 33 of file SequencePushConsumer.cpp.
00034 { 00035 } |
|
Attempt to dispatch event from a queue. Called by dispatch_pending. Deliver one or more events to the Consumer. If delivery fails, events are left in the queue (or discarded depending on QoS parameters.) Undelivered, undiscarded requests are left at the front of the queue. Overridden in sequence consumer to dispatch as an EventBatch.
Reimplemented from TAO_Notify_Consumer. Definition at line 100 of file SequencePushConsumer.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify_Method_Request_Event::complete(), TAO_Notify_Event::convert(), DEBUG_LEVEL, TAO_Notify_ProxySupplier::destroy(), TAO_Notify_Consumer::dispatch_batch(), TAO_Notify_Method_Request_Event::event(), CosNotification::EventBatch, TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, TAO_Notify_Consumer::proxy_supplier(), ACE_Message_Block::release(), ACE_Guard< ACE_LOCK >::release(), TAO_Notify_Method_Request_Event::sequence(), TAO_Notify_Method_Request_Event::should_retry(), and TAO_Notify_PropertyBase_T< TYPE >::value().
00101 { 00102 bool result = true; 00103 if (DEBUG_LEVEL > 0) 00104 { 00105 ACE_DEBUG ( (LM_DEBUG, 00106 ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"), 00107 requests.size ())); 00108 } 00109 00110 CORBA::ULong queue_size = ACE_Utils::truncate_cast<CORBA::ULong> (requests.size ()); 00111 CORBA::Long max_batch_size = queue_size; 00112 if (this->max_batch_size_.is_valid () ) 00113 { 00114 max_batch_size = this->max_batch_size_.value (); 00115 } 00116 CORBA::Long batch_size = queue_size; 00117 if (batch_size > max_batch_size) 00118 { 00119 batch_size = max_batch_size; 00120 } 00121 if (batch_size > 0) 00122 { 00123 CosNotification::EventBatch batch (batch_size); 00124 batch.length (batch_size); 00125 00126 Request_Queue completed; 00127 00128 CORBA::Long pos = 0; 00129 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00130 while (pos < batch_size && requests.dequeue_head (request) == 0) 00131 { 00132 if (DEBUG_LEVEL > 0) 00133 { 00134 ACE_DEBUG ( (LM_DEBUG, 00135 ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"), 00136 request)); 00137 } 00138 00139 const TAO_Notify_Event * ev = request->event (); 00140 ev->convert (batch [pos]); 00141 ++pos; 00142 00143 // note enqueue at head, use queue as stack. 00144 completed.enqueue_head (request); 00145 } 00146 batch.length (pos); 00147 ACE_ASSERT (pos > 0); 00148 00149 ace_mon.release (); 00150 TAO_Notify_Consumer::DispatchStatus status = 00151 this->dispatch_batch (batch); 00152 ace_mon.acquire (); 00153 switch (status) 00154 { 00155 case DISPATCH_SUCCESS: 00156 { 00157 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00158 while (completed.dequeue_head (request) == 0) 00159 { 00160 request->complete (); 00161 request->release (); 00162 } 00163 result = true; 00164 break; 00165 } 00166 case DISPATCH_FAIL: 00167 { 00168 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00169 while (completed.dequeue_head (request) == 0) 00170 { 00171 if (request->should_retry ()) 00172 { 00173 if (DEBUG_LEVEL > 0) 00174 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00175 static_cast <int> (this->proxy ()->id ()), 00176 request->sequence ())); 00177 requests.enqueue_head (request); 00178 result = false; 00179 } 00180 else 00181 { 00182 if (DEBUG_LEVEL > 0) 00183 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), 00184 static_cast<int> (this->proxy ()->id ()), 00185 request->sequence ())); 00186 request->complete (); 00187 request->release (); 00188 } 00189 } 00190 while (requests.dequeue_head (request) == 0) 00191 { 00192 if (request->should_retry ()) 00193 { 00194 if (DEBUG_LEVEL > 0) 00195 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00196 static_cast<int> (this->proxy ()->id ()), 00197 request->sequence ())); 00198 requests.enqueue_head (request); 00199 result = false; 00200 } 00201 else 00202 { 00203 if (DEBUG_LEVEL > 0) 00204 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), 00205 static_cast<int> (this->proxy ()->id ()), 00206 request->sequence ())); 00207 request->complete (); 00208 request->release (); 00209 } 00210 } 00211 ace_mon.release(); 00212 try 00213 { 00214 this->proxy_supplier ()->destroy (); 00215 } 00216 catch (const CORBA::Exception&) 00217 { 00218 // todo is there something meaningful we can do here? 00219 ; 00220 } 00221 ace_mon.acquire(); 00222 break; 00223 } 00224 case DISPATCH_RETRY: 00225 case DISPATCH_DISCARD: 00226 { 00227 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00228 while (completed.dequeue_head (request) == 0) 00229 { 00230 if (request->should_retry ()) 00231 { 00232 if (DEBUG_LEVEL > 0) 00233 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00234 static_cast<int> (this->proxy ()->id ()), 00235 request->sequence ())); 00236 requests.enqueue_head (request); 00237 result = false; 00238 } 00239 else 00240 { 00241 if (DEBUG_LEVEL > 0) 00242 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), 00243 static_cast<int> (this->proxy ()->id ()), 00244 request->sequence ())); 00245 request->complete (); 00246 request->release (); 00247 } 00248 } 00249 break; 00250 } 00251 default: 00252 { 00253 result = false; 00254 break; 00255 } 00256 } 00257 } 00258 return result; 00259 } |
|
Add request to a queue if necessary. for Sequence it's always necessary. Reimplemented from TAO_Notify_Consumer. Definition at line 262 of file SequencePushConsumer.cpp. References ACE_DEBUG, DEBUG_LEVEL, TAO_Notify_Consumer::dispatch_pending(), TAO_Notify_Consumer::enqueue_request(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, TAO_Notify_Consumer::pending_events(), TAO_Notify_Consumer::schedule_timer(), and TAO_Notify_PropertyBase_T< TYPE >::value().
00264 { 00265 if (DEBUG_LEVEL > 0) 00266 ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n")); 00267 this->enqueue_request (request); 00268 00269 size_t mbs = static_cast<size_t>(this->max_batch_size_.value()); 00270 00271 if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0) 00272 { 00273 this->dispatch_pending (); 00274 } 00275 else 00276 { 00277 schedule_timer (false); 00278 } 00279 return true; 00280 } |
|
Retrieve the ior of this peer.
Implements TAO_Notify_Peer. Definition at line 310 of file SequencePushConsumer.cpp. References ACE_CString, TAO_Notify_Properties::instance(), and TAO_Notify_Properties::orb().
00311 { 00312 ACE_CString result; 00313 CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb (); 00314 try 00315 { 00316 CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in ()); 00317 result = static_cast<const char*> (ior.in ()); 00318 } 00319 catch (const CORBA::Exception&) 00320 { 00321 result.fast_clear(); 00322 } 00323 return result; 00324 } |
|
Init the Consumer.
Definition at line 38 of file SequencePushConsumer.cpp. References ACE_ASSERT, ACE_DEBUG, TAO_Notify_Properties::dispatching_orb(), TAO_Notify_Properties::instance(), CORBA::is_nil(), LM_DEBUG, TAO_Notify_Properties::orb(), TAO_Notify_Properties::separate_dispatching_orb(), and TAO_debug_level. Referenced by TAO_Notify_SequenceProxyPushSupplier::connect_sequence_push_consumer(), and reconnect_from_consumer().
00039 { 00040 // Initialize only once 00041 ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) ); 00042 00043 if (CORBA::is_nil (push_consumer)) 00044 { 00045 throw CORBA::BAD_PARAM(); 00046 } 00047 00048 if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ()) 00049 { 00050 this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer); 00051 this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); 00052 } 00053 else 00054 { 00055 // "Port" consumer's object reference from receiving ORB to dispatching ORB. 00056 CORBA::String_var temp = 00057 TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer); 00058 00059 CORBA::Object_var obj = 00060 TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in()); 00061 00062 try 00063 { 00064 CosNotifyComm::SequencePushConsumer_var new_push_consumer = 00065 CosNotifyComm::SequencePushConsumer::_unchecked_narrow(obj.in()); 00066 00067 this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (new_push_consumer.in()); 00068 this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer.in()); 00069 00070 //--cj verify dispatching ORB 00071 if (TAO_debug_level >= 10) 00072 { 00073 ACE_DEBUG ((LM_DEBUG, 00074 "(%P|%t) Sequence push init dispatching ORB id is %s.\n", 00075 obj->_stubobj()->orb_core()->orbid())); 00076 } 00077 //--cj end 00078 } 00079 catch (const CORBA::TRANSIENT& ex) 00080 { 00081 ex._tao_print_exception ( 00082 "Got a TRANSIENT in NS_SequencePushConsumer::init"); 00083 ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_SequencePushConsumer %@\n", this)); 00084 } 00085 catch (const CORBA::Exception&) 00086 { 00087 // _narrow failed 00088 } 00089 } 00090 } |
|
Push a batch of events to this consumer.
Implements TAO_Notify_Consumer. Definition at line 297 of file SequencePushConsumer.cpp. References ACE_DEBUG, CosNotification::EventBatch, LM_DEBUG, and TAO_debug_level.
00298 { 00299 //--cj verify dispatching ORB 00300 if (TAO_debug_level >= 10) { 00301 ACE_DEBUG ((LM_DEBUG, "(%P|%t) Sequence push dispatching ORB id is %s.\n", 00302 this->push_consumer_->_stubobj()->orb_core()->orbid())); 00303 } 00304 //--cj end 00305 00306 this->push_consumer_->push_structured_events (event_batch); 00307 } |
|
Push to this consumer.
Implements TAO_Notify_Consumer. Definition at line 290 of file SequencePushConsumer.cpp.
00291 {
00292 //NOP
00293 }
|
|
Push to this consumer.
Implements TAO_Notify_Consumer. Definition at line 284 of file SequencePushConsumer.cpp.
00285 {
00286 //NOP
00287 }
|
|
on reconnect we need to move events from the old consumer to the new one Implements TAO_Notify_Consumer. Definition at line 327 of file SequencePushConsumer.cpp. References ACE_ASSERT, init(), push_consumer_, and TAO_Notify_Consumer::schedule_timer().
00328 { 00329 TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer); 00330 ACE_ASSERT(tmp != 0); 00331 this->init(tmp->push_consumer_.in()); 00332 this->schedule_timer(false); 00333 } |
|
TAO_Notify_Destroy_Callback methods.
Implements TAO_Notify_Peer. Definition at line 93 of file SequencePushConsumer.cpp.
00094 { 00095 delete this; 00096 //@@ inform factory 00097 } |
|
The Consumer.
Definition at line 82 of file SequencePushConsumer.h. Referenced by reconnect_from_consumer(). |