00001
00002
00003 #include "orbsvcs/Notify/Sequence/SequencePushConsumer.h"
00004
00005 ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "SequencePushConsumer.cpp,v 1.27 2006/03/15 21:29:09 jtc Exp")
00006
00007 #include "ace/Reactor.h"
00008 #include "tao/debug.h"
00009 #include "orbsvcs/Notify/QoSProperties.h"
00010 #include "orbsvcs/Notify/ProxySupplier.h"
00011 #include "orbsvcs/Notify/Worker_Task.h"
00012 #include "orbsvcs/Notify/Consumer.h"
00013 #include "orbsvcs/Notify/Method_Request_Dispatch.h"
00014 #include "orbsvcs/Notify/Method_Request_Event.h"
00015 #include "orbsvcs/Notify/Timer.h"
00016 #include "orbsvcs/Notify/Proxy.h"
00017 #include "orbsvcs/Notify/Properties.h"
00018
00019 #ifndef DEBUG_LEVEL
00020 # define DEBUG_LEVEL TAO_debug_level
00021 #endif //DEBUG_LEVEL
00022
00023 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy)
00026 : TAO_Notify_Consumer (proxy)
00027 {
00028 }
00029
00030 TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
00031 {
00032 }
00033
00034 void
00035 TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL)
00036 {
00037
00038 ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );
00039
00040 if (CORBA::is_nil (push_consumer))
00041 {
00042 ACE_THROW (CORBA::BAD_PARAM());
00043 }
00044
00045 this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
00046 this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
00047 }
00048
00049 void
00050 TAO_Notify_SequencePushConsumer::release (void)
00051 {
00052 delete this;
00053
00054 }
00055
00056 bool
00057 TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
00058 {
00059 bool result = true;
00060 if (DEBUG_LEVEL > 0)
00061 {
00062 ACE_DEBUG ( (LM_DEBUG,
00063 ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"),
00064 requests.size ()));
00065 }
00066
00067 long queue_size = requests.size ();
00068 CORBA::Long max_batch_size = queue_size;
00069 if (this->max_batch_size_.is_valid () )
00070 {
00071 max_batch_size = this->max_batch_size_.value ();
00072 }
00073 CORBA::Long batch_size = queue_size;
00074 if (batch_size > max_batch_size)
00075 {
00076 batch_size = max_batch_size;
00077 }
00078 if (batch_size > 0)
00079 {
00080 CosNotification::EventBatch batch (batch_size);
00081 batch.length (batch_size);
00082
00083 Request_Queue completed;
00084
00085 CORBA::Long pos = 0;
00086 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00087 while (pos < batch_size && requests.dequeue_head (request) == 0)
00088 {
00089 if (DEBUG_LEVEL > 0)
00090 {
00091 ACE_DEBUG ( (LM_DEBUG,
00092 ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
00093 request));
00094 }
00095
00096 const TAO_Notify_Event * ev = request->event ();
00097 ev->convert (batch [pos]);
00098 ++pos;
00099
00100
00101 completed.enqueue_head (request);
00102 }
00103 batch.length (pos);
00104 ACE_ASSERT (pos > 0);
00105
00106 ace_mon.release ();
00107 TAO_Notify_Consumer::DispatchStatus status =
00108 this->dispatch_batch (batch);
00109 ace_mon.acquire ();
00110 switch (status)
00111 {
00112 case DISPATCH_SUCCESS:
00113 {
00114 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00115 while (completed.dequeue_head (request) == 0)
00116 {
00117 request->complete ();
00118 request->release ();
00119 }
00120 result = true;
00121 break;
00122 }
00123 case DISPATCH_FAIL:
00124 {
00125 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00126 while (completed.dequeue_head (request) == 0)
00127 {
00128 if (request->should_retry ())
00129 {
00130 if (DEBUG_LEVEL > 0)
00131 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00132 static_cast <int> (this->proxy ()->id ()),
00133 request->sequence ()));
00134 requests.enqueue_head (request);
00135 result = false;
00136 }
00137 else
00138 {
00139 if (DEBUG_LEVEL > 0)
00140 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00141 static_cast<int> (this->proxy ()->id ()),
00142 request->sequence ()));
00143 request->complete ();
00144 request->release ();
00145 }
00146 }
00147 while (requests.dequeue_head (request) == 0)
00148 {
00149 if (request->should_retry ())
00150 {
00151 if (DEBUG_LEVEL > 0)
00152 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00153 static_cast<int> (this->proxy ()->id ()),
00154 request->sequence ()));
00155 requests.enqueue_head (request);
00156 result = false;
00157 }
00158 else
00159 {
00160 if (DEBUG_LEVEL > 0)
00161 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00162 static_cast<int> (this->proxy ()->id ()),
00163 request->sequence ()));
00164 request->complete ();
00165 request->release ();
00166 }
00167 }
00168 ace_mon.release();
00169 ACE_DECLARE_NEW_ENV;
00170 ACE_TRY
00171 {
00172 this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00173 ACE_TRY_CHECK;
00174 }
00175 ACE_CATCHANY
00176 {
00177
00178 ;
00179 }
00180 ACE_ENDTRY;
00181 ace_mon.acquire();
00182 break;
00183 }
00184 case DISPATCH_RETRY:
00185 case DISPATCH_DISCARD:
00186 {
00187 TAO_Notify_Method_Request_Event_Queueable * request = 0;
00188 while (completed.dequeue_head (request) == 0)
00189 {
00190 if (request->should_retry ())
00191 {
00192 if (DEBUG_LEVEL > 0)
00193 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00194 static_cast<int> (this->proxy ()->id ()),
00195 request->sequence ()));
00196 requests.enqueue_head (request);
00197 result = false;
00198 }
00199 else
00200 {
00201 if (DEBUG_LEVEL > 0)
00202 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00203 static_cast<int> (this->proxy ()->id ()),
00204 request->sequence ()));
00205 request->complete ();
00206 request->release ();
00207 }
00208 }
00209 break;
00210 }
00211 default:
00212 {
00213 result = false;
00214 break;
00215 }
00216 }
00217 }
00218 return result;
00219 }
00220
00221 bool
00222 TAO_Notify_SequencePushConsumer::enqueue_if_necessary (
00223 TAO_Notify_Method_Request_Event * request
00224 ACE_ENV_ARG_DECL)
00225 {
00226 if (DEBUG_LEVEL > 0)
00227 ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n"));
00228 this->enqueue_request (request ACE_ENV_ARG_PARAMETER);
00229 ACE_CHECK_RETURN (false);
00230
00231 size_t mbs = static_cast<size_t>(this->max_batch_size_.value());
00232
00233 if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0)
00234 {
00235 this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
00236 ACE_CHECK_RETURN (false);
00237 }
00238 else
00239 {
00240 schedule_timer (false);
00241 }
00242 return true;
00243 }
00244
00245
00246 void
00247 TAO_Notify_SequencePushConsumer::push (const CORBA::Any& ACE_ENV_ARG_DECL_NOT_USED)
00248 {
00249
00250 }
00251
00252 void
00253 TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& ACE_ENV_ARG_DECL_NOT_USED)
00254 {
00255
00256 }
00257
00258
00259 void
00260 TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL)
00261 {
00262 this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
00263 ACE_CHECK;
00264 }
00265
00266 ACE_CString
00267 TAO_Notify_SequencePushConsumer::get_ior (void) const
00268 {
00269 ACE_CString result;
00270 CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb ();
00271 ACE_DECLARE_NEW_CORBA_ENV;
00272 ACE_TRY
00273 {
00274 CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in () ACE_ENV_ARG_PARAMETER);
00275 ACE_TRY_CHECK;
00276 result = static_cast<const char*> (ior.in ());
00277 }
00278 ACE_CATCHANY
00279 {
00280 result.fast_clear();
00281 }
00282 ACE_ENDTRY;
00283 return result;
00284 }
00285
00286 void
00287 TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
00288 ACE_ENV_ARG_DECL)
00289 {
00290 TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer);
00291 ACE_ASSERT(tmp != 0);
00292 this->init(tmp->push_consumer_.in() ACE_ENV_ARG_PARAMETER);
00293 ACE_CHECK;
00294 this->schedule_timer(false);
00295 }
00296
00297 TAO_END_VERSIONED_NAMESPACE_DECL