SequencePushConsumer.cpp

Go to the documentation of this file.
00001 // $Id: SequencePushConsumer.cpp 79230 2007-08-06 18:18:07Z elliott_c $
00002 
00003 #include "orbsvcs/Notify/Sequence/SequencePushConsumer.h"
00004 
00005 ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id: SequencePushConsumer.cpp 79230 2007-08-06 18:18:07Z elliott_c $")
00006 
00007 #include "ace/Truncate.h"
00008 #include "ace/Reactor.h"
00009 #include "tao/debug.h"
00010 #include "tao/Stub.h" // For debug messages printing out ORBid.
00011 #include "tao/ORB_Core.h"
00012 #include "orbsvcs/Notify/QoSProperties.h"
00013 #include "orbsvcs/Notify/ProxySupplier.h"
00014 #include "orbsvcs/Notify/Worker_Task.h"
00015 #include "orbsvcs/Notify/Consumer.h"
00016 #include "orbsvcs/Notify/Method_Request_Dispatch.h"
00017 #include "orbsvcs/Notify/Method_Request_Event.h"
00018 #include "orbsvcs/Notify/Timer.h"
00019 #include "orbsvcs/Notify/Proxy.h"
00020 #include "orbsvcs/Notify/Properties.h"
00021 //#define DEBUG_LEVEL 10
00022 #ifndef DEBUG_LEVEL
00023 # define DEBUG_LEVEL TAO_debug_level
00024 #endif //DEBUG_LEVEL
00025 
00026 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy)
00029 : TAO_Notify_Consumer (proxy)
00030 {
00031 }
00032 
00033 TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
00034 {
00035 }
00036 
00037 void
00038 TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer)
00039 {
00040   // Initialize only once
00041   ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );
00042 
00043   if (CORBA::is_nil (push_consumer))
00044   {
00045     throw CORBA::BAD_PARAM();
00046   }
00047 
00048   if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ())
00049     {
00050       this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
00051       this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
00052     }
00053   else
00054     {
00055       // "Port" consumer's object reference from receiving ORB to dispatching ORB.
00056       CORBA::String_var temp =
00057         TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer);
00058 
00059       CORBA::Object_var obj =
00060         TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in());
00061 
00062       try
00063         {
00064           CosNotifyComm::SequencePushConsumer_var new_push_consumer =
00065             CosNotifyComm::SequencePushConsumer::_unchecked_narrow(obj.in());
00066 
00067           this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (new_push_consumer.in());
00068           this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer.in());
00069 
00070           //--cj verify dispatching ORB
00071           if (TAO_debug_level >= 10)
00072             {
00073               ACE_DEBUG ((LM_DEBUG,
00074                           "(%P|%t) Sequence push init dispatching ORB id is %s.\n",
00075                           obj->_stubobj()->orb_core()->orbid()));
00076             }
00077           //--cj end
00078         }
00079       catch (const CORBA::TRANSIENT& ex)
00080         {
00081           ex._tao_print_exception (
00082             "Got a TRANSIENT in NS_SequencePushConsumer::init");
00083           ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_SequencePushConsumer %@\n", this));
00084         }
00085       catch (const CORBA::Exception&)
00086         {
00087           // _narrow failed
00088         }
00089     }
00090 }
00091 
00092 void
00093 TAO_Notify_SequencePushConsumer::release (void)
00094 {
00095   delete this;
00096   //@@ inform factory
00097 }
00098 
00099 bool
00100 TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
00101 {
00102   bool result = true;
00103   if (DEBUG_LEVEL > 0)
00104   {
00105     ACE_DEBUG ( (LM_DEBUG,
00106       ACE_TEXT ("(%P|%t) SequencePushConsumer dispatch queued requests. queue size:%u\n"),
00107       requests.size ()));
00108   }
00109 
00110   CORBA::ULong queue_size = ACE_Utils::truncate_cast<CORBA::ULong> (requests.size ());
00111   CORBA::Long max_batch_size = queue_size;
00112   if (this->max_batch_size_.is_valid () )
00113   {
00114     max_batch_size = this->max_batch_size_.value ();
00115   }
00116   CORBA::Long batch_size = queue_size;
00117   if (batch_size > max_batch_size)
00118   {
00119     batch_size = max_batch_size;
00120   }
00121   if (batch_size > 0)
00122   {
00123     CosNotification::EventBatch batch (batch_size);
00124     batch.length (batch_size);
00125 
00126     Request_Queue completed;
00127 
00128     CORBA::Long pos = 0;
00129     TAO_Notify_Method_Request_Event_Queueable * request = 0;
00130     while (pos < batch_size && requests.dequeue_head (request) == 0)
00131     {
00132       if (DEBUG_LEVEL > 0)
00133       {
00134         ACE_DEBUG ( (LM_DEBUG,
00135           ACE_TEXT ("(%P|%t) Sequence Dispatch Method_Request_Dispatch @%@\n"),
00136           request));
00137       }
00138 
00139       const TAO_Notify_Event * ev = request->event ();
00140       ev->convert (batch [pos]);
00141       ++pos;
00142 
00143       // note enqueue at head, use queue as stack.
00144       completed.enqueue_head (request);
00145     }
00146     batch.length (pos);
00147     ACE_ASSERT (pos > 0);
00148 
00149     ace_mon.release ();
00150     TAO_Notify_Consumer::DispatchStatus status =
00151       this->dispatch_batch (batch);
00152     ace_mon.acquire ();
00153     switch (status)
00154     {
00155     case DISPATCH_SUCCESS:
00156       {
00157         TAO_Notify_Method_Request_Event_Queueable * request = 0;
00158         while (completed.dequeue_head (request) == 0)
00159         {
00160           request->complete ();
00161           request->release ();
00162         }
00163         result = true;
00164         break;
00165       }
00166     case DISPATCH_FAIL:
00167       {
00168         TAO_Notify_Method_Request_Event_Queueable * request = 0;
00169         while (completed.dequeue_head (request) == 0)
00170         {
00171           if (request->should_retry ())
00172           {
00173             if (DEBUG_LEVEL > 0)
00174               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00175                           static_cast <int> (this->proxy ()->id ()),
00176                           request->sequence ()));
00177             requests.enqueue_head (request);
00178             result = false;
00179           }
00180           else
00181           {
00182             if (DEBUG_LEVEL > 0)
00183               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00184                           static_cast<int> (this->proxy ()->id ()),
00185                           request->sequence ()));
00186             request->complete ();
00187             request->release ();
00188           }
00189         }
00190         while (requests.dequeue_head (request) == 0)
00191         {
00192           if (request->should_retry ())
00193           {
00194             if (DEBUG_LEVEL > 0)
00195               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00196                           static_cast<int> (this->proxy ()->id ()),
00197                           request->sequence ()));
00198             requests.enqueue_head (request);
00199             result = false;
00200           }
00201           else
00202           {
00203             if (DEBUG_LEVEL > 0)
00204               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00205                           static_cast<int> (this->proxy ()->id ()),
00206                           request->sequence ()));
00207             request->complete ();
00208             request->release ();
00209           }
00210         }
00211         ace_mon.release();
00212         try
00213         {
00214           this->proxy_supplier ()->destroy ();
00215         }
00216         catch (const CORBA::Exception&)
00217         {
00218           // todo is there something meaningful we can do here?
00219           ;
00220         }
00221         ace_mon.acquire();
00222         break;
00223       }
00224     case DISPATCH_RETRY:
00225     case DISPATCH_DISCARD:
00226       {
00227         TAO_Notify_Method_Request_Event_Queueable *  request = 0;
00228         while (completed.dequeue_head (request) == 0)
00229         {
00230           if (request->should_retry ())
00231           {
00232             if (DEBUG_LEVEL > 0)
00233               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00234                           static_cast<int> (this->proxy ()->id ()),
00235                           request->sequence ()));
00236             requests.enqueue_head (request);
00237             result = false;
00238           }
00239           else
00240           {
00241             if (DEBUG_LEVEL > 0)
00242               ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Consumer %d: Discarding %d\n"),
00243                           static_cast<int> (this->proxy ()->id ()),
00244                           request->sequence ()));
00245             request->complete ();
00246             request->release ();
00247           }
00248         }
00249         break;
00250       }
00251     default:
00252       {
00253         result = false;
00254         break;
00255       }
00256     }
00257   }
00258   return result;
00259 }
00260 
00261 bool
00262 TAO_Notify_SequencePushConsumer::enqueue_if_necessary (
00263   TAO_Notify_Method_Request_Event * request)
00264 {
00265   if (DEBUG_LEVEL > 0)
00266     ACE_DEBUG ( (LM_DEBUG, "SequencePushConsumer enqueing event.\n"));
00267   this->enqueue_request (request);
00268 
00269   size_t mbs = static_cast<size_t>(this->max_batch_size_.value());
00270 
00271   if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0)
00272   {
00273     this->dispatch_pending ();
00274   }
00275   else
00276   {
00277     schedule_timer (false);
00278   }
00279   return true;
00280 }
00281 
00282 
00283 void
00284 TAO_Notify_SequencePushConsumer::push (const CORBA::Any& /*event*/)
00285 {
00286   //NOP
00287 }
00288 
00289 void
00290 TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& /*notification*/)
00291 {
00292   //NOP
00293 }
00294 
00295 
00296 void
00297 TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch)
00298 {
00299   //--cj verify dispatching ORB
00300   if (TAO_debug_level >= 10) {
00301     ACE_DEBUG ((LM_DEBUG, "(%P|%t) Sequence push dispatching ORB id is %s.\n",
00302                 this->push_consumer_->_stubobj()->orb_core()->orbid()));
00303   }
00304   //--cj end
00305 
00306   this->push_consumer_->push_structured_events (event_batch);
00307 }
00308 
00309 ACE_CString
00310 TAO_Notify_SequencePushConsumer::get_ior (void) const
00311 {
00312   ACE_CString result;
00313   CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance ()->orb ();
00314   try
00315   {
00316     CORBA::String_var ior = orb->object_to_string (this->push_consumer_.in ());
00317     result = static_cast<const char*> (ior.in ());
00318   }
00319   catch (const CORBA::Exception&)
00320   {
00321     result.fast_clear();
00322   }
00323   return result;
00324 }
00325 
00326 void
00327 TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer)
00328 {
00329   TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer);
00330   ACE_ASSERT(tmp != 0);
00331   this->init(tmp->push_consumer_.in());
00332   this->schedule_timer(false);
00333 }
00334 
00335 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:45:29 2010 for TAO_CosNotification by  doxygen 1.4.7