SequencePushConsumer.cpp

Go to the documentation of this file.
00001 // SequencePushConsumer.cpp,v 1.27 2006/03/15 21:29:09 jtc Exp
00002 
00003 #include "orbsvcs/Notify/Sequence/SequencePushConsumer.h"
00004 
00005 ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "SequencePushConsumer.cpp,v 1.27 2006/03/15 21:29:09 jtc Exp")
00006 
00007 #include "ace/Reactor.h"
00008 #include "tao/debug.h"
00009 #include "orbsvcs/Notify/QoSProperties.h"
00010 #include "orbsvcs/Notify/ProxySupplier.h"
00011 #include "orbsvcs/Notify/Worker_Task.h"
00012 #include "orbsvcs/Notify/Consumer.h"
00013 #include "orbsvcs/Notify/Method_Request_Dispatch.h"
00014 #include "orbsvcs/Notify/Method_Request_Event.h"
00015 #include "orbsvcs/Notify/Timer.h"
00016 #include "orbsvcs/Notify/Proxy.h"
00017 #include "orbsvcs/Notify/Properties.h"
00018 //#define DEBUG_LEVEL 10
00019 #ifndef DEBUG_LEVEL
00020 # define DEBUG_LEVEL TAO_debug_level
00021 #endif //DEBUG_LEVEL
00022 
00023 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy)
00026 : TAO_Notify_Consumer (proxy)
00027 {
00028 }
00029 
00030 TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
00031 {
00032 }
00033 
00034 void
00035 TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL)
00036 {
00037   // Initialize only once
00038   ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );
00039 
00040   if (CORBA::is_nil (push_consumer))
00041   {
00042     ACE_THROW (CORBA::BAD_PARAM());
00043   }
00044 
00045   this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
00046   this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
00047 }
00048 
00049 void
00050 TAO_Notify_SequencePushConsumer::release (void)
00051 {
00052   delete this;
00053   //@@ inform factory
00054 }
00055 
00056 bool
00057 TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
00058 {
00059   bool result = true;
00060   if (DEBUG_LEVEL > 0)
00061   {
00062     ACE_DEBUG ( (LM_DEBUG,
00063       ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"),
00064       requests.size ()));
00065   }
00066 
00067   long queue_size = requests.size ();
00068   CORBA::Long max_batch_size = queue_size;
00069   if (this->max_batch_size_.is_valid () )
00070   {
00071     max_batch_size = this->max_batch_size_.value ();
00072   }
00073   CORBA::Long batch_size = queue_size;
00074   if (batch_size > max_batch_size)
00075   {
00076     batch_size = max_batch_size;
00077   }
00078   if (batch_size > 0)
00079   {
00080     CosNotification::EventBatch batch (batch_size);
00081     batch.length (batch_size);
00082 
00083     Request_Queue completed;
00084 
00085     CORBA::Long pos = 0;
00086     TAO_Notify_Method_Request_Event_Queueable * request = 0;
00087     while (pos < batch_size && requests.dequeue_head (request) == 0)
00088     {
00089       if (DEBUG_LEVEL > 0)
00090       {
00091         ACE_DEBUG ( (LM_DEBUG,
00092           ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
00093           request));
00094       }
00095 
00096       const TAO_Notify_Event * ev = request->event ();
00097       ev->convert (batch [pos]);
00098       ++pos;
00099 
00100       // note enqueue at head, use queue as stack.
00101       completed.enqueue_head (request);
00102     }
00103     batch.length (pos);
00104     ACE_ASSERT (pos > 0);
00105 
00106     ace_mon.release ();
00107     TAO_Notify_Consumer::DispatchStatus status =
00108       this->dispatch_batch (batch);
00109     ace_mon.acquire ();
00110     switch (status)
00111     {
00112     case DISPATCH_SUCCESS:
00113       {
00114         TAO_Notify_Method_Request_Event_Queueable * request = 0;
00115         while (completed.dequeue_head (request) == 0)
00116         {
00117           request->complete ();
00118           request->release ();
00119         }
00120         result = true;
00121         break;
00122       }
00123     case DISPATCH_FAIL:
00124       {
00125         TAO_Notify_Method_Request_Event_Queueable * request = 0;
00126         while (completed.dequeue_head (request) == 0)
00127         {
00128           if (request->should_retry ())
00129           {
00130             if (DEBUG_LEVEL > 0)
00131               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00132                           static_cast <int> (this->proxy ()->id ()),
00133                           request->sequence ()));
00134             requests.enqueue_head (request);
00135             result = false;
00136           }
00137           else
00138           {
00139             if (DEBUG_LEVEL > 0)
00140               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00141                           static_cast<int> (this->proxy ()->id ()),
00142                           request->sequence ()));
00143             request->complete ();
00144             request->release ();
00145           }
00146         }
00147         while (requests.dequeue_head (request) == 0)
00148         {
00149           if (request->should_retry ())
00150           {
00151             if (DEBUG_LEVEL > 0)
00152               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00153                           static_cast<int> (this->proxy ()->id ()),
00154                           request->sequence ()));
00155             requests.enqueue_head (request);
00156             result = false;
00157           }
00158           else
00159           {
00160             if (DEBUG_LEVEL > 0)
00161               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00162                           static_cast<int> (this->proxy ()->id ()),
00163                           request->sequence ()));
00164             request->complete ();
00165             request->release ();
00166           }
00167         }
00168         ace_mon.release();
00169         ACE_DECLARE_NEW_ENV;
00170         ACE_TRY
00171         {
00172           this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00173           ACE_TRY_CHECK;
00174         }
00175         ACE_CATCHANY
00176         {
00177           // todo is there something meaningful we can do here?
00178           ;
00179         }
00180         ACE_ENDTRY;
00181         ace_mon.acquire();
00182         break;
00183       }
00184     case DISPATCH_RETRY:
00185     case DISPATCH_DISCARD:
00186       {
00187         TAO_Notify_Method_Request_Event_Queueable *  request = 0;
00188         while (completed.dequeue_head (request) == 0)
00189         {
00190           if (request->should_retry ())
00191           {
00192             if (DEBUG_LEVEL > 0)
00193               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00194                           static_cast<int> (this->proxy ()->id ()),
00195                           request->sequence ()));
00196             requests.enqueue_head (request);
00197             result = false;
00198           }
00199           else
00200           {
00201             if (DEBUG_LEVEL > 0)
00202               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00203                           static_cast<int> (this->proxy ()->id ()),
00204                           request->sequence ()));
00205             request->complete ();
00206             request->release ();
00207           }
00208         }
00209         break;
00210       }
00211     default:
00212       {
00213         result = false;
00214         break;
00215       }
00216     }
00217   }
00218   return result;
00219 }
00220 
00221 bool
00222 TAO_Notify_SequencePushConsumer::enqueue_if_necessary (
00223   TAO_Notify_Method_Request_Event * request
00224   ACE_ENV_ARG_DECL)
00225 {
00226   if (DEBUG_LEVEL > 0)
00227     ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n"));
00228   this->enqueue_request (request ACE_ENV_ARG_PARAMETER);
00229   ACE_CHECK_RETURN (false);
00230 
00231   size_t mbs = static_cast<size_t>(this->max_batch_size_.value());
00232 
00233   if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0)
00234   {
00235     this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
00236     ACE_CHECK_RETURN (false);
00237   }
00238   else
00239   {
00240     schedule_timer (false);
00241   }
00242   return true;
00243 }
00244 
00245 
00246 void
00247 TAO_Notify_SequencePushConsumer::push (const CORBA::Any& /*event*/ ACE_ENV_ARG_DECL_NOT_USED)
00248 {
00249   //NOP
00250 }
00251 
00252 void
00253 TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& /*notification*/ ACE_ENV_ARG_DECL_NOT_USED)
00254 {
00255   //NOP
00256 }
00257 
00258 
00259 void
00260 TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL)
00261 {
00262   this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
00263   ACE_CHECK;
00264 }
00265 
00266 ACE_CString
00267 TAO_Notify_SequencePushConsumer::get_ior (void) const
00268 {
00269   ACE_CString result;
00270   CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb ();
00271   ACE_DECLARE_NEW_CORBA_ENV;
00272   ACE_TRY
00273   {
00274     CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in () ACE_ENV_ARG_PARAMETER);
00275     ACE_TRY_CHECK;
00276     result = static_cast<const char*> (ior.in ());
00277   }
00278   ACE_CATCHANY
00279   {
00280     result.fast_clear();
00281   }
00282   ACE_ENDTRY;
00283   return result;
00284 }
00285 
00286 void
00287 TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
00288                                                           ACE_ENV_ARG_DECL)
00289 {
00290   TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer);
00291   ACE_ASSERT(tmp != 0);
00292   this->init(tmp->push_consumer_.in() ACE_ENV_ARG_PARAMETER);
00293   ACE_CHECK;
00294   this->schedule_timer(false);
00295 }
00296 
00297 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:24:16 2006 for TAO_CosNotification by doxygen 1.3.6