Go to the documentation of this file.00001
00002
00003 #include "orbsvcs/Notify/Sequence/SequencePushConsumer.h"
00004
00005 ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id: SequencePushConsumer.cpp 84685 2009-03-02 22:49:17Z mesnier_p $")
00006
00007 #include "ace/Truncate.h"
00008 #include "ace/Reactor.h"
00009 #include "tao/debug.h"
00010 #include "tao/Stub.h"
00011 #include "tao/ORB_Core.h"
00012 #include "orbsvcs/Notify/QoSProperties.h"
00013 #include "orbsvcs/Notify/ProxySupplier.h"
00014 #include "orbsvcs/Notify/Worker_Task.h"
00015 #include "orbsvcs/Notify/Consumer.h"
00016 #include "orbsvcs/Notify/Method_Request_Dispatch.h"
00017 #include "orbsvcs/Notify/Method_Request_Event.h"
00018 #include "orbsvcs/Notify/Timer.h"
00019 #include "orbsvcs/Notify/Proxy.h"
00020 #include "orbsvcs/Notify/Properties.h"
00021
00022 #ifndef DEBUG_LEVEL
00023 # define DEBUG_LEVEL TAO_debug_level
00024 #endif //DEBUG_LEVEL
00025
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy)
00029 : TAO_Notify_Consumer (proxy)
00030 {
00031 }
00032
00033 TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
00034 {
00035 }
00036
00037 void
00038 TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer)
00039 {
00040
00041 ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );
00042
00043 if (CORBA::is_nil (push_consumer))
00044 {
00045 throw CORBA::BAD_PARAM();
00046 }
00047
00048 if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ())
00049 {
00050 this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
00051 this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
00052 }
00053 else
00054 {
00055
00056 CORBA::String_var temp =
00057 TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer);
00058
00059 CORBA::Object_var obj =
00060 TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in());
00061
00062 try
00063 {
00064 CosNotifyComm::SequencePushConsumer_var new_push_consumer =
00065 CosNotifyComm::SequencePushConsumer::_unchecked_narrow(obj.in());
00066
00067 this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (new_push_consumer.in());
00068 this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer.in());
00069
00070
00071 if (TAO_debug_level >= 10)
00072 {
00073 ACE_DEBUG ((LM_DEBUG,
00074 "(%P|%t) Sequence push init dispatching ORB id is %s.\n",
00075 obj->_stubobj()->orb_core()->orbid()));
00076 }
00077
00078 }
00079 catch (const CORBA::TRANSIENT& ex)
00080 {
00081 ex._tao_print_exception (
00082 "Got a TRANSIENT in NS_SequencePushConsumer::init");
00083 ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_SequencePushConsumer %@\n", this));
00084 }
00085 catch (const CORBA::Exception&)
00086 {
00087
00088 }
00089 }
00090 }
00091
00092 void
00093 TAO_Notify_SequencePushConsumer::release (void)
00094 {
00095 delete this;
00096
00097 }
00098
00099 bool
00100 TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
00101 {
00102 bool result = true;
00103 if (DEBUG_LEVEL > 0)
00104 {
00105 ACE_DEBUG ( (LM_DEBUG,
00106 ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"),
00107 requests.size ()));
00108 }
00109
00110 CORBA::ULong queue_size = ACE_Utils::truncate_cast<CORBA::ULong> (requests.size ());
00111 CORBA::Long max_batch_size = queue_size;
00112 if (this->max_batch_size_.is_valid () )
00113 {
00114 max_batch_size = this->max_batch_size_.value ();
00115 }
00116 CORBA::Long batch_size = queue_size;
00117 if (batch_size > max_batch_size)
00118 {
00119 batch_size = max_batch_size;
00120 }
00121 if (batch_size > 0)
00122 {
00123 CosNotification::EventBatch batch (batch_size);
00124 batch.length (batch_size);
00125
00126 Request_Queue completed;
00127
00128 CORBA::Long pos = 0;
00129 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00130 while (pos < batch_size && requests.dequeue_head (request) == 0)
00131 {
00132 if (DEBUG_LEVEL > 0)
00133 {
00134 ACE_DEBUG ( (LM_DEBUG,
00135 ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
00136 request));
00137 }
00138
00139 const TAO_Notify_Event * ev = request->event ();
00140 ev->convert (batch [pos]);
00141 ++pos;
00142
00143
00144 completed.enqueue_head (request);
00145 }
00146 batch.length (pos);
00147 ACE_ASSERT (pos > 0);
00148
00149 ace_mon.release ();
00150 bool from_timeout = false;
00151 TAO_Notify_Consumer::DispatchStatus status =
00152 this->dispatch_batch (batch);
00153 ace_mon.acquire ();
00154 switch (status)
00155 {
00156 case DISPATCH_SUCCESS:
00157 {
00158 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00159 while (completed.dequeue_head (request) == 0)
00160 {
00161 request->complete ();
00162 request->release ();
00163 }
00164 result = true;
00165 break;
00166 }
00167 case DISPATCH_FAIL_TIMEOUT:
00168 from_timeout = true;
00169
00170 case DISPATCH_FAIL:
00171 {
00172 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00173 while (completed.dequeue_head (request) == 0)
00174 {
00175 if (request->should_retry ())
00176 {
00177 if (DEBUG_LEVEL > 0)
00178 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00179 static_cast <int> (this->proxy ()->id ()),
00180 request->sequence ()));
00181 requests.enqueue_head (request);
00182 result = false;
00183 }
00184 else
00185 {
00186 if (DEBUG_LEVEL > 0)
00187 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00188 static_cast<int> (this->proxy ()->id ()),
00189 request->sequence ()));
00190 request->complete ();
00191 request->release ();
00192 }
00193 }
00194 while (requests.dequeue_head (request) == 0)
00195 {
00196 if (request->should_retry ())
00197 {
00198 if (DEBUG_LEVEL > 0)
00199 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00200 static_cast<int> (this->proxy ()->id ()),
00201 request->sequence ()));
00202 requests.enqueue_head (request);
00203 result = false;
00204 }
00205 else
00206 {
00207 if (DEBUG_LEVEL > 0)
00208 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00209 static_cast<int> (this->proxy ()->id ()),
00210 request->sequence ()));
00211 request->complete ();
00212 request->release ();
00213 }
00214 }
00215 ace_mon.release();
00216 try
00217 {
00218 this->proxy_supplier ()->destroy (from_timeout);
00219 }
00220 catch (const CORBA::Exception&)
00221 {
00222
00223 ;
00224 }
00225 ace_mon.acquire();
00226 break;
00227 }
00228 case DISPATCH_RETRY:
00229 case DISPATCH_DISCARD:
00230 {
00231 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00232 while (completed.dequeue_head (request) == 0)
00233 {
00234 if (request->should_retry ())
00235 {
00236 if (DEBUG_LEVEL > 0)
00237 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00238 static_cast<int> (this->proxy ()->id ()),
00239 request->sequence ()));
00240 requests.enqueue_head (request);
00241 result = false;
00242 }
00243 else
00244 {
00245 if (DEBUG_LEVEL > 0)
00246 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00247 static_cast<int> (this->proxy ()->id ()),
00248 request->sequence ()));
00249 request->complete ();
00250 request->release ();
00251 }
00252 }
00253 break;
00254 }
00255 default:
00256 {
00257 result = false;
00258 break;
00259 }
00260 }
00261 }
00262 return result;
00263 }
00264
00265 bool
00266 TAO_Notify_SequencePushConsumer::enqueue_if_necessary (
00267 TAO_Notify_Method_Request_Event * request)
00268 {
00269 if (DEBUG_LEVEL > 0)
00270 ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n"));
00271 this->enqueue_request (request);
00272
00273 size_t mbs = static_cast<size_t>(this->max_batch_size_.value());
00274
00275 if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0)
00276 {
00277 this->dispatch_pending ();
00278 }
00279 else
00280 {
00281 schedule_timer (false);
00282 }
00283 return true;
00284 }
00285
00286
00287 void
00288 TAO_Notify_SequencePushConsumer::push (const CORBA::Any& )
00289 {
00290
00291 }
00292
00293 void
00294 TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& )
00295 {
00296
00297 }
00298
00299
00300 void
00301 TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch)
00302 {
00303
00304 if (TAO_debug_level >= 10) {
00305 ACE_DEBUG ((LM_DEBUG, "(%P|%t) Sequence push dispatching ORB id is %s.\n",
00306 this->push_consumer_->_stubobj()->orb_core()->orbid()));
00307 }
00308
00309 last_ping_ = ACE_OS::gettimeofday ();
00310 this->push_consumer_->push_structured_events (event_batch);
00311 }
00312
00313 ACE_CString
00314 TAO_Notify_SequencePushConsumer::get_ior (void) const
00315 {
00316 ACE_CString result;
00317 CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb ();
00318 try
00319 {
00320 CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in ());
00321 result = static_cast<const char*> (ior.in ());
00322 }
00323 catch (const CORBA::Exception&)
00324 {
00325 result.fast_clear();
00326 }
00327 return result;
00328 }
00329
00330 void
00331 TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer)
00332 {
00333 TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer);
00334 ACE_ASSERT(tmp != 0);
00335 this->init(tmp->push_consumer_.in());
00336 this->schedule_timer(false);
00337 }
00338
00339 CORBA::Object_ptr
00340 TAO_Notify_SequencePushConsumer::get_consumer (void)
00341 {
00342 return CosNotifyComm::SequencePushConsumer::_duplicate (this->push_consumer_.in ());
00343 }
00344
00345 TAO_END_VERSIONED_NAMESPACE_DECL