Consumer.cpp

Go to the documentation of this file.
00001 // $Id: Consumer.cpp 79324 2007-08-13 11:20:01Z elliott_c $
00002 
00003 #include "orbsvcs/Notify/Consumer.h"
00004 
00005 #if ! defined (__ACE_INLINE__)
00006 #include "orbsvcs/Notify/Consumer.inl"
00007 #endif /* __ACE_INLINE__ */
00008 
00009 ACE_RCSID (RT_Notify, TAO_Notify_Consumer, "$Id: Consumer.cpp 79324 2007-08-13 11:20:01Z elliott_c $")
00010 
00011 #include "orbsvcs/Notify/Timer.h"
00012 #include "orbsvcs/Notify/ProxySupplier.h"
00013 #include "orbsvcs/Notify/Method_Request_Event.h"
00014 
00015 #include "orbsvcs/Time_Utilities.h"
00016 
00017 #include "tao/debug.h"
00018 #include "tao/corba.h"
00019 
00020 #include "ace/Bound_Ptr.h"
00021 #include "ace/Unbounded_Queue.h"
00022 
00023 #ifndef DEBUG_LEVEL
00024 # define DEBUG_LEVEL TAO_debug_level
00025 #endif //DEBUG_LEVEL
00026 
00027 static const int DEFAULT_RETRY_TIMEOUT = 10;//120; // Note : This should be a config param or qos setting
00028 
00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00030 
00031 TAO_Notify_Consumer::TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy)
00032 : proxy_ (proxy)
00033 , is_suspended_ (0)
00034 , have_not_yet_verified_publish_ (true)
00035 , pacing_ (proxy->qos_properties_.pacing_interval ())
00036 , max_batch_size_ (CosNotification::MaximumBatchSize, 0)
00037 , timer_id_ (-1)
00038 , timer_ (0)
00039 {
00040   Request_Queue* pending_events = 0;
00041   ACE_NEW (pending_events, TAO_Notify_Consumer::Request_Queue ());
00042   this->pending_events_.reset( pending_events );
00043 
00044   this->timer_.reset( this->proxy ()->timer () );
00045 }
00046 
00047 TAO_Notify_Consumer::~TAO_Notify_Consumer ()
00048 {
00049   if (this->timer_.isSet())
00050   {
00051     this->cancel_timer ();
00052     this->timer_.reset ();
00053   }
00054 }
00055 
00056 TAO_Notify_Proxy*
00057 TAO_Notify_Consumer::proxy (void)
00058 {
00059   return this->proxy_supplier ();
00060 }
00061 
00062 void
00063 TAO_Notify_Consumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties)
00064 {
00065   this->max_batch_size_ = qos_properties.maximum_batch_size ();
00066 }
00067 
00068 void
00069 TAO_Notify_Consumer::resume (void)
00070 {
00071   this->is_suspended_ = 0;
00072 
00073   this->dispatch_pending ();
00074 }
00075 
00076 void
00077 TAO_Notify_Consumer::enqueue_request (
00078   TAO_Notify_Method_Request_Event * request)
00079 {
00080   TAO_Notify_Event::Ptr event (
00081     request->event ()->queueable_copy ());
00082 
00083   TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00084   ACE_NEW_THROW_EX (queue_entry,
00085     TAO_Notify_Method_Request_Event_Queueable (*request, event),
00086     CORBA::NO_MEMORY ());
00087 
00088   if (DEBUG_LEVEL  > 3) ACE_DEBUG ( (LM_DEBUG,
00089     ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"),
00090     static_cast<int> (this->proxy ()->id ()),
00091     request->sequence (),
00092     request
00093     ));
00094   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00095   this->pending_events().enqueue_tail (queue_entry);
00096 }
00097 
00098 bool
00099 TAO_Notify_Consumer::enqueue_if_necessary (TAO_Notify_Method_Request_Event * request)
00100 {
00101   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false);
00102   if (! this->pending_events().is_empty ())
00103     {
00104       if (DEBUG_LEVEL > 3)
00105         ACE_DEBUG ((LM_DEBUG,
00106                     ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"),
00107                     static_cast<int> (this->proxy ()->id ()),
00108                     request->sequence ()
00109                     ));
00110       TAO_Notify_Event::Ptr event (
00111         request->event ()->queueable_copy ());
00112       TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00113       ACE_NEW_THROW_EX (queue_entry,
00114                         TAO_Notify_Method_Request_Event_Queueable (*request,
00115                                                                    event),
00116                         CORBA::NO_MEMORY ());
00117       this->pending_events().enqueue_tail (queue_entry);
00118       this->schedule_timer (false);
00119       return true;
00120     }
00121   if (this->is_suspended_ == 1)
00122     {
00123       if (DEBUG_LEVEL > 3)
00124         ACE_DEBUG ((LM_DEBUG,
00125                     ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"),
00126                     static_cast<int> (this->proxy ()->id ()),
00127                     request->sequence ()
00128                     ));
00129       TAO_Notify_Event::Ptr event (
00130         request->event ()->queueable_copy ());
00131       TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00132       ACE_NEW_THROW_EX (queue_entry,
00133                         TAO_Notify_Method_Request_Event_Queueable (*request,
00134                                                                    event),
00135                         CORBA::NO_MEMORY ());
00136       this->pending_events().enqueue_tail (queue_entry);
00137       this->schedule_timer (false);
00138       return true;
00139     }
00140   return false;
00141 }
00142 
00143 void
00144 TAO_Notify_Consumer::deliver (TAO_Notify_Method_Request_Event * request)
00145 {
00146   // Increment reference counts (safely) to prevent this object and its proxy
00147   // from being deleted while the push is in progress.
00148   TAO_Notify_Proxy::Ptr proxy_guard (this->proxy ());
00149   bool queued = enqueue_if_necessary (request);
00150   if (!queued)
00151     {
00152       DispatchStatus status = this->dispatch_request (request);
00153       switch (status)
00154         {
00155         case DISPATCH_SUCCESS:
00156           {
00157             request->complete ();
00158             break;
00159           }
00160         case DISPATCH_RETRY:
00161           {
00162             if (DEBUG_LEVEL > 1)
00163               ACE_DEBUG ((LM_DEBUG,
00164                           ACE_TEXT ("Consumer %d enqueing event %d due ")
00165                           ACE_TEXT ("to failed dispatch.\n"),
00166                           static_cast<int> (this->proxy ()->id ()),
00167                           request->sequence ()));
00168             this->enqueue_request (request);
00169             this->schedule_timer (true);
00170             break;
00171           }
00172         case DISPATCH_DISCARD:
00173           {
00174             if (DEBUG_LEVEL  > 0)
00175               ACE_DEBUG ((LM_DEBUG,
00176                           ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00177                                     "direct dispatch. Discarding event:%d.\n"),
00178                           static_cast<int> (this->proxy ()->id ()),
00179                           request->sequence ()
00180                           ));
00181             request->complete ();
00182             break;
00183           }
00184         case DISPATCH_FAIL:
00185           {
00186             if (DEBUG_LEVEL  > 0)
00187               ACE_DEBUG ((LM_DEBUG,
00188                           ACE_TEXT ("(%P|%t) Consumer %d: Failed during "
00189                                     "direct dispatch :%d. Discarding event.\n"),
00190                           static_cast<int> (this->proxy ()->id ()),
00191                           request->sequence ()
00192                           ));
00193             request->complete ();
00194             try
00195               {
00196                 this->proxy_supplier ()->destroy ();
00197               }
00198             catch (const CORBA::Exception&)
00199               {
00200                 // todo is there something meaningful we can do here?
00201                 ;
00202               }
00203             break;
00204           }
00205         }
00206     }
00207 }
00208 
00209 TAO_Notify_Consumer::DispatchStatus
00210 TAO_Notify_Consumer::dispatch_request (TAO_Notify_Method_Request_Event * request)
00211 {
00212   DispatchStatus result = DISPATCH_SUCCESS;
00213   try
00214     {
00215       request->event ()->push (this);
00216       if (DEBUG_LEVEL  > 8)
00217         ACE_DEBUG ((LM_DEBUG,
00218                     ACE_TEXT ("Consumer %d dispatched single event %d.\n"),
00219                     static_cast<int> (this->proxy ()->id ()),
00220                     request->sequence ()
00221                     ));
00222     }
00223   catch (const CORBA::OBJECT_NOT_EXIST& ex)
00224     {
00225       if (DEBUG_LEVEL  > 0)
00226         {
00227           ACE_DEBUG ((LM_ERROR,
00228                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00229                                 "(request) %s\n"),
00230                       static_cast<int> (this->proxy ()->id ()),
00231                       ex._info ().c_str ()
00232                       ));
00233         }
00234       result = DISPATCH_FAIL;
00235     }
00236   catch (const CORBA::TRANSIENT& ex)
00237     {
00238       if (DEBUG_LEVEL  > 0)
00239         ACE_DEBUG ((LM_ERROR,
00240                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00241                               "(request) Transient (minor=%d) %s\n"),
00242                     static_cast<int> (this->proxy ()->id ()),
00243                     ex.minor (),
00244                     ex._info ().c_str ()
00245                     ));
00246       const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00247       switch (ex.minor () & 0xfffff000u)
00248         {
00249         case CORBA::OMGVMCID:
00250           switch (ex.minor () & 0x00000fffu)
00251             {
00252             case 2: // No usable profile
00253             case 3: // Request cancelled
00254             case 4: // POA destroyed
00255               result = DISPATCH_FAIL;
00256               break;
00257             default:
00258               result = DISPATCH_DISCARD;
00259             }
00260           break;
00261 
00262         case TAO::VMCID:
00263         default:
00264           switch (ex.minor () & BITS_5_THRU_12_MASK)
00265             {
00266             case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00267               result = DISPATCH_FAIL;
00268               break;
00269             case TAO_POA_DISCARDING:
00270             case TAO_POA_HOLDING:
00271             default:
00272               result = DISPATCH_RETRY;
00273             } break;
00274         }
00275     }
00276   catch (const CORBA::TIMEOUT& ex)
00277     {
00278       if (DEBUG_LEVEL  > 0)
00279         ACE_DEBUG ((LM_ERROR,
00280                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00281                               "(request) %s\n"),
00282                     this->proxy ()->id (),
00283                     ex._info().c_str ()
00284                     ));
00285       result = DISPATCH_FAIL;
00286     }
00287   catch (const CORBA::COMM_FAILURE& ex)
00288     {
00289       if (DEBUG_LEVEL  > 0)
00290         ACE_DEBUG ((LM_ERROR,
00291                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00292                               "(request) %s\n"),
00293                     this->proxy ()->id (),
00294                     ex._info().c_str ()
00295                     ));
00296       result = DISPATCH_FAIL;
00297     }
00298   catch (const CORBA::SystemException& ex)
00299     {
00300       if (DEBUG_LEVEL  > 0)
00301         {
00302           ACE_DEBUG ((LM_ERROR,
00303                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00304                                 "(request) SystemException %s\n"),
00305                       static_cast<int> (this->proxy ()->id ()),
00306                       ex._info ().c_str ()
00307                       ));
00308         }
00309       result = DISPATCH_DISCARD;
00310     }
00311   catch (const CORBA::Exception&)
00312     {
00313       ACE_ERROR ( (LM_ERROR,
00314                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00315                              "(request) Caught unexpected exception "
00316                              "pushing event to consumer.\n"),
00317                    static_cast<int> (this->proxy ()->id ())
00318                    ));
00319       result = DISPATCH_DISCARD;
00320     }
00321 
00322   // for persistent events that haven't timed out
00323   // convert "FAIL" & "DISCARD" to "RETRY"
00324   // for transient events, convert RETRY to DISCARD (hey, best_effort.)
00325   if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD)
00326     {
00327       if (request->should_retry ())
00328         {
00329           result = DISPATCH_RETRY;
00330         }
00331     }
00332   else if (result == DISPATCH_RETRY)
00333     {
00334       if (! request->should_retry ())
00335         {
00336           result = DISPATCH_DISCARD;
00337         }
00338     }
00339 
00340   return result;
00341 }
00342 
00343 TAO_Notify_Consumer::DispatchStatus
00344 TAO_Notify_Consumer::dispatch_batch (const CosNotification::EventBatch& batch)
00345 {
00346   DispatchStatus result = DISPATCH_SUCCESS;
00347   try
00348     {
00349       this->push (batch);
00350     }
00351   catch (const CORBA::OBJECT_NOT_EXIST& ex)
00352     {
00353       if (DEBUG_LEVEL  > 0)
00354         ACE_DEBUG ((LM_ERROR,
00355                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00356                               "%d::dispatch_batch() %s\n"),
00357                     static_cast<int> (this->proxy ()->id ()),
00358                     ex._info ().c_str ()
00359                     ));
00360       result = DISPATCH_FAIL;
00361     }
00362   catch (const CORBA::TRANSIENT& ex)
00363     {
00364       if (DEBUG_LEVEL  > 0)
00365         ACE_DEBUG ((LM_ERROR,
00366                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00367                               "%d::dispatch_batch() Transient (minor=%d) %s\n"),
00368                     static_cast<int> (this->proxy ()->id ()),
00369                     ex.minor (),
00370                     ex._info ().c_str ()
00371                     ));
00372       const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00373       switch (ex.minor () & 0xfffff000u)
00374         {
00375         case CORBA::OMGVMCID:
00376           switch (ex.minor () & 0x00000fffu)
00377             {
00378             case 2: // No usable profile
00379             case 3: // Request cancelled
00380             case 4: // POA destroyed
00381               result = DISPATCH_FAIL;
00382               break;
00383             default:
00384               result = DISPATCH_DISCARD;
00385             }
00386           break;
00387 
00388         case TAO::VMCID:
00389         default:
00390           switch (ex.minor () & BITS_5_THRU_12_MASK)
00391             {
00392             case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00393               result = DISPATCH_FAIL;
00394               break;
00395             case TAO_POA_DISCARDING:
00396             case TAO_POA_HOLDING:
00397             default:
00398               result = DISPATCH_RETRY;
00399             } break;
00400         }
00401     }
00402   catch (const CORBA::TIMEOUT& ex)
00403     {
00404       if (DEBUG_LEVEL  > 0)
00405         ACE_DEBUG ((LM_ERROR,
00406                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00407                               "%u::dispatch_batch() %s\n"),
00408                     this->proxy ()->id (),
00409                     ex._info().c_str ()
00410                     ));
00411       result = DISPATCH_FAIL;
00412     }
00413   catch (const CORBA::COMM_FAILURE& ex)
00414     {
00415       if (DEBUG_LEVEL  > 0)
00416         ACE_DEBUG ((LM_ERROR,
00417                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00418                               "%u::dispatch_batch() %s\n"),
00419                     this->proxy ()->id (),
00420                     ex._info().c_str ()
00421                     ));
00422       result = DISPATCH_FAIL;
00423     }
00424   catch (const CORBA::SystemException& ex)
00425     {
00426       if (DEBUG_LEVEL  > 0)
00427         {
00428           ACE_DEBUG ((LM_ERROR,
00429                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00430                                 "%d::dispatch_batch() SystemException %s\n"),
00431                       static_cast<int>  (this->proxy ()->id ()),
00432                       ex._info ().c_str ()
00433                       ));
00434         }
00435       result = DISPATCH_DISCARD;
00436     }
00437   catch (const CORBA::Exception&)
00438     {
00439       ACE_ERROR ((LM_ERROR,
00440                   ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00441                             "%d::dispatch_batch() Caught unexpected "
00442                             "exception pushing batch to consumer.\n"),
00443                   static_cast<int> (this->proxy ()->id ())
00444                   ));
00445       result = DISPATCH_DISCARD;
00446     }
00447   return result;
00448 }
00449 
00450 void
00451 TAO_Notify_Consumer::dispatch_pending (void)
00452 {
00453   if (DEBUG_LEVEL  > 5)
00454     ACE_DEBUG ((LM_DEBUG,
00455                 ACE_TEXT ("Consumer %d dispatching pending events.  Queue size: %d\n"),
00456                 static_cast<int> (this->proxy ()->id ()),
00457                 this->pending_events().size ()
00458                 ));
00459 
00460   // lock ourselves in memory for the duration
00461   TAO_Notify_Consumer::Ptr self_grd (this);
00462 
00463   // dispatch events until: 1) the queue is empty; 2) the proxy shuts down, or 3) the dispatch fails
00464   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00465   bool ok = true;
00466   while (ok
00467          && !this->proxy_supplier ()->has_shutdown ()
00468          && !this->pending_events().is_empty ())
00469     {
00470       if (! dispatch_from_queue ( this->pending_events(), ace_mon))
00471         {
00472           this->schedule_timer (true);
00473           ok = false;
00474         }
00475     }
00476 }
00477 
00478 
00479 // virtual: this is the default, overridden for SequencePushConsumer
00480 bool
00481 TAO_Notify_Consumer::dispatch_from_queue (Request_Queue & requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
00482 {
00483   bool result = true;
00484   TAO_Notify_Method_Request_Event_Queueable * request;
00485   if (requests.dequeue_head (request) == 0)
00486     {
00487       ace_mon.release ();
00488       DispatchStatus status = this->dispatch_request (request);
00489       switch (status)
00490         {
00491         case DISPATCH_SUCCESS:
00492           {
00493             request->complete ();
00494             request->release ();
00495             result = true;
00496             ace_mon.acquire ();
00497             break;
00498           }
00499         case DISPATCH_RETRY:
00500           {
00501             if (DEBUG_LEVEL  > 0)
00502               ACE_DEBUG ((LM_DEBUG,
00503                           ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00504                           static_cast<int> (this->proxy ()->id ()),
00505                           request->sequence ()
00506                           ));
00507             ace_mon.acquire ();
00508             requests.enqueue_head (request); // put the failed event back where it was
00509             result = false;
00510             break;
00511           }
00512         case DISPATCH_DISCARD:
00513           {
00514             if (DEBUG_LEVEL  > 0)
00515               ACE_DEBUG ((LM_DEBUG,
00516                           ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00517                                     "dispatch. Discarding event:%d.\n"),
00518                           static_cast<int> (this->proxy ()->id ()),
00519                           request->sequence ()
00520                           ));
00521             request->complete ();
00522             ace_mon.acquire ();
00523             result = true;
00524             break;
00525           }
00526         case DISPATCH_FAIL:
00527           {
00528             if (DEBUG_LEVEL  > 0)
00529               ACE_DEBUG ((LM_DEBUG,
00530                           ACE_TEXT ("(%P|%t) Consumer %d: Failed. "
00531                                     "Discarding event %d.\n"),
00532                           static_cast<int> (this->proxy ()->id ()),
00533                           request->sequence ()
00534                           ));
00535             request->complete ();
00536             ace_mon.acquire ();
00537             while (requests.dequeue_head (request) == 0)
00538               {
00539                 ace_mon.release ();
00540                 request->complete ();
00541                 ace_mon.acquire ();
00542               }
00543             ace_mon.release ();
00544             try
00545               {
00546                 this->proxy_supplier ()->destroy ();
00547               }
00548             catch (const CORBA::Exception&)
00549               {
00550                 // todo is there something reasonable to do here?
00551               }
00552             ace_mon.acquire ();
00553             result = true;
00554             break;
00555           }
00556         default:
00557           {
00558             ace_mon.acquire ();
00559             result = false;
00560             break;
00561           }
00562         }
00563     }
00564   return result;
00565 }
00566 
00567 //@@todo: rather than is_error, use pacing interval so it will be configurable
00568 //@@todo: find some way to use batch buffering stratgy for sequence consumers.
00569 void
00570 TAO_Notify_Consumer::schedule_timer (bool is_error)
00571 {
00572   if (this->timer_id_ != -1)
00573     {
00574       return; // We only want a single timeout scheduled.
00575     }
00576   // don't schedule timer if there's nothing that can be done
00577   if (this->is_suspended ())
00578     {
00579       return;
00580     }
00581 
00582   ACE_ASSERT (this->timer_.get() != 0);
00583 
00584   // If we're scheduling the timer due to an error then we want to
00585   // use the retry timeout, otherwise we'll assume that the pacing
00586   // interval is sufficient for now.
00587   ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT);
00588 
00589   if (! is_error)
00590     {
00591       if (this->pacing_.is_valid ())
00592         {
00593           tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ());
00594         }
00595     }
00596 
00597   if (DEBUG_LEVEL  > 5)
00598     {
00599       ACE_DEBUG ((LM_DEBUG,
00600                   ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"),
00601                   static_cast<int> (this->proxy ()->id ()), tv.msec ()));
00602     }
00603 
00604   this->timer_id_ =
00605     this->timer_->schedule_timer (this, tv, ACE_Time_Value::zero);
00606   if (this->timer_id_ == -1)
00607     {
00608       ACE_ERROR ((LM_ERROR,
00609                   ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () "
00610                             "Error scheduling timer.\n"),
00611                   static_cast<int> (this->proxy ()->id ())
00612                   ));
00613     }
00614 }
00615 
00616 void
00617 TAO_Notify_Consumer::cancel_timer (void)
00618 {
00619   if (this->timer_.isSet() && this->timer_id_ != -1)
00620     {
00621       if (DEBUG_LEVEL  > 5)
00622         ACE_DEBUG ((LM_DEBUG,
00623                     ACE_TEXT ("Consumer %d canceling dispatch timer.\n"),
00624                     static_cast<int> (this->proxy ()->id ())
00625                     ));
00626 
00627       this->timer_->cancel_timer (timer_id_);
00628     }
00629   this->timer_id_ = -1;
00630 }
00631 
00632 int
00633 TAO_Notify_Consumer::handle_timeout (const ACE_Time_Value&, const void*)
00634 {
00635   TAO_Notify_Consumer::Ptr grd (this);
00636   this->timer_id_ = -1;  // This must come first, because dispatch_pending may try to resched
00637   try
00638     {
00639       this->dispatch_pending ();
00640     }
00641   catch (...)
00642     {
00643     }
00644 
00645   return 0;
00646 }
00647 
00648 void
00649 TAO_Notify_Consumer::shutdown (void)
00650 {
00651   if (this->timer_.isSet ())
00652     {
00653       this->cancel_timer ();
00654       this->timer_.reset ();
00655     }
00656 }
00657 
00658 void
00659 TAO_Notify_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed)
00660 {
00661   if (this->have_not_yet_verified_publish_)
00662     {
00663       this->have_not_yet_verified_publish_ = false; // no need to check again
00664       if (! this->publish_->_is_a ("IDL:omg.org/CosNotifyComm/NotifyPublish:1.0"))
00665         this->publish_ = CosNotifyComm::NotifyPublish::_nil();
00666     }
00667   if (! CORBA::is_nil (this->publish_.in ()))
00668     this->publish_->offer_change (added, removed);
00669 }
00670 
00671 TAO_SYNCH_MUTEX*
00672 TAO_Notify_Consumer::proxy_lock (void)
00673 {
00674   return &this->proxy_->lock_;
00675 }
00676 
00677 TAO_Notify_ProxySupplier*
00678 TAO_Notify_Consumer::proxy_supplier (void)
00679 {
00680   return this->proxy_;
00681 }
00682 
00683 void
00684 TAO_Notify_Consumer::assume_pending_events (TAO_Notify_Consumer& rhs)
00685 {
00686   // No need to lock the this proxy's lock.  It should have been locked
00687   // by the caller.
00688 
00689   // If the original consumer has pending events
00690   if (!rhs.pending_events ().is_empty ())
00691     {
00692       // We will take them away and cancel it's timer
00693       this->pending_events_.reset (rhs.pending_events_.release ());
00694       if (rhs.timer_.isSet ())
00695         {
00696           rhs.cancel_timer ();
00697         }
00698 
00699       // Schedule a new timer for us, which will use the default
00700       // timer value (unless we have a valid pacing interval).
00701       this->schedule_timer ();
00702     }
00703 }
00704 
00705 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:45:28 2010 for TAO_CosNotification by  doxygen 1.4.7