#include <SequencePushConsumer.h>
Inheritance diagram for TAO_Notify_SequencePushConsumer:
Public Member Functions | |
TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier *proxy) | |
virtual | ~TAO_Notify_SequencePushConsumer () |
void | init (CosNotifyComm::SequencePushConsumer_ptr push_consumer) |
Init the Consumer. | |
virtual bool | enqueue_if_necessary (TAO_Notify_Method_Request_Event *request) |
virtual bool | dispatch_from_queue (Request_Queue &requests, ACE_Guard< TAO_SYNCH_MUTEX > &ace_mon) |
Attempt to dispatch event from a queue. | |
virtual void | push (const CORBA::Any &event) |
Push to this consumer. | |
virtual void | push (const CosNotification::StructuredEvent &event) |
Push to this consumer. | |
virtual void | push (const CosNotification::EventBatch &event) |
Push a batch of events to this consumer. | |
virtual ACE_CString | get_ior (void) const |
Retrieve the ior of this peer. | |
virtual void | reconnect_from_consumer (TAO_Notify_Consumer *old_consumer) |
Protected Attributes | |
CosNotifyComm::SequencePushConsumer_var | push_consumer_ |
The Consumer. | |
Private Member Functions | |
virtual void | release (void) |
TAO_Notify_Destroy_Callback methods. |
|
Definition at line 25 of file SequencePushConsumer.cpp.
00026 : TAO_Notify_Consumer (proxy) 00027 { 00028 } |
|
Definition at line 30 of file SequencePushConsumer.cpp.
00031 { 00032 } |
|
Attempt to dispatch event from a queue. Called by dispatch_pending. Deliver one or more events to the Consumer. If delivery fails, events are left in the queue (or discarded depending on QoS parameters.) Undelivered, undiscarded requests are left at the front of the queue. Overridden in sequence consumer to dispatch as an EventBatch.
Reimplemented from TAO_Notify_Consumer. Definition at line 57 of file SequencePushConsumer.cpp. References ACE_ASSERT, ACE_CATCHANY, ACE_DEBUG, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify_Method_Request_Event::complete(), TAO_Notify_Event::convert(), DEBUG_LEVEL, TAO_Notify_ProxySupplier::destroy(), TAO_Notify_Consumer::dispatch_batch(), TAO_Notify_Method_Request_Event::event(), CosNotification::EventBatch, TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, TAO_Notify_Consumer::proxy_supplier(), ACE_Message_Block::release(), ACE_Guard< ACE_LOCK >::release(), TAO_Notify_Method_Request_Event::sequence(), TAO_Notify_Method_Request_Event::should_retry(), and TAO_Notify_PropertyBase_T< TYPE >::value().
00058 { 00059 bool result = true; 00060 if (DEBUG_LEVEL > 0) 00061 { 00062 ACE_DEBUG ( (LM_DEBUG, 00063 ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"), 00064 requests.size ())); 00065 } 00066 00067 long queue_size = requests.size (); 00068 CORBA::Long max_batch_size = queue_size; 00069 if (this->max_batch_size_.is_valid () ) 00070 { 00071 max_batch_size = this->max_batch_size_.value (); 00072 } 00073 CORBA::Long batch_size = queue_size; 00074 if (batch_size > max_batch_size) 00075 { 00076 batch_size = max_batch_size; 00077 } 00078 if (batch_size > 0) 00079 { 00080 CosNotification::EventBatch batch (batch_size); 00081 batch.length (batch_size); 00082 00083 Request_Queue completed; 00084 00085 CORBA::Long pos = 0; 00086 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00087 while (pos < batch_size && requests.dequeue_head (request) == 0) 00088 { 00089 if (DEBUG_LEVEL > 0) 00090 { 00091 ACE_DEBUG ( (LM_DEBUG, 00092 ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"), 00093 request)); 00094 } 00095 00096 const TAO_Notify_Event * ev = request->event (); 00097 ev->convert (batch [pos]); 00098 ++pos; 00099 00100 // note enqueue at head, use queue as stack. 00101 completed.enqueue_head (request); 00102 } 00103 batch.length (pos); 00104 ACE_ASSERT (pos > 0); 00105 00106 ace_mon.release (); 00107 TAO_Notify_Consumer::DispatchStatus status = 00108 this->dispatch_batch (batch); 00109 ace_mon.acquire (); 00110 switch (status) 00111 { 00112 case DISPATCH_SUCCESS: 00113 { 00114 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00115 while (completed.dequeue_head (request) == 0) 00116 { 00117 request->complete (); 00118 request->release (); 00119 } 00120 result = true; 00121 break; 00122 } 00123 case DISPATCH_FAIL: 00124 { 00125 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00126 while (completed.dequeue_head (request) == 0) 00127 { 00128 if (request->should_retry ()) 00129 { 00130 if (DEBUG_LEVEL > 0) 00131 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00132 static_cast <int> (this->proxy ()->id ()), 00133 request->sequence ())); 00134 requests.enqueue_head (request); 00135 result = false; 00136 } 00137 else 00138 { 00139 if (DEBUG_LEVEL > 0) 00140 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), 00141 static_cast<int> (this->proxy ()->id ()), 00142 request->sequence ())); 00143 request->complete (); 00144 request->release (); 00145 } 00146 } 00147 while (requests.dequeue_head (request) == 0) 00148 { 00149 if (request->should_retry ()) 00150 { 00151 if (DEBUG_LEVEL > 0) 00152 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00153 static_cast<int> (this->proxy ()->id ()), 00154 request->sequence ())); 00155 requests.enqueue_head (request); 00156 result = false; 00157 } 00158 else 00159 { 00160 if (DEBUG_LEVEL > 0) 00161 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), 00162 static_cast<int> (this->proxy ()->id ()), 00163 request->sequence ())); 00164 request->complete (); 00165 request->release (); 00166 } 00167 } 00168 ace_mon.release(); 00169 ACE_DECLARE_NEW_ENV; 00170 ACE_TRY 00171 { 00172 this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); 00173 ACE_TRY_CHECK; 00174 } 00175 ACE_CATCHANY 00176 { 00177 // todo is there something meaningful we can do here? 00178 ; 00179 } 00180 ACE_ENDTRY; 00181 ace_mon.acquire(); 00182 break; 00183 } 00184 case DISPATCH_RETRY: 00185 case DISPATCH_DISCARD: 00186 { 00187 TAO_Notify_Method_Request_Event_Queueable * request = 0; 00188 while (completed.dequeue_head (request) == 0) 00189 { 00190 if (request->should_retry ()) 00191 { 00192 if (DEBUG_LEVEL > 0) 00193 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"), 00194 static_cast<int> (this->proxy ()->id ()), 00195 request->sequence ())); 00196 requests.enqueue_head (request); 00197 result = false; 00198 } 00199 else 00200 { 00201 if (DEBUG_LEVEL > 0) 00202 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"), 00203 static_cast<int> (this->proxy ()->id ()), 00204 request->sequence ())); 00205 request->complete (); 00206 request->release (); 00207 } 00208 } 00209 break; 00210 } 00211 default: 00212 { 00213 result = false; 00214 break; 00215 } 00216 } 00217 } 00218 return result; 00219 } |
|
Add request to a queue if necessary. for Sequence it's always necessary. Reimplemented from TAO_Notify_Consumer. Definition at line 222 of file SequencePushConsumer.cpp. References ACE_CHECK_RETURN, ACE_DEBUG, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, DEBUG_LEVEL, TAO_Notify_Consumer::dispatch_pending(), TAO_Notify_Consumer::enqueue_request(), TAO_Notify_PropertyBase_T< TYPE >::is_valid(), LM_DEBUG, TAO_Notify_Consumer::pending_events(), TAO_Notify_Consumer::schedule_timer(), and TAO_Notify_PropertyBase_T< TYPE >::value().
00225 { 00226 if (DEBUG_LEVEL > 0) 00227 ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n")); 00228 this->enqueue_request (request ACE_ENV_ARG_PARAMETER); 00229 ACE_CHECK_RETURN (false); 00230 00231 size_t mbs = static_cast<size_t>(this->max_batch_size_.value()); 00232 00233 if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0) 00234 { 00235 this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); 00236 ACE_CHECK_RETURN (false); 00237 } 00238 else 00239 { 00240 schedule_timer (false); 00241 } 00242 return true; 00243 } |
|
Retrieve the ior of this peer.
Implements TAO_Notify_Peer. Definition at line 267 of file SequencePushConsumer.cpp. References ACE_CATCHANY, ACE_CString, ACE_DECLARE_NEW_CORBA_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_TRY, ACE_TRY_CHECK, and TAO_Singleton< TYPE, ACE_LOCK >::instance().
00268 { 00269 ACE_CString result; 00270 CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb (); 00271 ACE_DECLARE_NEW_CORBA_ENV; 00272 ACE_TRY 00273 { 00274 CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in () ACE_ENV_ARG_PARAMETER); 00275 ACE_TRY_CHECK; 00276 result = static_cast<const char*> (ior.in ()); 00277 } 00278 ACE_CATCHANY 00279 { 00280 result.fast_clear(); 00281 } 00282 ACE_ENDTRY; 00283 return result; 00284 } |
|
Init the Consumer.
Definition at line 35 of file SequencePushConsumer.cpp. References ACE_ASSERT, ACE_THROW, and CORBA::is_nil(). Referenced by TAO_Notify_SequenceProxyPushSupplier::connect_sequence_push_consumer(), and reconnect_from_consumer().
00036 { 00037 // Initialize only once 00038 ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) ); 00039 00040 if (CORBA::is_nil (push_consumer)) 00041 { 00042 ACE_THROW (CORBA::BAD_PARAM()); 00043 } 00044 00045 this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer); 00046 this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); 00047 } |
|
Push a batch of events to this consumer.
Implements TAO_Notify_Consumer. Definition at line 260 of file SequencePushConsumer.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, and CosNotification::EventBatch.
00261 { 00262 this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER); 00263 ACE_CHECK; 00264 } |
|
Push to this consumer.
Implements TAO_Notify_Consumer. Definition at line 253 of file SequencePushConsumer.cpp.
00254 {
00255 //NOP
00256 }
|
|
Push to this consumer.
Implements TAO_Notify_Consumer. Definition at line 247 of file SequencePushConsumer.cpp.
00248 {
00249 //NOP
00250 }
|
|
on reconnect we need to move events from the old consumer to the new one Implements TAO_Notify_Consumer. Definition at line 287 of file SequencePushConsumer.cpp. References ACE_ASSERT, ACE_CHECK, ACE_ENV_ARG_PARAMETER, init(), push_consumer_, and TAO_Notify_Consumer::schedule_timer().
00289 { 00290 TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer); 00291 ACE_ASSERT(tmp != 0); 00292 this->init(tmp->push_consumer_.in() ACE_ENV_ARG_PARAMETER); 00293 ACE_CHECK; 00294 this->schedule_timer(false); 00295 } |
|
TAO_Notify_Destroy_Callback methods.
Implements TAO_Notify_Peer. Definition at line 50 of file SequencePushConsumer.cpp.
00051 { 00052 delete this; 00053 //@@ inform factory 00054 } |
|
The Consumer.
Definition at line 84 of file SequencePushConsumer.h. Referenced by reconnect_from_consumer(). |