00001
00002
00003 #include "orbsvcs/Notify/Sequence/SequencePushConsumer.h"
00004
00005 ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id: SequencePushConsumer.cpp 79230 2007-08-06 18:18:07Z elliott_c $")
00006
00007 #include "ace/Truncate.h"
00008 #include "ace/Reactor.h"
00009 #include "tao/debug.h"
00010 #include "tao/Stub.h"
00011 #include "tao/ORB_Core.h"
00012 #include "orbsvcs/Notify/QoSProperties.h"
00013 #include "orbsvcs/Notify/ProxySupplier.h"
00014 #include "orbsvcs/Notify/Worker_Task.h"
00015 #include "orbsvcs/Notify/Consumer.h"
00016 #include "orbsvcs/Notify/Method_Request_Dispatch.h"
00017 #include "orbsvcs/Notify/Method_Request_Event.h"
00018 #include "orbsvcs/Notify/Timer.h"
00019 #include "orbsvcs/Notify/Proxy.h"
00020 #include "orbsvcs/Notify/Properties.h"
00021
00022 #ifndef DEBUG_LEVEL
00023 # define DEBUG_LEVEL TAO_debug_level
00024 #endif //DEBUG_LEVEL
00025
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy)
00029 : TAO_Notify_Consumer (proxy)
00030 {
00031 }
00032
00033 TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
00034 {
00035 }
00036
00037 void
00038 TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer)
00039 {
00040
00041 ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );
00042
00043 if (CORBA::is_nil (push_consumer))
00044 {
00045 throw CORBA::BAD_PARAM();
00046 }
00047
00048 if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ())
00049 {
00050 this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
00051 this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
00052 }
00053 else
00054 {
00055
00056 CORBA::String_var temp =
00057 TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer);
00058
00059 CORBA::Object_var obj =
00060 TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in());
00061
00062 try
00063 {
00064 CosNotifyComm::SequencePushConsumer_var new_push_consumer =
00065 CosNotifyComm::SequencePushConsumer::_unchecked_narrow(obj.in());
00066
00067 this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (new_push_consumer.in());
00068 this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer.in());
00069
00070
00071 if (TAO_debug_level >= 10)
00072 {
00073 ACE_DEBUG ((LM_DEBUG,
00074 "(%P|%t) Sequence push init dispatching ORB id is %s.\n",
00075 obj->_stubobj()->orb_core()->orbid()));
00076 }
00077
00078 }
00079 catch (const CORBA::TRANSIENT& ex)
00080 {
00081 ex._tao_print_exception (
00082 "Got a TRANSIENT in NS_SequencePushConsumer::init");
00083 ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_SequencePushConsumer %@\n", this));
00084 }
00085 catch (const CORBA::Exception&)
00086 {
00087
00088 }
00089 }
00090 }
00091
00092 void
00093 TAO_Notify_SequencePushConsumer::release (void)
00094 {
00095 delete this;
00096
00097 }
00098
00099 bool
00100 TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
00101 {
00102 bool result = true;
00103 if (DEBUG_LEVEL > 0)
00104 {
00105 ACE_DEBUG ( (LM_DEBUG,
00106 ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"),
00107 requests.size ()));
00108 }
00109
00110 CORBA::ULong queue_size = ACE_Utils::truncate_cast<CORBA::ULong> (requests.size ());
00111 CORBA::Long max_batch_size = queue_size;
00112 if (this->max_batch_size_.is_valid () )
00113 {
00114 max_batch_size = this->max_batch_size_.value ();
00115 }
00116 CORBA::Long batch_size = queue_size;
00117 if (batch_size > max_batch_size)
00118 {
00119 batch_size = max_batch_size;
00120 }
00121 if (batch_size > 0)
00122 {
00123 CosNotification::EventBatch batch (batch_size);
00124 batch.length (batch_size);
00125
00126 Request_Queue completed;
00127
00128 CORBA::Long pos = 0;
00129 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00130 while (pos < batch_size && requests.dequeue_head (request) == 0)
00131 {
00132 if (DEBUG_LEVEL > 0)
00133 {
00134 ACE_DEBUG ( (LM_DEBUG,
00135 ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
00136 request));
00137 }
00138
00139 const TAO_Notify_Event * ev = request->event ();
00140 ev->convert (batch [pos]);
00141 ++pos;
00142
00143
00144 completed.enqueue_head (request);
00145 }
00146 batch.length (pos);
00147 ACE_ASSERT (pos > 0);
00148
00149 ace_mon.release ();
00150 TAO_Notify_Consumer::DispatchStatus status =
00151 this->dispatch_batch (batch);
00152 ace_mon.acquire ();
00153 switch (status)
00154 {
00155 case DISPATCH_SUCCESS:
00156 {
00157 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00158 while (completed.dequeue_head (request) == 0)
00159 {
00160 request->complete ();
00161 request->release ();
00162 }
00163 result = true;
00164 break;
00165 }
00166 case DISPATCH_FAIL:
00167 {
00168 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00169 while (completed.dequeue_head (request) == 0)
00170 {
00171 if (request->should_retry ())
00172 {
00173 if (DEBUG_LEVEL > 0)
00174 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00175 static_cast <int> (this->proxy ()->id ()),
00176 request->sequence ()));
00177 requests.enqueue_head (request);
00178 result = false;
00179 }
00180 else
00181 {
00182 if (DEBUG_LEVEL > 0)
00183 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00184 static_cast<int> (this->proxy ()->id ()),
00185 request->sequence ()));
00186 request->complete ();
00187 request->release ();
00188 }
00189 }
00190 while (requests.dequeue_head (request) == 0)
00191 {
00192 if (request->should_retry ())
00193 {
00194 if (DEBUG_LEVEL > 0)
00195 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00196 static_cast<int> (this->proxy ()->id ()),
00197 request->sequence ()));
00198 requests.enqueue_head (request);
00199 result = false;
00200 }
00201 else
00202 {
00203 if (DEBUG_LEVEL > 0)
00204 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00205 static_cast<int> (this->proxy ()->id ()),
00206 request->sequence ()));
00207 request->complete ();
00208 request->release ();
00209 }
00210 }
00211 ace_mon.release();
00212 try
00213 {
00214 this->proxy_supplier ()->destroy ();
00215 }
00216 catch (const CORBA::Exception&)
00217 {
00218
00219 ;
00220 }
00221 ace_mon.acquire();
00222 break;
00223 }
00224 case DISPATCH_RETRY:
00225 case DISPATCH_DISCARD:
00226 {
00227 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00228 while (completed.dequeue_head (request) == 0)
00229 {
00230 if (request->should_retry ())
00231 {
00232 if (DEBUG_LEVEL > 0)
00233 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00234 static_cast<int> (this->proxy ()->id ()),
00235 request->sequence ()));
00236 requests.enqueue_head (request);
00237 result = false;
00238 }
00239 else
00240 {
00241 if (DEBUG_LEVEL > 0)
00242 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00243 static_cast<int> (this->proxy ()->id ()),
00244 request->sequence ()));
00245 request->complete ();
00246 request->release ();
00247 }
00248 }
00249 break;
00250 }
00251 default:
00252 {
00253 result = false;
00254 break;
00255 }
00256 }
00257 }
00258 return result;
00259 }
00260
00261 bool
00262 TAO_Notify_SequencePushConsumer::enqueue_if_necessary (
00263 TAO_Notify_Method_Request_Event * request)
00264 {
00265 if (DEBUG_LEVEL > 0)
00266 ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n"));
00267 this->enqueue_request (request);
00268
00269 size_t mbs = static_cast<size_t>(this->max_batch_size_.value());
00270
00271 if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0)
00272 {
00273 this->dispatch_pending ();
00274 }
00275 else
00276 {
00277 schedule_timer (false);
00278 }
00279 return true;
00280 }
00281
00282
00283 void
00284 TAO_Notify_SequencePushConsumer::push (const CORBA::Any& )
00285 {
00286
00287 }
00288
00289 void
00290 TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& )
00291 {
00292
00293 }
00294
00295
00296 void
00297 TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch)
00298 {
00299
00300 if (TAO_debug_level >= 10) {
00301 ACE_DEBUG ((LM_DEBUG, "(%P|%t) Sequence push dispatching ORB id is %s.\n",
00302 this->push_consumer_->_stubobj()->orb_core()->orbid()));
00303 }
00304
00305
00306 this->push_consumer_->push_structured_events (event_batch);
00307 }
00308
00309 ACE_CString
00310 TAO_Notify_SequencePushConsumer::get_ior (void) const
00311 {
00312 ACE_CString result;
00313 CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb ();
00314 try
00315 {
00316 CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in ());
00317 result = static_cast<const char*> (ior.in ());
00318 }
00319 catch (const CORBA::Exception&)
00320 {
00321 result.fast_clear();
00322 }
00323 return result;
00324 }
00325
00326 void
00327 TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer)
00328 {
00329 TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer);
00330 ACE_ASSERT(tmp != 0);
00331 this->init(tmp->push_consumer_.in());
00332 this->schedule_timer(false);
00333 }
00334
00335 TAO_END_VERSIONED_NAMESPACE_DECL